import uuid from dataclasses import dataclass from typing import Optional from sqlmodel import SQLModel, Field, Session, select from typing_extensions import override, Self from sqlalchemy.engine import Engine from app.repos.sql import dbRetry from core.domain.optimization.OptimizationState import OptimizationState from core.domain.worker.WorkerJob import WorkerJob from core.repos.WorkerJobRepo import WorkerJobRepo from core.types.Id import Id @dataclass class WorkerJobSqlRepo(WorkerJobRepo): db: Engine class Table(SQLModel, table=True): __tablename__ = "worker_job" id: str = Field(primary_key=True) optimization_id: str = Field(foreign_key="optimization.id") worker_id: str = Field(foreign_key="worker.id") name: str state: str def toDomain(self) -> WorkerJob: return WorkerJob( id=Id(value=uuid.UUID(self.id)), optimizationId=Id(value=uuid.UUID(self.optimization_id)), workerId=Id(value=uuid.UUID(self.worker_id)), name=self.name, state=OptimizationState(self.state) ) @classmethod def toRow(cls, obj: WorkerJob) -> Self: return cls( id=obj.id.value, optimization_id=obj.optimizationId.value, worker_id=obj.workerId.value, name=obj.name, state=obj.state.value, ) @override def getAll(self) -> list[WorkerJob]: with Session(self.db) as conn: query = select(self.Table) return [row.toDomain() for row in conn.exec(query).all()] @override def get(self, id: Id[WorkerJob]) -> Optional[WorkerJob]: with Session(self.db) as conn: query = select(self.Table).filter_by(id=id.value) row = conn.exec(query).one_or_none() return row.toDomain() if row is not None else None @override @dbRetry def post(self, obj: WorkerJob): with Session(self.db) as conn: conn.add(self.Table.toRow(obj)) conn.commit() return obj