add working implementation
This commit is contained in:
1
.gitignore
vendored
1
.gitignore
vendored
@@ -188,3 +188,4 @@ cython_debug/
|
||||
# Built Visual Studio Code Extensions
|
||||
*.vsix
|
||||
|
||||
settings.py
|
||||
|
||||
261
main.py
Normal file
261
main.py
Normal file
@@ -0,0 +1,261 @@
|
||||
from pathlib import Path
|
||||
from string import punctuation
|
||||
import time
|
||||
from datetime import datetime
|
||||
from urllib.parse import urlparse
|
||||
|
||||
import requests
|
||||
from selenium import webdriver
|
||||
from selenium.webdriver.common.by import By
|
||||
|
||||
from bs4 import BeautifulSoup
|
||||
|
||||
from settings import DOCUMENT_ID, WEBHOOK_URL
|
||||
|
||||
from transcription import transcribe_audio_with_whisper
|
||||
|
||||
MAX_CAPTCHA_ATTEMPTS = 3
|
||||
USE_HEADLESS_MODE = False
|
||||
|
||||
start_url = f"https://olmera.verwalt-berlin.de/std/olav/antrag/passpa/1?d={DOCUMENT_ID}"
|
||||
# start_url = "http://127.0.0.1:8000/sample_1.html"
|
||||
download_dir = Path("./audio_captchas").resolve()
|
||||
|
||||
|
||||
def _extract_mp3_filename(url):
|
||||
"""Extrahiert den Dateinamen (z.B. '1770057062035.mp3') aus einer URL."""
|
||||
parsed_url = urlparse(url)
|
||||
filename = parsed_url.path.split("/")[-1]
|
||||
return filename
|
||||
|
||||
|
||||
def _get_driver():
|
||||
options = webdriver.FirefoxOptions()
|
||||
options.set_preference("intl.accept_languages", "de-DE, de")
|
||||
options.set_preference("browser.download.useDownloadDir", True)
|
||||
options.set_preference("browser.download.folderList", 2)
|
||||
options.set_preference("browser.download.dir", str(download_dir))
|
||||
|
||||
if USE_HEADLESS_MODE:
|
||||
options.add_argument("--headless")
|
||||
|
||||
driver = webdriver.Firefox(options=options)
|
||||
return driver
|
||||
|
||||
|
||||
# Try the audio_captcha way
|
||||
def process_captcha_page_with_audio_captcha(driver):
|
||||
|
||||
# Download audio captcha
|
||||
driver.find_element(By.ID, "captcha").find_element(By.TAG_NAME, "img").screenshot(
|
||||
"captcha.png"
|
||||
)
|
||||
audio_captcha = (
|
||||
driver.find_element(By.ID, "captcha")
|
||||
.find_element(By.CLASS_NAME, "audioCaptcha")
|
||||
.find_element(By.TAG_NAME, "a")
|
||||
.get_attribute("href")
|
||||
)
|
||||
print(audio_captcha)
|
||||
driver.find_element(By.ID, "captcha").find_element(
|
||||
By.CLASS_NAME, "audioCaptcha"
|
||||
).find_element(By.TAG_NAME, "a").click()
|
||||
|
||||
print(f"Download dir: {str(download_dir)}")
|
||||
mp3_filename = _extract_mp3_filename(audio_captcha)
|
||||
print(f"Extracted MP3 filename: {mp3_filename}")
|
||||
|
||||
print("Warte auf den Download der Audiodatei...")
|
||||
time.sleep(2) # Warte 2 Sekunden auf den Download (anpassen je nach Bedarf)
|
||||
|
||||
print(f"Lokaler Pfad der Audiodatei: {download_dir / mp3_filename}")
|
||||
|
||||
# perform transcription
|
||||
transcribed_text = transcribe_audio_with_whisper(download_dir / mp3_filename)
|
||||
print(f"Transkribierter Text: {transcribed_text}")
|
||||
|
||||
if len(transcribed_text) != 4:
|
||||
print("Transkription hat nicht die erwartete Länge von 4 Zeichen. Abbruch.")
|
||||
return False
|
||||
|
||||
# Fill in the form
|
||||
driver.find_element(By.CLASS_NAME, "formtable").find_element(
|
||||
By.ID, "code"
|
||||
).send_keys(transcribed_text)
|
||||
|
||||
# Submit the form
|
||||
driver.find_element(By.CLASS_NAME, "actionbuttons").find_element(
|
||||
By.CLASS_NAME, "button"
|
||||
).click()
|
||||
|
||||
return True
|
||||
|
||||
|
||||
def normalize_status_text(raw_text: str):
|
||||
return raw_text.replace("Das Dokument ist", "").strip(punctuation).strip()
|
||||
|
||||
|
||||
# I tried the OCR way first, but was not successful.
|
||||
def screenshot_captcha(save_dir="./captchas", prefix="captcha"):
|
||||
driver = _get_driver()
|
||||
driver.get(start_url)
|
||||
|
||||
# Erzeuge einen eindeutigen Dateinamen mit Zeitstempel
|
||||
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
|
||||
filename = f"{save_dir}/{prefix}_{timestamp}.png"
|
||||
|
||||
driver.find_element(By.ID, "captcha").find_element(By.TAG_NAME, "img").screenshot(
|
||||
filename
|
||||
)
|
||||
driver.quit()
|
||||
|
||||
print(f"Captcha gespeichert als {filename}")
|
||||
|
||||
|
||||
def test_transcription():
|
||||
"""
|
||||
Processes all MP3 files in the specified download directory, transcribes each using Whisper,
|
||||
and writes the results to a CSV file. Calculates and prints the accuracy based on the number
|
||||
of transcriptions with exactly 4 characters.
|
||||
|
||||
The CSV file 'transcription_results.csv' will contain two columns: 'filename' and 'transcription'.
|
||||
|
||||
Returns:
|
||||
None
|
||||
"""
|
||||
|
||||
accuracy = 0
|
||||
total = 0
|
||||
|
||||
# create or overwrite CSV file for transcription results
|
||||
with open("transcription_results.csv", "w") as csvfile:
|
||||
csvfile.write("filename,transcription\n")
|
||||
|
||||
for mp3_path in download_dir.glob("*.mp3"):
|
||||
print(f"Verarbeite Datei: {mp3_path}")
|
||||
transcription = transcribe_audio_with_whisper(mp3_path)
|
||||
|
||||
print(f"Transkription: {transcription}")
|
||||
|
||||
csvfile.write(f"{mp3_path.name},{transcription}\n")
|
||||
|
||||
if transcription and len(transcription) == 4:
|
||||
accuracy += 1
|
||||
|
||||
total += 1
|
||||
|
||||
print(f"Genauigkeit: {accuracy}/{total} = {accuracy/total*100:.2f}%")
|
||||
|
||||
|
||||
def is_captcha_page(html):
|
||||
soup = BeautifulSoup(html, "html.parser")
|
||||
fieldset = soup.find("fieldset", attrs={"aria-label": "CAPTCHA"})
|
||||
|
||||
return fieldset is not None
|
||||
|
||||
|
||||
def is_status_page(html):
|
||||
soup = BeautifulSoup(html, "html.parser")
|
||||
fieldset = soup.find("fieldset", attrs={"aria-label": "Statusabfrage Suchergebnis"})
|
||||
|
||||
return fieldset is not None
|
||||
|
||||
|
||||
def parse_status_page(html):
|
||||
soup = BeautifulSoup(html, "html.parser")
|
||||
|
||||
box = soup.find("fieldset", attrs={"aria-label": "Statusabfrage Suchergebnis"})
|
||||
if box is None:
|
||||
raise ValueError("Statusbereich nicht gefunden (fehlendes Fieldset).")
|
||||
|
||||
warn = box.select_one(".warn p")
|
||||
if warn is None:
|
||||
raise ValueError("Statusabschnitt (.warn p) nicht gefunden.")
|
||||
|
||||
status_text = warn.get_text(strip=True)
|
||||
|
||||
timestamp_node = warn.find_next("p")
|
||||
if timestamp_node is None:
|
||||
raise ValueError("Zeitstempelabschnitt (nächstes <p>) nicht gefunden.")
|
||||
|
||||
timestamp_raw = timestamp_node.get_text(strip=True).replace("Stand: ", "")
|
||||
timestamp_dt = datetime.strptime(timestamp_raw, "%d.%m.%Y %H:%M:%S")
|
||||
|
||||
print("Status:", status_text)
|
||||
print("Stand :", timestamp_dt.isoformat())
|
||||
|
||||
return status_text, timestamp_dt
|
||||
|
||||
|
||||
def test_parse_status_page():
|
||||
with open("./sample_html/status.html", "r") as f:
|
||||
html = f.read()
|
||||
is_status = is_status_page(html)
|
||||
print(f"is_status_page: {is_status}")
|
||||
status, timestamp = parse_status_page(html)
|
||||
status = normalize_status_text(status)
|
||||
print(f"Status: {status}, Timestamp: {timestamp}")
|
||||
|
||||
|
||||
def notify_webhook(status, last_updated, webhook_url):
|
||||
data = {"status": status, "last_updated": last_updated.isoformat()}
|
||||
response = requests.post(webhook_url, json=data)
|
||||
success = response.status_code == 200
|
||||
|
||||
if success:
|
||||
print("Daten erfolgreich gesendet.")
|
||||
else:
|
||||
print(f"Fehler beim Senden der Daten: {response.status_code}, {response.text}")
|
||||
|
||||
return success
|
||||
|
||||
|
||||
def solve_captcha_flow(driver):
|
||||
"""Versucht höchstens MAX_CAPTCHA_ATTEMPTS-mal, bis eine Statusseite erreicht wird."""
|
||||
for attempt in range(1, MAX_CAPTCHA_ATTEMPTS + 1):
|
||||
print(f"[Attempt {attempt}/{MAX_CAPTCHA_ATTEMPTS}] Löse Audio-Captcha …")
|
||||
transcription_successful = process_captcha_page_with_audio_captcha(driver)
|
||||
|
||||
if not transcription_successful:
|
||||
print("Transkription fehlgeschlagen, lade Captcha-Seite neu …")
|
||||
driver.get(start_url)
|
||||
continue
|
||||
|
||||
html = driver.page_source
|
||||
if is_status_page(html):
|
||||
status_raw, last_updated = parse_status_page(html)
|
||||
status = normalize_status_text(status_raw)
|
||||
print(f"Status ermittelt: {status} (Stand: {last_updated})")
|
||||
return status, last_updated
|
||||
|
||||
if attempt == MAX_CAPTCHA_ATTEMPTS:
|
||||
raise RuntimeError(
|
||||
"Maximale Anzahl an Captcha-Versuchen erreicht, ohne Statusseite zu erhalten."
|
||||
)
|
||||
|
||||
if not is_captcha_page(html):
|
||||
raise RuntimeError("Weder Status- noch Captcha-Seite erkannt. Abbruch.")
|
||||
|
||||
print("Captcha nicht gelöst, versuche es erneut …")
|
||||
driver.get(start_url)
|
||||
|
||||
raise RuntimeError("Status konnte nicht ermittelt werden.")
|
||||
|
||||
|
||||
def main():
|
||||
driver = _get_driver()
|
||||
try:
|
||||
driver.get(start_url)
|
||||
status, last_updated = solve_captcha_flow(driver)
|
||||
|
||||
print(f"Final Status: {status}, Timestamp: {last_updated}")
|
||||
if not notify_webhook(status, last_updated, WEBHOOK_URL):
|
||||
raise RuntimeError("Webhook konnte nicht benachrichtigt werden.")
|
||||
finally:
|
||||
driver.quit()
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
#test_transcription()
|
||||
# test_parse_status_page()
|
||||
main()
|
||||
19
requirements.txt
Normal file
19
requirements.txt
Normal file
@@ -0,0 +1,19 @@
|
||||
# data acquisition and web scraping
|
||||
selenium
|
||||
beautifulsoup4
|
||||
|
||||
# audio processing
|
||||
openai-whisper
|
||||
|
||||
# traditional OCR
|
||||
pillow
|
||||
pytesseract
|
||||
opencv-python
|
||||
tqdm
|
||||
streamlit
|
||||
python-Levenshtein
|
||||
|
||||
# ocr with keras/tensorflow
|
||||
tensorflow
|
||||
keras
|
||||
matplotlib
|
||||
5
settings.example.py
Normal file
5
settings.example.py
Normal file
@@ -0,0 +1,5 @@
|
||||
# required
|
||||
DOCUMENT_ID = ""
|
||||
|
||||
# optional: Webhook URL to send notifications to
|
||||
#WEBHOOK_URL = "https://example.com/webhook/your_webhook_id"
|
||||
72
transcription.py
Normal file
72
transcription.py
Normal file
@@ -0,0 +1,72 @@
|
||||
import re
|
||||
|
||||
# Mapping von gesprochenen Buchstaben (deutsch) auf Zeichen
|
||||
SPOKEN_TO_CHAR = {
|
||||
"a": "a", "ah": "a",
|
||||
"be": "b", "bee": "b", "bei": "b",
|
||||
"ce": "c", "see": "c", "ze": "c", "cheet": "c", "ci": "c",
|
||||
"de": "d", "dee": "d",
|
||||
"e": "e", "eh": "e",
|
||||
"ef": "f", "eff": "f",
|
||||
"ge": "g", "geh": "g",
|
||||
"ha": "h", "hah": "h",
|
||||
"i": "i", "ih": "i",
|
||||
"jot": "j", "jay": "j", "yacht": "j", "jöt": "j",
|
||||
"ka": "k", "kah": "k", "kar": "k", "car": "k",
|
||||
"el": "l", "ell": "l",
|
||||
"em": "m", "emm": "m",
|
||||
"en": "n", "enn": "n",
|
||||
"o": "o", "oh": "o",
|
||||
"pe": "p", "peh": "p", "pi": "p", "pee": "p",
|
||||
"ku": "q", "kuh": "q", "queue": "q", "coup": "q",
|
||||
"er": "r", "err": "r",
|
||||
"es": "s", "ess": "s",
|
||||
"te": "t", "teh": "t", "ti": "t",
|
||||
"u": "u", "uh": "u",
|
||||
"vau": "v", "fau": "v", "faul": "v",
|
||||
"we": "w", "weh": "w",
|
||||
"ix": "x", "iks": "x",
|
||||
"ypsilon": "y", "üpsilon": "y",
|
||||
"zet": "z", "zett": "z", "set": "z", "fett": "z", "sedt": "z",
|
||||
# Zahlen
|
||||
"null": "0", "zero": "0",
|
||||
"eins": "1", "one": "1",
|
||||
"zwei": "2", "two": "2", "zwo": "2", "svi": "2", "svay": "2", "swei": "2",
|
||||
"drei": "3", "three": "3",
|
||||
"vier": "4", "four": "4", "fia": "4","fiar": "4", "sier": "4", "fier": "4",
|
||||
"fünf": "5", "five": "5", "fönz": "5", "fünfs": "5", "fins": "5",
|
||||
"sechs": "6", "six": "6",
|
||||
"sieben": "7", "seven": "7", "zieben": "7", "riben": "7",
|
||||
"acht": "8", "eight": "8",
|
||||
"neun": "9", "nine": "9", "noin": "9",
|
||||
}
|
||||
|
||||
def _normalize_transcription(raw_text):
|
||||
"""Wandelt eine Whisper-Transkription in die tatsächlichen Captcha-Zeichen um."""
|
||||
# Entferne Satzzeichen und splitte in Tokens
|
||||
tokens = re.split(r'[,.\s]+', raw_text.lower().strip())
|
||||
result = []
|
||||
for token in tokens:
|
||||
if not token:
|
||||
continue
|
||||
# Prüfe, ob das Token ein bekanntes gesprochenes Wort ist
|
||||
if token in SPOKEN_TO_CHAR:
|
||||
result.append(SPOKEN_TO_CHAR[token])
|
||||
# Falls es ein einzelnes Zeichen ist (a-z, 0-9), direkt übernehmen
|
||||
elif len(token) == 1 and token.isalnum():
|
||||
result.append(token)
|
||||
# Sonst ignorieren oder loggen
|
||||
else:
|
||||
print(f"Unbekanntes Token: '{token}'")
|
||||
return ''.join(result)
|
||||
|
||||
def transcribe_audio_with_whisper(mp3_path):
|
||||
import whisper
|
||||
model = whisper.load_model("small")
|
||||
result = model.transcribe(str(mp3_path), language='de')
|
||||
raw_text = result["text"]
|
||||
print("Raw transcription:", raw_text)
|
||||
|
||||
cleaned = _normalize_transcription(raw_text)
|
||||
print("Cleaned transcription:", cleaned)
|
||||
return cleaned
|
||||
Reference in New Issue
Block a user