diff --git a/.gitignore b/.gitignore index 555e90e..62424dd 100644 --- a/.gitignore +++ b/.gitignore @@ -188,3 +188,4 @@ cython_debug/ # Built Visual Studio Code Extensions *.vsix +settings.py diff --git a/main.py b/main.py new file mode 100644 index 0000000..200b08f --- /dev/null +++ b/main.py @@ -0,0 +1,261 @@ +from pathlib import Path +from string import punctuation +import time +from datetime import datetime +from urllib.parse import urlparse + +import requests +from selenium import webdriver +from selenium.webdriver.common.by import By + +from bs4 import BeautifulSoup + +from settings import DOCUMENT_ID, WEBHOOK_URL + +from transcription import transcribe_audio_with_whisper + +MAX_CAPTCHA_ATTEMPTS = 3 +USE_HEADLESS_MODE = False + +start_url = f"https://olmera.verwalt-berlin.de/std/olav/antrag/passpa/1?d={DOCUMENT_ID}" +# start_url = "http://127.0.0.1:8000/sample_1.html" +download_dir = Path("./audio_captchas").resolve() + + +def _extract_mp3_filename(url): + """Extrahiert den Dateinamen (z.B. '1770057062035.mp3') aus einer URL.""" + parsed_url = urlparse(url) + filename = parsed_url.path.split("/")[-1] + return filename + + +def _get_driver(): + options = webdriver.FirefoxOptions() + options.set_preference("intl.accept_languages", "de-DE, de") + options.set_preference("browser.download.useDownloadDir", True) + options.set_preference("browser.download.folderList", 2) + options.set_preference("browser.download.dir", str(download_dir)) + + if USE_HEADLESS_MODE: + options.add_argument("--headless") + + driver = webdriver.Firefox(options=options) + return driver + + +# Try the audio_captcha way +def process_captcha_page_with_audio_captcha(driver): + + # Download audio captcha + driver.find_element(By.ID, "captcha").find_element(By.TAG_NAME, "img").screenshot( + "captcha.png" + ) + audio_captcha = ( + driver.find_element(By.ID, "captcha") + .find_element(By.CLASS_NAME, "audioCaptcha") + .find_element(By.TAG_NAME, "a") + .get_attribute("href") + ) + print(audio_captcha) + driver.find_element(By.ID, "captcha").find_element( + By.CLASS_NAME, "audioCaptcha" + ).find_element(By.TAG_NAME, "a").click() + + print(f"Download dir: {str(download_dir)}") + mp3_filename = _extract_mp3_filename(audio_captcha) + print(f"Extracted MP3 filename: {mp3_filename}") + + print("Warte auf den Download der Audiodatei...") + time.sleep(2) # Warte 2 Sekunden auf den Download (anpassen je nach Bedarf) + + print(f"Lokaler Pfad der Audiodatei: {download_dir / mp3_filename}") + + # perform transcription + transcribed_text = transcribe_audio_with_whisper(download_dir / mp3_filename) + print(f"Transkribierter Text: {transcribed_text}") + + if len(transcribed_text) != 4: + print("Transkription hat nicht die erwartete Länge von 4 Zeichen. Abbruch.") + return False + + # Fill in the form + driver.find_element(By.CLASS_NAME, "formtable").find_element( + By.ID, "code" + ).send_keys(transcribed_text) + + # Submit the form + driver.find_element(By.CLASS_NAME, "actionbuttons").find_element( + By.CLASS_NAME, "button" + ).click() + + return True + + +def normalize_status_text(raw_text: str): + return raw_text.replace("Das Dokument ist", "").strip(punctuation).strip() + + +# I tried the OCR way first, but was not successful. +def screenshot_captcha(save_dir="./captchas", prefix="captcha"): + driver = _get_driver() + driver.get(start_url) + + # Erzeuge einen eindeutigen Dateinamen mit Zeitstempel + timestamp = datetime.now().strftime("%Y%m%d_%H%M%S") + filename = f"{save_dir}/{prefix}_{timestamp}.png" + + driver.find_element(By.ID, "captcha").find_element(By.TAG_NAME, "img").screenshot( + filename + ) + driver.quit() + + print(f"Captcha gespeichert als {filename}") + + +def test_transcription(): + """ + Processes all MP3 files in the specified download directory, transcribes each using Whisper, + and writes the results to a CSV file. Calculates and prints the accuracy based on the number + of transcriptions with exactly 4 characters. + + The CSV file 'transcription_results.csv' will contain two columns: 'filename' and 'transcription'. + + Returns: + None + """ + + accuracy = 0 + total = 0 + + # create or overwrite CSV file for transcription results + with open("transcription_results.csv", "w") as csvfile: + csvfile.write("filename,transcription\n") + + for mp3_path in download_dir.glob("*.mp3"): + print(f"Verarbeite Datei: {mp3_path}") + transcription = transcribe_audio_with_whisper(mp3_path) + + print(f"Transkription: {transcription}") + + csvfile.write(f"{mp3_path.name},{transcription}\n") + + if transcription and len(transcription) == 4: + accuracy += 1 + + total += 1 + + print(f"Genauigkeit: {accuracy}/{total} = {accuracy/total*100:.2f}%") + + +def is_captcha_page(html): + soup = BeautifulSoup(html, "html.parser") + fieldset = soup.find("fieldset", attrs={"aria-label": "CAPTCHA"}) + + return fieldset is not None + + +def is_status_page(html): + soup = BeautifulSoup(html, "html.parser") + fieldset = soup.find("fieldset", attrs={"aria-label": "Statusabfrage Suchergebnis"}) + + return fieldset is not None + + +def parse_status_page(html): + soup = BeautifulSoup(html, "html.parser") + + box = soup.find("fieldset", attrs={"aria-label": "Statusabfrage Suchergebnis"}) + if box is None: + raise ValueError("Statusbereich nicht gefunden (fehlendes Fieldset).") + + warn = box.select_one(".warn p") + if warn is None: + raise ValueError("Statusabschnitt (.warn p) nicht gefunden.") + + status_text = warn.get_text(strip=True) + + timestamp_node = warn.find_next("p") + if timestamp_node is None: + raise ValueError("Zeitstempelabschnitt (nächstes
) nicht gefunden.") + + timestamp_raw = timestamp_node.get_text(strip=True).replace("Stand: ", "") + timestamp_dt = datetime.strptime(timestamp_raw, "%d.%m.%Y %H:%M:%S") + + print("Status:", status_text) + print("Stand :", timestamp_dt.isoformat()) + + return status_text, timestamp_dt + + +def test_parse_status_page(): + with open("./sample_html/status.html", "r") as f: + html = f.read() + is_status = is_status_page(html) + print(f"is_status_page: {is_status}") + status, timestamp = parse_status_page(html) + status = normalize_status_text(status) + print(f"Status: {status}, Timestamp: {timestamp}") + + +def notify_webhook(status, last_updated, webhook_url): + data = {"status": status, "last_updated": last_updated.isoformat()} + response = requests.post(webhook_url, json=data) + success = response.status_code == 200 + + if success: + print("Daten erfolgreich gesendet.") + else: + print(f"Fehler beim Senden der Daten: {response.status_code}, {response.text}") + + return success + + +def solve_captcha_flow(driver): + """Versucht höchstens MAX_CAPTCHA_ATTEMPTS-mal, bis eine Statusseite erreicht wird.""" + for attempt in range(1, MAX_CAPTCHA_ATTEMPTS + 1): + print(f"[Attempt {attempt}/{MAX_CAPTCHA_ATTEMPTS}] Löse Audio-Captcha …") + transcription_successful = process_captcha_page_with_audio_captcha(driver) + + if not transcription_successful: + print("Transkription fehlgeschlagen, lade Captcha-Seite neu …") + driver.get(start_url) + continue + + html = driver.page_source + if is_status_page(html): + status_raw, last_updated = parse_status_page(html) + status = normalize_status_text(status_raw) + print(f"Status ermittelt: {status} (Stand: {last_updated})") + return status, last_updated + + if attempt == MAX_CAPTCHA_ATTEMPTS: + raise RuntimeError( + "Maximale Anzahl an Captcha-Versuchen erreicht, ohne Statusseite zu erhalten." + ) + + if not is_captcha_page(html): + raise RuntimeError("Weder Status- noch Captcha-Seite erkannt. Abbruch.") + + print("Captcha nicht gelöst, versuche es erneut …") + driver.get(start_url) + + raise RuntimeError("Status konnte nicht ermittelt werden.") + + +def main(): + driver = _get_driver() + try: + driver.get(start_url) + status, last_updated = solve_captcha_flow(driver) + + print(f"Final Status: {status}, Timestamp: {last_updated}") + if not notify_webhook(status, last_updated, WEBHOOK_URL): + raise RuntimeError("Webhook konnte nicht benachrichtigt werden.") + finally: + driver.quit() + + +if __name__ == "__main__": + #test_transcription() + # test_parse_status_page() + main() diff --git a/requirements.txt b/requirements.txt new file mode 100644 index 0000000..2a7d88d --- /dev/null +++ b/requirements.txt @@ -0,0 +1,19 @@ +# data acquisition and web scraping +selenium +beautifulsoup4 + +# audio processing +openai-whisper + +# traditional OCR +pillow +pytesseract +opencv-python +tqdm +streamlit +python-Levenshtein + +# ocr with keras/tensorflow +tensorflow +keras +matplotlib \ No newline at end of file diff --git a/settings.example.py b/settings.example.py new file mode 100644 index 0000000..de2e9c4 --- /dev/null +++ b/settings.example.py @@ -0,0 +1,5 @@ +# required +DOCUMENT_ID = "" + +# optional: Webhook URL to send notifications to +#WEBHOOK_URL = "https://example.com/webhook/your_webhook_id" \ No newline at end of file diff --git a/transcription.py b/transcription.py new file mode 100644 index 0000000..e109336 --- /dev/null +++ b/transcription.py @@ -0,0 +1,72 @@ +import re + +# Mapping von gesprochenen Buchstaben (deutsch) auf Zeichen +SPOKEN_TO_CHAR = { + "a": "a", "ah": "a", + "be": "b", "bee": "b", "bei": "b", + "ce": "c", "see": "c", "ze": "c", "cheet": "c", "ci": "c", + "de": "d", "dee": "d", + "e": "e", "eh": "e", + "ef": "f", "eff": "f", + "ge": "g", "geh": "g", + "ha": "h", "hah": "h", + "i": "i", "ih": "i", + "jot": "j", "jay": "j", "yacht": "j", "jöt": "j", + "ka": "k", "kah": "k", "kar": "k", "car": "k", + "el": "l", "ell": "l", + "em": "m", "emm": "m", + "en": "n", "enn": "n", + "o": "o", "oh": "o", + "pe": "p", "peh": "p", "pi": "p", "pee": "p", + "ku": "q", "kuh": "q", "queue": "q", "coup": "q", + "er": "r", "err": "r", + "es": "s", "ess": "s", + "te": "t", "teh": "t", "ti": "t", + "u": "u", "uh": "u", + "vau": "v", "fau": "v", "faul": "v", + "we": "w", "weh": "w", + "ix": "x", "iks": "x", + "ypsilon": "y", "üpsilon": "y", + "zet": "z", "zett": "z", "set": "z", "fett": "z", "sedt": "z", + # Zahlen + "null": "0", "zero": "0", + "eins": "1", "one": "1", + "zwei": "2", "two": "2", "zwo": "2", "svi": "2", "svay": "2", "swei": "2", + "drei": "3", "three": "3", + "vier": "4", "four": "4", "fia": "4","fiar": "4", "sier": "4", "fier": "4", + "fünf": "5", "five": "5", "fönz": "5", "fünfs": "5", "fins": "5", + "sechs": "6", "six": "6", + "sieben": "7", "seven": "7", "zieben": "7", "riben": "7", + "acht": "8", "eight": "8", + "neun": "9", "nine": "9", "noin": "9", +} + +def _normalize_transcription(raw_text): + """Wandelt eine Whisper-Transkription in die tatsächlichen Captcha-Zeichen um.""" + # Entferne Satzzeichen und splitte in Tokens + tokens = re.split(r'[,.\s]+', raw_text.lower().strip()) + result = [] + for token in tokens: + if not token: + continue + # Prüfe, ob das Token ein bekanntes gesprochenes Wort ist + if token in SPOKEN_TO_CHAR: + result.append(SPOKEN_TO_CHAR[token]) + # Falls es ein einzelnes Zeichen ist (a-z, 0-9), direkt übernehmen + elif len(token) == 1 and token.isalnum(): + result.append(token) + # Sonst ignorieren oder loggen + else: + print(f"Unbekanntes Token: '{token}'") + return ''.join(result) + +def transcribe_audio_with_whisper(mp3_path): + import whisper + model = whisper.load_model("small") + result = model.transcribe(str(mp3_path), language='de') + raw_text = result["text"] + print("Raw transcription:", raw_text) + + cleaned = _normalize_transcription(raw_text) + print("Cleaned transcription:", cleaned) + return cleaned \ No newline at end of file