from __future__ import annotations import argparse import asyncio from parser.discovery import discover_documents, build_session from parser.fetcher import fetch_documents, load_manifest, load_raw_index from parser.ingest import ingest_documents from parser.normalizer import load_normalized_document, normalize_document, write_normalized_document from shared import ORM def parse_categories(value: str | None) -> set[str] | None: if not value: return None return {item.strip() for item in value.split(",") if item.strip()} def select_documents(categories: set[str] | None, limit: int | None) -> list[dict]: manifest = load_manifest() documents = manifest["documents"] if categories: documents = [doc for doc in documents if doc["category_key"] in categories] if limit is not None: documents = documents[:limit] return documents def run_discover(_: argparse.Namespace) -> None: manifest = discover_documents(build_session()) print(f"discovered {len(manifest['documents'])} documents from {manifest['source_page']}") def run_fetch(args: argparse.Namespace) -> None: documents = select_documents(parse_categories(args.categories), args.limit) payloads = fetch_documents(documents, force=args.force, dry_run=args.dry_run) print(f"fetched {len(payloads)} documents") def run_normalize(args: argparse.Namespace) -> None: documents = select_documents(parse_categories(args.categories), args.limit) raw_index = load_raw_index() normalized_count = 0 for document in documents: raw_payload = raw_index.get("documents", {}).get(document["key"]) if raw_payload is None: raise FileNotFoundError( f"raw payload for {document['key']} not found; run `python -m parser fetch` first" ) normalized_document = normalize_document(raw_payload) write_normalized_document(normalized_document, dry_run=args.dry_run) normalized_count += 1 print(f"normalized {normalized_count} documents") async def _run_ingest_async(args: argparse.Namespace) -> None: documents = select_documents(parse_categories(args.categories), args.limit) normalized_documents = [] for document in documents: normalized = load_normalized_document(document["key"]) if normalized is None: raise FileNotFoundError( f"normalized payload for {document['key']} not found; run `python -m parser normalize` first" ) normalized_documents.append(normalized) orm = None try: if not args.dry_run: orm = ORM() await orm.init_schema() results = await ingest_documents(orm, normalized_documents, dry_run=args.dry_run) finally: if orm is not None: await orm.close() print(f"ingested {len(results)} documents") def run_ingest(args: argparse.Namespace) -> None: asyncio.run(_run_ingest_async(args)) def run_pipeline(args: argparse.Namespace) -> None: run_discover(args) if args.dry_run: run_fetch(args) print("dry-run stopped after fetch preview; raw files and DB were not changed") return run_fetch(args) run_normalize(args) run_ingest(args) def build_parser() -> argparse.ArgumentParser: parser = argparse.ArgumentParser(description="Consultant ingestion pipeline for LawBot.") subparsers = parser.add_subparsers(dest="command", required=True) def add_common_flags(command_parser: argparse.ArgumentParser) -> None: command_parser.add_argument("--categories", default=None) command_parser.add_argument("--force", action="store_true") command_parser.add_argument("--limit", type=int, default=None) command_parser.add_argument("--dry-run", action="store_true") discover_parser = subparsers.add_parser("discover") discover_parser.set_defaults(func=run_discover) fetch_parser = subparsers.add_parser("fetch") add_common_flags(fetch_parser) fetch_parser.set_defaults(func=run_fetch) normalize_parser = subparsers.add_parser("normalize") add_common_flags(normalize_parser) normalize_parser.set_defaults(func=run_normalize) ingest_parser = subparsers.add_parser("ingest") add_common_flags(ingest_parser) ingest_parser.set_defaults(func=run_ingest) run_parser = subparsers.add_parser("run") add_common_flags(run_parser) run_parser.set_defaults(func=run_pipeline) return parser def main() -> None: parser = build_parser() args = parser.parse_args() args.func(args)