53 lines
1.4 KiB
Python
53 lines
1.4 KiB
Python
import logging
|
|
import os
|
|
from dataclasses import dataclass
|
|
from pathlib import Path
|
|
import shutil
|
|
from typing_extensions import override
|
|
|
|
from core.extend import fs
|
|
from core.services.FtpService import FtpService
|
|
|
|
log = logging.getLogger(__name__)
|
|
|
|
|
|
@dataclass
|
|
class FsFtpService(FtpService):
|
|
|
|
@override
|
|
def download(self, path: Path):
|
|
ftpPath = fs.getFtpPath(path.name)
|
|
log.info(f"Download: '{path.name}' to '{path}'")
|
|
shutil.copyfile(src=ftpPath, dst=path)
|
|
|
|
@override
|
|
def upload(self, path: Path):
|
|
ftpPath = fs.getFtpPath(path.name)
|
|
log.info(f"Upload: '{path}' to '{path.name}'")
|
|
shutil.copyfile(src=path, dst=ftpPath)
|
|
|
|
@override
|
|
def rename(self, oldPath: Path, newPath: Path):
|
|
newFtpPath = fs.getFtpPath(newPath.name)
|
|
oldFtpPath = fs.getFtpPath(oldPath.name)
|
|
log.info(f"Rename: '{oldPath.name}' to '{newPath.name}'")
|
|
shutil.move(src=oldFtpPath, dst=newFtpPath)
|
|
|
|
@override
|
|
def delete(self, path: Path):
|
|
ftpPath = fs.getFtpPath(path.name)
|
|
log.info(f"Delete: '{path.name}'")
|
|
ftpPath.unlink(missing_ok=True)
|
|
|
|
@override
|
|
def copy(self, path: Path, newPath: Path):
|
|
newFtpPath = fs.getFtpPath(newPath.name)
|
|
oldFtpPath = fs.getFtpPath(path.name)
|
|
log.info(f"Copy: '{path.name}' to '{newPath.name}'")
|
|
shutil.copyfile(src=oldFtpPath, dst=newFtpPath)
|
|
|
|
@override
|
|
def scan(self) -> list[Path]:
|
|
ftpPath = fs.getFtpPath()
|
|
return list(ftpPath.iterdir())
|