Files
sachsen_pegel_proxy/api/app.py
T
2026-05-05 03:54:21 +02:00

247 lines
7.7 KiB
Python

from fastapi import FastAPI, HTTPException
from fastapi.responses import JSONResponse
import feedparser
import re
import html
import datetime as dt
from urllib.request import urlopen, Request
from urllib.error import URLError, HTTPError
from email.utils import parsedate_to_datetime
from typing import TypedDict, Any
app = FastAPI()
BASE = "https://www.umwelt.sachsen.de/umwelt/infosysteme/hwims/portal/web/feed/wasserstand-pegel-{}"
DISCOVERY_URL = "https://www.umwelt.sachsen.de/umwelt/infosysteme/hwims/portal/web/wasserstand-uebersicht"
INDEX_CACHE_TTL_SECONDS = 900
LATEST_CACHE_TTL_SECONDS = 120
class IndexCache(TypedDict):
items: list[dict[str, Any]] | None
fetched_at: dt.datetime | None
_index_cache: IndexCache = {
"items": None,
"fetched_at": None,
}
_latest_cache: dict[str, dict[str, Any]] = {}
def parse_number(pattern, text):
m = re.search(pattern, text, re.I)
return float(m.group(1).replace(",", ".")) if m else None
def parse_text(pattern, text):
m = re.search(pattern, text, re.I)
return m.group(1).strip() if m else None
def clean_text(text):
text = re.sub(r"<br\s*/?>", "\n", text, flags=re.I)
text = re.sub(r"<[^>]+>", " ", text)
return text
def extract_station_name(title: str | None):
if not title:
return None
m = re.search(r"Pegel\s+(.+)$", title)
return m.group(1).strip() if m else title
def _now_utc():
return dt.datetime.now(dt.UTC)
def _is_cache_fresh(fetched_at, ttl_seconds: int):
if not fetched_at:
return False
return (_now_utc() - fetched_at).total_seconds() < ttl_seconds
def _set_index_cache(items):
_index_cache["items"] = items
_index_cache["fetched_at"] = _now_utc()
def _extract_station_title_from_id(pegel_id: str):
feed = feedparser.parse(BASE.format(pegel_id))
feed_obj = getattr(feed, "feed", None)
feed_title = feed_obj.get("title") if isinstance(feed_obj, dict) else None
if isinstance(feed_title, str) and feed_title:
return extract_station_name(feed_title)
if feed.entries:
first_title = feed.entries[0].get("title")
if isinstance(first_title, str):
return extract_station_name(first_title)
return None
def _parse_stations_from_overview(body: str):
stations: dict[str, str | None] = {}
paired_pattern = re.compile(
r'<a\s+href="wasserstand-pegel-(\d{6,})"[^>]*></a>\s*'
r'<div\s+class="popUp\s+popUpMs"[^>]*>.*?'
r'<div\s+class="popUpTitle">\s*<span\s+class="popUpTitleBold">(.*?)</span>',
re.I | re.S,
)
for pegel_id, raw_title in paired_pattern.findall(body):
title = re.sub(r"\s+", " ", html.unescape(raw_title)).strip() or None
stations[pegel_id] = title
for pegel_id in re.findall(r"wasserstand-pegel-(\d{6,})", body):
stations.setdefault(pegel_id, None)
return stations
def discover_stations():
req = Request(
DISCOVERY_URL,
headers={"User-Agent": "sachsen-pegel-proxy/1.0"},
)
with urlopen(req, timeout=12) as resp:
body = resp.read().decode("utf-8", errors="ignore")
stations = _parse_stations_from_overview(body)
ids = sorted(stations.keys())
if not ids:
raise ValueError("Keine Pegel-IDs in Discovery-Quelle gefunden")
items = []
for pegel_id in ids:
station_title = stations.get(pegel_id)
items.append({
"station_id": pegel_id,
"station_title": station_title or _extract_station_title_from_id(pegel_id),
"source": BASE.format(pegel_id),
})
return items
def parse_timestamp_from_title(title: str):
if not title:
return None
m = re.search(r"(\d{2}\.\d{2}\.\d{4})\s+(\d{2}:\d{2})\s+Uhr\s+\((MESZ|MEZ)", title)
if not m:
return None
date_part, time_part, tz_hint = m.groups()
parsed = dt.datetime.strptime(f"{date_part} {time_part}", "%d.%m.%Y %H:%M")
offset = dt.timedelta(hours=2 if tz_hint == "MESZ" else 1)
return parsed.replace(tzinfo=dt.timezone(offset)).isoformat()
def parse_timestamp(entry):
ts_from_title = parse_timestamp_from_title(entry.get("title"))
if ts_from_title:
return ts_from_title
if entry.get("dc_date"):
return entry.get("dc_date")
if entry.get("published"):
return parsedate_to_datetime(entry.get("published")).isoformat()
return None
def parse_entry(entry):
text = clean_text(entry.get("description", ""))
return {
"timestamp": parse_timestamp(entry),
"title": entry.get("title"),
"water_level_cm": parse_number(
r"Wasserstand:\s*([0-9]+(?:[,.][0-9]+)?)\s*cm", text
),
"flow_m3s": parse_number(
r"Durchfluss:\s*([0-9]+(?:[,.][0-9]+)?)\s*m³/s", text
),
"flood_alert_level": parse_text(
r"Meldestufe:\s*([^\n\r]+)", text
),
}
def _get_latest_for_station(station_id: str):
cache_hit = _latest_cache.get(station_id)
if cache_hit and _is_cache_fresh(cache_hit.get("fetched_at"), LATEST_CACHE_TTL_SECONDS):
return cache_hit.get("latest")
feed = feedparser.parse(BASE.format(station_id))
if not feed.entries:
raise ValueError("Keine Daten gefunden")
latest = parse_entry(feed.entries[0])
_latest_cache[station_id] = {
"latest": latest,
"fetched_at": _now_utc(),
}
return latest
@app.get("/api/pegel")
def list_pegel(limit: int | None = None):
if limit is not None and limit <= 0:
raise HTTPException(400, "Ungültiger limit-Parameter")
warnings = []
discovered_items: list[dict[str, Any]] | None = None
source_used = DISCOVERY_URL
cache_fallback = False
if _is_cache_fresh(_index_cache.get("fetched_at"), INDEX_CACHE_TTL_SECONDS):
discovered_items = _index_cache.get("items")
else:
try:
discovered_items = discover_stations()
_set_index_cache(discovered_items)
except (URLError, HTTPError, TimeoutError, ValueError, OSError) as e:
if _index_cache.get("items"):
discovered_items = _index_cache.get("items")
source_used = "cache"
cache_fallback = True
warnings.append(f"Discovery fehlgeschlagen, nutze Cache: {str(e)}")
else:
raise HTTPException(502, "Upstream-Discovery nicht verfügbar")
if discovered_items is None:
raise HTTPException(502, "Upstream-Discovery nicht verfügbar")
total_available = len(discovered_items)
items = discovered_items[:limit] if limit is not None else list(discovered_items)
response = {
"count": len(items),
"total_available": total_available,
"items": items,
"source": source_used,
"fetched_at": _now_utc().isoformat(),
}
if warnings:
response["warnings"] = warnings
cache_fetched_at = _index_cache.get("fetched_at")
if cache_fallback and cache_fetched_at:
response["cache_fetched_at"] = cache_fetched_at.isoformat()
return JSONResponse(response)
@app.get("/api/pegel/{pegel_id}")
def get_pegel(pegel_id: str):
if not re.fullmatch(r"\d{6,}", pegel_id):
raise HTTPException(400, "Ungültige Pegel-ID")
url = BASE.format(pegel_id)
feed = feedparser.parse(url)
if not feed.entries:
raise HTTPException(404, "Keine Daten gefunden")
entries = [parse_entry(entry) for entry in feed.entries]
feed_obj = getattr(feed, "feed", None)
feed_title = feed_obj.get("title") if isinstance(feed_obj, dict) else None
return JSONResponse({
"station_id": pegel_id,
"station_title": extract_station_name(feed_title),
"count": len(entries),
"latest": entries[0],
"entries": entries,
"source": url,
"fetched_at": dt.datetime.now(dt.UTC).isoformat(),
})