from __future__ import annotations import time from concurrent.futures import ThreadPoolExecutor, as_completed from datetime import datetime, timezone from pathlib import Path import requests from bs4 import BeautifulSoup, Tag from parser.config import ( MANIFEST_PATH, MAX_RETRIES, MAX_WORKERS, RAW_INDEX_PATH, RAW_ROOT, REQUEST_TIMEOUT, ) from parser.discovery import build_session, discover_documents from parser.utils import ensure_dir, read_json, sha256_text, to_absolute_url, write_json def fetch_with_retry(session: requests.Session, url: str) -> str: last_error: Exception | None = None for attempt in range(1, MAX_RETRIES + 1): try: response = session.get(url, timeout=REQUEST_TIMEOUT) response.raise_for_status() return response.text except Exception as exc: # pragma: no cover - network branch last_error = exc if attempt < MAX_RETRIES: time.sleep(attempt) raise last_error # type: ignore[misc] def extract_toc_articles(root_html: str, source_url: str) -> list[dict]: soup = BeautifulSoup(root_html, "html.parser") toc = soup.select_one(".document-page__toc > ul") if toc is None: return [] articles: list[dict] = [] def walk_list(node: Tag, stack: list[str]) -> None: items = [child for child in node.children if isinstance(child, Tag) and child.name == "li"] for li in items: anchor = li.find("a", href=True, recursive=False) if anchor is None: continue title = anchor.get_text(" ", strip=True) url = to_absolute_url(anchor["href"], source_url) next_ul = li.find_next_sibling("ul") if title.startswith("Статья "): article_number, _, article_title = title.partition(". ") articles.append( { "article_number": article_number.replace("Статья", "").strip(), "article_title": article_title.strip() or title, "article_url": url, "section_title": next((value for value in reversed(stack) if value.startswith("Раздел")), None), "chapter_title": next((value for value in reversed(stack) if value.startswith("Глава")), None), "part_title": next((value for value in reversed(stack) if value.startswith("Часть")), None), "breadcrumb": [*stack, title], } ) else: if next_ul is not None: walk_list(next_ul, [*stack, title]) walk_list(toc, []) return articles def _fetch_article(session: requests.Session, article: dict) -> tuple[dict, str]: return article, fetch_with_retry(session, article["article_url"]) def load_manifest() -> dict: manifest = read_json(MANIFEST_PATH) if manifest is None: manifest = discover_documents(build_session()) return manifest def load_raw_index() -> dict: return read_json(RAW_INDEX_PATH, default={"documents": {}}) def save_raw_index(index_payload: dict) -> None: write_json(RAW_INDEX_PATH, index_payload) def fetch_documents( selected_documents: list[dict], force: bool = False, dry_run: bool = False ) -> list[dict]: session = build_session() raw_index = load_raw_index() raw_index.setdefault("documents", {}) fetched_payloads: list[dict] = [] for document in selected_documents: root_html = fetch_with_retry(session, document["source_url"]) root_hash = sha256_text(root_html) previous = raw_index["documents"].get(document["key"]) if ( not force and previous and previous.get("root_hash") == root_hash and Path(previous["raw_dir"]).exists() ): fetched_payloads.append(previous) continue toc_articles = extract_toc_articles(root_html, document["source_url"]) timestamp = datetime.now(timezone.utc) raw_dir = RAW_ROOT / timestamp.date().isoformat() / document["key"] article_dir = raw_dir / "articles" article_payloads: list[dict] = [] article_hash_parts: list[str] = [] if not dry_run: ensure_dir(article_dir) (raw_dir / "root.html").write_text(root_html, encoding="utf-8") with ThreadPoolExecutor(max_workers=MAX_WORKERS) as executor: futures = { executor.submit(_fetch_article, session, article): index for index, article in enumerate(toc_articles) } article_results = [None] * len(futures) for future in as_completed(futures): article, article_html = future.result() index = futures[future] article_hash = sha256_text(article_html) article_hash_parts.append(article_hash) article_payload = { **article, "file_name": f"{index:04d}.html", "sha256": article_hash, } article_results[index] = (article_payload, article_html) for article_payload, article_html in article_results: article_payloads.append(article_payload) if not dry_run: (article_dir / article_payload["file_name"]).write_text(article_html, encoding="utf-8") payload = { **document, "fetched_at": timestamp.isoformat(), "raw_dir": str(raw_dir), "root_file": "root.html", "root_hash": root_hash, "version_hash": sha256_text(root_hash + "".join(article_hash_parts)), "article_count": len(article_payloads), "articles": article_payloads, } if not dry_run: write_json(raw_dir / "sidecar.json", payload) raw_index["documents"][document["key"]] = payload save_raw_index(raw_index) fetched_payloads.append(payload) return fetched_payloads