Files
grosse_haufen/scrape.py
2025-12-29 14:51:25 +01:00

322 lines
11 KiB
Python

import re
import time
import json
from dataclasses import dataclass, asdict
from typing import List, Optional
from urllib.parse import urljoin, urlparse, parse_qs
import requests
from bs4 import BeautifulSoup
THREAD_URL = "https://www.haustechnikdialog.de/Forum/t/19886/Grosse-Haufen"
HEADERS = {
"User-Agent": "Mozilla/5.0 (compatible; HTD-ThreadScraper/3.0)"
}
@dataclass
class Post:
page: int
author: str
timestamp: str
likes: int
post_id: str
text: str
TIME_RE = re.compile(r"^Zeit:\s*(\d{2}\.\d{2}\.\d{4}\s+\d{2}:\d{2}:\d{2})\s*$")
def fetch_html(url: str, session: requests.Session, timeout: int = 30) -> str:
r = session.get(url, headers=HEADERS, timeout=timeout)
r.raise_for_status()
return r.text
def detect_max_page(html: str, base_url: str) -> int:
soup = BeautifulSoup(html, "html.parser")
pages = set()
for a in soup.select('a[href*="page="]'):
href = a.get("href")
if not href:
continue
full = urljoin(base_url, href)
qs = parse_qs(urlparse(full).query)
for v in qs.get("page", []):
if v.isdigit():
pages.add(int(v))
return max(pages) if pages else 1
def page_url(base: str, page: int) -> str:
return base if page == 1 else f"{base}?page={page}"
def table_to_lines(table) -> List[str]:
"""
Konvertiert NUR den Tabelleninhalt in Zeilen.
Vorteil: Keine Footer/Nav/Sidebar-Texte.
"""
# get_text mit separator="\n" macht es viel stabiler als .text
txt = table.get_text("\n")
txt = txt.replace("\r", "\n")
# Whitespace normalisieren
txt = re.sub(r"[ \t]+", " ", txt)
txt = re.sub(r"\n{3,}", "\n\n", txt)
lines = [ln.strip() for ln in txt.split("\n")]
# Leere Zeilen nicht komplett entfernen, aber trimmen ist ok
return lines
def parse_posts_from_lines(lines: List[str], page_num: int) -> List[Post]:
"""
State-Machine auf Basis der bekannten 'Verfasser:' / 'Zeit:' Struktur,
aber NUR innerhalb table.tablebeitraege.
"""
posts: List[Post] = []
i = 0
seen_ids = set()
def skip_empty(idx: int) -> int:
while idx < len(lines) and lines[idx] == "":
idx += 1
return idx
while i < len(lines):
if lines[i] != "Verfasser:":
i += 1
continue
# author
i += 1
i = skip_empty(i)
if i >= len(lines):
break
author = lines[i].strip()
# Manche Autorenzeilen haben noch "Image: Registrierter..." daneben/drunter -> nur Namen nehmen.
# (Das ist heuristisch, aber in der Praxis gut.)
author = re.split(r"\s{2,}|Image:|Registrierter", author, maxsplit=1)[0].strip()
i += 1
# time line
i = skip_empty(i)
if i >= len(lines):
break
# Manche Seiten haben die Beschriftung "Zeit:" in einer eigenen Zelle/Zeile
# und die eigentliche Zeit steht in der nächsten Zeile. Behandle diesen Fall.
if lines[i].strip() == "Zeit:":
# nächste nicht-leere Zeile als Zeit verwenden
j = i + 1
while j < len(lines) and lines[j].strip() == "":
j += 1
if j >= len(lines):
i = j
continue
candidate_line = lines[j].strip()
mm = re.search(r"\d{1,2}\.\d{1,2}\.\d{2,4}[,]?\s*\d{1,2}:\d{2}(?::\d{2})?", candidate_line)
ts = mm.group(0) if mm else candidate_line
i = j + 1
else:
m = TIME_RE.match(lines[i])
if not m:
# falls Layout mal anders: versuche "Zeit:" irgendwo in der Zeile
if "Zeit:" in lines[i]:
candidate = lines[i].split("Zeit:", 1)[1].strip()
mm = re.search(r"\d{1,2}\.\d{1,2}\.\d{2,4}[,]?\s*\d{1,2}:\d{2}(?::\d{2})?", candidate)
ts = mm.group(0) if mm else candidate
else:
# Suche in dieser und den nächsten zwei Zeilen nach einem Datum
ts = ""
for j in range(i, min(i + 3, len(lines))):
mm = re.search(r"\d{1,2}\.\d{1,2}\.\d{2,4}[,]?\s*\d{1,2}:\d{2}(?::\d{2})?", lines[j])
if mm:
ts = mm.group(0)
break
if not ts:
i += 1
continue
else:
ts = m.group(1)
i += 1
# likes (nächste reine Zahl)
i = skip_empty(i)
while i < len(lines) and not lines[i].isdigit():
i += 1
if i >= len(lines):
break
likes = int(lines[i])
i += 1
# post_id (nächste reine Zahl)
i = skip_empty(i)
while i < len(lines) and not lines[i].isdigit():
i += 1
if i >= len(lines):
break
post_id = lines[i]
i += 1
# Dedup (innerhalb einer Seite)
if post_id in seen_ids:
# body überspringen
while i < len(lines) and lines[i] != "Verfasser:":
i += 1
continue
seen_ids.add(post_id)
# optional "Image:" oder leere Zeilen überspringen
i = skip_empty(i)
while i < len(lines) and lines[i].startswith("Image:"):
i += 1
i = skip_empty(i)
# body sammeln bis zum nächsten "Verfasser:" oder Ende
body_lines: List[str] = []
while i < len(lines) and lines[i] != "Verfasser:":
if lines[i] != "":
body_lines.append(lines[i])
i += 1
body = "\n".join(body_lines).strip()
body = re.sub(r"\n{3,}", "\n\n", body)
posts.append(Post(
page=page_num,
author=author,
timestamp=ts,
likes=likes,
post_id=post_id,
text=body
))
return posts
def parse_page_posts(html: str, page_num: int) -> List[Post]:
soup = BeautifulSoup(html, "html.parser")
# genau die Tabelle(n), die die Beiträge enthalten
tables = soup.select("table.tablebeitraege")
if not tables:
raise RuntimeError("Keine table.tablebeitraege gefunden (evtl. Cookie-Wall / Layout geändert).")
all_posts: List[Post] = []
for table in tables:
rows = table.find_all("tr")
seen_ids = set()
for idx, tr in enumerate(rows):
tr_id = (tr.get("id") or "")
# Erkenne Kopfzeilen für Posts: id enthält '_trPostHead' oder die Zelle enthält 'Verfasser:'
is_head = "trPostHead" in tr_id or tr.select_one("span.fontcolor") and "Verfasser:" in tr.get_text()
if not is_head:
continue
tds = tr.find_all("td")
if not tds:
continue
# Links: Verfasser
left_td = tds[0]
# bevorzugt sichtbaren Benutzernamen in .hl oder <a>
author_el = left_td.select_one(".hl") or left_td.select_one("a") or left_td.find("span")
author = author_el.get_text(strip=True) if author_el else left_td.get_text(" ", strip=True)
# Aufräumen: entferne 'Verfasser:' Wortteile
author = re.sub(r"^Verfasser:\s*", "", author, flags=re.I).strip()
# Rechts: Zeit / Likes / Post-ID
right_td = tds[1] if len(tds) > 1 else left_td
right_text = right_td.get_text(" ", strip=True)
# timestamp: suche nach Datum/Zeit im Text
m = re.search(r"\d{1,2}\.\d{1,2}\.\d{2,4}[,]?\s*\d{1,2}:\d{2}(?::\d{2})?", right_text)
ts = m.group(0) if m else ""
# likes: oft in .fr-buttons > span
likes = 0
fr_buttons = right_td.select_one(".fr-buttons")
if fr_buttons:
num = fr_buttons.find("span")
if num and num.get_text(strip=True).isdigit():
likes = int(num.get_text(strip=True))
# post_id: versteckt in einem input (hfPostId) oder als Zahl im rechten Bereich
post_id = ""
hid = tr.find("input", attrs={"id": re.compile(r"hfPostId$")}) or right_td.find("input", attrs={"id": re.compile(r"hfPostId$")})
if hid and hid.get("value"):
post_id = hid.get("value")
else:
m2 = re.search(r"\b(\d{5,9})\b", right_text)
if m2:
post_id = m2.group(1)
if not post_id:
# kein gültiges Post-ID — überspringen
continue
if post_id in seen_ids:
continue
seen_ids.add(post_id)
# body ist im nächsten TR (üblicherweise), suche nach .divB
body = ""
if idx + 1 < len(rows):
nexttr = rows[idx + 1]
divb = nexttr.select_one(".divB") or nexttr.find_all("td") and nexttr.find_all("td")[0]
if divb:
body = divb.get_text("\n", strip=True)
body = re.sub(r"\n{2,}", "\n\n", body)
all_posts.append(Post(
page=page_num,
author=author,
timestamp=ts,
likes=likes,
post_id=post_id,
text=body,
))
return all_posts
def scrape_thread(thread_url: str, sleep_s: float = 1.0) -> List[Post]:
out: List[Post] = []
seen_global = set()
with requests.Session() as session:
html1 = fetch_html(thread_url, session)
max_page = detect_max_page(html1, thread_url)
for p in range(1, max_page + 1):
url = page_url(thread_url, p)
html = html1 if p == 1 else fetch_html(url, session)
posts = parse_page_posts(html, p)
# Global dedup über post_id (sollte bei sauberer Tabellenbegrenzung i.d.R. nichts mehr finden)
for post in posts:
if post.post_id not in seen_global:
seen_global.add(post.post_id)
out.append(post)
print(f"[OK] Seite {p}/{max_page}: {len(posts)} Posts (unique so far: {len(out)})")
time.sleep(sleep_s)
# optional sortieren
out.sort(key=lambda x: int(x.post_id))
return out
if __name__ == "__main__":
posts = scrape_thread(THREAD_URL, sleep_s=1.0)
with open("thread_posts.json", "w", encoding="utf-8") as f:
json.dump([asdict(p) for p in posts], f, ensure_ascii=False, indent=2)
print(f"Fertig. Insgesamt eindeutige Posts: {len(posts)}")