Files

136 lines
4.5 KiB
Python
Raw Permalink Normal View History

2026-05-25 01:12:43 +03:00
from __future__ import annotations
import argparse
import asyncio
from parser.discovery import discover_documents, build_session
from parser.fetcher import fetch_documents, load_manifest, load_raw_index
from parser.ingest import ingest_documents
from parser.normalizer import load_normalized_document, normalize_document, write_normalized_document
from shared import ORM
def parse_categories(value: str | None) -> set[str] | None:
if not value:
return None
return {item.strip() for item in value.split(",") if item.strip()}
def select_documents(categories: set[str] | None, limit: int | None) -> list[dict]:
manifest = load_manifest()
documents = manifest["documents"]
if categories:
documents = [doc for doc in documents if doc["category_key"] in categories]
if limit is not None:
documents = documents[:limit]
return documents
def run_discover(_: argparse.Namespace) -> None:
manifest = discover_documents(build_session())
print(f"discovered {len(manifest['documents'])} documents from {manifest['source_page']}")
def run_fetch(args: argparse.Namespace) -> None:
documents = select_documents(parse_categories(args.categories), args.limit)
payloads = fetch_documents(documents, force=args.force, dry_run=args.dry_run)
print(f"fetched {len(payloads)} documents")
def run_normalize(args: argparse.Namespace) -> None:
documents = select_documents(parse_categories(args.categories), args.limit)
raw_index = load_raw_index()
normalized_count = 0
for document in documents:
raw_payload = raw_index.get("documents", {}).get(document["key"])
if raw_payload is None:
raise FileNotFoundError(
f"raw payload for {document['key']} not found; run `python -m parser fetch` first"
)
normalized_document = normalize_document(raw_payload)
write_normalized_document(normalized_document, dry_run=args.dry_run)
normalized_count += 1
print(f"normalized {normalized_count} documents")
async def _run_ingest_async(args: argparse.Namespace) -> None:
documents = select_documents(parse_categories(args.categories), args.limit)
normalized_documents = []
for document in documents:
normalized = load_normalized_document(document["key"])
if normalized is None:
raise FileNotFoundError(
f"normalized payload for {document['key']} not found; run `python -m parser normalize` first"
)
normalized_documents.append(normalized)
orm = None
try:
if not args.dry_run:
orm = ORM()
await orm.init_schema()
results = await ingest_documents(orm, normalized_documents, dry_run=args.dry_run)
finally:
if orm is not None:
await orm.close()
print(f"ingested {len(results)} documents")
def run_ingest(args: argparse.Namespace) -> None:
asyncio.run(_run_ingest_async(args))
def run_pipeline(args: argparse.Namespace) -> None:
run_discover(args)
if args.dry_run:
run_fetch(args)
print("dry-run stopped after fetch preview; raw files and DB were not changed")
return
run_fetch(args)
run_normalize(args)
run_ingest(args)
def build_parser() -> argparse.ArgumentParser:
parser = argparse.ArgumentParser(description="Consultant ingestion pipeline for LawBot.")
subparsers = parser.add_subparsers(dest="command", required=True)
def add_common_flags(command_parser: argparse.ArgumentParser) -> None:
command_parser.add_argument("--categories", default=None)
command_parser.add_argument("--force", action="store_true")
command_parser.add_argument("--limit", type=int, default=None)
command_parser.add_argument("--dry-run", action="store_true")
discover_parser = subparsers.add_parser("discover")
discover_parser.set_defaults(func=run_discover)
fetch_parser = subparsers.add_parser("fetch")
add_common_flags(fetch_parser)
fetch_parser.set_defaults(func=run_fetch)
normalize_parser = subparsers.add_parser("normalize")
add_common_flags(normalize_parser)
normalize_parser.set_defaults(func=run_normalize)
ingest_parser = subparsers.add_parser("ingest")
add_common_flags(ingest_parser)
ingest_parser.set_defaults(func=run_ingest)
run_parser = subparsers.add_parser("run")
add_common_flags(run_parser)
run_parser.set_defaults(func=run_pipeline)
return parser
def main() -> None:
parser = build_parser()
args = parser.parse_args()
args.func(args)