Some checks failed
Python tests / tests (push) Failing after 8s
in order to allow for better efficiency and consistency
285 lines
9.0 KiB
Python
285 lines
9.0 KiB
Python
from pathlib import Path
|
|
from string import punctuation
|
|
import time
|
|
from datetime import datetime
|
|
from urllib.parse import urlparse
|
|
import re
|
|
|
|
import requests
|
|
from selenium import webdriver
|
|
from selenium.webdriver.common.by import By
|
|
|
|
from bs4 import BeautifulSoup
|
|
|
|
try:
|
|
from settings import DOCUMENT_ID, WEBHOOK_URL, MODEL_NAME
|
|
except ImportError:
|
|
print("settings.py nicht gefunden, verwende settings_example.py")
|
|
from settings_example import DOCUMENT_ID, WEBHOOK_URL, MODEL_NAME
|
|
|
|
from transcription import transcribe_audio_with_whisper
|
|
|
|
MAX_CAPTCHA_ATTEMPTS = 3
|
|
USE_HEADLESS_MODE = False
|
|
|
|
start_url = f"https://olmera.verwalt-berlin.de/std/olav/antrag/passpa/1?d={DOCUMENT_ID}"
|
|
# start_url = "http://127.0.0.1:8000/sample_1.html"
|
|
download_dir = Path("./audio_captchas").resolve()
|
|
|
|
# Not yet really used, just for reference
|
|
statuses = {
|
|
"in Produktion": "Das Dokument ist noch in Produktion.",
|
|
"abholbereit (Reisepass)": "Ihr Reisepass liegt zur Abholung bereit.",
|
|
"abholbereit (Personalausweis)": "Ihr Personalausweis ist in der Ausweisbehörde eingetroffen. Das Dokument kann abgeholt werden.",
|
|
}
|
|
|
|
|
|
def _extract_mp3_filename(url):
|
|
"""Extrahiert den Dateinamen (z.B. '1770057062035.mp3') aus einer URL."""
|
|
parsed_url = urlparse(url)
|
|
filename = parsed_url.path.split("/")[-1]
|
|
return filename
|
|
|
|
|
|
def _get_driver():
|
|
options = webdriver.FirefoxOptions()
|
|
options.set_preference("intl.accept_languages", "de-DE, de")
|
|
options.set_preference("browser.download.useDownloadDir", True)
|
|
options.set_preference("browser.download.folderList", 2)
|
|
options.set_preference("browser.download.dir", str(download_dir))
|
|
|
|
if USE_HEADLESS_MODE:
|
|
options.add_argument("--headless")
|
|
|
|
driver = webdriver.Firefox(options=options)
|
|
return driver
|
|
|
|
|
|
# Try the audio_captcha way
|
|
def process_captcha_page_with_audio_captcha(driver):
|
|
|
|
# Download audio captcha
|
|
driver.find_element(By.ID, "captcha").find_element(By.TAG_NAME, "img").screenshot(
|
|
"captcha.png"
|
|
)
|
|
audio_captcha = (
|
|
driver.find_element(By.ID, "captcha")
|
|
.find_element(By.CLASS_NAME, "audioCaptcha")
|
|
.find_element(By.TAG_NAME, "a")
|
|
.get_attribute("href")
|
|
)
|
|
print(audio_captcha)
|
|
driver.find_element(By.ID, "captcha").find_element(
|
|
By.CLASS_NAME, "audioCaptcha"
|
|
).find_element(By.TAG_NAME, "a").click()
|
|
|
|
print(f"Download dir: {str(download_dir)}")
|
|
mp3_filename = _extract_mp3_filename(audio_captcha)
|
|
print(f"Extracted MP3 filename: {mp3_filename}")
|
|
|
|
print("Warte auf den Download der Audiodatei...")
|
|
time.sleep(2) # Warte 2 Sekunden auf den Download (anpassen je nach Bedarf)
|
|
|
|
print(f"Lokaler Pfad der Audiodatei: {download_dir / mp3_filename}")
|
|
|
|
# perform transcription
|
|
transcribed_text = transcribe_audio_with_whisper(download_dir / mp3_filename)
|
|
print(f"Transkribierter Text: {transcribed_text}")
|
|
|
|
if len(transcribed_text) != 4:
|
|
print("Transkription hat nicht die erwartete Länge von 4 Zeichen. Abbruch.")
|
|
return False
|
|
|
|
# Fill in the form
|
|
driver.find_element(By.CLASS_NAME, "formtable").find_element(
|
|
By.ID, "code"
|
|
).send_keys(transcribed_text)
|
|
|
|
# Submit the form
|
|
driver.find_element(By.CLASS_NAME, "actionbuttons").find_element(
|
|
By.CLASS_NAME, "button"
|
|
).click()
|
|
|
|
return True
|
|
|
|
|
|
def normalize_status_text(raw_text: str):
|
|
# Remove extra whitespace
|
|
text = re.sub(r"\s+", " ", raw_text).strip()
|
|
|
|
# Normalize specific status texts
|
|
if text == statuses["in Produktion"]:
|
|
text = "noch in Produktion"
|
|
|
|
return text
|
|
|
|
|
|
# I tried the OCR way first, but was not successful.
|
|
def screenshot_captcha(save_dir="./captchas", prefix="captcha"):
|
|
driver = _get_driver()
|
|
driver.get(start_url)
|
|
|
|
# Erzeuge einen eindeutigen Dateinamen mit Zeitstempel
|
|
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
|
|
filename = f"{save_dir}/{prefix}_{timestamp}.png"
|
|
|
|
driver.find_element(By.ID, "captcha").find_element(By.TAG_NAME, "img").screenshot(
|
|
filename
|
|
)
|
|
driver.quit()
|
|
|
|
print(f"Captcha gespeichert als {filename}")
|
|
|
|
|
|
def test_transcription():
|
|
"""
|
|
Processes all MP3 files in the specified download directory, transcribes each using Whisper,
|
|
and writes the results to a CSV file. Calculates and prints the accuracy based on the number
|
|
of transcriptions with exactly 4 characters.
|
|
|
|
The CSV file 'transcription_results.csv' will contain two columns: 'filename' and 'transcription'.
|
|
|
|
Returns:
|
|
None
|
|
"""
|
|
|
|
accuracy = 0
|
|
total = 0
|
|
|
|
# Load Whisper model once for efficiency
|
|
import whisper
|
|
model = whisper.load_model(MODEL_NAME)
|
|
|
|
# create or overwrite CSV file for transcription results
|
|
with open("transcription_results.csv", "w") as csvfile:
|
|
csvfile.write("filename,transcription\n")
|
|
|
|
for mp3_path in download_dir.glob("*.mp3"):
|
|
print(f"Verarbeite Datei: {mp3_path}")
|
|
transcription = transcribe_audio_with_whisper(mp3_path, model=model)
|
|
|
|
print(f"Transkription: {transcription}")
|
|
|
|
csvfile.write(f"{mp3_path.name},{transcription}\n")
|
|
|
|
if transcription and len(transcription) == 4:
|
|
accuracy += 1
|
|
|
|
total += 1
|
|
|
|
print(f"Genauigkeit: {accuracy}/{total} = {accuracy/total*100:.2f}%")
|
|
|
|
|
|
def is_captcha_page(html):
|
|
soup = BeautifulSoup(html, "html.parser")
|
|
fieldset = soup.find("fieldset", attrs={"aria-label": "CAPTCHA"})
|
|
|
|
return fieldset is not None
|
|
|
|
|
|
def is_status_page(html):
|
|
soup = BeautifulSoup(html, "html.parser")
|
|
fieldset = soup.find("fieldset", attrs={"aria-label": "Statusabfrage Suchergebnis"})
|
|
|
|
return fieldset is not None
|
|
|
|
|
|
def parse_status_page(html):
|
|
soup = BeautifulSoup(html, "html.parser")
|
|
|
|
box = soup.find("fieldset", attrs={"aria-label": "Statusabfrage Suchergebnis"})
|
|
if box is None:
|
|
raise ValueError("Statusbereich nicht gefunden (fehlendes Fieldset).")
|
|
|
|
warn = box.select_one(".warn p") or box.select_one(".info p")
|
|
if warn is None:
|
|
raise ValueError("Statusabschnitt (.warn p oder .info p) nicht gefunden.")
|
|
|
|
status_text = warn.get_text(strip=True)
|
|
|
|
timestamp_node = warn.find_next("p")
|
|
if timestamp_node is None:
|
|
raise ValueError("Zeitstempelabschnitt (nächstes <p>) nicht gefunden.")
|
|
|
|
timestamp_raw = timestamp_node.get_text(strip=True).replace("Stand: ", "")
|
|
timestamp_dt = datetime.strptime(timestamp_raw, "%d.%m.%Y %H:%M:%S")
|
|
|
|
print("Status:", status_text)
|
|
print("Stand :", timestamp_dt.isoformat())
|
|
|
|
return status_text, timestamp_dt
|
|
|
|
|
|
def test_parse_status_page():
|
|
with open("./sample_html/status.html", "r") as f:
|
|
html = f.read()
|
|
is_status = is_status_page(html)
|
|
print(f"is_status_page: {is_status}")
|
|
status, timestamp = parse_status_page(html)
|
|
status = normalize_status_text(status)
|
|
print(f"Status: {status}, Timestamp: {timestamp}")
|
|
|
|
|
|
def notify_webhook(status, last_updated, webhook_url):
|
|
data = {"status": status, "last_updated": last_updated.isoformat()}
|
|
response = requests.post(webhook_url, json=data)
|
|
success = response.status_code == 200
|
|
|
|
if success:
|
|
print("Daten erfolgreich gesendet.")
|
|
else:
|
|
print(f"Fehler beim Senden der Daten: {response.status_code}, {response.text}")
|
|
|
|
return success
|
|
|
|
|
|
def solve_captcha_flow(driver):
|
|
"""Versucht höchstens MAX_CAPTCHA_ATTEMPTS-mal, bis eine Statusseite erreicht wird."""
|
|
for attempt in range(1, MAX_CAPTCHA_ATTEMPTS + 1):
|
|
print(f"[Attempt {attempt}/{MAX_CAPTCHA_ATTEMPTS}] Löse Audio-Captcha …")
|
|
transcription_successful = process_captcha_page_with_audio_captcha(driver)
|
|
|
|
if not transcription_successful:
|
|
print("Transkription fehlgeschlagen, lade Captcha-Seite neu …")
|
|
driver.get(start_url)
|
|
continue
|
|
|
|
html = driver.page_source
|
|
if is_status_page(html):
|
|
status_raw, last_updated = parse_status_page(html)
|
|
status = normalize_status_text(status_raw)
|
|
print(f"Status ermittelt: {status} (Stand: {last_updated})")
|
|
return status, last_updated
|
|
|
|
if attempt == MAX_CAPTCHA_ATTEMPTS:
|
|
raise RuntimeError(
|
|
"Maximale Anzahl an Captcha-Versuchen erreicht, ohne Statusseite zu erhalten."
|
|
)
|
|
|
|
if not is_captcha_page(html):
|
|
raise RuntimeError("Weder Status- noch Captcha-Seite erkannt. Abbruch.")
|
|
|
|
print("Captcha nicht gelöst, versuche es erneut …")
|
|
driver.get(start_url)
|
|
|
|
raise RuntimeError("Status konnte nicht ermittelt werden.")
|
|
|
|
|
|
def main():
|
|
driver = _get_driver()
|
|
try:
|
|
driver.get(start_url)
|
|
status, last_updated = solve_captcha_flow(driver)
|
|
|
|
print(f"Final Status: {status}, Timestamp: {last_updated}")
|
|
if not notify_webhook(status, last_updated, WEBHOOK_URL):
|
|
raise RuntimeError("Webhook konnte nicht benachrichtigt werden.")
|
|
finally:
|
|
driver.quit()
|
|
|
|
|
|
if __name__ == "__main__":
|
|
#test_transcription()
|
|
# test_parse_status_page()
|
|
main()
|