72 lines
2.3 KiB
Python
72 lines
2.3 KiB
Python
from dataclasses import dataclass
|
|
|
|
from sqlalchemy import Engine, BLOB
|
|
from sqlalchemy import PrimaryKeyConstraint
|
|
from sqlmodel import SQLModel, Field, Session, select
|
|
from typing_extensions import override, Self, Optional
|
|
|
|
from app.repos.sql import dbRetry
|
|
from core import Utils
|
|
from core.domain.optimization.Optimization import Optimization
|
|
from core.domain.optimization.OptimizationMetrics import OptimizationMetrics
|
|
from core.repos.OptimizationMetricsRepo import OptimizationMetricsRepo
|
|
from core.types.Id import Id
|
|
|
|
|
|
@dataclass
|
|
class OptimizationMetricsSqlRepo(OptimizationMetricsRepo):
|
|
db: Engine
|
|
|
|
class Table(SQLModel, table=True):
|
|
__tablename__ = "optimization_metrics"
|
|
|
|
__table_args__ = (PrimaryKeyConstraint("optimization_id", "solution", "created_at"),)
|
|
|
|
optimization_id: str = Field(foreign_key="optimization.id")
|
|
solution: int
|
|
vehicles: int
|
|
cost: float
|
|
distance: float
|
|
duration: float
|
|
created_at: int
|
|
overlapping: str = Field(sa_type=BLOB)
|
|
|
|
@classmethod
|
|
def toRow(cls, obj: OptimizationMetrics) -> Self:
|
|
return cls(
|
|
optimization_id=obj.optimizationId.value,
|
|
solution=obj.solution,
|
|
cost=obj.cost,
|
|
vehicles=obj.vehicles,
|
|
distance=obj.distance,
|
|
duration=obj.duration,
|
|
created_at=obj.createdAt.timestamp(),
|
|
overlapping=Utils.json_dumps(obj.overlapping).encode('ascii') if obj.overlapping is not None else None,
|
|
)
|
|
|
|
@override
|
|
def getAll(self) -> list[OptimizationMetrics]:
|
|
with Session(self.db) as conn:
|
|
query = select(self.Table)
|
|
return [row.toDomain() for row in conn.exec(query).all()]
|
|
|
|
@override
|
|
def get(self, id: Id[OptimizationMetrics]) -> Optional[OptimizationMetrics]:
|
|
with Session(self.db) as conn:
|
|
query = select(self.Table).filter_by(id=id.value)
|
|
row = conn.exec(query).one_or_none()
|
|
return row.toDomain() if row is not None else None
|
|
|
|
@override
|
|
def getAllByOptimizationId(self, optimizationId: Id[Optimization]) -> list[OptimizationMetrics]:
|
|
with Session(self.db) as conn:
|
|
query = select(self.Table).filter_by(optimization_id=optimizationId.value)
|
|
return [row.toDomain() for row in conn.exec(query).all()]
|
|
|
|
@override
|
|
@dbRetry
|
|
def post(self, optimizationMetrics: OptimizationMetrics):
|
|
with Session(self.db) as conn:
|
|
conn.add(self.Table.toRow(optimizationMetrics))
|
|
conn.commit()
|