import uuid from dataclasses import dataclass from typing import Optional from sqlalchemy.engine import Engine from sqlmodel import SQLModel, Field, Session, select from typing_extensions import override, Self from app.repos.sql import dbRetry from core.domain.worker.Worker import Worker from core.domain.worker.WorkerLog import WorkerLog from core.domain.worker.WorkerLogLevel import WorkerLogLevel from core.repos.WorkerLogRepo import WorkerLogRepo from core.types.Id import Id @dataclass class WorkerLogSqlRepo(WorkerLogRepo): db: Engine class Table(SQLModel, table=True): __tablename__ = "worker_log" id: str = Field(primary_key=True) context: str data: str worker_id: str = Field(foreign_key="worker.id") level: str created_at: float def toDomain(self) -> WorkerLog: return WorkerLog( context=self.context, data=self.data, ownerId=Id(value=uuid.UUID(self.worker_id)), createdAt=self.created_at, level=WorkerLogLevel(self.level), id=Id(value=uuid.UUID(self.id)) ) @classmethod def toRow(cls, obj: WorkerLog) -> Self: return cls( context=obj.context, data=obj.data, worker_id=obj.ownerId.value, created_at=obj.createdAt, level=obj.level.value, id=obj.id.value, ) @override def getAll(self) -> list[WorkerLog]: with Session(self.db) as conn: query = select(self.Table) return [row.toDomain() for row in conn.exec(query).all()] @override def get(self, id: Id[WorkerLog]) -> Optional[WorkerLog]: with Session(self.db) as conn: query = select(self.Table).filter_by(id=id.value) row = conn.exec(query).one_or_none() return row.toDomain() if row is not None else None @override @dbRetry def post(self, context: str, workerId: Id[Worker], data: str, level: WorkerLogLevel) -> WorkerLog: obj = WorkerLog(context=context, data=data, ownerId=workerId, level=level) with Session(self.db) as conn: conn.add(self.Table.toRow(obj)) conn.commit() return obj