Source code for biosample_enricher.weather.service

"""
Weather enrichment service for biosample environmental context.

Orchestrates multiple weather providers to deliver day-specific weather data
with temporal precision tracking and standardized schema mapping.
"""

from datetime import date, datetime
from typing import Any

from biosample_enricher.logging_config import get_logger
from biosample_enricher.weather.models import TemporalQuality, WeatherResult
from biosample_enricher.weather.providers.base import WeatherProviderBase
from biosample_enricher.weather.providers.meteostat import MeteostatProvider
from biosample_enricher.weather.providers.open_meteo import OpenMeteoProvider

logger = get_logger(__name__)


[docs] class WeatherService: """ Multi-provider weather enrichment service for biosample metadata. Provides day-specific weather data using a provider fallback chain with temporal precision tracking and standardized output schema. """
[docs] def __init__(self, providers: list[WeatherProviderBase] | None = None): """ Initialize weather service with provider chain. Args: providers: List of weather providers in priority order. If None, uses default Open-Meteo + MeteoStat providers. """ if providers is None: # Default to Open-Meteo (primary) + MeteoStat (fallback) providers = [OpenMeteoProvider(), MeteostatProvider()] self.providers = providers logger.info(f"Weather service initialized with {len(providers)} providers")
[docs] def get_weather_for_biosample( self, biosample: dict[str, Any], target_schema: str = "nmdc" ) -> dict[str, Any]: """ Get weather data for a biosample and map to target schema. Args: biosample: Biosample dictionary with location and collection date target_schema: "nmdc" or "gold" for schema mapping Returns: Dict with weather enrichment results and schema-mapped fields """ # Extract location and date from biosample location = self._extract_location(biosample) collection_date = self._extract_collection_date(biosample) if not location: logger.warning("No valid coordinates found in biosample") return {"error": "no_coordinates", "enrichment": {}} if not collection_date: logger.warning("No valid collection date found in biosample") return {"error": "no_collection_date", "enrichment": {}} # Get weather data weather_result = self.get_daily_weather( lat=location["lat"], lon=location["lon"], target_date=collection_date ) # Map to target schema schema_mapping = weather_result.get_schema_mapping(target_schema) # Generate coverage metrics coverage_metrics = weather_result.get_coverage_metrics() return { "weather_result": weather_result, "schema_mapping": schema_mapping, "coverage_metrics": coverage_metrics, "enrichment_success": len(weather_result.successful_providers) > 0, }
[docs] def get_daily_weather( self, lat: float, lon: float, target_date: date, parameters: list[str] | None = None, ) -> WeatherResult: """ Get daily weather data by integrating results from all providers. Args: lat: Latitude in decimal degrees lon: Longitude in decimal degrees target_date: Date for weather lookup parameters: Optional list of specific parameters to fetch Returns: WeatherResult with integrated data from all available providers """ logger.info( f"Getting weather for ({lat}, {lon}) on {target_date} from all providers" ) all_providers_attempted = [] all_successful_providers = [] all_failed_providers = [] provider_results = [] # Query ALL providers simultaneously for provider in self.providers: provider_name = provider.provider_name all_providers_attempted.append(provider_name) try: # Check if provider has data available if not provider.is_available(lat, lon, target_date): logger.info( f"Provider {provider_name} not available for {target_date}" ) all_failed_providers.append(provider_name) continue # Fetch weather data result = provider.get_daily_weather(lat, lon, target_date, parameters) if result.successful_providers: logger.info(f"Provider {provider_name} successful") all_successful_providers.extend(result.successful_providers) provider_results.append(result) else: logger.warning(f"Provider {provider_name} failed") all_failed_providers.extend(result.failed_providers) except Exception as e: logger.error(f"Provider {provider_name} error: {e}") all_failed_providers.append(provider_name) # Integrate data from all successful providers if provider_results: integrated_result = self._integrate_provider_results( provider_results, lat, lon, target_date ) integrated_result.providers_attempted = all_providers_attempted integrated_result.successful_providers = list(set(all_successful_providers)) integrated_result.failed_providers = list(set(all_failed_providers)) return integrated_result else: # Create empty result if all providers failed return self._create_empty_result( lat, lon, target_date, all_providers_attempted, all_failed_providers )
def _extract_location(self, biosample: dict[str, Any]) -> dict[str, float] | None: """Extract latitude and longitude from biosample.""" # Try NMDC format first if "lat_lon" in biosample: lat_lon = biosample["lat_lon"] if isinstance(lat_lon, dict): lat = lat_lon.get("latitude") lon = lat_lon.get("longitude") if lat is not None and lon is not None: return {"lat": float(lat), "lon": float(lon)} # Try GOLD format if "latitude" in biosample and "longitude" in biosample: lat = biosample["latitude"] lon = biosample["longitude"] if lat is not None and lon is not None: return {"lat": float(lat), "lon": float(lon)} # Try direct elev coordinate extraction if "elev" in biosample: # Sometimes coordinates are stored with elevation pass # Would need more complex parsing return None def _extract_collection_date(self, biosample: dict[str, Any]) -> date | None: """Extract collection date from biosample.""" # Try NMDC format if "collection_date" in biosample: date_info = biosample["collection_date"] if isinstance(date_info, dict): date_str = date_info.get("has_raw_value") else: date_str = date_info if date_str: return self._parse_date_string(date_str) # Try GOLD format if "dateCollected" in biosample: date_str = biosample["dateCollected"] if date_str: return self._parse_date_string(date_str) return None def _parse_date_string(self, date_str: str) -> date | None: """Parse various date string formats to date object.""" try: # Handle ISO datetime strings if "T" in date_str: date_str = date_str.split("T")[0] # Handle YYYY-MM-DD format return datetime.strptime(date_str, "%Y-%m-%d").date() except ValueError: try: # Handle other formats return datetime.strptime(date_str, "%Y-%m-%d").date() except ValueError: logger.warning(f"Could not parse date string: {date_str}") return None def _integrate_provider_results( self, provider_results: list[WeatherResult], lat: float, lon: float, target_date: date, ) -> WeatherResult: """ Integrate weather data from multiple providers for comprehensive coverage. Prioritizes higher quality data but combines all available measurements. """ from biosample_enricher.weather.models import TemporalQuality, WeatherResult # Initialize integrated result integrated = WeatherResult( location={"lat": lat, "lon": lon}, collection_date=target_date.strftime("%Y-%m-%d"), overall_quality=TemporalQuality.NO_DATA, ) # For each weather parameter, select best observation from all providers weather_fields = [ "temperature", "wind_speed", "wind_direction", "humidity", "solar_radiation", "precipitation", "pressure", ] best_quality = TemporalQuality.NO_DATA for field in weather_fields: best_obs = None best_obs_quality = TemporalQuality.NO_DATA # Compare observations across all providers for this field for result in provider_results: obs = getattr(result, field, None) if obs is not None: obs_quality = obs.temporal_precision.data_quality # Select observation with best temporal quality if self._is_better_quality(obs_quality, best_obs_quality): best_obs = obs best_obs_quality = obs_quality # Set the best observation for this field if best_obs: setattr(integrated, field, best_obs) if self._is_better_quality(best_obs_quality, best_quality): best_quality = best_obs_quality integrated.overall_quality = best_quality return integrated def _is_better_quality( self, new_quality: TemporalQuality, current_quality: TemporalQuality ) -> bool: """Compare temporal quality levels.""" quality_order = [ TemporalQuality.DAY_SPECIFIC_COMPLETE, TemporalQuality.DAY_SPECIFIC_PARTIAL, TemporalQuality.WEEKLY_COMPOSITE, TemporalQuality.MONTHLY_CLIMATOLOGY, TemporalQuality.NO_DATA, ] new_rank = quality_order.index(new_quality) current_rank = quality_order.index(current_quality) return new_rank < current_rank def _create_empty_result( self, lat: float, lon: float, target_date: date, attempted_providers: list[str], failed_providers: list[str], ) -> WeatherResult: """Create empty result when all providers fail.""" return WeatherResult( location={"lat": lat, "lon": lon}, collection_date=target_date.strftime("%Y-%m-%d"), providers_attempted=attempted_providers, successful_providers=[], failed_providers=failed_providers, overall_quality=TemporalQuality.NO_DATA, )
[docs] def get_provider_info(self) -> list[dict[str, Any]]: """Get information about all configured providers.""" return [provider.get_provider_info() for provider in self.providers]