Files
check_pa/main.py
cyroxx 031a90a332
All checks were successful
Python tests / tests (push) Successful in 2m22s
make scraper more robust for "not found" error
2026-02-05 22:15:40 +01:00

297 lines
9.6 KiB
Python

from pathlib import Path
from string import punctuation
import time
from datetime import datetime
from urllib.parse import urlparse
import re
import requests
from selenium import webdriver
from selenium.webdriver.common.by import By
from selenium.webdriver.remote.webdriver import WebDriver
from bs4 import BeautifulSoup
try:
from settings import DOCUMENT_ID, WEBHOOK_URL, MODEL_NAME
except ImportError:
print("settings.py nicht gefunden, verwende settings_example.py")
from settings_example import DOCUMENT_ID, WEBHOOK_URL, MODEL_NAME
from transcription import transcribe_audio_with_whisper
MAX_CAPTCHA_ATTEMPTS = 3
USE_HEADLESS_MODE = False
start_url = f"https://olmera.verwalt-berlin.de/std/olav/antrag/passpa/1?d={DOCUMENT_ID}"
# start_url = "http://127.0.0.1:8000/sample_1.html"
download_dir = Path("./audio_captchas").resolve()
# Not yet really used, just for reference
statuses = {
"in Produktion": "Das Dokument ist noch in Produktion.",
"abholbereit (Reisepass)": "Ihr Reisepass liegt zur Abholung bereit.",
"abholbereit (Personalausweis)": "Ihr Personalausweis ist in der Ausweisbehörde eingetroffen. Das Dokument kann abgeholt werden.",
}
def _extract_mp3_filename(url):
"""Extrahiert den Dateinamen (z.B. '1770057062035.mp3') aus einer URL."""
parsed_url = urlparse(url)
filename = parsed_url.path.split("/")[-1]
return filename
def _get_driver():
options = webdriver.FirefoxOptions()
options.set_preference("intl.accept_languages", "de-DE, de")
options.set_preference("browser.download.useDownloadDir", True)
options.set_preference("browser.download.folderList", 2)
options.set_preference("browser.download.dir", str(download_dir))
if USE_HEADLESS_MODE:
options.add_argument("--headless")
driver = webdriver.Firefox(options=options)
return driver
# Try the audio_captcha way
def process_captcha_page_with_audio_captcha(driver):
# Download audio captcha
driver.find_element(By.ID, "captcha").find_element(By.TAG_NAME, "img").screenshot(
"captcha.png"
)
audio_captcha = (
driver.find_element(By.ID, "captcha")
.find_element(By.CLASS_NAME, "audioCaptcha")
.find_element(By.TAG_NAME, "a")
.get_attribute("href")
)
print(audio_captcha)
driver.find_element(By.ID, "captcha").find_element(
By.CLASS_NAME, "audioCaptcha"
).find_element(By.TAG_NAME, "a").click()
print(f"Download dir: {str(download_dir)}")
mp3_filename = _extract_mp3_filename(audio_captcha)
print(f"Extracted MP3 filename: {mp3_filename}")
print("Warte auf den Download der Audiodatei...")
time.sleep(2) # Warte 2 Sekunden auf den Download (anpassen je nach Bedarf)
print(f"Lokaler Pfad der Audiodatei: {download_dir / mp3_filename}")
# perform transcription
transcribed_text = transcribe_audio_with_whisper(download_dir / mp3_filename)
print(f"Transkribierter Text: {transcribed_text}")
if len(transcribed_text) != 4:
print("Transkription hat nicht die erwartete Länge von 4 Zeichen. Abbruch.")
return False
# Fill in the form
driver.find_element(By.CLASS_NAME, "formtable").find_element(
By.ID, "code"
).send_keys(transcribed_text)
# Submit the form
driver.find_element(By.CLASS_NAME, "actionbuttons").find_element(
By.CLASS_NAME, "button"
).click()
return True
def normalize_status_text(raw_text: str):
# Remove extra whitespace
text = re.sub(r"\s+", " ", raw_text).strip()
# Normalize specific status texts
if text == statuses["in Produktion"]:
text = "noch in Produktion"
return text
# I tried the OCR way first, but was not successful.
def screenshot_captcha(save_dir="./captchas", prefix="captcha"):
driver = _get_driver()
driver.get(start_url)
# Erzeuge einen eindeutigen Dateinamen mit Zeitstempel
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
filename = f"{save_dir}/{prefix}_{timestamp}.png"
driver.find_element(By.ID, "captcha").find_element(By.TAG_NAME, "img").screenshot(
filename
)
driver.quit()
print(f"Captcha gespeichert als {filename}")
def test_transcription():
"""
Processes all MP3 files in the specified download directory, transcribes each using Whisper,
and writes the results to a CSV file. Calculates and prints the accuracy based on the number
of transcriptions with exactly 4 characters.
The CSV file 'transcription_results.csv' will contain two columns: 'filename' and 'transcription'.
Returns:
None
"""
accuracy = 0
total = 0
# Load Whisper model once for efficiency
import whisper
model = whisper.load_model(MODEL_NAME)
# create or overwrite CSV file for transcription results
with open("transcription_results.csv", "w") as csvfile:
csvfile.write("filename,transcription\n")
for mp3_path in download_dir.glob("*.mp3"):
print(f"Verarbeite Datei: {mp3_path}")
transcription = transcribe_audio_with_whisper(mp3_path, model=model)
print(f"Transkription: {transcription}")
csvfile.write(f"{mp3_path.name},{transcription}\n")
if transcription and len(transcription) == 4:
accuracy += 1
total += 1
print(f"Genauigkeit: {accuracy}/{total} = {accuracy/total*100:.2f}%")
def is_captcha_page(html):
soup = BeautifulSoup(html, "html.parser")
fieldset = soup.find("fieldset", attrs={"aria-label": "CAPTCHA"})
return fieldset is not None
def is_status_page(html):
soup = BeautifulSoup(html, "html.parser")
fieldset = soup.find("fieldset", attrs={"aria-label": "Statusabfrage Suchergebnis"})
return fieldset is not None
def parse_status_page(html):
soup = BeautifulSoup(html, "html.parser")
box = soup.find("fieldset", attrs={"aria-label": "Statusabfrage Suchergebnis"})
if box is None:
raise ValueError("Statusbereich nicht gefunden (fehlendes Fieldset).")
warn = box.select_one(".warn p") or box.select_one(".info p")
if warn is None:
raise ValueError("Statusabschnitt (.warn p oder .info p) nicht gefunden.")
status_text = warn.get_text(strip=True)
timestamp_node = warn.find_next("p")
if timestamp_node is None:
raise ValueError("Zeitstempelabschnitt (nächstes <p>) nicht gefunden.")
timestamp_raw = timestamp_node.get_text(strip=True).replace("Stand: ", "")
timestamp_dt = datetime.strptime(timestamp_raw, "%d.%m.%Y %H:%M:%S")
print("Status:", status_text)
print("Stand :", timestamp_dt.isoformat())
return status_text, timestamp_dt
def test_parse_status_page():
with open("./sample_html/status.html", "r") as f:
html = f.read()
is_status = is_status_page(html)
print(f"is_status_page: {is_status}")
status, timestamp = parse_status_page(html)
status = normalize_status_text(status)
print(f"Status: {status}, Timestamp: {timestamp}")
def notify_webhook(status, last_updated, webhook_url):
data = {"status": status, "last_updated": last_updated.isoformat()}
response = requests.post(webhook_url, json=data)
success = response.status_code == 200
if success:
print("Daten erfolgreich gesendet.")
else:
print(f"Fehler beim Senden der Daten: {response.status_code}, {response.text}")
return success
def solve_captcha_flow(driver: WebDriver):
"""Versucht höchstens MAX_CAPTCHA_ATTEMPTS-mal, bis eine Statusseite erreicht wird."""
for attempt in range(1, MAX_CAPTCHA_ATTEMPTS + 1):
print(f"[Attempt {attempt}/{MAX_CAPTCHA_ATTEMPTS}] Löse Audio-Captcha …")
transcription_successful = process_captcha_page_with_audio_captcha(driver)
if not transcription_successful:
print("Transkription fehlgeschlagen, lade Captcha-Seite neu …")
driver.get(start_url)
continue
html = driver.page_source
if is_status_page(html):
status_raw, last_updated = parse_status_page(html)
status = normalize_status_text(status_raw)
print(f"Status ermittelt: {status} (Stand: {last_updated})")
return status, last_updated
if attempt == MAX_CAPTCHA_ATTEMPTS:
raise RuntimeError(
"Maximale Anzahl an Captcha-Versuchen erreicht, ohne Statusseite zu erhalten."
)
if is_captcha_page(html):
print("Captcha nicht gelöst, versuche es erneut …")
driver.get(start_url)
if error_messages := driver.find_elements(By.CLASS_NAME, "error_message"):
errors = [e.text for e in error_messages if e.text.strip(punctuation)]
print(f"Es gab folgende Fehler: {errors}")
if "Das Dokument wurde nicht gefunden." in errors:
# Mögliche Gründe; Falsche Dokument-ID, Dokument wurde bereits abgeholt und ist nicht mehr im System, etc.
return "Dokument nicht gefunden", datetime.now()
else:
return "Fehler beim Abrufen des Dokumentenstatus", datetime.now()
raise RuntimeError("Weder Status- noch Captcha-Seite erkannt. Abbruch.")
raise RuntimeError("Status konnte nicht ermittelt werden.")
def main():
driver = _get_driver()
try:
driver.get(start_url)
status, last_updated = solve_captcha_flow(driver)
print(f"Final Status: {status}, Timestamp: {last_updated}")
if not notify_webhook(status, last_updated, WEBHOOK_URL):
raise RuntimeError("Webhook konnte nicht benachrichtigt werden.")
finally:
# driver.quit()
pass
if __name__ == "__main__":
#test_transcription()
# test_parse_status_page()
main()