261 lines
9.6 KiB
Python
261 lines
9.6 KiB
Python
import json
|
|
import logging
|
|
from datetime import timedelta
|
|
from typing import Literal, Optional
|
|
|
|
import numpy as np
|
|
import pandas as pd
|
|
from pydantic import BaseModel, PositiveInt, PositiveFloat, NonNegativeInt, model_validator, ConfigDict
|
|
from pydantic.alias_generators import to_camel
|
|
from typing_extensions import Self
|
|
from typing_extensions import override
|
|
|
|
from app.algorithms import solver_or
|
|
from core.types.Logger import Logger
|
|
|
|
|
|
class BaseSchema(BaseModel):
|
|
model_config = ConfigDict(
|
|
alias_generator=to_camel, populate_by_name=True, extra="allow"
|
|
)
|
|
|
|
|
|
class OrToolsOptimizationVehicle(BaseSchema):
|
|
id: NonNegativeInt
|
|
name: str
|
|
|
|
route_type: str
|
|
capacity: PositiveInt
|
|
range_km: PositiveFloat
|
|
working_time_h: Optional[PositiveFloat] = 8.0
|
|
districts: list[str]
|
|
priority: Optional[bool] = False
|
|
|
|
@model_validator(mode="after")
|
|
def check_values(self) -> Self:
|
|
# assert 0 < self.range_km <= 1_000, f"Range should be between 0 and 1000 km."
|
|
# assert 0 < self.capacity <= 1_000
|
|
# assert 0 < self.working_time_h <= 10, f"Max working time is 10h."
|
|
return self
|
|
|
|
|
|
class OrToolsOptimizationPoint(BaseSchema):
|
|
id: NonNegativeInt
|
|
hisa_id: str
|
|
|
|
service_time_sec: NonNegativeInt
|
|
demand: Optional[NonNegativeInt] = 1
|
|
freq: Optional[float] = 1.0
|
|
type: Literal['crn', 'depot', 'refill']
|
|
|
|
lat: float
|
|
lon: float
|
|
|
|
district: Optional[str] = None
|
|
|
|
@model_validator(mode="after")
|
|
def check_values(self) -> Self:
|
|
# TODO: assert 0 <= self.service_time_sec <= 1200, f"Service time too large: {self.service_time_sec}."
|
|
assert 0 <= self.demand < 1_000, f"Demand too large {self.demand}"
|
|
assert 0 <= self.freq <= 1, f"Frequency not between 0 and 1.0 {self.freq}"
|
|
assert self.type != 'depot' or self.district is None, "Depot can't have an assigned district."
|
|
assert 45 <= self.lat <= 47 and 13 <= self.lon <= 17, f"Invalid coordinates {self.lat}, {self.lon}"
|
|
|
|
return self
|
|
|
|
|
|
def to_np_array_int(df, col):
|
|
df = df.sort_values('start_hisa')
|
|
n = len(df['start_hisa'].unique())
|
|
dm = np.full((n, n), 10 ** 9, dtype=int)
|
|
dm[df['start_index'], df['end_index']] = df[col]
|
|
return dm
|
|
|
|
|
|
class OrToolsOptimizationInstance(BaseSchema):
|
|
class Config:
|
|
arbitrary_types_allowed = True
|
|
|
|
class JsonEncoder(json.JSONEncoder):
|
|
def default(self, obj):
|
|
if isinstance(obj, OrToolsOptimizationInstance):
|
|
return obj.model_dump()
|
|
if isinstance(obj, np.ndarray):
|
|
all = {}
|
|
for y, line in enumerate(obj.tolist()):
|
|
for x, ele in enumerate(line):
|
|
all[f"{y}_{x}"] = ele
|
|
return all
|
|
if isinstance(obj, dict):
|
|
return str(obj)
|
|
return super().default(obj)
|
|
|
|
vehicles: list[OrToolsOptimizationVehicle]
|
|
points: list[OrToolsOptimizationPoint]
|
|
distance_matrix: dict[str, np.ndarray]
|
|
time_matrix: dict[str, np.ndarray]
|
|
initial_routes: Optional[list[list[NonNegativeInt]]]
|
|
district_percentage: Optional[float] = 0.0
|
|
log: Logger
|
|
|
|
# time_dist_data: dict[Literal['bike', 'car', 'foot'], pd.DataFrame]
|
|
|
|
# @computed_field(return_type=dict[Literal['bike', 'car', 'foot'], np.ndarray])
|
|
# @cached_property
|
|
# def distance_matrix(self):
|
|
# return {key: to_np_array_int(df, 'distance') for key, df in self.time_dist_data.items()}
|
|
|
|
# @computed_field(return_type=dict[Literal['bike', 'car', 'foot'], np.ndarray])
|
|
# @cached_property
|
|
# def time_matrix(self):
|
|
# return {key: to_np_array_int(df, 'duration') for key, df in self.time_dist_data.items()}
|
|
|
|
@model_validator(mode="after")
|
|
def check_values(self) -> Self:
|
|
availableCapacity = sum([o.capacity for o in self.vehicles])
|
|
requiredCapacity = sum([o.demand for o in self.points])
|
|
assert availableCapacity >= requiredCapacity, f"Available capacity '{availableCapacity}' is less than required capacity '{requiredCapacity}'"
|
|
|
|
if self.district_percentage is not None:
|
|
assert 0.0 <= self.district_percentage <= 1.0, f"District percentage has to be between 0 and 1 (float)."
|
|
|
|
# for k, df in self.time_dist_data.items():
|
|
# assert set(df['start_index']) == set(df['end_index']), "Sources and destinations should be the same."
|
|
|
|
for k, v in self.distance_matrix.items():
|
|
assert len(self.points) == v.shape[0], f"Number of points({len(self.points)} should be same as distance_matrix size({v.shape[0]})"
|
|
assert v.shape[0] == v.shape[1], "Both dimensions of distance_matrix should be of equal size"
|
|
assert all(np.array(sorted([x.id for x in self.points])) == np.arange(v.shape[0])), "Point.id should be its index in distance_matrix."
|
|
|
|
for k, v in self.time_matrix.items():
|
|
assert len(self.points) == v.shape[0], "Number of points should be same as time_matrix size"
|
|
assert v.shape[0] == v.shape[1], "Both dimensions of time_matrix should be of equal size"
|
|
|
|
assert all(np.issubdtype(v.dtype, np.floating) for v in self.distance_matrix.values()), "Distance matrix should be of type np.integer"
|
|
assert all(np.issubdtype(v.dtype, np.floating) for v in self.time_matrix.values()), "Time matrix should be of type np.integer"
|
|
|
|
for k, v in self.distance_matrix.items():
|
|
# assert v.max() <= 100_000, f"Some values in distance_matrix '{k}' are larger than 100 km."
|
|
# assert v.mean() >= 1_000, f"Mean of values in distance_matrix '{k}' is smaller than 1000 m. Check why are values so big!"
|
|
if v.max() > 100_000:
|
|
self.log.warning(f"Some values in distance_matrix '{k}' are to big: {v.max()}")
|
|
if v.mean() < 1_000:
|
|
self.log.warning(f"Mean of values in distance_matrix '{k}' are to big: {v.mean()}")
|
|
|
|
# TODO: check matrix
|
|
return self
|
|
|
|
|
|
class OrToolsOptimizationSolution(BaseSchema):
|
|
vehicle_id: NonNegativeInt
|
|
dummy: bool
|
|
hisa_ids: list[str]
|
|
distance: NonNegativeInt
|
|
duration: timedelta
|
|
cost: NonNegativeInt
|
|
district: Optional[str] = None # TODO: solver_or needs to assign district names when doing exact optimization!!!
|
|
|
|
|
|
class OrToolsOptimizationConfig(BaseSchema):
|
|
objective: Literal['distance', 'time'] = 'time'
|
|
vehicle_cost: Optional[int] = solver_or.VEHICLE_COST
|
|
|
|
district_penalty: NonNegativeInt = 0
|
|
district_mode: Literal['single', 'subsets', 'hard'] = 'soft'
|
|
|
|
set_initial: bool = False
|
|
|
|
useDistrictCentrality: bool = True
|
|
|
|
|
|
class OrToolsOptimizationService:
|
|
"""
|
|
Main class for doing optimization
|
|
"""
|
|
|
|
@override
|
|
def vrpOptimization(
|
|
self,
|
|
solving_time_sec: int,
|
|
instance: OrToolsOptimizationInstance,
|
|
config: OrToolsOptimizationConfig,
|
|
log: Logger,
|
|
solution_callback_fn=lambda objective, raw_solution, overlapping: None,
|
|
stop_callback_fn=lambda: False,
|
|
) -> tuple[int, list[OrToolsOptimizationSolution], dict[int, float]]:
|
|
|
|
log.info("Mapping optimizationVehicles")
|
|
opta_vehicles = pd.DataFrame([x.__dict__ for x in instance.vehicles])
|
|
opta_vehicles['cost'] = config.vehicle_cost
|
|
opta_vehicles.loc[opta_vehicles['priority'], 'cost'] = solver_or.VEHICLE_PRIORITY_COST
|
|
opta_vehicles['max_time'] = (opta_vehicles['working_time_h'] * 3600).astype(int)
|
|
opta_vehicles['range'] = (opta_vehicles['range_km'] * 1000).astype(int)
|
|
|
|
if solver_or.VEHICLE_DUPLICATE_FACTOR > 1:
|
|
vn = len(opta_vehicles)
|
|
opta_vehicles = pd.concat([opta_vehicles] * solver_or.VEHICLE_DUPLICATE_FACTOR).reset_index(drop=True)
|
|
opta_vehicles.loc[opta_vehicles.index[vn:], 'cost'] = solver_or.VEHICLE_DUPLICATE_COST
|
|
log.info("Mapping optimization points")
|
|
opta_points = pd.DataFrame([x.__dict__ for x in instance.points])
|
|
opta_points['service_time'] = opta_points['service_time_sec']
|
|
opta_points['base_point'] = np.arange(len(opta_points))
|
|
|
|
opta_instance = solver_or.VrpInstance(opta_vehicles, opta_points, instance.distance_matrix, instance.time_matrix, instance.initial_routes,
|
|
instance.district_percentage)
|
|
|
|
def calculate_overlapping(solution, instance) -> Optional[dict[int, float]]:
|
|
if not config.set_initial:
|
|
return None
|
|
overlapping: dict[int, float] = {}
|
|
for id, vehicle, type, route, total_distance, total_time, total_cost, num_points, orig_id, dummy in solution.to_records(index=False):
|
|
if len(instance.initial_routes) == orig_id:
|
|
break
|
|
initial_route = set(instance.initial_routes[orig_id])
|
|
route = set(route)
|
|
addedPoints = route - initial_route
|
|
if len(initial_route) > 0:
|
|
overlapping[int(vehicle)] = round(100 * len(addedPoints) / len(initial_route), 3)
|
|
return overlapping
|
|
|
|
def map_raw_solution(objective: int, solution: pd.DataFrame) -> tuple[int, list[OrToolsOptimizationSolution], dict[int, float]]:
|
|
solution = solution[solution['total_distance'] > 0].copy()
|
|
|
|
solution['orig_id'] = solution['vehicle'].apply(lambda x: x % len(instance.vehicles))
|
|
solution = solution.reset_index()
|
|
solution['dummy'] = solution['orig_id'].duplicated()
|
|
|
|
readings = [kwargs for kwargs in solution.to_dict(orient='records')]
|
|
|
|
optimizationSolutions = []
|
|
id2point = {x.id: x for x in instance.points}
|
|
|
|
for reading in readings:
|
|
hisa_ids = [id2point[i].hisa_id for i in reading['route']]
|
|
vehicle_id = reading['orig_id']
|
|
dummy = reading['dummy']
|
|
|
|
optimizationSolutions.append(
|
|
OrToolsOptimizationSolution(
|
|
vehicle_id=vehicle_id,
|
|
dummy=dummy,
|
|
hisa_ids=hisa_ids,
|
|
distance=reading['total_distance'],
|
|
duration=timedelta(seconds=reading['total_time']),
|
|
cost=reading['total_cost']
|
|
)
|
|
)
|
|
|
|
return objective, optimizationSolutions, calculate_overlapping(solution=solution, instance=instance)
|
|
|
|
log.info(f"Solving VRP with points (without depot): {len(opta_instance.nodes) - 1}")
|
|
objective, solution = solver_or.solve(
|
|
opta_instance, config, solving_time_sec,
|
|
solution_callback_fn=lambda objec, raw_solution: solution_callback_fn(*map_raw_solution(objective=objec, solution=raw_solution)),
|
|
stop_callback_fn=stop_callback_fn,
|
|
log=log
|
|
)
|
|
|
|
obj, sol, overlap = map_raw_solution(objective=objective, solution=solution)
|
|
log.info(f"VRP solved with points (without depot): {sum([len(s.hisa_ids) - 1 for s in sol])}")
|
|
return obj, sol, overlap
|