После того как мы разобрались с парсингом Wildberries, логично двигаться дальше и освоить Ozon. Но здесь нас ждёт сюрприз: Ozon гораздо сложнее парсить из-за динамической загрузки контента и более строгих политик автоматизированного доступа.
В этой статье мы разберём, почему для Ozon нужен браузерный парсинг, как использовать Playwright для успешного парсинга и как обернем решение в Telegram-бота, который по запросу пользователя парсит товары и отправляет CSV-файл.
Статья создана исключительно в образовательных целях и содержит описание работы с общедоступной информацией на Ozon. Для автоматизированного получения информации рекомендуется придерживаться правил Ozon по работе API, используя официальные методы, предоставляемые маркетплейсом.
Почему парсить Ozon сложно
При парсинге Ozon возникает ряд сложностей. Большая часть данных подгружается динамически через JavaScript после загрузки страницы, cайт проверяет, что запросы идут от настоящего браузера, а не от скрипта, cелекторы часто меняются, от чего классы генерируются автоматически.
Поэтому для Ozon нам понадобится браузерная автоматизация через Playwright. Инструмент, который управляет настоящим браузером и может обходить большинство защит.
Нам пригодятся ротируемые IP и возможность частого и быстрого обновления кода. Для этого мы воспользуемся облаком для простого хостинга приложений - Amvera. Это даст возможность обновлять код тремя командами в IDE или через закидывания файлов в интерфейсе и ротирование IP-адресов, снижающее риск блокировки по числу запросов.
Как устроен парсинг Ozon?
Общая схема работы
Поиск товаров. Открываем страницу поиска по запросу пользователя.
Сбор ссылок. Извлекаем ссылки на карточки товаров (обычно парсим 2 страницы — до 15 товаров).
Парсинг каждого товара. Переходим по ссылкам и собираем, название, цену в рублях, рейтинг, количество отзывов.
Формирование результата. Cохраняем данные в DataFrame и конвертируем в CSV.
Подготовка окружения
Для работы нам понадобятся следующие библиотеки:
python-telegram-bot==20.7
pandas==2.1.4
playwright==1.40.0
aiohttp==3.9.1
urllib3==2.1.0
Создайте файл requirements.txt, в который скопируем и вставим приведённые выше библиотеки.
Установка Playwright
Playwright требует дополнительной установки браузеров:
pip install playwright
playwright install chromium
Приступим к написанию самого кода парсера.
1. Создаём класс для парсинга Ozon
Начнём с основы — создадим класс, который будет управлять браузером и парсить данные.
1.1. Инициализация класса
создадим класс OzonParser и его конструктор init, который вызывается при создании объекта класса.
from playwright.async_api import async_playwright
import asyncio
import random
class OzonParser:
def __init__(self):
self.playwright = None
self.browser = None
self.page = None
В нём мы создали переменную для хранения драйвера Playwright. Для управления браузером.также создали переменные для хранения экземпляра браузера Chromium, через который в полседующем будем парсить и создаём переменную для хранения самой вкладки браузера
1.2. Имитация человеческого поведения
У Ozon антибот-системы всегда отслеживают слишком быстрые действия (т.е. люди не кликают моментально, между действиями всегда есть задержки на размышление, движение мыши и т.д.). Рандомная задержка делает бота менее заметным для данных систем. Для этого создадим функцию, которая будет создавать случайную задержку между действиями.
async def human_delay(self, min_sec=1, max_sec=3):
await asyncio.sleep(random.uniform(min_sec, max_sec))
Объявляем асинхронную функцию с параметрами минималльной и максимальной задержки. После чего создаём асинхронную паузу на случайное количество секунд в заданных пределах. Метод await ждёт завершения операции, но не блокирует программу
1.3. Запуск Playwright
Без запуска драйвера мы не сможем открыть браузер и управлять им. Для этого будем использовать фреймворк Playwright
async def setup_browser(self):
self.playwright = await async_playwright().start()
В данной функции мы создаём объект Playwright, который запускает драйвер.
Метод self.playwright сохраняет объект в переменную класса.
1.4. Запуск Chromium
Для обхода блокировки мы будем запускать браузер Chromium с параметрами, которые маскируют автоматизацию. Эти параметры помогут сделать браузер похожим на обычный.
self.browser = await self.playwright.chromium.launch(
headless=True,
args=[
"--disable-blink-features=AutomationControlled",
"--disable-dev-shm-usage",
"--no-sandbox",
]
)
Метод self.playwright.chromium обращается к модулю Chromium в Playwright, после чего с помощью .launch() мы запускаем браузер.
headless=True отвечает за запуск браузера без графического интерфейса. Используйте для отладки значение False, чтобы видеть окно браузера.
Далее вписываем значения аргументов в командной строке Chrome:
"--disable-blink-features=AutomationControlled" убираем управление автоматизацией
"--disable-dev-shm-usage" отключаем использование /dev/shm для Docker/облака
"--no-sandbox" отключаем sandbox для работы в контейнерах html
1.5. Создание контекста
Контекст позволяет работать с браузером как отдельный пользователь (контекст = отдельный профиль со своими куками, localStorage и т.д.). Если создать несколько контекстов, они не будут влиять друг на друга (разные куки, сессии и т.д.).
Далее мы пропишем код, который создаёт изолированную сессию браузера с настройками реального пользователя.
self.context = await self.browser.new_context(
viewport={"width": 1920, "height": 1080},
user_agent="Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36",
java_script_enabled=True,
ignore_https_errors=True
)
Создадим новый контекст с помощью self.browser.new_context() и для него размер окна браузераviewport={"width": 1920, "height": 1080}
Также сайты проверяют User-Agent, чтобы понять откуда пришёл запрос. Поэтому пропишем идентификатор нашего браузера user_agent="Mozilla/5.0..." (в примере прописан для действующей версии Chrome на Mac OS).
Методjava_script_enabled=True включает выполнение JavaScript. Он обязателен для обработки современных сайтов, имеющих скрипт js.
Так как мы используем самоподписанный сертификат, то пропишем ignore_https_errors=True, который будет игнорировать ошибки SSL-сертификатов.
1.6. Маскировка автоматизации
Когда сайт проверяет свойство navigator.webdriver и видит значение true, он определяет браузер как управляемый программой и может заблокировать доступ. Далее пропишем код, который будет внедрять JavaScript-скрипт, чтобы маскировать браузер под пользовательский, скрывая признаки пасинга.
await self.context.add_init_script("""
Object.defineProperty(navigator, 'webdriver', {
get: () => undefined
});
Object.defineProperty(navigator, 'plugins', {
get: () => [1, 2, 3, 4, 5]
});
""")
Метод self.context.add_init_script("""...""") добавляет скрипт инициализации, который будет выполняться перед создание�� любого документа в контексте страницы.
Далее, в строке Object.defineProperty(navigator, 'webdriver', {...}) мы переопределяем свойство webdriver в объекте navigator, который содержит информацию о браузере (в автоматизированных браузерах это свойство обычно равно true). С помощью get: () => undefined мы заставляем его возвращать undefined, как в обычном браузере.
Строка Object.defineProperty(navigator, 'plugins', {...}) изменяет свойство plugins. У автоматизированных браузеров массив плагинов часто пустой. Ozon использует это для детектирования бота. Метод get: () => [1, 2, 3, 4, 5] возвращает массив, имитируя наличие установленных плагинов.
1.7. Создание страницы
self.page = await self.context.new_page()
self.page.set_default_timeout(15000)
self.page.set_default_navigation_timeout(20000)
Таймауты защищают от бесконечного ожидания. Если страница не загрузится за 20 секунд или элемент не появится за 15 секунд, выбросится исключение и мы сможем обработать ошибку.
Разница между таймаутами:
default_timeout— для всех операций (клики, ожидание, ввод текста).navigation_timeout— только для загрузки страниц.
1.8. Формирование URL поиска
Настроим создание URL для поиска товаров на Ozon, правильно кодируя специальные символы.
from urllib.parse import quote
async def fetch_product_links(self, query, pages=2):
encoded_query = quote(query)
search_url = f"https://www.ozon.ru/search/?text={encoded\\_query}&from\\_global=true"
from urllib.parse import quote импортирует функцию для кодирования URL
После этого объявим функцию async def fetch_product_links(self, query, pages=2):, которая служит для получения ссылок на товары.
Функция quote(query) выполняет кодирование строки запроса в URL. Например, запрос "игровая мышь" преобразуется в "игровая%20мышь".
В переменной encoded_query сохраняем результат кодирования для дальнейшего использования в создании URL
Конструкция f"https://www.ozon.ru/search/?text={encoded\_query}&from\_global=true" создаёт строку, которая позволяет динамически вставлять значения переменных.
1.9. Открытие страницы поиска
Далее откроем страницу поиска Ozon и проверим, не перенаправил ли Ozon на страницу категории вместо поиска.
logger.info(f"Поиск: {query}")
await self.page.goto(search_url, wait_until="domcontentloaded", timeout=15000)
await self.human_delay(2, 3)
current_url = self.page.url
if "category" in current_url and "text" not in current_url:
logger.warning("Перенаправление на категорию")
search_url = f"https://www.ozon.ru/search/?text={encoded\\_query}"
await self.page.goto(search_url, wait_until="domcontentloaded", timeout=15000)
await self.human_delay(2, 3)
Свойство self.page.url возвращает текущий адрес страницы.
Условие if "category" in current_url and "text" not in current_url: проверяет, произошло ли перенаправление в категорию вместо отображения результатов поиска. В таком случае код записывает предупреждение в лог через logger.warning(), формирует новый URL без параметра &from_global=true и повторно выполняет переход.
1.10. Ожидание загрузки товаров
Как мы уже сказали, товары в Ozon загружаются динамически через JavaScript. Нужно дождаться их появления прежде чем искать ссылки. Используем множество селекторов для надёжности.
selectors_to_wait = [
"[data-widget='searchResults']",
".widget-search-result-container",
".search-container",
".tile-root",
"div[data-widget*='search']",
".a0c6"
]
for selector in selectors_to_wait:
try:
await self.page.wait_for_selector(selector, timeout=5000)
break
except:
continue
Мы прописали цикл, который перебирает возможные CSS-селекторы контейнеров с товарами, пытаясь найти хотя бы один работающий вариант. Для каждого селектора система ожидает его появление в течение 5 секунд. При успешном обнаружении элемента цикл прерывается, иначе проверяется следующий селектор из списка.
1.11. Подготовка к сбору ссылок
Пропишем код, который инициализирует хранилище для ссылок и ожидает их появления перед началом парсинга.
links = set()
for page_num in range(1, pages + 1):
logger.info(f"Страница {page_num}/{pages}")
try:
await self.page.wait_for_selector("a[href*='/product/']", timeout=10000)
Cоздаём множество объект links для хранения ссылок, после чего начинается цикл по страницам поиска. На каждой странице ожидаем появления хотя бы одной ссылки на товар с помощью селектора a[href*='/product/'] перед продолжением работы.
set() идеален для ссылок, потому что автоматически убирает дубликаты. Один товар может несколько раз появиться на странице (в рекламе, в результатах), но в set он сохранится только один раз.
1.12. Сбор и фильтрация ссылок
Мы подготовили нашу программу к началу парсинга. Теперь можем написать сам код, который находит все элементы с ссылками на странице, используя несколько селекторов, и удаляет дубликаты для получения уникального списка
product_selectors = [
"a[href*='/product/']",
".tile-root a[href*='/product/']",
"[data-widget*='searchResults'] a[href*='/product/']",
"div a[href*='/product/']"
]
elements = []
for selector in product_selectors:
found_elements = await self.page.query_selector_all(selector)
elements.extend(found_elements)
if found_elements:
break
seen_hrefs = set()
unique_elements = []
for e in elements:
href = await e.get_attribute("href")
if href and href not in seen_hrefs:
seen_hrefs.add(href)
unique_elements.append(e)
elements = unique_elements
Здесь наша система последовательно проверяет различные CSS-селекторы для поиска товарных ссылок, начиная с самых общих и переходя к более специфичным. После сбора всех элементов выполняется фильтрация дубликатов путем проверки их атрибутов href
Множество селекторов обеспечивает надежность поиска при возможных изменениях структуры страницы Ozon. Фильтрация по href устраняет дублирующиеся ссылки на один и тот же товар, который может появляться в разных блоках страницы (основные результаты, рекламные вставки, рекомендации).
1.13. Извлечение и обработка URL
Далее, в цикле, мы пройдём по всем найденным HTML-элементам для получения ссылки на товар.
for e in elements:
href = await e.get_attribute("href")
if href and "/product/" in href:
if href.startswith("/"):
full_url = "https://www.ozon.ru" + href
else:
full_url = href
clean_url = full_url.split("?")[0]
if "product" in clean_url.lower() and len(clean_url) > 30:
links.add(clean_url)
logger.info(f"Найдено ссылок: {len(links)}")
Здесь, в строке href = await e.get_attribute("href") мы извлекаем значение атрибута href из каждого элемента.
URL могут быть относительными (/product/123) или абсолютными (https://...). Мы приводим их к единому виду. Параметры запроса (?ref=abc) не нужны — они для аналитики, но товар тот же. Убираем их для получения канонического URL.
Для этого пропишем условия if href and "/product/" in href: и if href.startswith("/"):. Они проверяют, что ссылка существует и содержит путь к товару. Определяем, является ли URL относительным и преобразуем его в абсолютные, добавляя домен:full_url = "https://www.ozon.ru" + href
Далее удаляем все параметры запроса после знака вопроса, оставляя только чистый URL.
И добавляем обработанный URL в множество для исключения дубликатов.
1.14. Переход на следующую страницу
if page_num < pages:
next_selectors = [
"a[aria-label*='Следующая']",
"a[data-widget*='paginator-next']",
"[data-widget='paginator'] a:last-child",
".paginator .next",
"a:has-text('Далее')",
]
next_btn = None
for selector in next_selectors:
next_btn = await self.page.query_selector(selector)
if next_btn:
break
Проверяем if page_num < pages: нужно ли нам вообще переходить дальше или мы уже собрали достаточно страниц.
После чего создаём список из нескольких способов найти кнопку "Далее". Сайты часто меняют дизайн, поэтому используем различные варианты:
ищем по тексту в кнопке ("Следующая", "Далее").
ищем по специальным атрибутам, которые используются в HTML.
ищем по классам стилей СSS.
пробуем найти последнюю кнопку в блоке пагинации.
Начинаем поиск с самого надёжного селектора и постепенно переходим к более общим. Как только находим подходящую кнопку, сразу останавливаем поиск, чтобы не тратить лишнее время.
1.15. Клик по кнопке "Далее"
Клик по кнопке должен быть максимально похож на человеческий и содержать прокрутку, паузу, клик, ожидание. Прописанный нами блок кода позволяет продолжить работу даже если одна страница не загрузилась.
if not next_btn:
logger.warning("Кнопка 'Далее' не найдена")
break
await next_btn.scroll_into_view_if_needed()
await self.human_delay(1, 2)
await next_btn.click()
await self.human_delay(3, 4)
except Exception as e:
logger.warning(f"Ошибка на странице {page_num}: {e}")
continue
Построчное объяснение
Проверяем, если кнопка "Далее" так и не нашлась, значит, страницы закончились. Предупреждаем об этом в логах и прекращаем сбор данных.
Если кнопка нашлась, то, прокручиваем страницу до кнопки, чтобы она оказалась в зоне видимости (браузеры могут игнорировать клики по невидимым элементам), затем делаем небольшую паузу await self.human_delay(1, 2) перед кликом нажимаем по кнопке "Далее" чтобы перейти на следующую страницу, ждём после нажатия, чтобы страница полностью загрузилась.continue — переходим к следующей итерации цикла
1.16. Возврат результата
Пропишем защиту от случайных ссылок на другие сайты или служебные страницы Ozon, которые могли попасть в результаты
filtered_links = [
link for link in links
if "ozon.ru" in link and "/product/" in link
]
logger.info(f"Всего товаров: {len(filtered_links)}")
return filtered_links
Создаём отфильтрованный список filtered_links = [...]
После чего перебираем все ссылки из множества и возвращаем список ссылок
2. Парсинг данных товара
Теперь разберём функции для извлечения информации со страницы товара
2.1. Открытие страницы товара
На случай медленного ответа сервера или тяжёлой страницы с множеством картинок пропишем случайную задержку
async def parse_product(self, url):
try:
logger.info(f"Парсим товар: {url}")
await self.page.goto(url, wait_until="domcontentloaded", timeout=20000)
await self.human_delay(2, 3)
2.2. Ожидание и получение контента
try:
await self.page.wait_for_selector("h1", timeout=10000)
except:
logger.warning(f"Заголовок не найден: {url}")
content = await self.page.content()
Здесь мы ожидаем тега h1 в подгруженной странице await self.page.wait_for_selector("h1", timeout=10000), после чего получаем весь HTML.
Зачем это нужно:
Заголовок h1 обычно загружается одним из первых — если его нет, страница загрузилась некорректно.
2.3. Извлечение названия товара
Название всегда находится в теге h1. Если его нет, записываем "Неизвестно" вместо ошибки.
title_elem = await self.page.query_selector("h1")
if title_elem:
title = await title_elem.text_content()
else:
title = "Неизвестно"
2.4. Вызов парсинга цены и рейтинга
Выносим сложную логику в отдельные функции для читаемости кода.
price = await self.parse_price(content)
rating, reviews = await self.parse_rating_and_reviews()
Здесь мы вызываем функции парсинга цены, рейтинга и отзывов
2.5. Форматирование и возврат результата
Далее мы бдуем правильно форматировать данные для удобного отображения в CSV
formatted_price = f"{price} ₽" if price is not None else None
logger.info(
f"Результат парсинга: {title[:30]}... | "
f"Цена: {formatted_price} | "
f"Рейтинг: {rating} | "
f"Отзывы: {reviews}"
)
return {
"Название": title.strip(),
"Цена": formatted_price,
"Рейтинг": rating,
"Количество отзывов": reviews,
"Ссылка": url
}
except Exception as e:
logger.warning(f"Ошибка парсинга {url}: {e}")
return {
"Название": "Ошибка парсинга",
"Цена": None,
"Рейтинг": None,
"Количество отзывов": None,
"Ссылка": url
}
Здесь мы форматируем цену, после чего в title.strip() удаляем пробелы с краёв и обрабатываем ошибки.
3. Парсинг цены товара
Самая сложная часть в том, что цены могут быть в разных местах и форматах.
3.1. Инициализация и подготовка
async def parse_price(self, content):
import re
import json
price = None
3.2. Поиск цены в JSON
Рекурсивный поиск позволяет найти цену в любом месте вложенной структуры JSON. Мы будем проверять разные варианты названий полей (price, currentPrice и т.д.).
try:
json_patterns = [
r'window\.__INITIAL_STATE__\s*=\s*({.*?});',
r']*data-widget[^>]*>([^<]*)',
]
for pattern in json_patterns:
matches = re.findall(pattern, content, re.DOTALL)
for match in matches:
try:
if isinstance(match, str) and match.startswith('{'):
data = json.loads(match)
def find_price(obj):
if isinstance(obj, dict):
for key, value in obj.items():
if key in ['price', 'currentPrice', 'finalPrice', 'amount'] and value:
if isinstance(value, (int, float)):
return int(value)
elif isinstance(value, str) and value.isdigit():
return int(value)
if isinstance(value, (dict, list)):
result = find_price(value)
if result:
return result
elif isinstance(obj, list):
for item in obj:
result = find_price(item)
if result:
return result
return None
found_price = find_price(data)
if found_price:
price = found_price
break
except Exception as parse_error:
continue
Сначала мы проверяем два самых распространённых места, где сайты хранят информацию о товарах.
Когда находим подходящий блок, пытаемся его "распаковать", а именно преобразовать текстовые данные в структурированный формат, с которым можно работать.
Дальше мы рекурсивно просматриваем всю структуру данных, ищем любые упоминания цены под разными именами (price, currentPrice, finalPrice, amount).
Если находим цену, сохраняем её и прекращаем поиск. Если в одном месте не нашли, то пробуем в другом, пока не переберём все возможные варианты.
А именно
json_patterns = [...] — паттерны для поиска JSON в HTML
re.findall(pattern, content, re.DOTALL) — ищем все совпадения
json.loads(match) — парсим JSON-строку в объект
def find_price(obj): — рекурсивная функция поиска
В if key in ['price', 'currentPrice', ...] проверяем известные ключи
3.3. Поиск цены через селекторы
Пропишем запасной метод на случай если JSON не найден. Ищем визуальный элемент с ценой через CSS-селекторы.
if not price:
price_selectors = [
"[data-widget='webPrice']",
"[data-widget='price']",
".price",
"[class*='price']",
".c3118",
".yo3",
]
for selector in price_selectors:
try:
price_elem = await self.page.query_selector(selector)
if price_elem:
price_text = await price_elem.text_content()
if price_text:
price_match = re.search(
r'(\d[\d\s]*)\s*[₽рубRUB]',
price_text.replace(',', '')
)
if price_match:
price_str = price_match.group(1).replace(' ', '').replace('\u2009', '').replace('\xa0', '')
if price_str.isdigit():
price = int(price_str)
break
except Exception as selector_error:
continue
except Exception as e:
logger.warning(f"Ошибка парсинга цены: {e}")
return price
if not price: — если не нашли в JSON, ищем через селекторы.
re.search(r'(\d[\d\s]*)\s*[₽рубRUB]', ...) регулярное выражение для поиска числа.
price_str.replace(' ', '') убираем пробелы-разделители.
Проверяем что это число с помощью price_str.isdigit().
4. Парсинг рейтинга и отзывов на Ozon
Ещё одна сложная часть, которую нам нужно решить это то, что данные тоже загружаются динамически.
4.1. Инициализация функции
async def parse_rating_and_reviews(self):
rating = None
reviews = None
try:
await self.human_delay(2, 3)
4.2. Поиск рейтинга через селекторы
rating_selectors = [
"span[data-widget='webReviewRating']",
"div[data-widget='webReviewRating']",
"[class*='rating']",
"[class*='Rating']",
".a0c8",
".a2a0",
]
for selector in rating_selectors:
try:
rating_elem = await self.page.query_selector(selector)
if rating_elem:
rating_text = await rating_elem.text_content()
if rating_text:
logger.info(f"Найден текст рейтинга: '{rating_text}'")
rating_match = re.search(r'(\d+\.\d+|\d+)', rating_text.replace(',', '.'))
if rating_match:
rating_val = rating_match.group(1)
try:
rating = float(rating_val)
logger.info(f"Найден рейтинг через селектор {selector}: {rating}")
break
except ValueError:
continue
except Exception as e:
continue
В начале кода прописываем список возможных селекторов
re.search(r'(\d+\.\d+|\d+)', ...) ищем число с точкой или без. Рейтинг может быть в формате "4,8" или "4.8". Поэтому проверяем разные селекторы для надёжности: rating_text.replace(',', '.').
Конвертируем рейтинг в число с плавающей точкой: float(rating_val).
4.3. Поиск рейтинга через regex в HTML
На случай если визуальные элементы не найдены, пропишем запасной метод. Ищем рейтинг в исходном коде страницы.
if rating is None:
try:
content = await self.page.content()
rating_patterns = [
r'"rating":\s*["]?(\d+\.\d+|\d+)["]?',
r'"ratingValue":\s*["]?(\d+\.\d+|\d+)["]?',
r'"averageRating":\s*["]?(\d+\.\d+|\d+)["]?',
r'рейтинг[^"]*?(\d+\.\d+|\d+)',
r'rating[^"]*?(\d+\.\d+|\d+)',
]
for pattern in rating_patterns:
matches = re.findall(pattern, content, re.IGNORECASE)
for match in matches:
try:
rating_candidate = float(match)
if 1 <= rating_candidate <= 5:
rating = rating_candidate
logger.info(f"Найден рейтинг через regex: {rating}")
break
except ValueError:
continue
if rating:
break
except Exception as e:
logger.warning(f"Ошибка при поиске рейтинга в тексте: {e}")
Условие if rating is None:, если не нашли через селекторre.findall(pattern, content, re.IGNORECASE) ищем рейтинг без учёта регистра и проверяем соответствие рейтинга: if 1 <= rating_candidate <= 5:
4.4. Поиск отзывов через селекторы
reviews_selectors = [
"span[data-widget='webReviewCount']",
"a[href*='reviews'] span",
"[class*='review-count']",
"[class*='reviewCount']",
".a0c9",
".a2a1",
]
for selector in reviews_selectors:
try:
reviews_elem = await self.page.query_selector(selector)
if reviews_elem:
reviews_text = await reviews_elem.text_content()
if reviews_text:
logger.info(f"Найден текст отзывов: '{reviews_text}'")
patterns = [
r'(\d+[\d\s]*)\s*(отзыв|review)',
r'(\d+[\d\s]*)',
]
for pattern in patterns:
reviews_match = re.search(pattern, reviews_text, re.IGNORECASE)
if reviews_match:
reviews_str = reviews_match.group(1).replace(' ', '').replace('\u2009', '').replace('\xa0', '')
if reviews_str.isdigit():
reviews = int(reviews_str)
logger.info(f"Найдены отзывы через селектор {selector}: {reviews}")
break
except Exception as e:
continue
В данном коде мы пробуем разные подходы:
ищем по специальным атрибутам, которые используют разработчики.
ищем в ссылках на отзывы.
ищем по классам, которые могут содержать
reviewпробуем конкретные классы, которые часто используются на Ozon.
Когда находим элемент с текстом (например, "145 отзывов"), нам нужно "вытащить" оттуда чистое число. Для этого используем регулярные выражения, которые помогают найти цифры в тексте, даже если они перемешаны со словами и пробелами(r'(\d+[\d\s]*)\s*(отзыв|review)' - паттерн для текста "125 отзывов")
Количество отзывов может содержать пробелы ("1 284 отзывов"). Удаляем их перед конвертацией в число.
4.5. Поиск отзывов через regex в HTML
Если мы не смогли найти количество отзывов в видимых элементах на странице (например, потому что сайт изменил дизайн или скрыл эту информацию), то мы будем парсить через исходный код страницы.
if reviews is None:
try:
content = await self.page.content()
reviews_patterns = [
r'"reviewCount":\s*["]?(\d+)["]?',
r'"reviewsCount":\s*["]?(\d+)["]?',
r'"review_count":\s*["]?(\d+)["]?',
r'отзыв[ов]*[^"]*?(\d+)',
r'review[s]*[^"]*?(\d+)',
]
for pattern in reviews_patterns:
matches = re.findall(pattern, content, re.IGNORECASE)
for match in matches:
if match.isdigit():
reviews_candidate = int(match)
if reviews_candidate > 0:
reviews = reviews_candidate
logger.info(f"Найдены отзывы через regex: {reviews}")
break
if reviews:
break
except Exception as e:
logger.warning(f"Ошибка при поиске отзывов в тексте: {e}")
Каждый найденный элемент проверяем. Это должно быть положительное число или ноль.
Если находим подходящее число, то записываем его. Если один шаблон не сработал, тогда пробуем следующий, пока не переберём все варианты
4.6. Поиск по структуре страницы
Последний запасной метод, это поиск элементов которые содержат только число от 1 до 5 (вероятно рейтинг).
if rating is None:
try:
possible_rating_elements = await self.page.query_selector_all("span, div, button")
for elem in possible_rating_elements:
try:
text = await elem.text_content()
if text:
rating_match = re.search(r'^\d+\.\d+, text.strip())
if rating_match:
rating_val = float(text.strip())
if 1 <= rating_val <= 5:
rating = rating_val
logger.info(f"Найден рейтинг в элементе: {rating}")
break
except:
continue
except Exception as e:
pass
return rating, reviews
Если рейтинг ещё не найден предыдущими способами, мы используем "метод тыка", а именно проверяем все подходящие элементы на странице в поисках числа, которое выглядит как рейтинг: query_selector_all("span, div, button")
Собираем все элементы, которые могут содержать рейтинг: текстовые блоки (span), контейнеры (div) и кнопки (button).
Для каждого элемента смотрим его текстовое содержимое и проверяем:
Это число с точкой (например, "4.5" или "4.8"). Число находится в разумном диапазоне для рейтинга (от 1 до 5). r'^\d+\.\d+ — паттерн для чистого числа (только рейтинг)
5. Создание Telegram-бота для демонстрации работы
Теперь создадим бота который будет использовать наш парсер. Через BotFather создайте вашего бота и скопируйте ваш токен
5.1. Настройки бота
import logging
from telegram import Update
from telegram.ext import Application, CommandHandler, MessageHandler, filters, ContextTypes
import tempfile
import os
from datetime import datetime
import pandas as pd
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger("ozon_bot")
TELEGRAM_BOT_TOKEN = "ВАШ_ТОКЕН_БОТА"
MAX_PRODUCTS = 15
PAGES_TO_PARSE = 2
ozon_parser = OzonParser()
from telegram import Update импортирует объект обновления
from telegram.ext import Application импортирует приложение бота
TELEGRAM_BOT_TOKEN = "..." токен от BotFather.
Cоздаём экземпляр парсера: ozon_parser = OzonParser()
5.2. Команда /start
async def start_command(update: Update, context: ContextTypes.DEFAULT_TYPE):
welcome_message = (
"Бот для парсинга товаров с Ozon.\n\n"
"Просто отправьте мне название товара или категорию, например:\n"
"• 'ноутбук'\n"
"• 'крем для лица'\n"
"• 'игровая мышь'\n"
"• 'телефон samsung'\n\n"
"Я найду товары и пришлю вам CSV файл с результатами!\n\n"
"**Теперь в результатах:**\n"
"• Название товара\n"
"• Цена в рублях (₽)\n"
"• Рейтинг товара (⭐)\n"
"• Количество отзывов\n"
"• Ссылка на товар\n\n"
"Парсинг занимает 1-3 минуты..."
)
await update.message.reply_text(welcome_message)
Функция async def start_command(update: Update, context: ContextTypes.DEFAULT_TYPE): объявляет асинхронную функцию-обработчик для команды /start в Telegram боте.
Переменная welcome_message содержит многострочное приветственное сообщение с эмодзи для лучшей визуализации.
Команда await update.message.reply_text(welcome_message) отправляет подготовленное сообщение обратно пользователю в чат.
5.3. Обработка текстовых сообщений (часть 1)
Здесь мы будем реализовывать обработку сообщений в боте
async def handle_message(update: Update, context: ContextTypes.DEFAULT_TYPE):
user_message = update.message.text.strip()
if len(user_message) < 2:
await update.message.reply_text(
"Пожалуйста, введите более конкретный запрос (минимум 2 символа)"
)
return
progress_message = await update.message.reply_text(
f"Ищу товары по запросу: '{user_message}'\n"
f"Это займёт 1-3 минуты..."
)
Бот будет работать по следующему алгоритму:
убирает случайные пробелы в начале и конце.
проверяет, не слишком ли короткий запрос — если меньше 2 символов, просит уточнить.
сообщает, что начал поиск и примерно сколько это займёт.
5.4. Обработка текстовых сообщений (часть 2)
После того как пользователь отправил запрос, мы запускаем основной процесс:
try:
df, stats_message = await ozon_parser.search_products(
query=user_message,
pages=PAGES_TO_PARSE,
max_products=MAX_PRODUCTS
)
if df is not None and len(df) > 0:
with tempfile.NamedTemporaryFile(
mode='w',
suffix='.csv',
delete=False,
encoding='utf-8-sig'
) as tmp_file:
df.to_csv(tmp_file.name, index=False, encoding='utf-8-sig')
tmp_filename = tmp_file.name
Здесь мы отправляем поисковый запрос в наш парсер с указанием сколько страниц и товаров нужно собрать.
Получаем результаты — парсер возвращает два значения:
df, stats_message = await ozon_parser.search_products(...) вызываем парсинг.
if df is not None and len(df) > 0: проверяем что есть результаты.
Создаём временный файл CSV, который автоматически удалится после использования: tempfile.NamedTemporaryFile(...).
df.to_csv(...) — сохраняем DataFrame в CSV.
5.5. Отправка результатов
try:
await update.message.reply_text(stats_message)
with open(tmp_filename, 'rb') as file:
await update.message.reply_document(
document=file,
filename=f"ozon_{user_message}_{datetime.now().strftime('%Y%m%d_%H%M')}.csv",
caption=f"Файл с результатами по запросу: '{user_message}'"
)
await update.message.reply_text(stats_message) отправляет сообщение в чат.
with open(tmp_filename, 'rb') as file: открывает файл в бинарном режиме.
await update.message.reply_document(...) отправляет файл.
datetime.now().strftime('%Y%m%d_%H%M') форматирует дату и время.
5.6. Превью топ-3 товаров
После того как мы собрали все товары, показываем пользователю результаты:
top_products = df.head(3)
preview_message = "Топ 3 товара:\n\n"
for i, (_, row) in enumerate(top_products.iterrows(), 1):
price_str = row['Цена'] if pd.notna(row['Цена']) else "Цена не указана"
rating_str = f" {row['Рейтинг']}" if pd.notna(row['Рейтинг']) else "Нет рейтинга"
reviews_str = f" {row['Количество отзывов']} отзывов" if pd.notna(row['Количество отзывов']) else "Нет отзывов"
preview_message += (
f"{i}. {row['Название'][:50]}...\n"
f" {price_str} | {rating_str} | {reviews_str}\n\n"
)
await update.message.reply_text(preview_message)
finally:
os.unlink(tmp_filename)
Выбираем первые три позиции из отсортированного списка: df.head(3)
После чего перебираем с нумерацией enumerate(top_products.iterrows(), 1)
Короткое название (первые 50 символов, чтобы не растягивать сообщение)
Цену, если она есть, рейтинг, количество отзывов.
Обрабатываем отсутствующие данные — если чего-то нет (например, рейтинга или цены), пишем понятное сообщение вместо числа.
Наконец, удаляем временный файл: os.unlink(tmp_filename)
5.7. Обработка ошибок
Если товары не найдены, ловим любую ошибку и записываем в лог
else:
await update.message.reply_text(stats_message)
except Exception as e:
logger.error(f"Ошибка: {e}")
await update.message.reply_text(
"Произошла непредвиденная ошибка. Попробуйте позже."
)
6. Метод search_products
Главный метод парсера который координирует всю работу.
6.1. Запуск парсинга
Напишем главную функцию парсинга, в которой будет реализован запуск браузера, сбор ссылок и проверка товаров
async def search_products(self, query, pages=2, max_products=15):
try:
await self.setup_browser()
links = await self.fetch_product_links(query, pages)
if not links:
return None, "Не найдено товаров по вашему запросу"
6.2. Парсинг товаров
Обрабатываем найденные ссылки на товары с ограничением количества и задержками между запросами.
results = []
max_products = min(max_products, len(links))
for i, url in enumerate(links[:max_products], 1):
logger.info(f"[{i}/{max_products}] Парсим товар")
product = await self.parse_product(url)
results.append(product)
if i < max_products:
await self.human_delay(1, 2)
max_products = min(max_products, len(links)) не берём больше чем нашли.
enumerate(links[:max_products], 1) перебираем с нумерацией.
results.append(product) добавляем результат в список.
6.3. Формирование статистики
Формируем итоговую таблицу с товарами и статистику по собранным данным.
df = pd.DataFrame(results)
total = len(df)
with_prices = df['Цена'].notna().sum()
with_ratings = df['Рейтинг'].notna().sum()
with_reviews = df['Количество отзывов'].notna().sum()
stats_message = (
f" Статистика парсинга:\n"
f"• Всего товаров: {total}\n"
f"• С ценами: {with_prices}\n"
f"• С рейтингами: {with_ratings}\n"
f"• С отзывами: {with_reviews}\n"
f"• Запрос: '{query}'"
)
return df, stats_message
Создаём удобную таблицу DataFrame, с которой легко работать.
Считаем статистику. Анализируем, сколько товаров имеют указанные цены, рейтинги, информацию об отзывах, формируем отчёт.
Возвращаем таблицу с данными, и текстовую сводку, чтобы пользователь мог посмотреть детали и получить общее представление о результате.
6.4. Обработка ошибок и закрытие браузера
except Exception as e:
logger.error(f"Критическая ошибка: {e}")
return None, f"ошибка при парсинге: {str(e)}"
finally:
await self.close_browser()
async def close_browser(self):
if self.browser:
await self.browser.close()
if self.playwright:
await self.playwright.stop()
Закрываем браузер с помощью: await self.close_browser()
if self.browser: проверяем, что браузер был запущен.
7. Запуск бота
Финальная часть заключается в создании точки входа в программу.
7.1. Функция main
Создадим приложение нашего бота.
def main():
application = Application.builder().token(TELEGRAM_BOT_TOKEN).build()
application.add_handler(CommandHandler("start", start_command))
application.add_handler(
MessageHandler(filters.TEXT & ~filters.COMMAND, handle_message)
)
logger.info("бот запущен")
application.run_polling()
if __name__ == "__main__":
main()
add_handler(CommandHandler("start", ...)) — регистрируем команду /start
application.run_polling() — запускаем бота в режиме опроса.
if name == "__main__": — запускается только при прямом вызове.
Запуск на удаленном сервере
Запуск мы произведем в Amvera Cloud. Это даст нам возможность быстрого и простого обновляения кода. Можно просто перетягивать файлы в интерфейсе или привязать свой Git-репозиторий из которого облако автоматически будет делать развертывание.
Помимо этого мы получим ротируемые IP, снижающие риск получения 429 ошибки.
Зависимости и файлы проекта
Требуется подготовить requirements.txt со всеми библиотеками, которые использует бот (pandas, playwright, telegram, urllib3 и др.), приложить докерфайл ниже и заполнить файл конфигрурации в интерфейсе (в нем указать docker - docker).
В нашем случае мы используем Dockerfile со следующим содержимым:
FROM joyzoursky/python-chromedriver:3.9
WORKDIR /app
COPY . /app
RUN pip install -r requirements.txt
CMD ["python", "ozon_telegram_bot.py"]
Все файлы для загрузки кроме файла amvera.yaml (его делаем в интерфейсе) уже собраны в репозитории Github.
Далее просто перетягиваем файлы в окно загрузки или делаем git push в привязанный репозиторий. После этого Amvera все соберет, настроит и запустит.
Проблемы и решения при парсинге Ozon.
Если бот не находит товары
Обновите селекторы в функции fetch_product_links():
selectors = [
"a[href*='/product/']",
"[data-widget*='searchResults'] a",
".tile-root a",
]
Если не парсятся цены
Добавьте задержку и ожидание загрузки:
await self.page.wait_for_selector("[data-widget='webPrice']", timeout=10000)
await self.human_delay(2, 3)
Бот блокируется Ozon
Увеличьте задержки между запросами:
await self.human_delay(3, 5)
Разверните проект в Amvera Cloud. Это даст вам ротируемые(изменяющиеся) IP, снижающие риск блокировки.
Заключение
Мы успешно создали автоматизированного бота для парсинга Ozon с использованием Playwright и Telegram Bot API. Скрипт может работать круглосуточно, предоставляя пользователям актуальную информацию о товарах в удобном формате CSV.
Ссылка на GitHub проекта с полным кодом парсера Ozon.
При реальной работе с Ozon я рекомендую придерживаться их правил использования ресурса.
Комментарии (5)

olegkusov
26.10.2025 10:59Парсинг в РФ запрещен так то

ovchinnikovproger Автор
26.10.2025 10:59В РФ много что запрещено, но точно не парсинг. Яндекс же парсит все сайты своим краулером аж с 1997-ого года, и это абсолютно законно. Как и еще тысячи других компаний автоматизируют извлечение информации. По моим данным даже маркетплейсы парсят другие сайты для дозаполнения карточек товаров и подобного.
LazyTalent
Через браузер много не спарсишь, в идеале реверс инжинирить апишку, а если много защиты, то надо снифить трафик с мобильного приложения - как правило, там мало защиты или её нет совсем