import logging from random import random import numpy as np import requests from typing_extensions import override from core.domain.map.GeoLocation import GeoLocation from core.domain.map.RouteInfo import RouteInfo from core.domain.map.RouteMatrix import RouteMatrix from core.domain.optimization.TransportMode import TransportMode from core.services.RoutingService import RoutingService log = logging.getLogger(__name__) class OsrmRoutingService(RoutingService): def __init__(self, domain: str): self.domain = domain def __getCoordinates(self, locations : list[GeoLocation]) -> str: coordinates = [] for location in locations: coordinates.append(f"{location.lon},{location.lat}") return ";".join(coordinates) @override def getRouteMatrix(self, geoLocations: list[GeoLocation], transportMode: TransportMode) -> RouteMatrix: coordinates = self.__getCoordinates(locations=[gl for gl in geoLocations]) port, profile = self.__getProfile(transportMode=transportMode) res = requests.get(url=f"{self.domain}:{port}/table/v1/{profile}/{coordinates}", params=dict(annotations="distance,duration")) if res.status_code != 200: raise Exception(f"OSRM routing engine failed to create matrix: {res.text}") matrixes = res.json() return RouteMatrix.init( distances=np.matrix(matrixes['distances'], dtype=np.float32), durations=np.matrix(matrixes['durations'], dtype=np.float32) ) @override def getRouteInfo(self, transportMode: TransportMode, legs: list[GeoLocation]) -> RouteInfo: coordinates = self.__getCoordinates(locations=legs) port, profile = self.__getProfile(transportMode=transportMode) res = requests.get( url=f"{self.domain}:{port}/route/v1/{profile}/{coordinates}", params=dict( geometries="geojson", alternatives='false', steps='true', continue_straight='false', ) ) if res.status_code != 200: raise Exception(f"OSRM routing engine failed to find route: {res.text}") data = res.json() route = data['routes'][0] steps = [legs[0]] for i, leg in enumerate(route['legs']): legSteps = [legs[i]] for step in leg['steps']: legSteps += [GeoLocation(lat=c[1], lon=c[0]) for c in step['geometry']['coordinates']] legSteps.append(legs[i + 1]) steps += legSteps return RouteInfo( distance=route['distance'], duration=route['duration'], steps=steps ) @override def getAverageRouteInfo(self, transportMode: TransportMode, legs: list[GeoLocation], probability: list[float], iterations: int) -> RouteInfo: averageDistance = 0 averageDuration = 0 for i in range(iterations): randomLegs: list[GeoLocation] = [] for leg in legs: if random() < probability[legs.index(leg)]: randomLegs.append(leg) if len(randomLegs) < 2: randomLegs = legs routeInfo = self.getRouteInfo(transportMode=transportMode, legs=randomLegs) averageDistance += routeInfo.distance averageDuration += routeInfo.duration return RouteInfo( distance=averageDistance / iterations, duration=averageDuration / iterations, steps=[] ) def __getProfile(self, transportMode: TransportMode) -> tuple[int, str]: match transportMode: case TransportMode.BIKE: return 5000, 'bike' case TransportMode.CAR: return 5001, 'car' case TransportMode.EV: return 5002, 'ev' case TransportMode.KM: return 5003, 'km' case TransportMode.KPM: return 5004, 'kpm' case TransportMode.MK: return 5005, 'mk' case TransportMode.WALK: return 5006, 'walk' case _: raise Exception(f"Mapping for transport mode does not exists: {transportMode.value}") def _getPolyline(self, transportMode: TransportMode, legs: list[GeoLocation]) -> str: coordinates = ";".join([f"{l.lon},{l.lat}" for l in legs]) port, profile = self.__getProfile(transportMode=transportMode) res = requests.get( url=f"{self.domain}:{port}/route/v1/{profile}/{coordinates}", params=dict( geometries="polyline", alternatives='false', steps='true', continue_straight='false', ) ) return res.json()['routes'][0]['geometry']