2025-06-24 14:22:50 +02:00

53 lines
1.4 KiB
Python

import logging
import os
from dataclasses import dataclass
from pathlib import Path
import shutil
from typing_extensions import override
from core.extend import fs
from core.services.FtpService import FtpService
log = logging.getLogger(__name__)
@dataclass
class FsFtpService(FtpService):
@override
def download(self, path: Path):
ftpPath = fs.getFtpPath(path.name)
log.info(f"Download: '{path.name}' to '{path}'")
shutil.copyfile(src=ftpPath, dst=path)
@override
def upload(self, path: Path):
ftpPath = fs.getFtpPath(path.name)
log.info(f"Upload: '{path}' to '{path.name}'")
shutil.copyfile(src=path, dst=ftpPath)
@override
def rename(self, oldPath: Path, newPath: Path):
newFtpPath = fs.getFtpPath(newPath.name)
oldFtpPath = fs.getFtpPath(oldPath.name)
log.info(f"Rename: '{oldPath.name}' to '{newPath.name}'")
shutil.move(src=oldFtpPath, dst=newFtpPath)
@override
def delete(self, path: Path):
ftpPath = fs.getFtpPath(path.name)
log.info(f"Delete: '{path.name}'")
ftpPath.unlink(missing_ok=True)
@override
def copy(self, path: Path, newPath: Path):
newFtpPath = fs.getFtpPath(newPath.name)
oldFtpPath = fs.getFtpPath(path.name)
log.info(f"Copy: '{path.name}' to '{newPath.name}'")
shutil.copyfile(src=oldFtpPath, dst=newFtpPath)
@override
def scan(self) -> list[Path]:
ftpPath = fs.getFtpPath()
return list(ftpPath.iterdir())