Admiral/admiral-worker/app/repos/sql/OptimizationMetricsSqlRepo.py
2025-06-24 14:22:50 +02:00

72 lines
2.3 KiB
Python

from dataclasses import dataclass
from sqlalchemy import Engine, BLOB
from sqlalchemy import PrimaryKeyConstraint
from sqlmodel import SQLModel, Field, Session, select
from typing_extensions import override, Self, Optional
from app.repos.sql import dbRetry
from core import Utils
from core.domain.optimization.Optimization import Optimization
from core.domain.optimization.OptimizationMetrics import OptimizationMetrics
from core.repos.OptimizationMetricsRepo import OptimizationMetricsRepo
from core.types.Id import Id
@dataclass
class OptimizationMetricsSqlRepo(OptimizationMetricsRepo):
db: Engine
class Table(SQLModel, table=True):
__tablename__ = "optimization_metrics"
__table_args__ = (PrimaryKeyConstraint("optimization_id", "solution", "created_at"),)
optimization_id: str = Field(foreign_key="optimization.id")
solution: int
vehicles: int
cost: float
distance: float
duration: float
created_at: int
overlapping: str = Field(sa_type=BLOB)
@classmethod
def toRow(cls, obj: OptimizationMetrics) -> Self:
return cls(
optimization_id=obj.optimizationId.value,
solution=obj.solution,
cost=obj.cost,
vehicles=obj.vehicles,
distance=obj.distance,
duration=obj.duration,
created_at=obj.createdAt.timestamp(),
overlapping=Utils.json_dumps(obj.overlapping).encode('ascii') if obj.overlapping is not None else None,
)
@override
def getAll(self) -> list[OptimizationMetrics]:
with Session(self.db) as conn:
query = select(self.Table)
return [row.toDomain() for row in conn.exec(query).all()]
@override
def get(self, id: Id[OptimizationMetrics]) -> Optional[OptimizationMetrics]:
with Session(self.db) as conn:
query = select(self.Table).filter_by(id=id.value)
row = conn.exec(query).one_or_none()
return row.toDomain() if row is not None else None
@override
def getAllByOptimizationId(self, optimizationId: Id[Optimization]) -> list[OptimizationMetrics]:
with Session(self.db) as conn:
query = select(self.Table).filter_by(optimization_id=optimizationId.value)
return [row.toDomain() for row in conn.exec(query).all()]
@override
@dbRetry
def post(self, optimizationMetrics: OptimizationMetrics):
with Session(self.db) as conn:
conn.add(self.Table.toRow(optimizationMetrics))
conn.commit()