Files
GorichBot/menu_scraper/app/scraper.py
T
2026-05-12 23:37:04 +03:00

310 lines
10 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
from __future__ import annotations
import json
import re
from datetime import datetime, timezone
from pathlib import Path
from urllib.parse import urljoin
import httpx
from bs4 import BeautifulSoup
from .config import settings
from .models import MenuItem, MenuSnapshot
SHOP_PAYLOAD_MARKERS = (
"MsJsShop.init(",
"MsJsPublishedManager.addJsData(",
)
SIZE_PATTERN = re.compile(
r"(\d+\s*(?:см|г|мл)(?:\s*/\s*\d+\s*(?:см|г|мл))*)",
re.IGNORECASE,
)
def normalize_spaces(value: str) -> str:
return " ".join(value.replace("\xa0", " ").split())
def compact_text(value: str) -> str:
return re.sub(r"\s+", "", value.replace("\xa0", " ")).lower()
def parse_price(price_label: str) -> int | None:
cleaned = normalize_spaces(price_label).lower()
if "бесплатно" in cleaned:
return None
digits = re.sub(r"[^\d]", "", cleaned)
return int(digits) if digits else None
def parse_ingredients(description: str) -> list[str]:
cleaned = normalize_spaces(description)
if not cleaned:
return []
lower_cleaned = cleaned.lower()
if lower_cleaned.startswith("состав:"):
cleaned = cleaned.split(":", 1)[1].strip()
return [part.strip() for part in cleaned.split(",") if part.strip()]
def extract_size(*values: str) -> str | None:
for value in values:
match = SIZE_PATTERN.search(value)
if match:
return match.group(1).replace(" ", "")
return None
def is_size_only_line(value: str) -> bool:
size = extract_size(value)
return size is not None and compact_text(value) == compact_text(size)
def extract_first_json_object(html: str, marker: str) -> dict[str, object]:
marker_index = html.find(marker)
if marker_index == -1:
raise ValueError(f"{marker} payload not found in page")
object_start = html.find("{", marker_index)
if object_start == -1:
raise ValueError("Shop payload start not found")
depth = 0
in_string = False
escaped = False
object_end = None
for index in range(object_start, len(html)):
char = html[index]
if in_string:
if escaped:
escaped = False
elif char == "\\":
escaped = True
elif char == '"':
in_string = False
continue
if char == '"':
in_string = True
elif char == "{":
depth += 1
elif char == "}":
depth -= 1
if depth == 0:
object_end = index + 1
break
if object_end is None:
raise ValueError("Shop payload end not found")
return json.loads(html[object_start:object_end])
def find_shop_container(payload: object) -> dict[str, object] | None:
if isinstance(payload, dict):
shop = payload.get("shop")
if isinstance(shop, dict) and isinstance(shop.get("products"), list):
return payload
ds_shop = payload.get("dsShop")
if isinstance(ds_shop, dict) and isinstance(ds_shop.get("data"), list):
return {
"shop": {
"products": ds_shop.get("data", []),
"settings": ds_shop.get("settings", {}),
}
}
for value in payload.values():
found = find_shop_container(value)
if found:
return found
if isinstance(payload, list):
for value in payload:
found = find_shop_container(value)
if found:
return found
return None
def extract_shop_payload(html: str) -> dict[str, object]:
errors: list[str] = []
for marker in SHOP_PAYLOAD_MARKERS:
try:
payload = extract_first_json_object(html, marker)
except ValueError as exc:
errors.append(str(exc))
continue
shop_container = find_shop_container(payload)
if shop_container is not None:
return shop_container
errors.append(f"{marker} found, but shop container is missing")
raise ValueError("; ".join(errors) or "Shop payload not found in page")
def html_fragment_to_lines(fragment: str) -> list[str]:
if not fragment:
return []
soup = BeautifulSoup(fragment, "html.parser")
return [
normalize_spaces(line)
for line in soup.get_text("\n", strip=True).splitlines()
if normalize_spaces(line)
]
class GorichMenuScraper:
def __init__(self) -> None:
self.site_url = settings.site_url
self.output_path = Path(settings.output_path)
self.timeout = settings.request_timeout
async def fetch_html(self) -> str:
async with self._build_client() as client:
response = await client.get(self.site_url)
response.raise_for_status()
return response.text
def _build_client(self) -> httpx.AsyncClient:
return httpx.AsyncClient(
headers={"User-Agent": "Mozilla/5.0"},
follow_redirects=True,
timeout=self.timeout,
)
def parse_menu(self, html: str) -> MenuSnapshot:
payload = extract_shop_payload(html)
shop = payload.get("shop") or {}
if not isinstance(shop, dict):
raise ValueError("Shop payload has unexpected format")
shop_settings = shop.get("settings") or {}
categories = shop_settings.get("categories") or []
products = shop.get("products") or []
if not isinstance(categories, list) or not isinstance(products, list):
raise ValueError("Shop categories or products have unexpected format")
category_by_id: dict[int, dict[str, object]] = {}
for category in categories:
if not isinstance(category, dict):
continue
category_id = category.get("id")
if isinstance(category_id, int):
category_by_id[category_id] = category
scraped_at = datetime.now(timezone.utc)
items: list[MenuItem] = []
for product in products:
if not isinstance(product, dict):
continue
if not product.get("is_visible", True):
continue
product_id = product.get("id")
name = normalize_spaces(str(product.get("name", "")))
if not product_id or not name:
continue
raw_description = str(product.get("short_description", "") or "")
description_lines = html_fragment_to_lines(raw_description)
size = extract_size(name, *description_lines)
description_parts = [line for line in description_lines if not is_size_only_line(line)]
description = " ".join(description_parts).strip()
if not description and description_lines:
description = " ".join(description_lines).strip()
raw_category_ids = [
category_id
for category_id in product.get("category_list", [])
if isinstance(category_id, int)
]
sorted_category_ids = sorted(
raw_category_ids,
key=lambda category_id: int(category_by_id.get(category_id, {}).get("pos", 10_000)),
)
category_name = "прочее"
primary_category_id: int | None = None
if sorted_category_ids:
primary_category_id = sorted_category_ids[0]
category_name = normalize_spaces(
str(category_by_id.get(primary_category_id, {}).get("name", "прочее"))
).lower()
image_url = ""
image_list = product.get("image_list", [])
if isinstance(image_list, list):
for image in image_list:
if not isinstance(image, dict):
continue
raw_url = str(image.get("url", "") or "")
if raw_url:
image_url = urljoin(self.site_url, raw_url)
break
price = product.get("price")
numeric_price = int(price) if isinstance(price, int) else None
currency = normalize_spaces(str(product.get("currency", "руб.") or "руб."))
price_label = (
f"{numeric_price} {currency}" if numeric_price is not None else "Цена не указана"
)
description_url = str(product.get("description_url", "") or "")
source_url = urljoin(self.site_url, description_url) if description_url else self.site_url
items.append(
MenuItem(
item_id=str(product_id),
name=name,
category=category_name,
description=description,
ingredients=parse_ingredients(description),
price=parse_price(price_label),
price_label=price_label,
size=size,
photo_url=image_url,
source_url=source_url,
scraped_at=scraped_at,
metadata={
"category_id": primary_category_id,
"category_ids": sorted_category_ids,
"raw_short_description": raw_description,
"amount": product.get("amount"),
"sku": product.get("sku"),
},
)
)
return MenuSnapshot(
source_url=self.site_url,
scraped_at=scraped_at,
total_items=len(items),
items=items,
)
def save_snapshot(self, snapshot: MenuSnapshot) -> None:
self.output_path.parent.mkdir(parents=True, exist_ok=True)
self.output_path.write_text(
json.dumps(snapshot.model_dump(mode="json"), ensure_ascii=False, indent=2),
encoding="utf-8",
)
async def scrape_and_save(self) -> MenuSnapshot:
html = await self.fetch_html()
snapshot = self.parse_menu(html)
self.save_snapshot(snapshot)
return snapshot