Files
LawBot/api/services/legal_ai.py
T
2026-05-25 01:12:43 +03:00

725 lines
27 KiB
Python

from __future__ import annotations
import json
import logging
import re
from openai import AsyncOpenAI
from api.prompts.rag_prompts import (
ANSWER_PROMPT,
CLASSIFIER_PROMPT,
CONSULTATION_TITLE_PROMPT,
FOLLOW_UP_ANSWER_PROMPT,
)
from api.schemas import ClassificationResult, StructuredInitialAnswer
logger = logging.getLogger(__name__)
CATEGORY_MAP = {
"работа": ["labor"],
"труд": ["labor"],
"защита прав потребителей": ["consumer", "civil"],
"потребител": ["consumer", "civil"],
"жилье": ["housing", "civil", "mortgage"],
"аренда": ["housing", "civil"],
"семья": ["family"],
"долги": ["civil", "enforcement"],
"займы": ["civil"],
"договоры": ["civil"],
"договор": ["civil"],
"суд": ["procedural"],
"процесс": ["procedural"],
"административ": ["administrative"],
"уголов": ["criminal"],
"краж": ["criminal"],
"мошеннич": ["criminal"],
}
LAW_TYPE_ALIASES = {
"labor": "labor",
"труд": "labor",
"трудовое право": "labor",
"criminal": "criminal",
"уголов": "criminal",
"civil": "civil",
"граждан": "civil",
"договор": "civil",
"consumer": "consumer",
"защита прав потребителей": "consumer",
"потребител": "consumer",
"housing": "housing",
"жилищ": "housing",
"аренда": "housing",
"family": "family",
"семейн": "family",
"procedural": "procedural",
"процесс": "procedural",
"суд": "procedural",
"administrative": "administrative",
"административ": "administrative",
"enforcement": "enforcement",
"исполнительн": "enforcement",
"mortgage": "mortgage",
"ипотек": "mortgage",
}
INITIAL_ANSWER_RESPONSE_FORMAT = {
"type": "json_schema",
"json_schema": {
"name": "lawbot_initial_answer",
"strict": True,
"schema": {
"type": "object",
"additionalProperties": False,
"properties": {
"short_conclusion": {"type": "string"},
"legal_points": {
"type": "array",
"items": {"type": "string"},
},
"action_steps": {
"type": "array",
"items": {"type": "string"},
},
"risks": {
"type": "array",
"items": {"type": "string"},
},
},
"required": [
"short_conclusion",
"legal_points",
"action_steps",
"risks",
],
},
},
}
CLASSIFIER_RESPONSE_FORMAT = {
"type": "json_schema",
"json_schema": {
"name": "lawbot_classifier",
"strict": True,
"schema": {
"type": "object",
"additionalProperties": False,
"properties": {
"legal_domain": {"type": "string"},
"issue_type": {"type": "string"},
"jurisdiction": {"type": "string"},
"region": {
"type": ["string", "null"],
},
"needs_clarification": {"type": "boolean"},
"clarification_questions": {
"type": "array",
"items": {"type": "string"},
},
"search_queries": {
"type": "array",
"items": {"type": "string"},
},
"filters": {
"type": "object",
"additionalProperties": False,
"properties": {
"law_type": {
"type": ["array", "null"],
"items": {"type": "string"},
},
},
},
},
"required": [
"legal_domain",
"issue_type",
"jurisdiction",
"region",
"needs_clarification",
"clarification_questions",
"search_queries",
"filters",
],
},
},
}
def extract_json(content: str, purpose: str = "response") -> dict:
try:
return json.loads(content)
except json.JSONDecodeError:
match = re.search(r"\{.*\}", content, re.S)
if not match:
logger.error("LLM %s returned non-JSON content: %s", purpose, content)
raise RuntimeError(f"LLM {purpose} returned invalid JSON.")
try:
return json.loads(match.group(0))
except json.JSONDecodeError as exc:
logger.error("LLM %s returned malformed JSON fragment: %s", purpose, content)
raise RuntimeError(f"LLM {purpose} returned malformed JSON.") from exc
def looks_like_llm_refusal(content: str) -> bool:
normalized = " ".join(content.lower().split())
refusal_markers = (
"i cannot assist",
"i can't assist",
"i cannot help",
"i'm sorry, but i cannot",
"не могу помочь с этим",
"не могу помочь в этом",
"не могу содействовать",
"не могу помочь с запросом",
"не могу ответить на этот запрос",
)
return any(marker in normalized for marker in refusal_markers)
def infer_law_types(category: str | None) -> list[str] | None:
if not category:
return None
normalized = category.lower().strip()
for key, law_types in CATEGORY_MAP.items():
if key in normalized:
return law_types
return None
def normalize_law_type_values(value) -> list[str] | None:
if value is None:
return None
raw_values = value if isinstance(value, list) else [value]
normalized_values: list[str] = []
for raw_value in raw_values:
if not isinstance(raw_value, str):
continue
raw_normalized = raw_value.strip().lower()
for alias, code in LAW_TYPE_ALIASES.items():
if alias in raw_normalized:
if code not in normalized_values:
normalized_values.append(code)
break
return normalized_values or None
def extract_message_content(completion, purpose: str) -> str:
choices = getattr(completion, "choices", None)
if not choices:
logger.error(
"LLM %s returned empty choices: model=%s id=%s usage=%s raw=%s",
purpose,
getattr(completion, "model", None),
getattr(completion, "id", None),
getattr(completion, "usage", None),
completion,
)
raise RuntimeError(
"LLM provider returned an empty response. Check OPENROUTER model name and provider response."
)
first_choice = choices[0]
message = getattr(first_choice, "message", None)
if message is None:
logger.error(
"LLM %s returned choice without message: model=%s id=%s choice=%s",
purpose,
getattr(completion, "model", None),
getattr(completion, "id", None),
first_choice,
)
raise RuntimeError("LLM provider returned a malformed response without message.")
content = getattr(message, "content", None)
if content is None:
logger.error(
"LLM %s returned empty message content: model=%s id=%s finish_reason=%s message=%s",
purpose,
getattr(completion, "model", None),
getattr(completion, "id", None),
getattr(first_choice, "finish_reason", None),
message,
)
raise RuntimeError("LLM provider returned an empty message content.")
return content
def build_fallback_title(question: str, limit: int = 70) -> str:
title = " ".join(question.strip().split())
if not title:
return "Юридическая консультация"
title = title.rstrip(" .,!?:;")
if len(title) <= limit:
return title
trimmed = title[: limit - 1].rstrip(" .,!?:;")
return f"{trimmed}"
def infer_primary_law_type(category: str | None, question: str) -> str:
inferred = infer_law_types(category)
if inferred:
return inferred[0]
normalized_question = question.lower()
for key, law_types in CATEGORY_MAP.items():
if key in normalized_question:
return law_types[0]
return "other"
def sanitize_answer_text(answer: str) -> str:
sanitized = answer.strip()
replacements = (
(r"(?i)\bSOURCES\b", "нормах закона"),
(r"(?i)\bsource\b", "нормах закона"),
(r"(?i)\bchunk(?:s)?\b", "нормах закона"),
(r"(?i)\bretrieval\b", "поиске норм"),
(
r"(?i)в ваших нормах закона",
"в найденных нормах закона",
),
(
r"(?i)на основании этих источников",
"по найденным нормам закона",
),
(
r"(?i)по этим источникам",
"по найденным нормам закона",
),
(
r"(?i)в базе нет",
"прямого ответа в найденных нормах нет",
),
(
r"(?i)в контексте нет",
"в найденных нормах прямо не указано",
),
)
for pattern, replacement in replacements:
sanitized = re.sub(pattern, replacement, sanitized)
sanitized = re.sub(r"\s{2,}", " ", sanitized)
return sanitized.strip()
def format_numbered_lines(items: list[str]) -> str:
normalized = [" ".join(item.strip().split()) for item in items if item and item.strip()]
return "\n".join(f"{index}. {item}" for index, item in enumerate(normalized, start=1))
def build_sources_section(sources: list[dict]) -> list[str]:
lines: list[str] = []
seen: set[tuple[str, str, str]] = set()
for source in sources:
title = str(source.get("source_title") or "").strip()
article_number = str(source.get("article_number") or "").strip()
article_title = str(source.get("article_title") or "").strip()
key = (title, article_number, article_title)
if not title or key in seen:
continue
seen.add(key)
if article_number and article_title:
lines.append(f"{title}, ст. {article_number}{article_title}")
elif article_number:
lines.append(f"{title}, ст. {article_number}")
else:
lines.append(title)
if len(lines) >= 5:
break
return lines
def render_structured_initial_answer(
payload: StructuredInitialAnswer,
sources: list[dict],
) -> str:
legal_points = payload.legal_points or ["В найденных нормах прямой ответ на вопрос не раскрыт."]
action_steps = payload.action_steps or ["Уточните обстоятельства и проверьте формулировку вопроса."]
risks = payload.risks or ["Ответ зависит от деталей ситуации и содержания применимых норм."]
source_lines = build_sources_section(sources)
if not source_lines:
source_lines = ["Подходящие нормы закона по этому вопросу автоматически не выделились."]
parts = [
"⚖️ Краткий вывод",
payload.short_conclusion.strip(),
"",
"📌 Что говорит закон",
format_numbered_lines(legal_points),
"",
"✅ Что можно сделать",
format_numbered_lines(action_steps),
"",
"⚠️ Риски и ограничения",
format_numbered_lines(risks),
"",
"📚 Найденные источники",
format_numbered_lines(source_lines),
"",
"❗ Важно",
"Ответ носит информационный характер и не заменяет консультацию юриста.",
]
return "\n".join(parts).strip()
def first_sentence(text: str, limit: int = 220) -> str:
normalized = " ".join(text.split())
normalized = re.sub(r"^\d+\s*\.\s*", "", normalized)
normalized = re.sub(r"\s+([,.;:!?])", r"\1", normalized)
if not normalized:
return ""
match = re.split(r"(?<=[.!?])\s+", normalized, maxsplit=1)
sentence = match[0].strip()
if len(sentence) <= limit:
return sentence
trimmed = sentence[: limit - 1].rstrip(" ,;:")
return f"{trimmed}"
def build_structured_answer_fallback(
*,
question: str,
category: str | None,
sources: list[dict],
) -> StructuredInitialAnswer:
legal_points: list[str] = []
for source in sources[:3]:
article_number = str(source.get("article_number") or "").strip()
article_title = str(source.get("article_title") or "").strip()
chunk_text = str(source.get("chunk_text") or "").strip()
summary = first_sentence(chunk_text)
if article_number and article_title and summary:
legal_points.append(f"Статья {article_number} {article_title}: {summary}")
elif article_number and article_title:
legal_points.append(f"Статья {article_number} {article_title}.")
elif summary:
legal_points.append(summary)
if not legal_points:
legal_points.append("В найденных нормах есть общие ориентиры, но прямой ответ зависит от деталей ситуации.")
category_hint = (category or "").lower()
is_criminal = "уголов" in category_hint or any(
str(source.get("law_type") or "") == "criminal" for source in sources
)
if is_criminal:
short_conclusion = (
"По найденным нормам возможна уголовная ответственность, "
"но точная квалификация и последствия зависят от обстоятельств дела."
)
action_steps = [
"Как можно быстрее обратитесь за очной помощью адвоката по уголовным делам.",
"Соберите и сохраните документы, повестки, протоколы и другие материалы, которые у вас уже есть.",
"Подготовьте точную хронологию событий, потому что для оценки важны обстоятельства и формулировка обвинения.",
]
risks = [
"Точная статья и возможное наказание зависят от обстоятельств, мотива, последствий и процессуального статуса.",
"Без изучения материалов дела нельзя надёжно оценить квалификацию и линию защиты.",
]
else:
short_conclusion = (
"По найденным нормам можно дать только общий ориентир; "
"точный вывод зависит от фактических обстоятельств вопроса."
)
action_steps = [
"Уточните ключевые обстоятельства и формулировку вопроса.",
"Соберите документы и доказательства, которые относятся к ситуации.",
"При необходимости получите очную консультацию профильного юриста.",
]
risks = [
"Ответ может измениться, если появятся новые существенные детали.",
"Без полного набора обстоятельств правовая оценка будет предварительной.",
]
return StructuredInitialAnswer(
short_conclusion=short_conclusion,
legal_points=legal_points,
action_steps=action_steps,
risks=risks,
)
def build_classification_fallback(
*,
question: str,
category: str | None,
region: str | None,
) -> ClassificationResult:
primary_law_type = infer_primary_law_type(category, question)
filters = {"law_type": [primary_law_type]} if primary_law_type != "other" else {}
return ClassificationResult(
legal_domain=primary_law_type,
issue_type="general_question",
jurisdiction="RU",
region=region,
needs_clarification=False,
clarification_questions=[],
search_queries=[question],
filters=filters,
)
class LegalAIService:
def __init__(self, client: AsyncOpenAI, llm_model: str) -> None:
self.client = client
self.llm_model = llm_model
async def classify(
self,
question: str,
category: str | None,
region: str | None,
user_type: str | None = None,
history: list[dict[str, str]] | None = None,
) -> ClassificationResult:
logger.info(
"LLM classification started: category=%s region=%s user_type=%s question_length=%s history_items=%s",
category,
region,
user_type,
len(question),
len(history or []),
)
category_hint = category or "не указана"
region_hint = region or "не указан"
user_type_hint = user_type or "не указан"
history_lines = []
for item in (history or [])[-6:]:
role = item.get("role", "user")
content = item.get("content", "")
history_lines.append(f"{role}: {content}")
history_text = "\n".join(history_lines) if history_lines else "нет"
user_prompt = (
f"Категория пользователя: {category_hint}\n"
f"Регион: {region_hint}\n"
f"Тип пользователя: {user_type_hint}\n"
f"История консультации:\n{history_text}\n"
f"Вопрос: {question}\n"
)
try:
completion = await self.client.chat.completions.create(
model=self.llm_model,
temperature=0,
response_format=CLASSIFIER_RESPONSE_FORMAT,
messages=[
{"role": "system", "content": CLASSIFIER_PROMPT},
{"role": "user", "content": user_prompt},
],
)
except Exception as exc:
logger.warning(
"LLM classification request with schema failed, using heuristic fallback: category=%s question=%s error=%s",
category,
question,
exc,
)
return build_classification_fallback(
question=question,
category=category,
region=region,
)
content = extract_message_content(completion, "classification") or "{}"
try:
payload = extract_json(content, "classification")
except RuntimeError:
logger.warning(
"LLM classification schema response was invalid, using heuristic fallback: category=%s question=%s",
category,
question,
)
return build_classification_fallback(
question=question,
category=category,
region=region,
)
search_queries = payload.get("search_queries") or [question]
filters = payload.get("filters") or {}
normalized_law_types = normalize_law_type_values(filters.get("law_type"))
if "law_type" in filters:
if normalized_law_types:
filters["law_type"] = normalized_law_types
else:
filters.pop("law_type", None)
fallback_law_types = infer_law_types(category)
if fallback_law_types and not filters.get("law_type"):
filters["law_type"] = fallback_law_types
result = ClassificationResult(
legal_domain=payload.get("legal_domain", "other"),
issue_type=payload.get("issue_type", "general_question"),
jurisdiction=payload.get("jurisdiction", "RU"),
region=payload.get("region") or region,
needs_clarification=bool(payload.get("needs_clarification", False)),
clarification_questions=payload.get("clarification_questions", []),
search_queries=search_queries,
filters=filters,
)
logger.info(
"LLM classification completed: legal_domain=%s issue_type=%s queries=%s needs_clarification=%s",
result.legal_domain,
result.issue_type,
result.search_queries,
result.needs_clarification,
)
return result
async def answer(
self,
question: str,
category: str | None,
region: str | None,
user_type: str | None,
history: list[dict[str, str]] | None,
sources: list[dict],
) -> str:
logger.info(
"LLM answer generation started: category=%s region=%s user_type=%s sources=%s question_length=%s history_items=%s",
category,
region,
user_type,
len(sources),
len(question),
len(history or []),
)
serialized_sources = json.dumps(sources, ensure_ascii=False, indent=2)
history_lines = []
for item in (history or [])[-6:]:
role = item.get("role", "user")
content = item.get("content", "")
history_lines.append(f"{role}: {content}")
history_text = "\n".join(history_lines) if history_lines else "нет"
has_consultation_history = bool(history)
answer_prompt = FOLLOW_UP_ANSWER_PROMPT if has_consultation_history else ANSWER_PROMPT
user_prompt = (
f"Категория: {category or 'не указана'}\n"
f"Регион: {region or 'не указан'}\n"
f"Тип пользователя: {user_type or 'не указан'}\n"
f"История консультации:\n{history_text}\n"
f"Вопрос пользователя: {question}\n\n"
f"SOURCES:\n{serialized_sources}"
)
try:
if has_consultation_history:
completion = await self.client.chat.completions.create(
model=self.llm_model,
temperature=0.2,
messages=[
{"role": "system", "content": answer_prompt},
{"role": "user", "content": user_prompt},
],
)
else:
completion = await self.client.chat.completions.create(
model=self.llm_model,
temperature=0.2,
response_format=INITIAL_ANSWER_RESPONSE_FORMAT,
messages=[
{"role": "system", "content": answer_prompt},
{"role": "user", "content": user_prompt},
],
)
except Exception as exc:
if has_consultation_history:
raise
logger.warning(
"LLM initial answer request with schema failed, using structured fallback: category=%s question=%s error=%s",
category,
question,
exc,
)
structured_answer = build_structured_answer_fallback(
question=question,
category=category,
sources=sources,
)
answer = render_structured_initial_answer(structured_answer, sources)
logger.info("LLM answer generation completed via fallback: answer_length=%s", len(answer))
return answer
raw_answer = extract_message_content(completion, "answer").strip()
if has_consultation_history:
answer = sanitize_answer_text(raw_answer)
else:
if looks_like_llm_refusal(raw_answer):
logger.warning(
"LLM returned refusal for initial answer, using structured fallback: category=%s question=%s",
category,
question,
)
structured_answer = build_structured_answer_fallback(
question=question,
category=category,
sources=sources,
)
else:
try:
payload = extract_json(raw_answer, "answer")
structured_answer = StructuredInitialAnswer.model_validate(payload)
except (RuntimeError, ValueError) as exc:
logger.warning(
"LLM initial answer schema response was invalid, using structured fallback: category=%s question=%s error=%s",
category,
question,
exc,
)
structured_answer = build_structured_answer_fallback(
question=question,
category=category,
sources=sources,
)
answer = render_structured_initial_answer(structured_answer, sources)
logger.info("LLM answer generation completed: answer_length=%s", len(answer))
return answer
async def generate_consultation_title(
self,
*,
question: str,
category: str | None,
answer: str,
) -> str:
logger.info(
"LLM consultation title generation started: category=%s question_length=%s answer_length=%s",
category,
len(question),
len(answer),
)
user_prompt = (
f"Категория: {category or 'не указана'}\n"
f"Вопрос пользователя: {question}\n"
f"Краткое содержание ответа:\n{answer[:1500]}"
)
completion = await self.client.chat.completions.create(
model=self.llm_model,
temperature=0,
messages=[
{"role": "system", "content": CONSULTATION_TITLE_PROMPT},
{"role": "user", "content": user_prompt},
],
)
content = extract_message_content(completion, "consultation_title")
title = " ".join(content.strip().split()).strip("\"' ")
title = build_fallback_title(title, limit=70)
logger.info("LLM consultation title generation completed: title=%s", title)
return title