Source code for biosample_enricher.reverse_geocoding.service

"""Reverse geocoding service for coordinating multiple providers."""

from typing import Any

from biosample_enricher.logging_config import get_logger
from biosample_enricher.reverse_geocoding.providers.base import ReverseGeocodingProvider
from biosample_enricher.reverse_geocoding.providers.google import (
    GoogleReverseGeocodingProvider,
)
from biosample_enricher.reverse_geocoding.providers.osm import (
    OSMReverseGeocodingProvider,
)
from biosample_enricher.reverse_geocoding_models import ReverseGeocodeResult

logger = get_logger(__name__)


[docs] class ReverseGeocodingService: """Service for managing reverse geocoding providers."""
[docs] def __init__(self) -> None: """Initialize the reverse geocoding service.""" self.providers: dict[str, ReverseGeocodingProvider] = {} self._initialize_providers()
def _initialize_providers(self) -> None: """Initialize available reverse geocoding providers.""" # Initialize OSM provider (always available) try: osm_provider = OSMReverseGeocodingProvider() self.providers["osm"] = osm_provider logger.info("Initialized OSM reverse geocoding provider") except Exception as e: logger.error(f"Failed to initialize OSM provider: {e}") # Initialize Google provider if API key is available try: google_provider = GoogleReverseGeocodingProvider() self.providers["google"] = google_provider logger.info("Initialized Google reverse geocoding provider") except ValueError as e: logger.warning(f"Google provider not available: {e}") except Exception as e: logger.error(f"Failed to initialize Google provider: {e}")
[docs] def get_available_providers(self) -> list[str]: """Get list of available provider names.""" return list(self.providers.keys())
[docs] def get_provider(self, name: str) -> ReverseGeocodingProvider | None: """ Get a specific provider by name. Args: name: Provider name Returns: Provider instance or None if not found """ return self.providers.get(name)
[docs] def reverse_geocode( self, lat: float, lon: float, provider: str | None = None, *, read_from_cache: bool = True, write_to_cache: bool = True, timeout_s: float = 20.0, language: str = "en", limit: int = 10, ) -> ReverseGeocodeResult | None: """ Perform reverse geocoding using specified or default provider. Args: lat: Latitude in decimal degrees lon: Longitude in decimal degrees provider: Provider name (None for auto-selection) read_from_cache: Whether to read from cache write_to_cache: Whether to write to cache timeout_s: Request timeout in seconds language: Language code for results limit: Maximum number of results Returns: Reverse geocoding result or None if failed """ # Select provider if provider: provider_instance = self.providers.get(provider) if not provider_instance: logger.error(f"Provider '{provider}' not found") return None else: # Auto-select: prefer Google if available, else OSM provider_instance = self.providers.get("google") or self.providers.get( "osm" ) if not provider_instance: logger.error("No providers available") return None # Perform reverse geocoding try: logger.info( f"Reverse geocoding {lat:.6f}, {lon:.6f} using {provider_instance.name}" ) result = provider_instance.fetch( lat, lon, read_from_cache=read_from_cache, write_to_cache=write_to_cache, timeout_s=timeout_s, language=language, limit=limit, ) if result.ok and result.result: return result.result else: logger.error(f"Reverse geocoding failed: {result.error}") return None except Exception as e: logger.error(f"Error during reverse geocoding: {e}") return None
[docs] def reverse_geocode_multiple( self, lat: float, lon: float, providers: list[str] | None = None, *, read_from_cache: bool = True, write_to_cache: bool = True, timeout_s: float = 20.0, language: str = "en", limit: int = 10, ) -> dict[str, ReverseGeocodeResult]: """ Perform reverse geocoding using multiple providers sequentially. Args: lat: Latitude in decimal degrees lon: Longitude in decimal degrees providers: List of provider names (None for all available) read_from_cache: Whether to read from cache write_to_cache: Whether to write to cache timeout_s: Request timeout in seconds language: Language code for results limit: Maximum number of results per provider Returns: Dictionary mapping provider names to results """ # Select providers if providers: provider_instances = { name: self.providers[name] for name in providers if name in self.providers } else: provider_instances = dict(self.providers.items()) if not provider_instances: logger.error("No providers available") return {} # Execute providers sequentially output: dict[str, ReverseGeocodeResult] = {} for name, provider_instance in provider_instances.items(): try: fetch_result = provider_instance.fetch( lat, lon, read_from_cache=read_from_cache, write_to_cache=write_to_cache, timeout_s=timeout_s, language=language, limit=limit, ) if fetch_result.ok and fetch_result.result: output[name] = fetch_result.result else: logger.error(f"Provider {name} failed: {fetch_result.error}") except Exception as e: logger.error(f"Provider {name} failed with exception: {e}") return output
[docs] def compare_providers( self, lat: float, lon: float, *, language: str = "en", limit: int = 5, ) -> dict[str, Any]: """ Compare results from all available providers. Args: lat: Latitude in decimal degrees lon: Longitude in decimal degrees language: Language code for results limit: Maximum number of results per provider Returns: Comparison dictionary with results and analysis """ # Get results from all providers results = self.reverse_geocode_multiple( lat, lon, language=language, limit=limit ) if not results: return {"error": "No providers returned results"} # Extract best matches from each provider comparison: dict[str, Any] = { "query": {"lat": lat, "lon": lon}, "providers": {}, "consensus": {}, } # Process each provider's results for provider_name, result in results.items(): best_match = result.get_best_match() if best_match: comparison["providers"][provider_name] = { "formatted_address": best_match.formatted_address, "country": best_match.country, "country_code": best_match.country_code, "state": best_match.state, "city": best_match.city, "postcode": best_match.postcode, "confidence": best_match.confidence, "distance_m": best_match.distance_m, "response_time_ms": result.response_time_ms, "cache_hit": result.cache_hit, "num_results": len(result.locations), } # Find consensus values (values that appear in multiple providers) fields_to_compare = ["country", "country_code", "state", "city", "postcode"] for field in fields_to_compare: values: dict[str, list[str]] = {} for provider_name, provider_data in comparison["providers"].items(): value = provider_data.get(field) if value: if value not in values: values[value] = [] values[value].append(provider_name) # Find most common value if values: most_common = max(values.items(), key=lambda x: len(x[1])) comparison["consensus"][field] = { "value": most_common[0], "providers": most_common[1], "agreement": len(most_common[1]) / len(results), } return comparison