from __future__ import annotations import asyncio from contextlib import asynccontextmanager import logging from time import perf_counter from fastapi import FastAPI, Request from api.config import settings from api.deps import get_indexing_service, get_orm, get_vector_store from api.logging import configure_logging from api.routers.health import router as health_router from api.routers.indexing import router as indexing_router from api.routers.rag import router as rag_router configure_logging() logger = logging.getLogger(__name__) async def run_startup_indexing_task() -> None: if not settings.auto_index_on_startup: logger.info("Startup auto-index is disabled") return indexing_service = get_indexing_service() vector_store = get_vector_store() for attempt in range(1, settings.auto_index_max_attempts + 1): try: current_count = vector_store.count() db_chunk_count = await indexing_service.get_indexable_chunks_count() should_reset_collection = settings.auto_index_reset_collection logger.info( "Startup auto-index check: attempt=%s/%s chroma_count=%s postgres_chunks=%s", attempt, settings.auto_index_max_attempts, current_count, db_chunk_count, ) if db_chunk_count == 0: logger.warning( "No indexable chunks found in Postgres yet, retrying in %ss", settings.auto_index_retry_delay_seconds, ) await asyncio.sleep(settings.auto_index_retry_delay_seconds) continue if settings.auto_index_only_if_empty and current_count == db_chunk_count and current_count > 0: logger.info( "Skipping startup auto-index because Chroma is already in sync with Postgres: %s items", current_count, ) return if current_count not in {0, db_chunk_count}: should_reset_collection = True logger.warning( "Chroma/Postgres count mismatch detected, forcing collection reset: chroma_count=%s postgres_chunks=%s", current_count, db_chunk_count, ) result = await indexing_service.rebuild( reset_collection=should_reset_collection, ) logger.info("Startup auto-index completed successfully: %s", result) return except asyncio.CancelledError: logger.info("Startup auto-index task cancelled") raise except Exception: logger.exception( "Startup auto-index attempt %s/%s failed", attempt, settings.auto_index_max_attempts, ) await asyncio.sleep(settings.auto_index_retry_delay_seconds) logger.error( "Startup auto-index exhausted all %s attempts", settings.auto_index_max_attempts, ) @asynccontextmanager async def lifespan(app: FastAPI): orm = get_orm() startup_index_task: asyncio.Task | None = None logger.info("API startup initiated") await orm.init_schema() logger.info("Database schema is ready") startup_index_task = asyncio.create_task(run_startup_indexing_task()) app.state.startup_index_task = startup_index_task yield if startup_index_task is not None and not startup_index_task.done(): startup_index_task.cancel() try: await startup_index_task except asyncio.CancelledError: pass await orm.close() logger.info("API shutdown completed") app = FastAPI(title="LawBot RAG API", version="0.1.0", lifespan=lifespan) @app.middleware("http") async def log_requests(request: Request, call_next): started_at = perf_counter() logger.info( "HTTP request started: method=%s path=%s client=%s", request.method, request.url.path, request.client.host if request.client else "unknown", ) try: response = await call_next(request) except Exception: duration_ms = round((perf_counter() - started_at) * 1000, 2) logger.exception( "HTTP request failed: method=%s path=%s duration_ms=%s", request.method, request.url.path, duration_ms, ) raise duration_ms = round((perf_counter() - started_at) * 1000, 2) logger.info( "HTTP request completed: method=%s path=%s status=%s duration_ms=%s", request.method, request.url.path, response.status_code, duration_ms, ) return response app.include_router(health_router) app.include_router(indexing_router) app.include_router(rag_router)