from datetime import datetime, timedelta import uuid from dataclasses import dataclass from typing import Optional from sqlalchemy import Engine, update from sqlmodel import SQLModel, Field, Session, select from typing_extensions import override from app.repos.sql import dbRetry from core.domain.optimization.Optimization import Optimization from core.domain.optimization.OptimizationState import OptimizationState from core.domain.optimization.OptimizationType import OptimizationType from core.repos.OptimizationRepo import OptimizationRepo from core.types.Id import Id from core.types.IntId import IntId @dataclass class OptimizationSqlRepo(OptimizationRepo): db: Engine class Table(SQLModel, table=True): __tablename__ = "optimization" id: str = Field(primary_key=True) posta: int title: str description: str optimization_time: float weight: int dates: str created_at: int authorized_by_user_id: str state_changed_at: int use_frequency: bool use_unvisited_crn: bool district_centering: bool static_service_times: int state: str type: str parent: str def toDomain(self) -> Optimization: return Optimization( posta=IntId(value=self.posta), title=self.title, description=self.description, weight=self.weight, dates=[datetime.fromtimestamp(int(date)).date() for date in self.dates.split(",") if date.isnumeric()], optimizationTime=timedelta(seconds=self.optimization_time), createdAt=datetime.fromtimestamp(self.created_at), authorizedByUserId=self.authorized_by_user_id, state=OptimizationState(self.state), type=OptimizationType(self.type), useFrequency=self.use_frequency, useUnvisitedCrn=self.use_unvisited_crn, useDistrictCentrality=self.district_centering, stateChangedAt=datetime.fromtimestamp(self.state_changed_at), staticServiceTimes=self.static_service_times, parent=Id(value=uuid.UUID(self.parent)) if self.parent is not None else None, id=Id(value=uuid.UUID(self.id)), ) @override def getAll(self) -> list[Optimization]: with Session(self.db) as conn: query = select(self.Table) return [row.toDomain() for row in conn.exec(query).all()] @override def get(self, id: Id[Optimization]) -> Optional[Optimization]: with Session(self.db) as conn: query = select(self.Table).filter_by(id=id.value) row = conn.exec(query).one_or_none() return row.toDomain() if row is not None else None @override def getWithState(self, state: OptimizationState) -> list[Optimization]: with Session(self.db) as conn: query = select(self.Table).filter_by(state=state.value) return [row.toDomain() for row in conn.exec(query).all()] @override @dbRetry def updateFirst(self, fromState: OptimizationState, toState: OptimizationState) -> Optional[Optimization]: with Session(self.db) as conn: # Get candidate for update selectQuery = select(self.Table).filter_by(state=fromState.value).limit(1) row = conn.exec(selectQuery).one_or_none() if row is None: return None # Update candidate but only if his state is still unchanged updateQuery = update(self.Table).filter_by(state=fromState.value, id=row.id).values(state=toState.value, state_changed_at=datetime.now().timestamp()) # If candidate was updated before this update break the transaction if conn.exec(updateQuery).rowcount != 1: return None # Again get updated candidate selectQuery = select(self.Table).filter_by(id=row.id).limit(1) row = conn.exec(selectQuery).one_or_none() if row is None: return None # Commit changes conn.commit() return row.toDomain() @override @dbRetry def setState(self, id: Id[Optimization], toState: OptimizationState) -> Optional[Optimization]: with Session(self.db) as conn: updateQuery = update(self.Table).filter_by(id=id.value).values(state=toState.value, state_changed_at=datetime.now().timestamp()) conn.exec(updateQuery) conn.commit() @override def getLatestConfirmedByPosta(self, posta: int) -> Optional[Optimization]: with (Session(self.db) as conn): query = select( self.Table ).order_by( self.Table.state_changed_at.desc() ).limit( 1 ).filter_by( posta=posta, state=OptimizationState.CONFIRMED.value ) row = conn.exec(query).one_or_none() if row is None: return None return row.toDomain()