132 lines
4.0 KiB
Python
132 lines
4.0 KiB
Python
import logging
|
|
from random import random
|
|
|
|
import numpy as np
|
|
import requests
|
|
from typing_extensions import override
|
|
|
|
from core.domain.map.GeoLocation import GeoLocation
|
|
from core.domain.map.RouteInfo import RouteInfo
|
|
from core.domain.map.RouteMatrix import RouteMatrix
|
|
from core.domain.optimization.TransportMode import TransportMode
|
|
from core.services.RoutingService import RoutingService
|
|
|
|
log = logging.getLogger(__name__)
|
|
|
|
|
|
class OsrmRoutingService(RoutingService):
|
|
|
|
|
|
def __init__(self, domain: str):
|
|
self.domain = domain
|
|
|
|
def __getCoordinates(self, locations : list[GeoLocation]) -> str:
|
|
coordinates = []
|
|
for location in locations:
|
|
coordinates.append(f"{location.lon},{location.lat}")
|
|
return ";".join(coordinates)
|
|
|
|
@override
|
|
def getRouteMatrix(self, geoLocations: list[GeoLocation], transportMode: TransportMode) -> RouteMatrix:
|
|
coordinates = self.__getCoordinates(locations=[gl for gl in geoLocations])
|
|
port, profile = self.__getProfile(transportMode=transportMode)
|
|
|
|
res = requests.get(url=f"{self.domain}:{port}/table/v1/{profile}/{coordinates}", params=dict(annotations="distance,duration"))
|
|
|
|
if res.status_code != 200:
|
|
raise Exception(f"OSRM routing engine failed to create matrix: {res.text}")
|
|
|
|
matrixes = res.json()
|
|
|
|
return RouteMatrix.init(
|
|
distances=np.matrix(matrixes['distances'], dtype=np.float32),
|
|
durations=np.matrix(matrixes['durations'], dtype=np.float32)
|
|
)
|
|
|
|
@override
|
|
def getRouteInfo(self, transportMode: TransportMode, legs: list[GeoLocation]) -> RouteInfo:
|
|
coordinates = self.__getCoordinates(locations=legs)
|
|
port, profile = self.__getProfile(transportMode=transportMode)
|
|
res = requests.get(
|
|
url=f"{self.domain}:{port}/route/v1/{profile}/{coordinates}",
|
|
params=dict(
|
|
geometries="geojson",
|
|
alternatives='false',
|
|
steps='true',
|
|
continue_straight='false',
|
|
)
|
|
)
|
|
if res.status_code != 200:
|
|
raise Exception(f"OSRM routing engine failed to find route: {res.text}")
|
|
|
|
data = res.json()
|
|
route = data['routes'][0]
|
|
|
|
steps = [legs[0]]
|
|
for i, leg in enumerate(route['legs']):
|
|
legSteps = [legs[i]]
|
|
for step in leg['steps']:
|
|
legSteps += [GeoLocation(lat=c[1], lon=c[0]) for c in step['geometry']['coordinates']]
|
|
legSteps.append(legs[i + 1])
|
|
steps += legSteps
|
|
|
|
return RouteInfo(
|
|
distance=route['distance'],
|
|
duration=route['duration'],
|
|
steps=steps
|
|
)
|
|
|
|
@override
|
|
def getAverageRouteInfo(self, transportMode: TransportMode, legs: list[GeoLocation], probability: list[float], iterations: int) -> RouteInfo:
|
|
averageDistance = 0
|
|
averageDuration = 0
|
|
for i in range(iterations):
|
|
randomLegs: list[GeoLocation] = []
|
|
for leg in legs:
|
|
if random() < probability[legs.index(leg)]:
|
|
randomLegs.append(leg)
|
|
if len(randomLegs) < 2:
|
|
randomLegs = legs
|
|
routeInfo = self.getRouteInfo(transportMode=transportMode, legs=randomLegs)
|
|
averageDistance += routeInfo.distance
|
|
averageDuration += routeInfo.duration
|
|
|
|
return RouteInfo(
|
|
distance=averageDistance / iterations,
|
|
duration=averageDuration / iterations,
|
|
steps=[]
|
|
)
|
|
|
|
def __getProfile(self, transportMode: TransportMode) -> tuple[int, str]:
|
|
match transportMode:
|
|
case TransportMode.BIKE:
|
|
return 5000, 'bike'
|
|
case TransportMode.CAR:
|
|
return 5001, 'car'
|
|
case TransportMode.EV:
|
|
return 5002, 'ev'
|
|
case TransportMode.KM:
|
|
return 5003, 'km'
|
|
case TransportMode.KPM:
|
|
return 5004, 'kpm'
|
|
case TransportMode.MK:
|
|
return 5005, 'mk'
|
|
case TransportMode.WALK:
|
|
return 5006, 'walk'
|
|
case _:
|
|
raise Exception(f"Mapping for transport mode does not exists: {transportMode.value}")
|
|
|
|
def _getPolyline(self, transportMode: TransportMode, legs: list[GeoLocation]) -> str:
|
|
coordinates = ";".join([f"{l.lon},{l.lat}" for l in legs])
|
|
port, profile = self.__getProfile(transportMode=transportMode)
|
|
res = requests.get(
|
|
url=f"{self.domain}:{port}/route/v1/{profile}/{coordinates}",
|
|
params=dict(
|
|
geometries="polyline",
|
|
alternatives='false',
|
|
steps='true',
|
|
continue_straight='false',
|
|
)
|
|
)
|
|
return res.json()['routes'][0]['geometry']
|