from pathlib import Path from string import punctuation import time from datetime import datetime from urllib.parse import urlparse import requests from selenium import webdriver from selenium.webdriver.common.by import By from bs4 import BeautifulSoup try: from settings import DOCUMENT_ID, WEBHOOK_URL except ImportError: print("settings.py nicht gefunden, verwende settings_example.py") from settings_example import DOCUMENT_ID, WEBHOOK_URL from transcription import transcribe_audio_with_whisper MAX_CAPTCHA_ATTEMPTS = 3 USE_HEADLESS_MODE = False start_url = f"https://olmera.verwalt-berlin.de/std/olav/antrag/passpa/1?d={DOCUMENT_ID}" # start_url = "http://127.0.0.1:8000/sample_1.html" download_dir = Path("./audio_captchas").resolve() def _extract_mp3_filename(url): """Extrahiert den Dateinamen (z.B. '1770057062035.mp3') aus einer URL.""" parsed_url = urlparse(url) filename = parsed_url.path.split("/")[-1] return filename def _get_driver(): options = webdriver.FirefoxOptions() options.set_preference("intl.accept_languages", "de-DE, de") options.set_preference("browser.download.useDownloadDir", True) options.set_preference("browser.download.folderList", 2) options.set_preference("browser.download.dir", str(download_dir)) if USE_HEADLESS_MODE: options.add_argument("--headless") driver = webdriver.Firefox(options=options) return driver # Try the audio_captcha way def process_captcha_page_with_audio_captcha(driver): # Download audio captcha driver.find_element(By.ID, "captcha").find_element(By.TAG_NAME, "img").screenshot( "captcha.png" ) audio_captcha = ( driver.find_element(By.ID, "captcha") .find_element(By.CLASS_NAME, "audioCaptcha") .find_element(By.TAG_NAME, "a") .get_attribute("href") ) print(audio_captcha) driver.find_element(By.ID, "captcha").find_element( By.CLASS_NAME, "audioCaptcha" ).find_element(By.TAG_NAME, "a").click() print(f"Download dir: {str(download_dir)}") mp3_filename = _extract_mp3_filename(audio_captcha) print(f"Extracted MP3 filename: {mp3_filename}") print("Warte auf den Download der Audiodatei...") time.sleep(2) # Warte 2 Sekunden auf den Download (anpassen je nach Bedarf) print(f"Lokaler Pfad der Audiodatei: {download_dir / mp3_filename}") # perform transcription transcribed_text = transcribe_audio_with_whisper(download_dir / mp3_filename) print(f"Transkribierter Text: {transcribed_text}") if len(transcribed_text) != 4: print("Transkription hat nicht die erwartete Länge von 4 Zeichen. Abbruch.") return False # Fill in the form driver.find_element(By.CLASS_NAME, "formtable").find_element( By.ID, "code" ).send_keys(transcribed_text) # Submit the form driver.find_element(By.CLASS_NAME, "actionbuttons").find_element( By.CLASS_NAME, "button" ).click() return True def normalize_status_text(raw_text: str): return raw_text.replace("Das Dokument ist", "").strip(punctuation).strip() # I tried the OCR way first, but was not successful. def screenshot_captcha(save_dir="./captchas", prefix="captcha"): driver = _get_driver() driver.get(start_url) # Erzeuge einen eindeutigen Dateinamen mit Zeitstempel timestamp = datetime.now().strftime("%Y%m%d_%H%M%S") filename = f"{save_dir}/{prefix}_{timestamp}.png" driver.find_element(By.ID, "captcha").find_element(By.TAG_NAME, "img").screenshot( filename ) driver.quit() print(f"Captcha gespeichert als {filename}") def test_transcription(): """ Processes all MP3 files in the specified download directory, transcribes each using Whisper, and writes the results to a CSV file. Calculates and prints the accuracy based on the number of transcriptions with exactly 4 characters. The CSV file 'transcription_results.csv' will contain two columns: 'filename' and 'transcription'. Returns: None """ accuracy = 0 total = 0 # create or overwrite CSV file for transcription results with open("transcription_results.csv", "w") as csvfile: csvfile.write("filename,transcription\n") for mp3_path in download_dir.glob("*.mp3"): print(f"Verarbeite Datei: {mp3_path}") transcription = transcribe_audio_with_whisper(mp3_path) print(f"Transkription: {transcription}") csvfile.write(f"{mp3_path.name},{transcription}\n") if transcription and len(transcription) == 4: accuracy += 1 total += 1 print(f"Genauigkeit: {accuracy}/{total} = {accuracy/total*100:.2f}%") def is_captcha_page(html): soup = BeautifulSoup(html, "html.parser") fieldset = soup.find("fieldset", attrs={"aria-label": "CAPTCHA"}) return fieldset is not None def is_status_page(html): soup = BeautifulSoup(html, "html.parser") fieldset = soup.find("fieldset", attrs={"aria-label": "Statusabfrage Suchergebnis"}) return fieldset is not None def parse_status_page(html): soup = BeautifulSoup(html, "html.parser") box = soup.find("fieldset", attrs={"aria-label": "Statusabfrage Suchergebnis"}) if box is None: raise ValueError("Statusbereich nicht gefunden (fehlendes Fieldset).") warn = box.select_one(".warn p") if warn is None: raise ValueError("Statusabschnitt (.warn p) nicht gefunden.") status_text = warn.get_text(strip=True) timestamp_node = warn.find_next("p") if timestamp_node is None: raise ValueError("Zeitstempelabschnitt (nächstes

) nicht gefunden.") timestamp_raw = timestamp_node.get_text(strip=True).replace("Stand: ", "") timestamp_dt = datetime.strptime(timestamp_raw, "%d.%m.%Y %H:%M:%S") print("Status:", status_text) print("Stand :", timestamp_dt.isoformat()) return status_text, timestamp_dt def test_parse_status_page(): with open("./sample_html/status.html", "r") as f: html = f.read() is_status = is_status_page(html) print(f"is_status_page: {is_status}") status, timestamp = parse_status_page(html) status = normalize_status_text(status) print(f"Status: {status}, Timestamp: {timestamp}") def notify_webhook(status, last_updated, webhook_url): data = {"status": status, "last_updated": last_updated.isoformat()} response = requests.post(webhook_url, json=data) success = response.status_code == 200 if success: print("Daten erfolgreich gesendet.") else: print(f"Fehler beim Senden der Daten: {response.status_code}, {response.text}") return success def solve_captcha_flow(driver): """Versucht höchstens MAX_CAPTCHA_ATTEMPTS-mal, bis eine Statusseite erreicht wird.""" for attempt in range(1, MAX_CAPTCHA_ATTEMPTS + 1): print(f"[Attempt {attempt}/{MAX_CAPTCHA_ATTEMPTS}] Löse Audio-Captcha …") transcription_successful = process_captcha_page_with_audio_captcha(driver) if not transcription_successful: print("Transkription fehlgeschlagen, lade Captcha-Seite neu …") driver.get(start_url) continue html = driver.page_source if is_status_page(html): status_raw, last_updated = parse_status_page(html) status = normalize_status_text(status_raw) print(f"Status ermittelt: {status} (Stand: {last_updated})") return status, last_updated if attempt == MAX_CAPTCHA_ATTEMPTS: raise RuntimeError( "Maximale Anzahl an Captcha-Versuchen erreicht, ohne Statusseite zu erhalten." ) if not is_captcha_page(html): raise RuntimeError("Weder Status- noch Captcha-Seite erkannt. Abbruch.") print("Captcha nicht gelöst, versuche es erneut …") driver.get(start_url) raise RuntimeError("Status konnte nicht ermittelt werden.") def main(): driver = _get_driver() try: driver.get(start_url) status, last_updated = solve_captcha_flow(driver) print(f"Final Status: {status}, Timestamp: {last_updated}") if not notify_webhook(status, last_updated, WEBHOOK_URL): raise RuntimeError("Webhook konnte nicht benachrichtigt werden.") finally: driver.quit() if __name__ == "__main__": #test_transcription() # test_parse_status_page() main()