Source code for biosample_enricher.weather.service

"""
Weather enrichment service for biosample environmental context.

Orchestrates multiple weather providers to deliver day-specific weather data
with temporal precision tracking and standardized schema mapping.
"""

from datetime import date, datetime
from typing import Any, Protocol

from biosample_enricher.logging_config import get_logger
from biosample_enricher.weather.models import (
    ClimateNormalsResult,
    MultiProviderClimateNormals,
    TemporalQuality,
    WeatherResult,
)
from biosample_enricher.weather.providers.base import WeatherProviderBase
from biosample_enricher.weather.providers.meteostat import MeteostatProvider
from biosample_enricher.weather.providers.open_meteo import OpenMeteoProvider

logger = get_logger(__name__)


[docs] class ClimateNormalsProvider(Protocol): """Protocol for providers that support climate normals."""
[docs] def get_climate_normals(
self, lat: float, lon: float, start_year: int, end_year: int ) -> ClimateNormalsResult: ...
[docs] class WeatherService: """ Multi-provider weather enrichment service for biosample metadata. Provides day-specific weather data using a provider fallback chain with temporal precision tracking and standardized output schema. """
[docs] def __init__(self, providers: list[WeatherProviderBase] | None = None): """ Initialize weather service with provider chain. Args: providers: List of weather providers in priority order. If None, uses default Open-Meteo + MeteoStat providers. """ if providers is None: # Default to Open-Meteo (primary) + MeteoStat (fallback) providers = [OpenMeteoProvider(), MeteostatProvider()] self.providers = providers logger.info(f"Weather service initialized with {len(providers)} providers")
[docs] def get_weather_for_biosample( self, biosample: dict[str, Any], target_schema: str = "nmdc" ) -> dict[str, Any]: """ Get weather data for a biosample and map to target schema. Args: biosample: Biosample dictionary with location and collection date target_schema: "nmdc" or "gold" for schema mapping Returns: Dict with weather enrichment results and schema-mapped fields """ # Extract location and date from biosample location = self._extract_location(biosample) collection_date = self._extract_collection_date(biosample) if not location: logger.warning("No valid coordinates found in biosample") return {"error": "no_coordinates", "enrichment": {}} if not collection_date: logger.warning("No valid collection date found in biosample") return {"error": "no_collection_date", "enrichment": {}} # Get weather data weather_result = self.get_daily_weather( lat=location["lat"], lon=location["lon"], target_date=collection_date ) # Map to target schema schema_mapping = weather_result.get_schema_mapping(target_schema) # Generate coverage metrics coverage_metrics = weather_result.get_coverage_metrics() return { "weather_result": weather_result, "schema_mapping": schema_mapping, "coverage_metrics": coverage_metrics, "enrichment_success": len(weather_result.successful_providers) > 0, }
[docs] def get_daily_weather( self, lat: float, lon: float, target_date: date, parameters: list[str] | None = None, ) -> WeatherResult: """ Get daily weather data by integrating results from all providers. Args: lat: Latitude in decimal degrees lon: Longitude in decimal degrees target_date: Date for weather lookup parameters: Optional list of specific parameters to fetch Returns: WeatherResult with integrated data from all available providers """ logger.info( f"Getting weather for ({lat}, {lon}) on {target_date} from all providers" ) all_providers_attempted = [] all_successful_providers = [] all_failed_providers = [] provider_results = [] # Query ALL providers simultaneously for provider in self.providers: provider_name = provider.provider_name all_providers_attempted.append(provider_name) try: # Check if provider has data available if not provider.is_available(lat, lon, target_date): logger.info( f"Provider {provider_name} not available for {target_date}" ) all_failed_providers.append(provider_name) continue # Fetch weather data result = provider.get_daily_weather(lat, lon, target_date, parameters) if result.successful_providers: logger.info(f"Provider {provider_name} successful") all_successful_providers.extend(result.successful_providers) provider_results.append(result) else: logger.warning(f"Provider {provider_name} failed") all_failed_providers.extend(result.failed_providers) except Exception as e: logger.error(f"Provider {provider_name} error: {e}") all_failed_providers.append(provider_name) # Integrate data from all successful providers if provider_results: integrated_result = self._integrate_provider_results( provider_results, lat, lon, target_date ) integrated_result.providers_attempted = all_providers_attempted integrated_result.successful_providers = list(set(all_successful_providers)) integrated_result.failed_providers = list(set(all_failed_providers)) return integrated_result else: # Create empty result if all providers failed return self._create_empty_result( lat, lon, target_date, all_providers_attempted, all_failed_providers )
[docs] def get_climate_normals( self, lat: float, lon: float, years_back: int = 30, providers: list[str] | None = None, ) -> MultiProviderClimateNormals: """ Get climate averages (normals) for a location from all available providers. By default, queries ALL available providers and returns results from each successful provider in a MultiProviderClimateNormals object. This allows: - Comparing values across different data sources - Detecting provider outages/failures - Validating data quality by cross-checking - Computing consensus values across providers Supported providers: - Meteostat: Station-based 1991-2020 normals (30-year WMO standard) - NASA POWER: Satellite-based 2001-2020 climatologies (20-year MERRA-2) Climate normals represent typical conditions over a multi-year period, providing context for biosample environmental metadata like annual precipitation totals and average temperatures. For biosample enrichment: - Use this for annual_precpt, annual_temp slots - Use get_daily_weather() for collection-date weather Following general-purpose design (Issue #199): This method provides comprehensive climate data that ANY project can use. Use the `to_submission_schema()` method on the result to extract values in submission-schema format (Issue #193). Args: lat: Latitude in decimal degrees lon: Longitude in decimal degrees years_back: Number of years back from current year to request (default: 30). For example, if current year is 2025 and years_back=30, requests period 1995-2025. Providers will return whatever period they actually have available, which may differ. providers: Optional list of provider names to query (e.g., ["meteostat", "nasa_power"]). If None, queries ALL available providers (default behavior). Returns: MultiProviderClimateNormals with results from all successful providers. The result includes both requested period and actual returned periods from each provider for transparency. Raises: ValueError: If no climate data available from any provider. Example: >>> service = WeatherService() >>> # Request 30 years back from current year (dynamic period) >>> normals = service.get_climate_normals(40.7128, -74.0060) >>> >>> # Get results from all successful providers >>> print(f"Successful providers: {normals.successful_providers}") Successful providers: ['meteostat', 'nasa_power'] >>> >>> # Extract consensus values for submission-schema (Issue #191) >>> schema_values = normals.to_submission_schema(strategy="consensus") >>> print(f"annual_precpt: {schema_values['annual_precpt']} mm") annual_precpt: 907.8 mm >>> >>> # Or get result from specific provider >>> meteostat_result = normals.get_provider_result("meteostat") >>> if meteostat_result: >>> print(f"Meteostat: {meteostat_result.get_annual_precipitation()} mm/year") Meteostat: 1268.4 mm/year >>> >>> # Query only specific providers or different period >>> normals = service.get_climate_normals(40.7128, -74.0060, ... years_back=20, providers=["nasa_power"]) """ # Compute requested period dynamically based on current year end_year = datetime.now().year start_year = end_year - years_back logger.info( f"Getting climate normals for ({lat}, {lon}) period {start_year}-{end_year} " f"(years_back={years_back})" ) # Import NASA POWER provider from biosample_enricher.weather.providers.nasa_power import NASAPowerProvider # Determine which providers to try available_providers: dict[str, type[ClimateNormalsProvider]] = { "meteostat": MeteostatProvider, "nasa_power": NASAPowerProvider, } # Default: query ALL available providers, or use user-specified provider_names = ["meteostat", "nasa_power"] if providers is None else providers # Query ALL providers and collect results results: dict[str, ClimateNormalsResult] = {} failed: dict[str, str] = {} for provider_name in provider_names: provider_class = available_providers.get(provider_name.lower()) if not provider_class: logger.warning(f"Unknown provider: {provider_name}") failed[provider_name] = "Unknown provider" continue try: logger.info(f"Querying {provider_name} for climate normals") provider = provider_class() result = provider.get_climate_normals(lat, lon, start_year, end_year) logger.info( f"Successfully retrieved climate normals from {provider_name}" ) results[provider_name] = result except Exception as e: error_msg = f"{e}" logger.warning(f"{provider_name} failed: {error_msg}") failed[provider_name] = error_msg continue # Check if we got any results if not results: error_summary = "; ".join([f"{k}: {v}" for k, v in failed.items()]) raise ValueError( f"No climate data available from any provider. Tried: {provider_names}. " f"Errors: {error_summary}" ) # Return multi-provider results with requested period return MultiProviderClimateNormals( providers=results, location={"lat": lat, "lon": lon}, requested_providers=provider_names, successful_providers=list(results.keys()), failed_providers=failed, requested_start_year=start_year, requested_end_year=end_year, )
def _extract_location(self, biosample: dict[str, Any]) -> dict[str, float] | None: """Extract latitude and longitude from biosample.""" # Try NMDC format first if "lat_lon" in biosample: lat_lon = biosample["lat_lon"] if isinstance(lat_lon, dict): lat = lat_lon.get("latitude") lon = lat_lon.get("longitude") if lat is not None and lon is not None: return {"lat": float(lat), "lon": float(lon)} # Try GOLD format if "latitude" in biosample and "longitude" in biosample: lat = biosample["latitude"] lon = biosample["longitude"] if lat is not None and lon is not None: return {"lat": float(lat), "lon": float(lon)} # Try direct elev coordinate extraction if "elev" in biosample: # Sometimes coordinates are stored with elevation pass # Would need more complex parsing return None def _extract_collection_date(self, biosample: dict[str, Any]) -> date | None: """Extract collection date from biosample.""" # Try NMDC format if "collection_date" in biosample: date_info = biosample["collection_date"] if isinstance(date_info, dict): date_str = date_info.get("has_raw_value") else: date_str = date_info if date_str: return self._parse_date_string(date_str) # Try GOLD format if "dateCollected" in biosample: date_str = biosample["dateCollected"] if date_str: return self._parse_date_string(date_str) return None def _parse_date_string(self, date_str: str) -> date | None: """Parse various date string formats to date object.""" try: # Handle ISO datetime strings if "T" in date_str: date_str = date_str.split("T")[0] # Handle YYYY-MM-DD format return datetime.strptime(date_str, "%Y-%m-%d").date() except ValueError: try: # Handle other formats return datetime.strptime(date_str, "%Y-%m-%d").date() except ValueError: logger.warning(f"Could not parse date string: {date_str}") return None def _integrate_provider_results( self, provider_results: list[WeatherResult], lat: float, lon: float, target_date: date, ) -> WeatherResult: """ Integrate weather data from multiple providers for comprehensive coverage. Prioritizes higher quality data but combines all available measurements. """ from biosample_enricher.weather.models import TemporalQuality, WeatherResult # Initialize integrated result integrated = WeatherResult( location={"lat": lat, "lon": lon}, collection_date=target_date.strftime("%Y-%m-%d"), overall_quality=TemporalQuality.NO_DATA, ) # For each weather parameter, select best observation from all providers weather_fields = [ "temperature", "wind_speed", "wind_direction", "humidity", "solar_radiation", "precipitation", "pressure", ] best_quality = TemporalQuality.NO_DATA for field in weather_fields: best_obs = None best_obs_quality = TemporalQuality.NO_DATA # Compare observations across all providers for this field for result in provider_results: obs = getattr(result, field, None) if obs is not None: obs_quality = obs.temporal_precision.data_quality # Select observation with best temporal quality if self._is_better_quality(obs_quality, best_obs_quality): best_obs = obs best_obs_quality = obs_quality # Set the best observation for this field if best_obs: setattr(integrated, field, best_obs) if self._is_better_quality(best_obs_quality, best_quality): best_quality = best_obs_quality integrated.overall_quality = best_quality return integrated def _is_better_quality( self, new_quality: TemporalQuality, current_quality: TemporalQuality ) -> bool: """Compare temporal quality levels.""" quality_order = [ TemporalQuality.DAY_SPECIFIC_COMPLETE, TemporalQuality.DAY_SPECIFIC_PARTIAL, TemporalQuality.WEEKLY_COMPOSITE, TemporalQuality.MONTHLY_CLIMATOLOGY, TemporalQuality.NO_DATA, ] new_rank = quality_order.index(new_quality) current_rank = quality_order.index(current_quality) return new_rank < current_rank def _create_empty_result( self, lat: float, lon: float, target_date: date, attempted_providers: list[str], failed_providers: list[str], ) -> WeatherResult: """Create empty result when all providers fail.""" return WeatherResult( location={"lat": lat, "lon": lon}, collection_date=target_date.strftime("%Y-%m-%d"), providers_attempted=attempted_providers, successful_providers=[], failed_providers=failed_providers, overall_quality=TemporalQuality.NO_DATA, )
[docs] def get_provider_info(self) -> list[dict[str, Any]]: """Get information about all configured providers.""" return [provider.get_provider_info() for provider in self.providers]