from collections import Counter from datetime import timedelta from typing import Callable, Literal, Optional import numpy as np from sklearn.neighbors import BallTree from app.algorithms.OrToolsOptimizationService import OrToolsOptimizationVehicle, OrToolsOptimizationPoint, OrToolsOptimizationInstance, \ OrToolsOptimizationSolution, OrToolsOptimizationService, OrToolsOptimizationConfig from core.Utils import percentage from core.domain.map.CrnPoint import CrnPoint from core.domain.map.GeoLocation import GeoLocation from core.domain.map.RouteMatrix import RouteMatrix from core.domain.optimization.Optimization import Optimization from core.domain.optimization.OptimizationPoint import OptimizationPoint from core.domain.optimization.OptimizationPointType import OptimizationPointType from core.domain.optimization.OptimizationResultData import OptimizationResultData from core.domain.optimization.OptimizationSolution import OptimizationSolution from core.domain.optimization.OptimizationType import OptimizationType from core.domain.optimization.OptimizationVehicle import OptimizationVehicle from core.domain.optimization.TransportMode import TransportMode from core.services.OptimizationService import OptimizationService from core.types.Logger import Logger class SolvesallOptimizationService(OptimizationService): def config(self, setInitial: bool, district_centering: bool) -> OrToolsOptimizationConfig: return OrToolsOptimizationConfig( district_mode='subsets', district_penalty=0, vehicle_cost=16 * 3600, # Two working days. set_initial=setInitial, useDistrictCentrality=district_centering, ) def vrpOptimization( self, optimization: Optimization, optimizationVehicles: list[OptimizationVehicle], optimizationPoints: list[OptimizationPoint], routeMatrices: dict[TransportMode, RouteMatrix], solutionCallback: Callable[[int, list[OptimizationSolution], bool, list[OptimizationPoint], Optional[dict[int, float]]], None], terminationCallback: Callable[[], bool], log: Logger, initialOptimizationResultData: Optional[OptimizationResultData] = None ): config = self.config(setInitial=initialOptimizationResultData is not None, district_centering=optimization.useDistrictCentrality) crn_initialDistrict: dict[int, str] = {} initialOptimizationPoints: list[OptimizationPoint] = [] initialRoutePointBallTree: Optional[BallTree] = None if config.set_initial: log.info('Setting optimization mode to initial solution.') log.info('Creating crn_initialDistrict map and initial optimization points ball tree.') for initialRoute in initialOptimizationResultData.optimizationResult.routes: for initialRoutePoint in initialRoute.points: if initialRoutePoint.crnPoint.hisa != 0: initialOptimizationPoints.append(initialRoutePoint) crn_initialDistrict[initialRoutePoint.crnPoint.hisa] = initialRoute.name initialRoutePointBallTree = BallTree([iop.crnPoint.location.ballVector for iop in initialOptimizationPoints], metric='haversine') log.info('Mapping optimization points') orToolsOptimizationPoints: list[OrToolsOptimizationPoint] = [] for i, point in enumerate(optimizationPoints): # Construct OrToolsOptimizationPoint list crnPoint = point.crnPoint microLocation = crnPoint.microLocation district = None if crnPoint.district == '' else crnPoint.district orPoint = OrToolsOptimizationPoint( id=i, hisa_id=str(crnPoint.hisa), service_time_sec=int(point.serviceTime.total_seconds()), demand=point.demand, freq=point.visitFrequency, type=self.__crn_type(point.type), lat=microLocation.lat, lon=microLocation.lon, district=district ) orToolsOptimizationPoints.append(orPoint) if crnPoint.hisa != 0: # Insert additional crn points which does not exists in initial routes to crn_initialDistrict initialDistrict = crn_initialDistrict.get(point.crnPoint.hisa, None) if initialDistrict is None and config.set_initial: ballVector = GeoLocation(lat=orPoint.lat, lon=orPoint.lon).ballVector nearestInitialPointsIndex = initialRoutePointBallTree.query([ballVector], k=1, return_distance=False)[0][0] nearestInitialCrn = initialOptimizationPoints[nearestInitialPointsIndex].crnPoint.hisa nearestInitialDistrict = crn_initialDistrict[nearestInitialCrn] crn_initialDistrict[crnPoint.hisa] = nearestInitialDistrict log.warning(f"Crn point '{crnPoint.hisa}' is missing in initial routes, nearest crn district: {nearestInitialDistrict}") # Log first 10 points if i < 10: log.info(orPoint) log.info('Mapping optimization vehicles') orToolsOptimizationVehicles: list[OrToolsOptimizationVehicle] = [] optimizationVehicleAll: list[OptimizationVehicle] = [] tempOrVehicleIndex_district: dict[int, str] = {} orVehicleIndex_district: dict[int, str] = {} for vehicle in optimizationVehicles: districts = vehicle.districts.split(",") for i in range(vehicle.maxQuantity): orVehicle = OrToolsOptimizationVehicle( id=len(orToolsOptimizationVehicles), name=vehicle.name, route_type=self.__route_type(vehicle.type), capacity=vehicle.capacity, range_km=vehicle.range / 1000, working_time_h=vehicle.deliveryTime, priority=i < vehicle.minQuantity, districts=vehicle.districts.split(",") ) # Assign district to vehicle if len(districts) > 0: district = districts.pop(0) tempOrVehicleIndex_district[orVehicle.id] = district orVehicleIndex_district[orVehicle.id] = district log.info(orVehicle) optimizationVehicleAll.append(vehicle) orToolsOptimizationVehicles.append(orVehicle) # TODO: !!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!! On backend get initial district from crn if no confirmed optimization allready exists otherwise get initial district from last confirmed optimization. initialRoutes: list[list[CrnPoint]] = [] if config.set_initial: log.info("Construct initial routes for vehicles") for i, vehicle in enumerate(orToolsOptimizationVehicles): vehicleDistrict = tempOrVehicleIndex_district.pop(i, None) initialRoutes.append([]) if vehicleDistrict is None: continue for route in initialOptimizationResultData.optimizationResult.routes: if route.name == vehicleDistrict: for routePoint in route.points: if routePoint.crnPoint.hisa != 0: initialRoutes[-1].append(routePoint.crnPoint) if len(initialRoutes[-1]) > 0: route = [ir.hisa for ir in initialRoutes[-1]] log.info([ f"{i}. {vehicleDistrict}.{vehicle.name}[{len(route)}]:", f"{route[0]}", f"->", f"{route[-1]}", "...", f"{route}" ]) log.info('Mapping optimization matrices') time_matrix: dict[str, np.ndarray] = {} distance_matrix: dict[str, np.ndarray] = {} for vehicle_type, routeMatrix in routeMatrices.items(): vehicle_type_name = self.__route_type(type=vehicle_type) time_matrix[vehicle_type_name] = routeMatrix.durationMatrix() distance_matrix[vehicle_type_name] = routeMatrix.distanceMatrix() # Creating configuration for optimization orToolsInstance = OrToolsOptimizationInstance( vehicles=orToolsOptimizationVehicles, points=orToolsOptimizationPoints, distance_matrix=distance_matrix, time_matrix=time_matrix, initial_routes=[], # <---------------------------- SET THIS LATER!!! district_percentage=optimization.weight / 100, log=log ) log.info(f"Use unvisited crns: {optimization.useUnvisitedCrn}") unvisitedOptimizationPoints: list[OptimizationPoint] = [] if not optimization.useUnvisitedCrn: visitedOptimizationPoints = list(filter(lambda op: op.isVisited, optimizationPoints)) unvisitedOptimizationPoints = list(filter(lambda op: not op.isVisited, optimizationPoints)) log.warning(f"Unvisited crns[{len(unvisitedOptimizationPoints)}]: {percentage(unvisitedOptimizationPoints, optimizationPoints)}%") orToolsInstance = self.__filteredOrToolsInstance(orToolsInstance=orToolsInstance, optimizationPoints=visitedOptimizationPoints) initialRoutes = self.__checkAndBalanceInitialRoutes(initialRoutes=initialRoutes, optimizationPoints=optimizationPoints, log=log) initialRoutes = self.__filterInitialRoutes(initialRoutes=initialRoutes, optimizationPoints=visitedOptimizationPoints) log.info("Put initial route crn indexes to initial routes as is their place in optimization points list") crn_optimizationPointIndex: dict[int, int] = {} for i, op in enumerate(orToolsInstance.points): hisa = int(op.hisa_id) if hisa != 0: crn_optimizationPointIndex[hisa] = i log.info("Set initial routes") orToolsInstance.initial_routes = [[crn_optimizationPointIndex[crnPoint.hisa] for crnPoint in route] for route in initialRoutes] # Stop callback def stop_callback_fn() -> bool: return terminationCallback() # Solution callback def solution_callback_fn(objective: int, solution: list[OrToolsOptimizationSolution], finished: bool, overlapping: dict[int, float] | None): mappedSolution = [] for os in solution: optimizationVehicle = optimizationVehicleAll[os.vehicle_id] district = None if optimization.weight > 0: district = os.district if os.district is not None else orVehicleIndex_district.get(os.vehicle_id, None) kwargs = dict( isExtra=os.dummy, optimizationVehicleId=optimizationVehicle.id, hise=[int(hi) for hi in os.hisa_ids], distance=os.distance, duration=os.duration, district=district, cost=os.cost ) mappedSolution.append(OptimizationSolution(**kwargs)) solutionCallback(objective, mappedSolution, finished, unvisitedOptimizationPoints, overlapping) if optimization.type == OptimizationType.INITIAL: return self.__generateInitialSolution( solvingTime=optimization.optimizationTime, orToolsInstance=orToolsInstance, solutionCallback=solution_callback_fn, district_centrality=optimization.useDistrictCentrality, log=log, ) elif optimization.type == OptimizationType.TEST: return self.__generateTestSolution( testingOptimizationPoints=self.__filterOptimizationPoints( optimization=optimization, optimizationPoints=optimizationPoints, log=log), solvingTime=optimization.optimizationTime, orToolsInstance=orToolsInstance, solutionCallback=solution_callback_fn, log=log, district_centrality=optimization.useDistrictCentrality, ) # Starting optimization and getting final solution objective, finalSolution, overlapping = OrToolsOptimizationService().vrpOptimization( solving_time_sec=int(optimization.optimizationTime.total_seconds()), instance=orToolsInstance, config=config, solution_callback_fn=lambda obj, sol, over: solution_callback_fn(objective=obj, solution=sol, finished=False, overlapping=over), stop_callback_fn=stop_callback_fn, log=log ) solution_callback_fn(objective=objective, solution=finalSolution, finished=True, overlapping=overlapping) def __filterInitialRoutes(self, initialRoutes: list[list[CrnPoint]], optimizationPoints: list[OptimizationPoint]) -> list[list[CrnPoint]]: """ Filter initial crns that are present inside optimization points """ allowedHise = [op.crnPoint.hisa for op in optimizationPoints] filteredInitialRoutes = [] for route in initialRoutes: filteredInitialRoute = [] for crnPoint in route: if crnPoint.hisa in allowedHise: filteredInitialRoute.append(crnPoint) filteredInitialRoutes.append(filteredInitialRoute) return filteredInitialRoutes def __checkAndBalanceInitialRoutes( self, initialRoutes: list[list[CrnPoint]], optimizationPoints: list[OptimizationPoint], log: Logger ) -> list[list[CrnPoint]]: if len(initialRoutes) == 0: return [] """ Add missing initial crn points, remove not needed crn points """ log.warning("Start balancing initial routes") log.info("Create crn mapping with optimization points as priority") hisa_crn: dict[int, CrnPoint] = {} hisa_initial_district: dict[int, int] = {} for district, initialRoute in enumerate(initialRoutes): for ip in initialRoute: hisa_crn[ip.hisa] = ip hisa_initial_district[ip.hisa] = district for op in optimizationPoints: hisa_crn[op.crnPoint.hisa] = op.crnPoint log.info("Get all initial crns") initialHise = [] for initialRoute in initialRoutes: for ip in initialRoute: initialHise.append(ip.hisa) log.info("Get all optimization crns") optimizationHise = {op.crnPoint.hisa for op in optimizationPoints} uniqueInitialHise = set(initialHise) # Check for duplicates if len(uniqueInitialHise) != len(initialHise): Exception(f"Initial routes contains duplicates: {[k for (k, v) in Counter(initialHise).items() if v > 1]} ") if len(optimizationHise) != len(optimizationPoints): opHise = [op.crnPoint.hisa for op in optimizationPoints] raise Exception(f"Optimization points contains duplicates: {[k for (k, v) in Counter(opHise).items() if v > 1]} ") allCrns = list(hisa_crn.values()) allCrnLocations = [crn.location.ballVector for crn in allCrns] crnBallTree = BallTree(allCrnLocations, metric='haversine') missingInitialHise = optimizationHise - uniqueInitialHise notUsedInitialHise = uniqueInitialHise - optimizationHise if len(missingInitialHise) > 0: log.warning(f"Missing initial crns: {len(missingInitialHise)}: {missingInitialHise}") if len(notUsedInitialHise) > 0: log.warning(f"Not used initial crns: {len(notUsedInitialHise)}: {notUsedInitialHise}") # Insert missing crns to initial routes log.info("Insert missing crns to initial routes") for mih in missingInitialHise: if mih == 0: # DO NOT INSERT POST OFFICE TO INITIAL ROUTES!!!!!!!!!!!!!!! continue missingCrn = hisa_crn[mih] closestCrnIndexes = crnBallTree.query([missingCrn.location.ballVector], k=int(len(optimizationPoints) / 2), return_distance=False)[0][:1] # Find to which district we can insert missing district inserted = False for closestCrnIndex in closestCrnIndexes: closestCrn = allCrns[closestCrnIndex] # We found closest crn that exists in initial districts we know where to insert it... if closestCrn.hisa in hisa_initial_district: closestCrnDistrict = hisa_initial_district[closestCrn.hisa] initialRoutes[closestCrnDistrict].append(missingCrn) inserted = True break # If I could not inserted crn insert in random initial route if not inserted: initialRoutes[0].append(missingCrn) # Remove not used initial crns for nuih in notUsedInitialHise: notUsedCrn = hisa_crn[nuih] notUsedCrnDistrict = hisa_initial_district[nuih] initialRoutes[notUsedCrnDistrict].remove(notUsedCrn) return initialRoutes def __route_type(self, type: TransportMode) -> str: match type: case TransportMode.BIKE: return 'bike' case TransportMode.CAR: return 'car' case TransportMode.EV: return 'ev' case TransportMode.KM: return 'km' case TransportMode.KPM: return 'kpm' case TransportMode.MK: return 'mk' case TransportMode.WALK: return 'foot' case _: raise TypeError(f"Mapping for transport mode does not exists: {type}") def __crn_type(self, type: OptimizationPointType) -> Literal['crn', 'depot', 'refill']: if type == OptimizationPointType.CRN: return 'crn' elif type == OptimizationPointType.POSTA: return 'depot' elif type == OptimizationPointType.DOSTAVNIK: return 'refill' raise TypeError(f"CRN type '{type}' currently not supported!") def __filteredOrToolsInstance( self, orToolsInstance: OrToolsOptimizationInstance, optimizationPoints: list[OptimizationPoint] ) -> OrToolsOptimizationInstance: depotOptPoint = orToolsInstance.points[0] crnOptPoints = orToolsInstance.points[1:] filteredHisaIds = [tOptP.crnPoint.hisa for tOptP in optimizationPoints] optPointIndexes = [0] newOtimizationPoints = [depotOptPoint] # Depot must be on the first index!!!!!!!!!!! # Fetch district optimization points and indexes for generating matrixes for i, crnOptPoint in enumerate(crnOptPoints): if int(crnOptPoint.hisa_id) in filteredHisaIds: optPointIndexes.append(i + 1) newOtimizationPoints.append(crnOptPoint) # Reset index to match new distance and time matrix for i, optPoint in enumerate(newOtimizationPoints): optPoint.id = i # Generate new distance matrices distance_matrix: dict[str, np.ndarray] = {} for vehicleType, matrix in orToolsInstance.distance_matrix.items(): distance_matrix[vehicleType] = matrix[np.ix_(optPointIndexes, optPointIndexes)] # Generate new time matrices time_matrix: dict[str, np.ndarray] = {} for vehicleType, matrix in orToolsInstance.time_matrix.items(): time_matrix[vehicleType] = matrix[np.ix_(optPointIndexes, optPointIndexes)] orToolsInstance.points = newOtimizationPoints orToolsInstance.distance_matrix = distance_matrix orToolsInstance.time_matrix = time_matrix return orToolsInstance def __generateTestSolution( self, solvingTime: timedelta, testingOptimizationPoints: list[OptimizationPoint], orToolsInstance: OrToolsOptimizationInstance, solutionCallback: Callable[[int, list[OrToolsOptimizationSolution], bool, Optional[dict[int, float]]], None], log: Logger, district_centrality: bool ): log.info("Generating test solution") orToolsInstance = self.__filteredOrToolsInstance(orToolsInstance=orToolsInstance, optimizationPoints=testingOptimizationPoints) # Starting optimization and getting final solution objective, solution, overlapping = OrToolsOptimizationService().vrpOptimization( solving_time_sec=int(solvingTime.total_seconds()), instance=orToolsInstance, config=self.config(setInitial=False, district_centering=district_centrality), log=log ) solutionCallback(objective, solution, True, overlapping) def __generateInitialSolution( self, solvingTime: timedelta, orToolsInstance: OrToolsOptimizationInstance, solutionCallback: Callable[[int, list[OrToolsOptimizationSolution], bool, Optional[dict[int, float]]], None], log: Logger, district_centrality: bool ): log.info("Generating initial solution") # Remove vehicles constraints for vehicle in orToolsInstance.vehicles: vehicle.working_time_h = 1e3 vehicle.range_km = 1e3 vehicle.capacity = 1e3 depotOptPoint = orToolsInstance.points[0] crnOptPoints = orToolsInstance.points[1:] districts = set([optPoint.district for optPoint in crnOptPoints]) # Depot is on the first index!!!!!!!!! solvingTimeSec = int((solvingTime / len(districts)).total_seconds()) combinedSolutions = [] combinedObjective = 0 for districtI, district in enumerate(sorted(list(districts))): log.info(f"Optimizing district[{districtI}/{len(districts)}] = '{district}'") log.info(f"Searching for appropriate vehicles for district '{district}'") districtVehicles = [] for vehicle in orToolsInstance.vehicles: if district in vehicle.districts: log.info(f"Found vehicle: {vehicle}") districtVehicles.append(vehicle) districtVehicles = districtVehicles[:1] log.info(f"Force one vehicle for district '{district}': {districtVehicles}") if len(districtVehicles) == 0: log.warning(f"No vehicles found for district '{district}' (using any free vehicle that has no district assigned) instead") districtVehicles = [vehicle for vehicle in orToolsInstance.vehicles if len(vehicle.districts) == 0] districtOptPointIndexes = [0] districtOptPoints = [depotOptPoint] # Depot must be on the first index!!!!!!!!!!! # Fetch district optimization points and indexes for generating matrixes for crnI, crnOptPoint in enumerate(crnOptPoints): if crnOptPoint.district == district: districtOptPointIndexes.append(crnI + 1) districtOptPoints.append(crnOptPoint) elif crnOptPoint.district not in districts: log.warning(f"CRN without district: {crnOptPoint}") # Reset index to match new distance and time matrix for optPointI, optPoint in enumerate(districtOptPoints): optPoint.id = optPointI # Generate new distance matrices district_distance_matrix: dict[str, np.ndarray] = {} for vehicleType, matrix in orToolsInstance.distance_matrix.items(): district_distance_matrix[vehicleType] = matrix[np.ix_(districtOptPointIndexes, districtOptPointIndexes)] # Generate new time matrices district_time_matrix: dict[str, np.ndarray] = {} for vehicleType, matrix in orToolsInstance.distance_matrix.items(): district_time_matrix[vehicleType] = matrix[np.ix_(districtOptPointIndexes, districtOptPointIndexes)] districtOrToolsInstance = OrToolsOptimizationInstance( vehicles=districtVehicles, points=districtOptPoints, distance_matrix=district_distance_matrix, time_matrix=district_time_matrix, initial_routes=[[]], log=log ) # Starting optimization and getting final solution objective, districtSolutions, overlapping = OrToolsOptimizationService().vrpOptimization( solving_time_sec=solvingTimeSec, instance=districtOrToolsInstance, config=self.config(setInitial=False, district_centering=district_centrality), log=log ) numOfDistrictSolutions = len(districtSolutions) if numOfDistrictSolutions != 1: raise Exception(f"Solution for one district should have one solution but instead has: {numOfDistrictSolutions}") for solution in districtSolutions: solution.vehicle_id = districtVehicles[solution.vehicle_id].id solution.district = district combinedSolutions.append(solution) combinedObjective += objective solutionCallback(objective, combinedSolutions, False, None) solutionCallback(combinedObjective, combinedSolutions, True, None) def __filterOptimizationPoints( self, optimization: Optimization, optimizationPoints: list[OptimizationPoint], log: Logger ) -> list[OptimizationPoint]: optPoints = [] titleInfo = optimization.title.split() log.info(f"Optimization parameters: {titleInfo}") match titleInfo[0]: case "RADIUS": radius = float(titleInfo[1]) depot = optimizationPoints[0] for optPoint in optimizationPoints[1:]: if depot.crnPoint.location.distance(optPoint.crnPoint.location) < radius: optPoints.append(optPoint) case "SQUARE": lats = [float(titleInfo[1]), float(titleInfo[3])] lons = [float(titleInfo[2]), float(titleInfo[4])] for optPoint in optimizationPoints[1:]: if lats[0] < optPoint.crnPoint.location.lat < lats[1] and lons[0] < optPoint.crnPoint.location.lon < lons[1]: optPoints.append(optPoint) case "STREET": streetName = titleInfo[1] for optPoint in optimizationPoints[1:]: if streetName in optPoint.crnPoint.naslov: optPoints.append(optPoint) case _: raise Exception(f"Unknown testing category '{titleInfo[0]}'") log.info(f"Testing optimization points: {len(optPoints)}") return optPoints