227 lines
7.9 KiB
Python
227 lines
7.9 KiB
Python
from __future__ import annotations
|
|
|
|
import re
|
|
from datetime import datetime, timezone
|
|
|
|
import httpx
|
|
from bs4 import BeautifulSoup
|
|
|
|
from .config import settings
|
|
from .models import KnowledgeDocument
|
|
|
|
|
|
MAP_PATTERN = re.compile(
|
|
r"yandexMaps\.addMap\('([^']+)'\s*,\s*'([^']+)'\s*,\s*'([^']+)'\)"
|
|
)
|
|
|
|
|
|
def normalize_spaces(value: str) -> str:
|
|
return " ".join(value.replace("\xa0", " ").split())
|
|
|
|
|
|
def deduplicate_preserving_order(values: list[str]) -> list[str]:
|
|
seen: set[str] = set()
|
|
result: list[str] = []
|
|
for value in values:
|
|
if value and value not in seen:
|
|
seen.add(value)
|
|
result.append(value)
|
|
return result
|
|
|
|
|
|
def is_meaningful_value(value: str) -> bool:
|
|
return any(char.isalnum() for char in value)
|
|
|
|
|
|
class SiteKnowledgeScraper:
|
|
ABOUT_MARKER = "ТЕРРИТОРИЯ БЫСТРОГО ПИТАНИЯ В ВОЛГОГРАДЕ"
|
|
MENU_MARKER = "МЕНЮ"
|
|
DELIVERY_MARKER = "ДОСТАВКА"
|
|
CONTACT_MARKER = "КОНТАКТЫ"
|
|
CONTACT_END_MARKERS = ("Закрыть", "OK")
|
|
|
|
def __init__(self) -> None:
|
|
self.site_url = settings.site_url
|
|
self.timeout = settings.request_timeout
|
|
|
|
async def fetch_homepage(self) -> str:
|
|
async with httpx.AsyncClient(
|
|
headers={"User-Agent": "Mozilla/5.0"},
|
|
follow_redirects=True,
|
|
timeout=self.timeout,
|
|
) as client:
|
|
response = await client.get(self.site_url)
|
|
response.raise_for_status()
|
|
return response.text
|
|
|
|
def visible_strings(self, soup: BeautifulSoup) -> list[str]:
|
|
return [
|
|
normalized
|
|
for text in soup.stripped_strings
|
|
for normalized in [normalize_spaces(text)]
|
|
if normalized and is_meaningful_value(normalized)
|
|
]
|
|
|
|
def find_marker(self, values: list[str], marker: str, start: int = 0) -> int | None:
|
|
for index in range(start, len(values)):
|
|
if values[index] == marker:
|
|
return index
|
|
return None
|
|
|
|
def find_last_marker(self, values: list[str], marker: str, start: int = 0) -> int | None:
|
|
for index in range(len(values) - 1, start - 1, -1):
|
|
if values[index] == marker:
|
|
return index
|
|
return None
|
|
|
|
def slice_between_markers(
|
|
self,
|
|
values: list[str],
|
|
start_marker: str,
|
|
end_markers: tuple[str, ...],
|
|
start_at: int = 0,
|
|
) -> list[str]:
|
|
start_index = self.find_marker(values, start_marker, start_at)
|
|
if start_index is None:
|
|
return []
|
|
|
|
end_index = len(values)
|
|
for marker in end_markers:
|
|
marker_index = self.find_marker(values, marker, start_index + 1)
|
|
if marker_index is not None:
|
|
end_index = min(end_index, marker_index)
|
|
|
|
return values[start_index:end_index]
|
|
|
|
def extract_social_links(self, soup: BeautifulSoup) -> list[str]:
|
|
links: list[str] = []
|
|
for node in soup.select("[data-page-link]"):
|
|
href = node.get("data-page-link")
|
|
label = normalize_spaces(node.get_text(" ", strip=True))
|
|
if not href:
|
|
continue
|
|
if label:
|
|
links.append(f"{label}: {href}")
|
|
else:
|
|
links.append(str(href))
|
|
return deduplicate_preserving_order(links)
|
|
|
|
def extract_map_coordinates(self, html: str) -> str | None:
|
|
match = MAP_PATTERN.search(html)
|
|
if not match:
|
|
return None
|
|
latitude = normalize_spaces(match.group(2))
|
|
longitude = normalize_spaces(match.group(3))
|
|
return f"{latitude}, {longitude}"
|
|
|
|
def parse_homepage(self, html: str) -> list[KnowledgeDocument]:
|
|
soup = BeautifulSoup(html, "html.parser")
|
|
strings = self.visible_strings(soup)
|
|
documents: list[KnowledgeDocument] = []
|
|
scraped_at = datetime.now(timezone.utc)
|
|
|
|
meta_description = soup.select_one('meta[name="description"]')
|
|
if meta_description and meta_description.get("content"):
|
|
documents.append(
|
|
KnowledgeDocument(
|
|
doc_id="site-meta-description",
|
|
title="Краткое описание заведения",
|
|
text=normalize_spaces(meta_description["content"]),
|
|
source_type="about",
|
|
source_url=self.site_url,
|
|
metadata={"scraped_at": scraped_at.isoformat()},
|
|
)
|
|
)
|
|
|
|
about_section = self.slice_between_markers(
|
|
strings,
|
|
self.ABOUT_MARKER,
|
|
(self.MENU_MARKER,),
|
|
)
|
|
if about_section:
|
|
documents.append(
|
|
KnowledgeDocument(
|
|
doc_id="site-about",
|
|
title=about_section[0],
|
|
text="\n".join(deduplicate_preserving_order(about_section[1:])),
|
|
source_type="about",
|
|
source_url=self.site_url,
|
|
metadata={"scraped_at": scraped_at.isoformat()},
|
|
)
|
|
)
|
|
|
|
social_links = self.extract_social_links(soup)
|
|
if social_links:
|
|
documents.append(
|
|
KnowledgeDocument(
|
|
doc_id="site-links",
|
|
title="Соцсети и внешние площадки",
|
|
text="\n".join(social_links),
|
|
source_type="links",
|
|
source_url=self.site_url,
|
|
metadata={"scraped_at": scraped_at.isoformat()},
|
|
)
|
|
)
|
|
|
|
menu_index = self.find_marker(strings, self.MENU_MARKER)
|
|
delivery_start = self.find_last_marker(
|
|
strings,
|
|
self.DELIVERY_MARKER,
|
|
start=(menu_index + 1) if menu_index is not None else 0,
|
|
)
|
|
contact_start = self.find_last_marker(
|
|
strings,
|
|
self.CONTACT_MARKER,
|
|
start=(delivery_start + 1) if delivery_start is not None else 0,
|
|
)
|
|
delivery_section = (
|
|
strings[delivery_start:contact_start]
|
|
if delivery_start is not None and contact_start is not None and contact_start > delivery_start
|
|
else []
|
|
)
|
|
if delivery_section:
|
|
documents.append(
|
|
KnowledgeDocument(
|
|
doc_id="site-delivery",
|
|
title=delivery_section[0],
|
|
text="\n".join(deduplicate_preserving_order(delivery_section[1:])),
|
|
source_type="delivery",
|
|
source_url=self.site_url,
|
|
metadata={"scraped_at": scraped_at.isoformat()},
|
|
)
|
|
)
|
|
|
|
auth_index = len(strings)
|
|
if contact_start is not None:
|
|
for marker in self.CONTACT_END_MARKERS:
|
|
marker_index = self.find_marker(strings, marker, contact_start + 1)
|
|
if marker_index is not None:
|
|
auth_index = min(auth_index, marker_index)
|
|
contact_section = (
|
|
strings[contact_start:auth_index]
|
|
if contact_start is not None and auth_index > contact_start
|
|
else []
|
|
)
|
|
if contact_section:
|
|
metadata = {"scraped_at": scraped_at.isoformat()}
|
|
coordinates = self.extract_map_coordinates(html)
|
|
if coordinates:
|
|
metadata["map_coordinates"] = coordinates
|
|
|
|
documents.append(
|
|
KnowledgeDocument(
|
|
doc_id="site-contact",
|
|
title=contact_section[0],
|
|
text="\n".join(deduplicate_preserving_order(contact_section[1:])),
|
|
source_type="contact",
|
|
source_url=self.site_url,
|
|
metadata=metadata,
|
|
)
|
|
)
|
|
|
|
return documents
|
|
|
|
async def scrape(self) -> list[KnowledgeDocument]:
|
|
html = await self.fetch_homepage()
|
|
return self.parse_homepage(html)
|