Admiral/admiral-worker/app/repos/sql/WorkerSqlRepo.py
2025-06-24 14:22:50 +02:00

84 lines
2.1 KiB
Python

import uuid
from dataclasses import dataclass
from typing import Optional
from sqlalchemy import delete
from sqlalchemy.engine import Engine
from sqlmodel import SQLModel, Field, Session, select
from typing_extensions import override, Self
from app.repos.sql import dbRetry
from core.domain.worker.Worker import Worker
from core.domain.worker.WorkerState import WorkerState
from core.domain.worker.WorkerType import WorkerType
from core.repos.WorkerRepo import WorkerRepo
from core.types.Id import Id
@dataclass
class WorkerSqlRepo(WorkerRepo):
db: Engine
class Table(SQLModel, table=True):
__tablename__ = "worker"
id: str = Field(primary_key=True)
type: str
ip: str
state: str
def toDomain(self) -> Worker:
return Worker(
ip=self.ip,
type=WorkerType(self.type),
state=WorkerState(self.state),
id=Id(value=uuid.UUID(self.id))
)
@classmethod
def toRow(cls, obj: Worker) -> Self:
return cls(
ip=obj.ip,
type=obj.type,
id=obj.id.value,
state=obj.state.value
)
@override
def getAll(self) -> list[Worker]:
with Session(self.db) as conn:
query = select(self.Table)
return [row.toDomain() for row in conn.exec(query).all()]
@override
def get(self, id: Id[Worker]) -> Optional[Worker]:
with Session(self.db) as conn:
query = select(self.Table).filter_by(id=id.value)
row = conn.exec(query).one_or_none()
return row.toDomain() if row is not None else None
@override
def post(self, ip: str, type: WorkerType) -> Worker:
worker = Worker(ip=ip, type=type, state=WorkerState.NORMAL)
with Session(self.db) as conn:
conn.add(self.Table.toRow(worker))
conn.commit()
return worker
@override
def getByIp(self, ip: str, type: WorkerType) -> Optional[Worker]:
with Session(self.db) as conn:
query = select(self.Table).filter_by(ip=ip, type=type)
row = conn.exec(query).one_or_none()
return row.toDomain() if row is not None else None
@override
@dbRetry
def deleteByIp(self, ip: str, type: WorkerType) -> int:
with Session(self.db) as conn:
query = delete(self.Table).filter_by(ip=ip, type=type)
result = conn.exec(query).rowcount
conn.commit()
return result