add index endpoint

This commit is contained in:
Matthias Jacob
2026-05-05 03:54:21 +02:00
parent 883e3aa9d1
commit 9b01145f07
2 changed files with 196 additions and 2 deletions
+152 -2
View File
@@ -2,12 +2,31 @@ from fastapi import FastAPI, HTTPException
from fastapi.responses import JSONResponse
import feedparser
import re
import html
import datetime as dt
from urllib.request import urlopen, Request
from urllib.error import URLError, HTTPError
from email.utils import parsedate_to_datetime
from typing import TypedDict, Any
app = FastAPI()
BASE = "https://www.umwelt.sachsen.de/umwelt/infosysteme/hwims/portal/web/feed/wasserstand-pegel-{}"
DISCOVERY_URL = "https://www.umwelt.sachsen.de/umwelt/infosysteme/hwims/portal/web/wasserstand-uebersicht"
INDEX_CACHE_TTL_SECONDS = 900
LATEST_CACHE_TTL_SECONDS = 120
class IndexCache(TypedDict):
items: list[dict[str, Any]] | None
fetched_at: dt.datetime | None
_index_cache: IndexCache = {
"items": None,
"fetched_at": None,
}
_latest_cache: dict[str, dict[str, Any]] = {}
def parse_number(pattern, text):
m = re.search(pattern, text, re.I)
@@ -22,13 +41,79 @@ def clean_text(text):
text = re.sub(r"<[^>]+>", " ", text)
return text
def extract_station_name(title: str):
def extract_station_name(title: str | None):
if not title:
return None
m = re.search(r"Pegel\s+(.+)$", title)
return m.group(1).strip() if m else title
def _now_utc():
return dt.datetime.now(dt.UTC)
def _is_cache_fresh(fetched_at, ttl_seconds: int):
if not fetched_at:
return False
return (_now_utc() - fetched_at).total_seconds() < ttl_seconds
def _set_index_cache(items):
_index_cache["items"] = items
_index_cache["fetched_at"] = _now_utc()
def _extract_station_title_from_id(pegel_id: str):
feed = feedparser.parse(BASE.format(pegel_id))
feed_obj = getattr(feed, "feed", None)
feed_title = feed_obj.get("title") if isinstance(feed_obj, dict) else None
if isinstance(feed_title, str) and feed_title:
return extract_station_name(feed_title)
if feed.entries:
first_title = feed.entries[0].get("title")
if isinstance(first_title, str):
return extract_station_name(first_title)
return None
def _parse_stations_from_overview(body: str):
stations: dict[str, str | None] = {}
paired_pattern = re.compile(
r'<a\s+href="wasserstand-pegel-(\d{6,})"[^>]*></a>\s*'
r'<div\s+class="popUp\s+popUpMs"[^>]*>.*?'
r'<div\s+class="popUpTitle">\s*<span\s+class="popUpTitleBold">(.*?)</span>',
re.I | re.S,
)
for pegel_id, raw_title in paired_pattern.findall(body):
title = re.sub(r"\s+", " ", html.unescape(raw_title)).strip() or None
stations[pegel_id] = title
for pegel_id in re.findall(r"wasserstand-pegel-(\d{6,})", body):
stations.setdefault(pegel_id, None)
return stations
def discover_stations():
req = Request(
DISCOVERY_URL,
headers={"User-Agent": "sachsen-pegel-proxy/1.0"},
)
with urlopen(req, timeout=12) as resp:
body = resp.read().decode("utf-8", errors="ignore")
stations = _parse_stations_from_overview(body)
ids = sorted(stations.keys())
if not ids:
raise ValueError("Keine Pegel-IDs in Discovery-Quelle gefunden")
items = []
for pegel_id in ids:
station_title = stations.get(pegel_id)
items.append({
"station_id": pegel_id,
"station_title": station_title or _extract_station_title_from_id(pegel_id),
"source": BASE.format(pegel_id),
})
return items
def parse_timestamp_from_title(title: str):
if not title:
return None
@@ -72,6 +157,68 @@ def parse_entry(entry):
),
}
def _get_latest_for_station(station_id: str):
cache_hit = _latest_cache.get(station_id)
if cache_hit and _is_cache_fresh(cache_hit.get("fetched_at"), LATEST_CACHE_TTL_SECONDS):
return cache_hit.get("latest")
feed = feedparser.parse(BASE.format(station_id))
if not feed.entries:
raise ValueError("Keine Daten gefunden")
latest = parse_entry(feed.entries[0])
_latest_cache[station_id] = {
"latest": latest,
"fetched_at": _now_utc(),
}
return latest
@app.get("/api/pegel")
def list_pegel(limit: int | None = None):
if limit is not None and limit <= 0:
raise HTTPException(400, "Ungültiger limit-Parameter")
warnings = []
discovered_items: list[dict[str, Any]] | None = None
source_used = DISCOVERY_URL
cache_fallback = False
if _is_cache_fresh(_index_cache.get("fetched_at"), INDEX_CACHE_TTL_SECONDS):
discovered_items = _index_cache.get("items")
else:
try:
discovered_items = discover_stations()
_set_index_cache(discovered_items)
except (URLError, HTTPError, TimeoutError, ValueError, OSError) as e:
if _index_cache.get("items"):
discovered_items = _index_cache.get("items")
source_used = "cache"
cache_fallback = True
warnings.append(f"Discovery fehlgeschlagen, nutze Cache: {str(e)}")
else:
raise HTTPException(502, "Upstream-Discovery nicht verfügbar")
if discovered_items is None:
raise HTTPException(502, "Upstream-Discovery nicht verfügbar")
total_available = len(discovered_items)
items = discovered_items[:limit] if limit is not None else list(discovered_items)
response = {
"count": len(items),
"total_available": total_available,
"items": items,
"source": source_used,
"fetched_at": _now_utc().isoformat(),
}
if warnings:
response["warnings"] = warnings
cache_fetched_at = _index_cache.get("fetched_at")
if cache_fallback and cache_fetched_at:
response["cache_fetched_at"] = cache_fetched_at.isoformat()
return JSONResponse(response)
@app.get("/api/pegel/{pegel_id}")
def get_pegel(pegel_id: str):
if not re.fullmatch(r"\d{6,}", pegel_id):
@@ -85,9 +232,12 @@ def get_pegel(pegel_id: str):
entries = [parse_entry(entry) for entry in feed.entries]
feed_obj = getattr(feed, "feed", None)
feed_title = feed_obj.get("title") if isinstance(feed_obj, dict) else None
return JSONResponse({
"station_id": pegel_id,
"station_title": extract_station_name(feed.feed.get("title")),
"station_title": extract_station_name(feed_title),
"count": len(entries),
"latest": entries[0],
"entries": entries,