from fastapi import FastAPI, HTTPException from fastapi.responses import JSONResponse import feedparser import re import html import datetime as dt from urllib.request import urlopen, Request from urllib.error import URLError, HTTPError from email.utils import parsedate_to_datetime from typing import TypedDict, Any app = FastAPI() BASE = "https://www.umwelt.sachsen.de/umwelt/infosysteme/hwims/portal/web/feed/wasserstand-pegel-{}" DISCOVERY_URL = "https://www.umwelt.sachsen.de/umwelt/infosysteme/hwims/portal/web/wasserstand-uebersicht" INDEX_CACHE_TTL_SECONDS = 900 LATEST_CACHE_TTL_SECONDS = 120 class IndexCache(TypedDict): items: list[dict[str, Any]] | None fetched_at: dt.datetime | None _index_cache: IndexCache = { "items": None, "fetched_at": None, } _latest_cache: dict[str, dict[str, Any]] = {} def parse_number(pattern, text): m = re.search(pattern, text, re.I) return float(m.group(1).replace(",", ".")) if m else None def parse_text(pattern, text): m = re.search(pattern, text, re.I) return m.group(1).strip() if m else None def clean_text(text): text = re.sub(r"", "\n", text, flags=re.I) text = re.sub(r"<[^>]+>", " ", text) return text def extract_station_name(title: str | None): if not title: return None m = re.search(r"Pegel\s+(.+)$", title) return m.group(1).strip() if m else title def _now_utc(): return dt.datetime.now(dt.UTC) def _is_cache_fresh(fetched_at, ttl_seconds: int): if not fetched_at: return False return (_now_utc() - fetched_at).total_seconds() < ttl_seconds def _set_index_cache(items): _index_cache["items"] = items _index_cache["fetched_at"] = _now_utc() def _extract_station_title_from_id(pegel_id: str): feed = feedparser.parse(BASE.format(pegel_id)) feed_obj = getattr(feed, "feed", None) feed_title = feed_obj.get("title") if isinstance(feed_obj, dict) else None if isinstance(feed_title, str) and feed_title: return extract_station_name(feed_title) if feed.entries: first_title = feed.entries[0].get("title") if isinstance(first_title, str): return extract_station_name(first_title) return None def _parse_stations_from_overview(body: str): stations: dict[str, str | None] = {} paired_pattern = re.compile( r']*>\s*' r']*>.*?' r'\s*(.*?)', re.I | re.S, ) for pegel_id, raw_title in paired_pattern.findall(body): title = re.sub(r"\s+", " ", html.unescape(raw_title)).strip() or None stations[pegel_id] = title for pegel_id in re.findall(r"wasserstand-pegel-(\d{6,})", body): stations.setdefault(pegel_id, None) return stations def discover_stations(): req = Request( DISCOVERY_URL, headers={"User-Agent": "sachsen-pegel-proxy/1.0"}, ) with urlopen(req, timeout=12) as resp: body = resp.read().decode("utf-8", errors="ignore") stations = _parse_stations_from_overview(body) ids = sorted(stations.keys()) if not ids: raise ValueError("Keine Pegel-IDs in Discovery-Quelle gefunden") items = [] for pegel_id in ids: station_title = stations.get(pegel_id) items.append({ "station_id": pegel_id, "station_title": station_title or _extract_station_title_from_id(pegel_id), "source": BASE.format(pegel_id), }) return items def parse_timestamp_from_title(title: str): if not title: return None m = re.search(r"(\d{2}\.\d{2}\.\d{4})\s+(\d{2}:\d{2})\s+Uhr\s+\((MESZ|MEZ)", title) if not m: return None date_part, time_part, tz_hint = m.groups() parsed = dt.datetime.strptime(f"{date_part} {time_part}", "%d.%m.%Y %H:%M") offset = dt.timedelta(hours=2 if tz_hint == "MESZ" else 1) return parsed.replace(tzinfo=dt.timezone(offset)).isoformat() def parse_timestamp(entry): ts_from_title = parse_timestamp_from_title(entry.get("title")) if ts_from_title: return ts_from_title if entry.get("dc_date"): return entry.get("dc_date") if entry.get("published"): return parsedate_to_datetime(entry.get("published")).isoformat() return None def parse_entry(entry): text = clean_text(entry.get("description", "")) return { "timestamp": parse_timestamp(entry), "title": entry.get("title"), "water_level_cm": parse_number( r"Wasserstand:\s*([0-9]+(?:[,.][0-9]+)?)\s*cm", text ), "flow_m3s": parse_number( r"Durchfluss:\s*([0-9]+(?:[,.][0-9]+)?)\s*m³/s", text ), "flood_alert_level": parse_text( r"Meldestufe:\s*([^\n\r]+)", text ), } def _get_latest_for_station(station_id: str): cache_hit = _latest_cache.get(station_id) if cache_hit and _is_cache_fresh(cache_hit.get("fetched_at"), LATEST_CACHE_TTL_SECONDS): return cache_hit.get("latest") feed = feedparser.parse(BASE.format(station_id)) if not feed.entries: raise ValueError("Keine Daten gefunden") latest = parse_entry(feed.entries[0]) _latest_cache[station_id] = { "latest": latest, "fetched_at": _now_utc(), } return latest @app.get("/api/pegel") def list_pegel(limit: int | None = None): if limit is not None and limit <= 0: raise HTTPException(400, "Ungültiger limit-Parameter") warnings = [] discovered_items: list[dict[str, Any]] | None = None source_used = DISCOVERY_URL cache_fallback = False if _is_cache_fresh(_index_cache.get("fetched_at"), INDEX_CACHE_TTL_SECONDS): discovered_items = _index_cache.get("items") else: try: discovered_items = discover_stations() _set_index_cache(discovered_items) except (URLError, HTTPError, TimeoutError, ValueError, OSError) as e: if _index_cache.get("items"): discovered_items = _index_cache.get("items") source_used = "cache" cache_fallback = True warnings.append(f"Discovery fehlgeschlagen, nutze Cache: {str(e)}") else: raise HTTPException(502, "Upstream-Discovery nicht verfügbar") if discovered_items is None: raise HTTPException(502, "Upstream-Discovery nicht verfügbar") total_available = len(discovered_items) items = discovered_items[:limit] if limit is not None else list(discovered_items) response = { "count": len(items), "total_available": total_available, "items": items, "source": source_used, "fetched_at": _now_utc().isoformat(), } if warnings: response["warnings"] = warnings cache_fetched_at = _index_cache.get("fetched_at") if cache_fallback and cache_fetched_at: response["cache_fetched_at"] = cache_fetched_at.isoformat() return JSONResponse(response) @app.get("/api/pegel/{pegel_id}") def get_pegel(pegel_id: str): if not re.fullmatch(r"\d{6,}", pegel_id): raise HTTPException(400, "Ungültige Pegel-ID") url = BASE.format(pegel_id) feed = feedparser.parse(url) if not feed.entries: raise HTTPException(404, "Keine Daten gefunden") entries = [parse_entry(entry) for entry in feed.entries] feed_obj = getattr(feed, "feed", None) feed_title = feed_obj.get("title") if isinstance(feed_obj, dict) else None return JSONResponse({ "station_id": pegel_id, "station_title": extract_station_name(feed_title), "count": len(entries), "latest": entries[0], "entries": entries, "source": url, "fetched_at": dt.datetime.now(dt.UTC).isoformat(), })