from __future__ import annotations import json import re from datetime import datetime, timezone from pathlib import Path from urllib.parse import urljoin import httpx from bs4 import BeautifulSoup from .config import settings from .models import MenuItem, MenuSnapshot SHOP_PAYLOAD_MARKERS = ( "MsJsShop.init(", "MsJsPublishedManager.addJsData(", ) SIZE_PATTERN = re.compile( r"(\d+\s*(?:см|г|мл)(?:\s*/\s*\d+\s*(?:см|г|мл))*)", re.IGNORECASE, ) def normalize_spaces(value: str) -> str: return " ".join(value.replace("\xa0", " ").split()) def compact_text(value: str) -> str: return re.sub(r"\s+", "", value.replace("\xa0", " ")).lower() def parse_price(price_label: str) -> int | None: cleaned = normalize_spaces(price_label).lower() if "бесплатно" in cleaned: return None digits = re.sub(r"[^\d]", "", cleaned) return int(digits) if digits else None def parse_ingredients(description: str) -> list[str]: cleaned = normalize_spaces(description) if not cleaned: return [] lower_cleaned = cleaned.lower() if lower_cleaned.startswith("состав:"): cleaned = cleaned.split(":", 1)[1].strip() return [part.strip() for part in cleaned.split(",") if part.strip()] def extract_size(*values: str) -> str | None: for value in values: match = SIZE_PATTERN.search(value) if match: return match.group(1).replace(" ", "") return None def is_size_only_line(value: str) -> bool: size = extract_size(value) return size is not None and compact_text(value) == compact_text(size) def extract_first_json_object(html: str, marker: str) -> dict[str, object]: marker_index = html.find(marker) if marker_index == -1: raise ValueError(f"{marker} payload not found in page") object_start = html.find("{", marker_index) if object_start == -1: raise ValueError("Shop payload start not found") depth = 0 in_string = False escaped = False object_end = None for index in range(object_start, len(html)): char = html[index] if in_string: if escaped: escaped = False elif char == "\\": escaped = True elif char == '"': in_string = False continue if char == '"': in_string = True elif char == "{": depth += 1 elif char == "}": depth -= 1 if depth == 0: object_end = index + 1 break if object_end is None: raise ValueError("Shop payload end not found") return json.loads(html[object_start:object_end]) def find_shop_container(payload: object) -> dict[str, object] | None: if isinstance(payload, dict): shop = payload.get("shop") if isinstance(shop, dict) and isinstance(shop.get("products"), list): return payload ds_shop = payload.get("dsShop") if isinstance(ds_shop, dict) and isinstance(ds_shop.get("data"), list): return { "shop": { "products": ds_shop.get("data", []), "settings": ds_shop.get("settings", {}), } } for value in payload.values(): found = find_shop_container(value) if found: return found if isinstance(payload, list): for value in payload: found = find_shop_container(value) if found: return found return None def extract_shop_payload(html: str) -> dict[str, object]: errors: list[str] = [] for marker in SHOP_PAYLOAD_MARKERS: try: payload = extract_first_json_object(html, marker) except ValueError as exc: errors.append(str(exc)) continue shop_container = find_shop_container(payload) if shop_container is not None: return shop_container errors.append(f"{marker} found, but shop container is missing") raise ValueError("; ".join(errors) or "Shop payload not found in page") def html_fragment_to_lines(fragment: str) -> list[str]: if not fragment: return [] soup = BeautifulSoup(fragment, "html.parser") return [ normalize_spaces(line) for line in soup.get_text("\n", strip=True).splitlines() if normalize_spaces(line) ] class GorichMenuScraper: def __init__(self) -> None: self.site_url = settings.site_url self.output_path = Path(settings.output_path) self.timeout = settings.request_timeout async def fetch_html(self) -> str: async with self._build_client() as client: response = await client.get(self.site_url) response.raise_for_status() return response.text def _build_client(self) -> httpx.AsyncClient: return httpx.AsyncClient( headers={"User-Agent": "Mozilla/5.0"}, follow_redirects=True, timeout=self.timeout, ) def parse_menu(self, html: str) -> MenuSnapshot: payload = extract_shop_payload(html) shop = payload.get("shop") or {} if not isinstance(shop, dict): raise ValueError("Shop payload has unexpected format") shop_settings = shop.get("settings") or {} categories = shop_settings.get("categories") or [] products = shop.get("products") or [] if not isinstance(categories, list) or not isinstance(products, list): raise ValueError("Shop categories or products have unexpected format") category_by_id: dict[int, dict[str, object]] = {} for category in categories: if not isinstance(category, dict): continue category_id = category.get("id") if isinstance(category_id, int): category_by_id[category_id] = category scraped_at = datetime.now(timezone.utc) items: list[MenuItem] = [] for product in products: if not isinstance(product, dict): continue if not product.get("is_visible", True): continue product_id = product.get("id") name = normalize_spaces(str(product.get("name", ""))) if not product_id or not name: continue raw_description = str(product.get("short_description", "") or "") description_lines = html_fragment_to_lines(raw_description) size = extract_size(name, *description_lines) description_parts = [line for line in description_lines if not is_size_only_line(line)] description = " ".join(description_parts).strip() if not description and description_lines: description = " ".join(description_lines).strip() raw_category_ids = [ category_id for category_id in product.get("category_list", []) if isinstance(category_id, int) ] sorted_category_ids = sorted( raw_category_ids, key=lambda category_id: int(category_by_id.get(category_id, {}).get("pos", 10_000)), ) category_name = "прочее" primary_category_id: int | None = None if sorted_category_ids: primary_category_id = sorted_category_ids[0] category_name = normalize_spaces( str(category_by_id.get(primary_category_id, {}).get("name", "прочее")) ).lower() image_url = "" image_list = product.get("image_list", []) if isinstance(image_list, list): for image in image_list: if not isinstance(image, dict): continue raw_url = str(image.get("url", "") or "") if raw_url: image_url = urljoin(self.site_url, raw_url) break price = product.get("price") numeric_price = int(price) if isinstance(price, int) else None currency = normalize_spaces(str(product.get("currency", "руб.") or "руб.")) price_label = ( f"{numeric_price} {currency}" if numeric_price is not None else "Цена не указана" ) description_url = str(product.get("description_url", "") or "") source_url = urljoin(self.site_url, description_url) if description_url else self.site_url items.append( MenuItem( item_id=str(product_id), name=name, category=category_name, description=description, ingredients=parse_ingredients(description), price=parse_price(price_label), price_label=price_label, size=size, photo_url=image_url, source_url=source_url, scraped_at=scraped_at, metadata={ "category_id": primary_category_id, "category_ids": sorted_category_ids, "raw_short_description": raw_description, "amount": product.get("amount"), "sku": product.get("sku"), }, ) ) return MenuSnapshot( source_url=self.site_url, scraped_at=scraped_at, total_items=len(items), items=items, ) def save_snapshot(self, snapshot: MenuSnapshot) -> None: self.output_path.parent.mkdir(parents=True, exist_ok=True) self.output_path.write_text( json.dumps(snapshot.model_dump(mode="json"), ensure_ascii=False, indent=2), encoding="utf-8", ) async def scrape_and_save(self) -> MenuSnapshot: html = await self.fetch_html() snapshot = self.parse_menu(html) self.save_snapshot(snapshot) return snapshot