Source code for biosample_enricher.elevation.service

"""Main elevation service orchestrator."""

import hashlib
import json
import os
from datetime import UTC, datetime

from dotenv import load_dotenv

from biosample_enricher.elevation.classifier import CoordinateClassifier
from biosample_enricher.elevation.providers import (
    ElevationProvider,
    GoogleElevationProvider,
    OpenTopoDataProvider,
    OSMElevationProvider,
    USGSElevationProvider,
)
from biosample_enricher.elevation.utils import calculate_distance_m
from biosample_enricher.logging_config import get_logger
from biosample_enricher.models import (
    CoordinateClassification,
    ElevationRequest,
    ElevationResult,
    EnrichmentRun,
    FetchResult,
    GeoPoint,
    Observation,
    OutputEnvelope,
    ProviderRef,
    ValueStatus,
    Variable,
)

# Load .env file if it exists
from biosample_enricher.paths import get_project_root

env_path = get_project_root() / ".env"
if env_path.exists():
    load_dotenv(env_path)

logger = get_logger(__name__)


[docs] class ElevationService: """Orchestrates elevation lookups across multiple providers."""
[docs] def __init__( self, google_api_key: str | None = None, enable_google: bool = True, enable_usgs: bool = True, enable_osm: bool = True, enable_open_topo_data: bool = True, osm_endpoint: str = "https://api.open-elevation.com/api/v1/lookup", open_topo_data_endpoint: str = "https://api.opentopodata.org/v1", ) -> None: """ Initialize the elevation service. Args: google_api_key: Google API key (if None, reads from env) enable_google: Whether to enable Google provider enable_usgs: Whether to enable USGS provider enable_osm: Whether to enable OSM provider enable_open_topo_data: Whether to enable Open Topo Data provider osm_endpoint: OSM provider endpoint URL open_topo_data_endpoint: Open Topo Data endpoint URL """ self.classifier = CoordinateClassifier() self.providers: dict[str, ElevationProvider] = {} # Initialize providers based on configuration if enable_google: try: self.providers["google"] = GoogleElevationProvider( api_key=google_api_key ) logger.info("Google Elevation provider enabled") except ValueError as e: logger.warning(f"Google provider disabled: {e}") if enable_usgs: self.providers["usgs"] = USGSElevationProvider() logger.info("USGS Elevation provider enabled") if enable_osm: self.providers["osm"] = OSMElevationProvider(endpoint=osm_endpoint) logger.info("OSM Elevation provider enabled") if enable_open_topo_data: self.providers["open_topo_data"] = OpenTopoDataProvider( endpoint=open_topo_data_endpoint ) logger.info("Open Topo Data provider enabled") if not self.providers: raise ValueError("No elevation providers are enabled") logger.info( f"ElevationService initialized with {len(self.providers)} providers" )
[docs] @classmethod def from_env(cls) -> "ElevationService": """ Create elevation service from environment variables. Returns: Configured elevation service """ return cls( google_api_key=os.getenv("GOOGLE_MAIN_API_KEY"), enable_google=os.getenv("ELEVATION_ENABLE_GOOGLE", "true").lower() == "true", enable_usgs=os.getenv("ELEVATION_ENABLE_USGS", "true").lower() == "true", enable_osm=os.getenv("ELEVATION_ENABLE_OSM", "true").lower() == "true", enable_open_topo_data=os.getenv( "ELEVATION_ENABLE_OPEN_TOPO_DATA", "true" ).lower() == "true", osm_endpoint=os.getenv( "ELEVATION_OSM_ENDPOINT", "https://api.open-elevation.com/api/v1/lookup" ), open_topo_data_endpoint=os.getenv( "ELEVATION_OPEN_TOPO_DATA_ENDPOINT", "https://api.opentopodata.org/v1" ), )
[docs] def classify_coordinates(self, lat: float, lon: float) -> CoordinateClassification: """ Classify coordinates for provider routing. Args: lat: Latitude in decimal degrees lon: Longitude in decimal degrees Returns: Coordinate classification """ return self.classifier.classify(lat, lon)
[docs] def classify_biosample_location(self, lat: float, lon: float) -> dict: """ Classify a biosample location for routing and metadata. This method provides biosample-specific classification that can be stored with the sample metadata for efficient provider routing. Args: lat: Latitude in decimal degrees lon: Longitude in decimal degrees Returns: Dictionary with classification metadata """ return self.classifier.classify_biosample_location(lat, lon)
[docs] def select_providers( self, classification: CoordinateClassification, preferred: list[str] | None = None, ) -> list[ElevationProvider]: """ Select providers based on coordinate classification. Args: classification: Coordinate classification result preferred: Preferred provider names in order Returns: List of providers in priority order """ available_providers = list(self.providers.keys()) # Smart routing based on classification if classification.is_us_territory: if classification.is_land is False: # Ocean areas - USGS likely won't work default_order = ["google", "open_topo_data", "osm", "usgs"] else: # US land: USGS first, then others default_order = ["usgs", "google", "open_topo_data", "osm"] else: # International locations if classification.is_land is False: # International ocean - prioritize global providers default_order = ["google", "open_topo_data", "osm"] else: # International land - Open Topo Data has good global coverage default_order = ["google", "open_topo_data", "osm"] # Apply preferred providers if specified if preferred: # Filter preferred to only available providers preferred_available = [p for p in preferred if p in available_providers] # Add remaining providers not in preferred list remaining = [ p for p in default_order if p not in preferred_available and p in available_providers ] provider_order = preferred_available + remaining else: provider_order = [p for p in default_order if p in available_providers] # Return provider objects selected = [self.providers[name] for name in provider_order] logger.debug(f"Selected providers: {[p.name for p in selected]}") return selected
[docs] def get_elevation( self, request: ElevationRequest, *, read_from_cache: bool = True, write_to_cache: bool = True, timeout_s: float = 20.0, ) -> list[Observation]: """ Get elevation observations from multiple providers. Args: request: Elevation request read_from_cache: Whether to read from cache write_to_cache: Whether to write to cache timeout_s: Request timeout in seconds Returns: List of elevation observations """ lat, lon = request.latitude, request.longitude logger.info(f"Getting elevation for {lat:.6f}, {lon:.6f}") # Classify coordinates classification = self.classify_coordinates(lat, lon) # Select providers providers = self.select_providers(classification, request.preferred_providers) # Create request location request_location = GeoPoint(lat=lat, lon=lon, precision_digits=6) observations = [] for provider in providers: try: logger.debug(f"Fetching from {provider.name}") # Fetch from provider result = provider.fetch( lat, lon, read_from_cache=read_from_cache, write_to_cache=write_to_cache, timeout_s=timeout_s, ) # Convert to observation observation = self._create_observation( request_location, provider, result ) observations.append(observation) if result.ok: logger.debug( f"{provider.name} returned elevation: {result.elevation}m" ) else: logger.warning(f"{provider.name} failed: {result.error}") except Exception as e: logger.error(f"Error fetching from {provider.name}: {e}") # Create error observation error_observation = self._create_error_observation( request_location, provider, str(e) ) observations.append(error_observation) logger.info(f"Completed elevation lookup: {len(observations)} observations") return observations
[docs] def get_best_elevation( self, observations: list[Observation] ) -> ElevationResult | None: """ Select the best elevation from multiple observations. Args: observations: List of elevation observations Returns: Best elevation result, or None if no valid observations """ # Filter to successful observations valid_obs = [ obs for obs in observations if obs.value_status == ValueStatus.OK and obs.value_numeric is not None ] if not valid_obs: return None # Sort by distance to input, then by resolution (smaller is better) def sort_key(obs: Observation) -> tuple[float, float]: distance = obs.distance_to_input_m or 0.0 resolution = obs.spatial_resolution_m or 999999.0 return (distance, resolution) best_obs = min(valid_obs, key=sort_key) # Create classification from first observation (they should all be the same) classification = CoordinateClassification( is_us_territory=True, # This would need to be stored in observation confidence=1.0, ) return ElevationResult( latitude=best_obs.request_location.lat, longitude=best_obs.request_location.lon, elevation_meters=best_obs.value_numeric or 0.0, provider=best_obs.provider.name, accuracy_meters=best_obs.spatial_resolution_m, data_source=best_obs.provider.name, timestamp=best_obs.created_at or datetime.now(UTC), classification=classification, )
[docs] def create_output_envelope( self, subject_id: str, observations: list[Observation], read_from_cache: bool = True, write_to_cache: bool = True, ) -> OutputEnvelope: """ Create output envelope with observations. Args: subject_id: Subject identifier observations: List of observations read_from_cache: Whether cache was used for reading write_to_cache: Whether cache was used for writing Returns: Output envelope """ run = EnrichmentRun( started_at=datetime.now(UTC), ended_at=datetime.now(UTC), tool_version="biosample-enricher 0.1.0", read_from_cache=read_from_cache, write_to_cache=write_to_cache, ) return OutputEnvelope( schema_version="1.0.0", run=run, subject_id=subject_id, observations=observations, )
def _create_observation( self, request_location: GeoPoint, provider: ElevationProvider, result: FetchResult, ) -> Observation: """Create observation from fetch result.""" # Calculate distance if measurement location is different distance_m = None if result.location: distance_m = calculate_distance_m( request_location.lat, request_location.lon, result.location.lat, result.location.lon, ) # Create request ID request_id = self._create_request_id( provider.name, request_location.lat, request_location.lon ) # Calculate payload hash payload_hash = None if result.raw: payload_str = json.dumps(result.raw, sort_keys=True) payload_hash = hashlib.sha256(payload_str.encode()).hexdigest() return Observation( variable=Variable.ELEVATION, value_numeric=result.elevation, unit_ucum="m", value_status=ValueStatus.OK if result.ok else ValueStatus.ERROR, provider=ProviderRef( name=provider.name, endpoint=provider.endpoint, api_version=provider.api_version, ), request_location=request_location, measurement_location=result.location, distance_to_input_m=distance_m, spatial_resolution_m=result.resolution_m, vertical_datum=result.vertical_datum, raw_payload=json.dumps(result.raw) if result.raw else None, raw_payload_sha256=payload_hash, normalization_version="elev-2025-09-10", cache_used=False, # This would need to be passed from provider request_id=request_id, error_message=result.error if not result.ok else None, created_at=datetime.now(UTC), ) def _create_error_observation( self, request_location: GeoPoint, provider: ElevationProvider, error_message: str, ) -> Observation: """Create error observation.""" request_id = self._create_request_id( provider.name, request_location.lat, request_location.lon ) return Observation( variable=Variable.ELEVATION, value_status=ValueStatus.ERROR, provider=ProviderRef( name=provider.name, endpoint=provider.endpoint, api_version=provider.api_version, ), request_location=request_location, normalization_version="elev-2025-09-10", request_id=request_id, error_message=error_message, created_at=datetime.now(UTC), ) def _create_request_id(self, provider_name: str, lat: float, lon: float) -> str: """Create deterministic request ID.""" key = f"{provider_name}:{lat:.6f},{lon:.6f}" return hashlib.sha1(key.encode()).hexdigest()[:8]