310 lines
10 KiB
Python
310 lines
10 KiB
Python
|
|
from __future__ import annotations
|
|||
|
|
|
|||
|
|
import json
|
|||
|
|
import re
|
|||
|
|
from datetime import datetime, timezone
|
|||
|
|
from pathlib import Path
|
|||
|
|
from urllib.parse import urljoin
|
|||
|
|
|
|||
|
|
import httpx
|
|||
|
|
from bs4 import BeautifulSoup
|
|||
|
|
|
|||
|
|
from .config import settings
|
|||
|
|
from .models import MenuItem, MenuSnapshot
|
|||
|
|
|
|||
|
|
|
|||
|
|
SHOP_PAYLOAD_MARKERS = (
|
|||
|
|
"MsJsShop.init(",
|
|||
|
|
"MsJsPublishedManager.addJsData(",
|
|||
|
|
)
|
|||
|
|
SIZE_PATTERN = re.compile(
|
|||
|
|
r"(\d+\s*(?:см|г|мл)(?:\s*/\s*\d+\s*(?:см|г|мл))*)",
|
|||
|
|
re.IGNORECASE,
|
|||
|
|
)
|
|||
|
|
|
|||
|
|
|
|||
|
|
def normalize_spaces(value: str) -> str:
|
|||
|
|
return " ".join(value.replace("\xa0", " ").split())
|
|||
|
|
|
|||
|
|
|
|||
|
|
def compact_text(value: str) -> str:
|
|||
|
|
return re.sub(r"\s+", "", value.replace("\xa0", " ")).lower()
|
|||
|
|
|
|||
|
|
|
|||
|
|
def parse_price(price_label: str) -> int | None:
|
|||
|
|
cleaned = normalize_spaces(price_label).lower()
|
|||
|
|
if "бесплатно" in cleaned:
|
|||
|
|
return None
|
|||
|
|
|
|||
|
|
digits = re.sub(r"[^\d]", "", cleaned)
|
|||
|
|
return int(digits) if digits else None
|
|||
|
|
|
|||
|
|
|
|||
|
|
def parse_ingredients(description: str) -> list[str]:
|
|||
|
|
cleaned = normalize_spaces(description)
|
|||
|
|
if not cleaned:
|
|||
|
|
return []
|
|||
|
|
|
|||
|
|
lower_cleaned = cleaned.lower()
|
|||
|
|
if lower_cleaned.startswith("состав:"):
|
|||
|
|
cleaned = cleaned.split(":", 1)[1].strip()
|
|||
|
|
|
|||
|
|
return [part.strip() for part in cleaned.split(",") if part.strip()]
|
|||
|
|
|
|||
|
|
|
|||
|
|
def extract_size(*values: str) -> str | None:
|
|||
|
|
for value in values:
|
|||
|
|
match = SIZE_PATTERN.search(value)
|
|||
|
|
if match:
|
|||
|
|
return match.group(1).replace(" ", "")
|
|||
|
|
return None
|
|||
|
|
|
|||
|
|
|
|||
|
|
def is_size_only_line(value: str) -> bool:
|
|||
|
|
size = extract_size(value)
|
|||
|
|
return size is not None and compact_text(value) == compact_text(size)
|
|||
|
|
|
|||
|
|
|
|||
|
|
def extract_first_json_object(html: str, marker: str) -> dict[str, object]:
|
|||
|
|
marker_index = html.find(marker)
|
|||
|
|
if marker_index == -1:
|
|||
|
|
raise ValueError(f"{marker} payload not found in page")
|
|||
|
|
|
|||
|
|
object_start = html.find("{", marker_index)
|
|||
|
|
if object_start == -1:
|
|||
|
|
raise ValueError("Shop payload start not found")
|
|||
|
|
|
|||
|
|
depth = 0
|
|||
|
|
in_string = False
|
|||
|
|
escaped = False
|
|||
|
|
object_end = None
|
|||
|
|
|
|||
|
|
for index in range(object_start, len(html)):
|
|||
|
|
char = html[index]
|
|||
|
|
|
|||
|
|
if in_string:
|
|||
|
|
if escaped:
|
|||
|
|
escaped = False
|
|||
|
|
elif char == "\\":
|
|||
|
|
escaped = True
|
|||
|
|
elif char == '"':
|
|||
|
|
in_string = False
|
|||
|
|
continue
|
|||
|
|
|
|||
|
|
if char == '"':
|
|||
|
|
in_string = True
|
|||
|
|
elif char == "{":
|
|||
|
|
depth += 1
|
|||
|
|
elif char == "}":
|
|||
|
|
depth -= 1
|
|||
|
|
if depth == 0:
|
|||
|
|
object_end = index + 1
|
|||
|
|
break
|
|||
|
|
|
|||
|
|
if object_end is None:
|
|||
|
|
raise ValueError("Shop payload end not found")
|
|||
|
|
|
|||
|
|
return json.loads(html[object_start:object_end])
|
|||
|
|
|
|||
|
|
|
|||
|
|
def find_shop_container(payload: object) -> dict[str, object] | None:
|
|||
|
|
if isinstance(payload, dict):
|
|||
|
|
shop = payload.get("shop")
|
|||
|
|
if isinstance(shop, dict) and isinstance(shop.get("products"), list):
|
|||
|
|
return payload
|
|||
|
|
|
|||
|
|
ds_shop = payload.get("dsShop")
|
|||
|
|
if isinstance(ds_shop, dict) and isinstance(ds_shop.get("data"), list):
|
|||
|
|
return {
|
|||
|
|
"shop": {
|
|||
|
|
"products": ds_shop.get("data", []),
|
|||
|
|
"settings": ds_shop.get("settings", {}),
|
|||
|
|
}
|
|||
|
|
}
|
|||
|
|
|
|||
|
|
for value in payload.values():
|
|||
|
|
found = find_shop_container(value)
|
|||
|
|
if found:
|
|||
|
|
return found
|
|||
|
|
|
|||
|
|
if isinstance(payload, list):
|
|||
|
|
for value in payload:
|
|||
|
|
found = find_shop_container(value)
|
|||
|
|
if found:
|
|||
|
|
return found
|
|||
|
|
|
|||
|
|
return None
|
|||
|
|
|
|||
|
|
|
|||
|
|
def extract_shop_payload(html: str) -> dict[str, object]:
|
|||
|
|
errors: list[str] = []
|
|||
|
|
for marker in SHOP_PAYLOAD_MARKERS:
|
|||
|
|
try:
|
|||
|
|
payload = extract_first_json_object(html, marker)
|
|||
|
|
except ValueError as exc:
|
|||
|
|
errors.append(str(exc))
|
|||
|
|
continue
|
|||
|
|
|
|||
|
|
shop_container = find_shop_container(payload)
|
|||
|
|
if shop_container is not None:
|
|||
|
|
return shop_container
|
|||
|
|
|
|||
|
|
errors.append(f"{marker} found, but shop container is missing")
|
|||
|
|
|
|||
|
|
raise ValueError("; ".join(errors) or "Shop payload not found in page")
|
|||
|
|
|
|||
|
|
|
|||
|
|
def html_fragment_to_lines(fragment: str) -> list[str]:
|
|||
|
|
if not fragment:
|
|||
|
|
return []
|
|||
|
|
|
|||
|
|
soup = BeautifulSoup(fragment, "html.parser")
|
|||
|
|
return [
|
|||
|
|
normalize_spaces(line)
|
|||
|
|
for line in soup.get_text("\n", strip=True).splitlines()
|
|||
|
|
if normalize_spaces(line)
|
|||
|
|
]
|
|||
|
|
|
|||
|
|
|
|||
|
|
class GorichMenuScraper:
|
|||
|
|
def __init__(self) -> None:
|
|||
|
|
self.site_url = settings.site_url
|
|||
|
|
self.output_path = Path(settings.output_path)
|
|||
|
|
self.timeout = settings.request_timeout
|
|||
|
|
|
|||
|
|
async def fetch_html(self) -> str:
|
|||
|
|
async with self._build_client() as client:
|
|||
|
|
response = await client.get(self.site_url)
|
|||
|
|
response.raise_for_status()
|
|||
|
|
return response.text
|
|||
|
|
|
|||
|
|
def _build_client(self) -> httpx.AsyncClient:
|
|||
|
|
return httpx.AsyncClient(
|
|||
|
|
headers={"User-Agent": "Mozilla/5.0"},
|
|||
|
|
follow_redirects=True,
|
|||
|
|
timeout=self.timeout,
|
|||
|
|
)
|
|||
|
|
|
|||
|
|
def parse_menu(self, html: str) -> MenuSnapshot:
|
|||
|
|
payload = extract_shop_payload(html)
|
|||
|
|
shop = payload.get("shop") or {}
|
|||
|
|
if not isinstance(shop, dict):
|
|||
|
|
raise ValueError("Shop payload has unexpected format")
|
|||
|
|
|
|||
|
|
shop_settings = shop.get("settings") or {}
|
|||
|
|
categories = shop_settings.get("categories") or []
|
|||
|
|
products = shop.get("products") or []
|
|||
|
|
if not isinstance(categories, list) or not isinstance(products, list):
|
|||
|
|
raise ValueError("Shop categories or products have unexpected format")
|
|||
|
|
|
|||
|
|
category_by_id: dict[int, dict[str, object]] = {}
|
|||
|
|
for category in categories:
|
|||
|
|
if not isinstance(category, dict):
|
|||
|
|
continue
|
|||
|
|
category_id = category.get("id")
|
|||
|
|
if isinstance(category_id, int):
|
|||
|
|
category_by_id[category_id] = category
|
|||
|
|
|
|||
|
|
scraped_at = datetime.now(timezone.utc)
|
|||
|
|
items: list[MenuItem] = []
|
|||
|
|
|
|||
|
|
for product in products:
|
|||
|
|
if not isinstance(product, dict):
|
|||
|
|
continue
|
|||
|
|
if not product.get("is_visible", True):
|
|||
|
|
continue
|
|||
|
|
|
|||
|
|
product_id = product.get("id")
|
|||
|
|
name = normalize_spaces(str(product.get("name", "")))
|
|||
|
|
if not product_id or not name:
|
|||
|
|
continue
|
|||
|
|
|
|||
|
|
raw_description = str(product.get("short_description", "") or "")
|
|||
|
|
description_lines = html_fragment_to_lines(raw_description)
|
|||
|
|
size = extract_size(name, *description_lines)
|
|||
|
|
description_parts = [line for line in description_lines if not is_size_only_line(line)]
|
|||
|
|
description = " ".join(description_parts).strip()
|
|||
|
|
if not description and description_lines:
|
|||
|
|
description = " ".join(description_lines).strip()
|
|||
|
|
|
|||
|
|
raw_category_ids = [
|
|||
|
|
category_id
|
|||
|
|
for category_id in product.get("category_list", [])
|
|||
|
|
if isinstance(category_id, int)
|
|||
|
|
]
|
|||
|
|
sorted_category_ids = sorted(
|
|||
|
|
raw_category_ids,
|
|||
|
|
key=lambda category_id: int(category_by_id.get(category_id, {}).get("pos", 10_000)),
|
|||
|
|
)
|
|||
|
|
category_name = "прочее"
|
|||
|
|
primary_category_id: int | None = None
|
|||
|
|
if sorted_category_ids:
|
|||
|
|
primary_category_id = sorted_category_ids[0]
|
|||
|
|
category_name = normalize_spaces(
|
|||
|
|
str(category_by_id.get(primary_category_id, {}).get("name", "прочее"))
|
|||
|
|
).lower()
|
|||
|
|
|
|||
|
|
image_url = ""
|
|||
|
|
image_list = product.get("image_list", [])
|
|||
|
|
if isinstance(image_list, list):
|
|||
|
|
for image in image_list:
|
|||
|
|
if not isinstance(image, dict):
|
|||
|
|
continue
|
|||
|
|
raw_url = str(image.get("url", "") or "")
|
|||
|
|
if raw_url:
|
|||
|
|
image_url = urljoin(self.site_url, raw_url)
|
|||
|
|
break
|
|||
|
|
|
|||
|
|
price = product.get("price")
|
|||
|
|
numeric_price = int(price) if isinstance(price, int) else None
|
|||
|
|
currency = normalize_spaces(str(product.get("currency", "руб.") or "руб."))
|
|||
|
|
price_label = (
|
|||
|
|
f"{numeric_price} {currency}" if numeric_price is not None else "Цена не указана"
|
|||
|
|
)
|
|||
|
|
|
|||
|
|
description_url = str(product.get("description_url", "") or "")
|
|||
|
|
source_url = urljoin(self.site_url, description_url) if description_url else self.site_url
|
|||
|
|
|
|||
|
|
items.append(
|
|||
|
|
MenuItem(
|
|||
|
|
item_id=str(product_id),
|
|||
|
|
name=name,
|
|||
|
|
category=category_name,
|
|||
|
|
description=description,
|
|||
|
|
ingredients=parse_ingredients(description),
|
|||
|
|
price=parse_price(price_label),
|
|||
|
|
price_label=price_label,
|
|||
|
|
size=size,
|
|||
|
|
photo_url=image_url,
|
|||
|
|
source_url=source_url,
|
|||
|
|
scraped_at=scraped_at,
|
|||
|
|
metadata={
|
|||
|
|
"category_id": primary_category_id,
|
|||
|
|
"category_ids": sorted_category_ids,
|
|||
|
|
"raw_short_description": raw_description,
|
|||
|
|
"amount": product.get("amount"),
|
|||
|
|
"sku": product.get("sku"),
|
|||
|
|
},
|
|||
|
|
)
|
|||
|
|
)
|
|||
|
|
|
|||
|
|
return MenuSnapshot(
|
|||
|
|
source_url=self.site_url,
|
|||
|
|
scraped_at=scraped_at,
|
|||
|
|
total_items=len(items),
|
|||
|
|
items=items,
|
|||
|
|
)
|
|||
|
|
|
|||
|
|
def save_snapshot(self, snapshot: MenuSnapshot) -> None:
|
|||
|
|
self.output_path.parent.mkdir(parents=True, exist_ok=True)
|
|||
|
|
self.output_path.write_text(
|
|||
|
|
json.dumps(snapshot.model_dump(mode="json"), ensure_ascii=False, indent=2),
|
|||
|
|
encoding="utf-8",
|
|||
|
|
)
|
|||
|
|
|
|||
|
|
async def scrape_and_save(self) -> MenuSnapshot:
|
|||
|
|
html = await self.fetch_html()
|
|||
|
|
snapshot = self.parse_menu(html)
|
|||
|
|
self.save_snapshot(snapshot)
|
|||
|
|
return snapshot
|