Admiral/admiral-worker/app/services/OsrmRoutingService.py
2025-06-24 14:22:50 +02:00

132 lines
4.0 KiB
Python

import logging
from random import random
import numpy as np
import requests
from typing_extensions import override
from core.domain.map.GeoLocation import GeoLocation
from core.domain.map.RouteInfo import RouteInfo
from core.domain.map.RouteMatrix import RouteMatrix
from core.domain.optimization.TransportMode import TransportMode
from core.services.RoutingService import RoutingService
log = logging.getLogger(__name__)
class OsrmRoutingService(RoutingService):
def __init__(self, domain: str):
self.domain = domain
def __getCoordinates(self, locations : list[GeoLocation]) -> str:
coordinates = []
for location in locations:
coordinates.append(f"{location.lon},{location.lat}")
return ";".join(coordinates)
@override
def getRouteMatrix(self, geoLocations: list[GeoLocation], transportMode: TransportMode) -> RouteMatrix:
coordinates = self.__getCoordinates(locations=[gl for gl in geoLocations])
port, profile = self.__getProfile(transportMode=transportMode)
res = requests.get(url=f"{self.domain}:{port}/table/v1/{profile}/{coordinates}", params=dict(annotations="distance,duration"))
if res.status_code != 200:
raise Exception(f"OSRM routing engine failed to create matrix: {res.text}")
matrixes = res.json()
return RouteMatrix.init(
distances=np.matrix(matrixes['distances'], dtype=np.float32),
durations=np.matrix(matrixes['durations'], dtype=np.float32)
)
@override
def getRouteInfo(self, transportMode: TransportMode, legs: list[GeoLocation]) -> RouteInfo:
coordinates = self.__getCoordinates(locations=legs)
port, profile = self.__getProfile(transportMode=transportMode)
res = requests.get(
url=f"{self.domain}:{port}/route/v1/{profile}/{coordinates}",
params=dict(
geometries="geojson",
alternatives='false',
steps='true',
continue_straight='false',
)
)
if res.status_code != 200:
raise Exception(f"OSRM routing engine failed to find route: {res.text}")
data = res.json()
route = data['routes'][0]
steps = [legs[0]]
for i, leg in enumerate(route['legs']):
legSteps = [legs[i]]
for step in leg['steps']:
legSteps += [GeoLocation(lat=c[1], lon=c[0]) for c in step['geometry']['coordinates']]
legSteps.append(legs[i + 1])
steps += legSteps
return RouteInfo(
distance=route['distance'],
duration=route['duration'],
steps=steps
)
@override
def getAverageRouteInfo(self, transportMode: TransportMode, legs: list[GeoLocation], probability: list[float], iterations: int) -> RouteInfo:
averageDistance = 0
averageDuration = 0
for i in range(iterations):
randomLegs: list[GeoLocation] = []
for leg in legs:
if random() < probability[legs.index(leg)]:
randomLegs.append(leg)
if len(randomLegs) < 2:
randomLegs = legs
routeInfo = self.getRouteInfo(transportMode=transportMode, legs=randomLegs)
averageDistance += routeInfo.distance
averageDuration += routeInfo.duration
return RouteInfo(
distance=averageDistance / iterations,
duration=averageDuration / iterations,
steps=[]
)
def __getProfile(self, transportMode: TransportMode) -> tuple[int, str]:
match transportMode:
case TransportMode.BIKE:
return 5000, 'bike'
case TransportMode.CAR:
return 5001, 'car'
case TransportMode.EV:
return 5002, 'ev'
case TransportMode.KM:
return 5003, 'km'
case TransportMode.KPM:
return 5004, 'kpm'
case TransportMode.MK:
return 5005, 'mk'
case TransportMode.WALK:
return 5006, 'walk'
case _:
raise Exception(f"Mapping for transport mode does not exists: {transportMode.value}")
def _getPolyline(self, transportMode: TransportMode, legs: list[GeoLocation]) -> str:
coordinates = ";".join([f"{l.lon},{l.lat}" for l in legs])
port, profile = self.__getProfile(transportMode=transportMode)
res = requests.get(
url=f"{self.domain}:{port}/route/v1/{profile}/{coordinates}",
params=dict(
geometries="polyline",
alternatives='false',
steps='true',
continue_straight='false',
)
)
return res.json()['routes'][0]['geometry']