
Изначально, как оно всегда и бывает, была задача (не курица и не яйцо, а именно задача)! Задача вполне себе нетривиальная - подключиться к партнерской сети и начать продавать на сайте аренду автомобиле. Но у этой ниши есть одна особенность, в лоб, никто ничего не покупает, и нужна тонкая настройка с тонкой геопривязкой, вплоть до координат. Имеющиеся на рынке автопартнерки - либо сильно перемудренные, либо не имеют нормальной (читай простой) системы входа, а те которые упрощены - не имеют нормальной базы, ну или не хотят ее давать партнерам. Мне нужна была база городов и точек выдачи автомобилей и их координатная привязка.
Про координаты я сделаю еще один материал, чтобы не перегружать имеющийся, а в этой разберу кейс, как я получил данные всех точек выдачи практически даром, но с небольшим временным лагом.
Небольшой спойлер - в итоге мне удалось вытянуть примерно 18000 точек (но тут необходимо подключать чистку, определенное количество точек собраны на другом языке). Основная проблема, которая выявилась в результате проб и ошибок - сервер не отдает более 10 объектов, а на одном потоке парсер работал как стахановец, можно сказать на износ, поэтому я дописал к нему многопоток и подключение прокси.
Идея сделать статью родилась спонтанно, и уже по мотивам готового решения. Это скорее нишевый кейс, а не многофункциональный парсер, но если есть желание то этот парсер на Python можно вполне переделать под свою задачу, слегка его модифицировав и поменяв немного логику.
Итак, задача ясна, приступаем к реализации. Нужен парсер на Python. Сперва я вообще подумал, что на сайте можно просто зайти, сформировать урл и перебором далее получить все необходимые данные, но не тут то было. На том конце тоже не дурачки сидят.
Парсер на Python, а также боль и разочарование самоуверенного автоматизатора
Все мои попытки как то упростить задачу, не нагромождать ее - разбивались о суровую реальность. Казалось бы - дико простой дизайн у партнерки, ничего экстраординарного нет - но блин. Каждая новая попытка - провал. А это время… И растянулось оно почти на сутки работы.
Ошибки закаляют
Сперва я пошел простым путем - зашёл на страницу - генератор урлов /landing-page-generator.
В HTML сразу лежит выпадашка <select>
со странами. Вот и решение, прямо на самом видном месте! Просто вытащу все <option>
и сохраню в countries.json
.
Собственно со странами это сработало, но есть одно но - в список затесалась первая строка-заглушка “Select a country”
с id = 0
. Уже позже я пойму, что этот нулевой id будет портить запросы, но по-началу не заметил.
Собственно получив все страны пытаемся забрать вторым проходом все города, немного модифицировав парсер
Делаю запрос …autocomplete?type=city&countryId=<ID>&query=&lang=en
(обратите внимание — query пустой).
При таком подходе в консоли на каждый countryId
появляется: ! ошибка Albania 404 Client Error …&countryId=2&query=
Собственно, какое первое впечатление на подобное? Правильно - Сайт меня банит, ничего не отдаёт! Но, как говориться у дизайнеров - это мы еще посмотрим…

Что я осознал на этом этапе - для city
нельзя запрашивать пустой query, нужна хотя бы одна буква; У 80 % стран есть промежуточный слой region
, но я это не учел.
Добавляю перебор букв, но оставляю прямой путь country → city
Вместо пустого query перебираю
a, b, c … z
.В результате 404 ошибка ушла, но почти все ответы в массиве пустые [].
В чем причина? Сервер ждёт, что сначала спрошу
type=region
. Для стран без регионов (например, Исландия) способ даже вроде сработал, поэтому я долго не мог понять, что не так.
Наконец переходим к этапу, с которого и нужно было начать изначально, но так как “все же просто”, перешел я к нему только с 4 попытки. Включаем инструменты разработчика, и вводим "ca"
в поле City — появился XHR-запрос type=region&countryId=187&query=ca
.
Меняем логику:
country → region (query из двух букв);
region → city (тоже две буквы);
city → location (здесь query можно оставить пустым).
Снова проблемка - минимальная длина query оказалась две буквы, а я продолжил перебирать по одной — сервер снова отдавал пустоту.
И подобные качели туда-сюда продолжались еще какое то время. Дальше описывать все неудачи не вижу смысла, уверен, уже и так стало понятно, что в лоб не получилось…
В результате вот к чему я пришел (и почему сразу не подумал об этом):
Я попробовал сделать обычный GET запрос https://www.discovercars.com/
, и нашел в HTML мета-тег:
<meta name="csrf-token" content="...">
Далее, сделал еще один GET запрос - /en/search/autocomplete/a
с заголовками в которых прописан токен, полученный на противоположном шаге. Собственно, браузер бы отправил такой же заголовок, если в поисковой строке вписать букву а.
Прочитал что вернул сервер (а вернул он массив из 10 объектов, что и стало отправным лимитом для дальнейшего парсера).
Таким образом я понял, что больше 10 объектов за раз сервер не возвращает и нужно подключать метод перебора (назовем его так) букв.
Как устроен мой многопоточный парсер на Python для парсинга данных с сайта автопартнерки
Архитектура «двух очередей» парсера на Python

В парсере центральной фигурой является связка из двух безопасных, как мне показалось, очередей:
Очередь |
Что хранит |
Кто пишет / читает |
prefix_q |
префиксы для обхода (a…z, aa…zz) |
пишутся в несколько потоков, как и читается |
rows_q |
готовые JSON-объекты точек выдачи |
пишется в несколько потоков, читается одним потоком |
Такое разделение избавляет от блокировок и падений: все I/O операции забирает на себя один изолированный поток, в целом такой парсер будет относительно неплохо работать и параллелится даже на Raspberry Pi.
Правда я не пробовал запускать парсер на Raspberry Pi, но ощущение такое есть.
Многопоточность парсера на Python + личный прокси под каждый поток
make_session()
собирает requests.Session
с трёхэтапной страховкой:
User-Agent последней Chrome-ветки — так реже прилетает 403 ошибка (не уверен что конкретно discoverycars сильно защищаются от парсинга - так как сильно нишевая тема, но на других ресурсах такой подход вполне оправдан).
Retry-обёртка (backoff 0.5, 1, 2, 4 с) переживает 502/504 ошибки от Cloudflare.
Прокси из proxies.txt закрепляется за потоком по индексу
(thread_id % len(PROXY_LIST)
). Если прокси меньше, чем тредов, идёт круговое использование, то есть один прокси может использоваться на нескольких потоках).
BFS с «освещением» слепых зон

Алгоритм обходит автодополнение по уровням:
Запросили
auto/a
— DiscoverCars вернул 10 строк? Значит, где-то глубже ещё есть данные. Кладём в очередьaa…az
. Таким подходом на конкретном ресурсе я ушел на уровень 4 букв.Вернуло < 10 строк — все, больше данных нет, префикс закрыт.
Такое освещение гарантирует, что мы не провалимся в бесконечную рекурсию и не оставим тёмными редкие префиксы типа qx.
Дедупликация «на лету»
Каждый JSON имеет уникальную пару location + placeID
. Парсер склеивает два полученных параметра в один и складывает их в оперативку на некоторое время, чтобы проверять дубли, раз в 2 минуты эти данные записываются на диск, что добавляет устойчивости при внезапном падении (их было у меня и было немало).
Многопоток без боли, перезаписи и подобных неприятных моментов
Первый пакет данных формирует список колонок
(fieldnames_ref)
, заодно нормализует порядок (сначала география, потом вся экзотика из API).Запись ведётся под
csv_lock
. Запись на диск как раз и есть это узкое горлышко, но так как запись идет в один поток, он срабатывает реже, чем поступают пакеты данных, поэтомуrows_q
ставится cmaxsize=1000
— очередь разглаживает пики. Если бы запись происходила на многопотоке, то это бы было неэффективно в данной реализации, как раз из-за медленной записи. Я так даже один раз уперся в квоту и первоначальный парсер упал. Но сейчас все гуд и он отрабатывает без падений.
Live-телеметрия прямо в терминале
Наличие этой функции, скорее вкусовщина (и моя гордость... на самом деле нет, но как звучит), а не острая необходимость, ну не люблю я, когда парсер вроде запустился, но по факту будто завис. Вот и запилил подобную штуку тут.
tqdm
показывает общее число выполненных запросов и ETA.rich.Live
параллельно рисует таблицу:
✓ processed, ★ unique, ⏳ queue.
Если вы цепляетесь поssh
и прерываете сеанс ‒ всё, что нужно, останется в логах и вы сможете продолжить с того места, где закончили.
Бекапы и продолжение оттуда, откуда закончил
Каждый BACKUP_EVERY
(по умолчанию 2 мин) производится snapshot:
pickle.dump({"seen_ids": seen_ids,
"queue": list(prefix_q.queue),
"processed": processed_counter[0]},
STATE_PATH.open("wb"))
На новом старте скрипт проверяет, есть ли queue_state.pkl; если да, подхватывает seen_ids и докачивает ровно с того места, где умер.
Производительность
8 потоков + 8 прокси → ≈
40 RPS
на API.Глубина
MAX_DEPTH = 4
покрывает планету (речь идет про парсинг всего мира, если что) с запасом: полный объём (15-18 к префиксов) собирается за 35-40 минут на средне-бюджетномVPS (2 vCPU)
.
Если удалить PROXY_LIST
— парсер превратится в однопоточный, и будет стучатся раз в 0.12 с. Правда я спарсил все нужные мне данные именно на однопоточном парсере и уже когда он отработал 18 часов, я добавил сюда прокси. Не, ну а что, могу себе позволить. И 18 часов превратились в примерно час.
Зачем нужна многопоточность и прокси в моем парсере на Python?
Как я уже писал выше - изначально ни о каком многопотоке я не думал, так как прошли сутки с момента начала решения простенькой задачи и нужно было просто уже найти решение и однопоточный парсер - который реально забирал нужные данные работал, он не падал, и складывал нужную информацию в файл. Поэтому я просто оставил это так как оно было и ушел спать, а утром разобрал результат.
Но цифра в 18 часов общей работы меня просто удивила, так как большую часть времени я вообще не вспоминал о парсере, а остальную часть я спал.
Но согласитесь - 18 часов это 18 часов, но несколько потоков, это все таки несколько потоков!
Потоки × прокси |
Время |
Ср. RPS |
CPU |
RAM |
1 × — |
18 ч 14 м |
0,28 |
5 % |
35 MB |
4 × 4 |
2 ч 11 м |
2,3 |
22 % |
60 MB |
8 × 8 |
35 мин |
9,8 |
39 % |
80 MB |
16 × 8 |
34 мин |
10,1 |
78 % |
140 MB |

И я дополнил парсер на Python этой модификацией, прокси я использовал Floppydata.com. С ними познакомился недавно, но в целом каких либо нареканий у меня к ним нет, ценник вполне себе достойный, да и качество проксей на уровне, относительно конкурентов то уж точно есть с чем сравнить.
Как я понял сервис этот сравнительно молодой, а новички всегда стараются демпинговать цены, чтобы наработать свою базу. Ну а нам это на руку, чем ниже цены - тем интереснее продукт в глазах небогатого фрилансера, правда?
Ну да ладно, отвлекся.
Итак - многопоток, что он дал? Многопоток на этом парсере существенно его бустанул, я бы так сказал, часов на 17, потому что когда я его запустил уже на 8 потоков, он отработал за 1 час полный цикл, что не может ни радовать.
Как запустить парсер на Python для сбора ГЕО данных
Чтобы парсер заработал нужно установить несколько зависимостей, которые можно поставить напрямую через команду, либо поместить в папку с парсером файл requirements.txt
со следующим текстом:
requests
tqdm
rich
И уже командой pip install -r requirements.txt
распаковать все нужные зависимости.
Ну а уже следующей командой запустить и сам парсер.
Также потребуется файл с прокси, назвать его можно как угодно, главное в самом парсере изменить название на свое. Порядок записи проксей следующий:
http://user1:pass1@1.2.3.4:3128
http://user2:pass2@5.6.7.8:3128
И так далее, пока прокси не кончатся.
Собственно сам виновник торжества - парсер на Python для сбора данных с автопартнерки:
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
DiscoverCars: многопоточный BFS-краулер с отдельным прокси на поток.
"""
from __future__ import annotations
import requests, csv, json, html, re, string, time, pathlib, pickle, sys, queue, threading
from collections import Counter
from datetime import datetime, timedelta
from typing import Dict, Any, Iterable, List
from requests.adapters import HTTPAdapter, Retry
from rich.console import Console
from rich.table import Table
from rich.live import Live
from tqdm import tqdm
# ───── константы ──────────────────────────────────────────────────────────
ROOT = "https://www.discovercars.com"
UA = ("Mozilla/5.0 (Windows NT 10.0; Win64; x64) "
"AppleWebKit/537.36 (KHTML, like Gecko) Chrome/137.0.0.0 Safari/537.36")
OUT = pathlib.Path("out"); OUT.mkdir(exist_ok=True)
CACHE_DIR = pathlib.Path("cache"); CACHE_DIR.mkdir(exist_ok=True)
CSV_PATH = OUT / "discovercars_live.csv"
JSON_PATH = OUT / "discovercars_live.jsonl"
STATE_PATH = CACHE_DIR / "queue_state.pkl"
DELAY = 0.12
MAX_DEPTH = 4
BACKUP_EVERY = timedelta(minutes=2)
N_THREADS = 8 # потоков ≈ числу прокси
PROXY_LIST = [l.strip() for l in open("proxies.txt") if l.strip()]
console = Console()
stats = Counter()
# ───── Session factory ────────────────────────────────────────────────────
def make_session(proxy: str | None) -> requests.Session:
s = requests.Session()
s.headers.update({"User-Agent": UA})
retry = Retry(total=5, backoff_factor=0.5,
status_forcelist=[429,500,502,503,504],
allowed_methods=["GET"], raise_on_status=False)
s.mount("https://", HTTPAdapter(max_retries=retry))
if proxy:
s.proxies.update({"http": proxy, "https": proxy})
return s
# ───── CSV helpers (writer-поток) ─────────────────────────────────────────
csv_lock = threading.Lock() # гарантируем атомарность записи
def ensure_csv_header(fieldnames: List[str]):
with csv_lock:
if CSV_PATH.exists():
return
with CSV_PATH.open("w", newline="", encoding="utf-8") as f:
csv.DictWriter(f, fieldnames=fieldnames,
extrasaction="ignore").writeheader()
def append_rows(rows: List[Dict[str,Any]], fieldnames: List[str]):
with csv_lock:
with CSV_PATH.open("a", newline="", encoding="utf-8") as f:
csv.DictWriter(f, fieldnames=fieldnames,
extrasaction="ignore").writerows(rows)
with JSON_PATH.open("a", encoding="utf-8") as jf:
for r in rows:
jf.write(json.dumps(r, ensure_ascii=False) + "\n")
# ───── API helpers (как в соло-версии) ────────────────────────────────────
def get_csrf(sess: requests.Session) -> str:
html_text = sess.get(ROOT, timeout=30).text
return html.unescape(re.search(
r'<meta name="csrf-token"\s+content="([^"]+)"',
html_text)[1])
def api_call(sess: requests.Session, prefix: str, token: str,
retries: int = 3) -> list[Dict[str, Any]]:
url = f"{ROOT}/en/search/autocomplete/{prefix}"
hdr = {
"x-csrf-token": token,
"x-kl-ajax-request": "Ajax_Request",
"x-requested-with": "XMLHttpRequest",
"referer": ROOT + "/",
"accept": "application/json, text/plain, */*",
}
try:
r = sess.get(url, headers=hdr, timeout=60)
stats[str(r.status_code)] += 1
if r.status_code == 404:
return []
r.raise_for_status()
return r.json()
except (requests.exceptions.ReadTimeout,
requests.exceptions.ConnectionError) as e:
if retries:
time.sleep(2)
return api_call(sess, prefix, token, retries-1)
tqdm.write(f"[TIMEOUT] {prefix}: {e}")
return []
# ───── сохранение / загрузка состояния ───────────────────────────────────
def load_previous():
if not STATE_PATH.exists():
return set(), list(string.ascii_lowercase), 0
with STATE_PATH.open("rb") as fh:
state = pickle.load(fh)
console.print("[yellow]⏪ Продолжаю прошлый запуск...[/]")
return state["seen_ids"], state["queue"], state["processed"]
def save_state(seen, queue, processed):
pickle.dump({"seen_ids": seen, "queue": queue,
"processed": processed}, STATE_PATH.open("wb"))
# ───── worker + writer поток ─────────────────────────────────────────────
def writer_thread(rows_q: "queue.Queue[list[Dict]]", fieldnames_ref):
"""Забирает списки строк из очереди и пишет в файлы."""
while True:
rows = rows_q.get()
if rows is None: # сигнал остановки
break
if rows and fieldnames_ref[0] is None:
fieldnames = list({k for r in rows for k in r})
first = ["country","countryID","city","cityID","location","place",
"placeID","lat","lng"]
fieldnames = first + [k for k in fieldnames if k not in first]
fieldnames_ref[0] = fieldnames
ensure_csv_header(fieldnames)
append_rows(rows, fieldnames_ref[0])
rows_q.task_done()
def worker(thread_id: int, prefix_q: "queue.Queue[str]",
rows_q: "queue.Queue[list[Dict]]",
seen_ids: set[str], seen_lock: threading.Lock,
processed_counter, processed_lock: threading.Lock):
sess = make_session(PROXY_LIST[thread_id % len(PROXY_LIST)])
token = get_csrf(sess)
while True:
try:
prefix = prefix_q.get(timeout=3) # timeout → выходим
except queue.Empty:
return
data = api_call(sess, prefix, token)
with processed_lock:
processed_counter[0] += 1
new_rows = []
with seen_lock:
for obj in data:
uid = f"{obj['location']}:{obj['placeID']}"
if uid not in seen_ids:
seen_ids.add(uid)
new_rows.append(obj)
if new_rows:
rows_q.put(new_rows)
# углубляем префикс
if len(data) == 10 and len(prefix) < MAX_DEPTH:
for ch in string.ascii_lowercase:
prefix_q.put(prefix + ch)
prefix_q.task_done()
time.sleep(DELAY)
# ───── многопоточный crawl ───────────────────────────────────────────────
def crawl():
seen_ids, queue_list, processed = load_previous()
prefix_q: "queue.Queue[str]" = queue.Queue()
for p in queue_list:
prefix_q.put(p)
seen_lock = threading.Lock()
processed_lock = threading.Lock()
processed_counter = [processed] # обёртка-список ⇒ byref
rows_q: "queue.Queue[list[Dict]]" = queue.Queue(maxsize=1000)
fieldnames_ref = [None] # by-reference контейнер
writer = threading.Thread(target=writer_thread,
args=(rows_q, fieldnames_ref), daemon=True)
writer.start()
workers = [threading.Thread(
target=worker,
args=(i, prefix_q, rows_q, seen_ids, seen_lock,
processed_counter, processed_lock),
daemon=True)
for i in range(N_THREADS)]
for w in workers: w.start()
last_backup = datetime.utcnow()
with Live(console=console, auto_refresh=False) as live, \
tqdm(total=prefix_q.qsize(),
bar_format="{l_bar}{bar}| {n_fmt} префиксов {elapsed}") as pbar:
prev_processed = processed_counter[0]
while any(w.is_alive() for w in workers):
# обновляем прогресс-бар
new_proc = processed_counter[0] - prev_processed
if new_proc:
pbar.update(new_proc)
pbar.total = prefix_q.qsize() + processed_counter[0]
prev_processed = processed_counter[0]
live.update(Table().add_row(
f"✓ {processed_counter[0]:,}",
f"★ {len(seen_ids):,}",
f"⏳ {prefix_q.qsize():,}"), refresh=True)
# периодический backup
if datetime.utcnow() - last_backup >= BACKUP_EVERY:
save_state(seen_ids, list(prefix_q.queue),
processed_counter[0])
console.print(f"[cyan]? backup "
f"({processed_counter[0]} префиксов)[/]")
last_backup = datetime.utcnow()
time.sleep(1)
# финальные штрихи
rows_q.put(None) # стоп-сигнал writer-потоку
writer.join()
save_state(seen_ids, [], processed_counter[0])
return len(seen_ids), processed_counter[0]
# ───── ENTRY ──────────────────────────────────────────────────────────────
if __name__ == "__main__":
start = time.time()
try:
uniq, proc = crawl()
console.print(f"\n[bold green]✔ Завершено: {uniq:,} точек, "
f"{proc:,} префиксов, {(time.time()-start)/60:.1f} мин.[/]")
console.print(f"[green]CSV[/] → {CSV_PATH}\n"
f"[green]JSONL[/]→ {JSON_PATH}")
except KeyboardInterrupt:
console.print("[red]\n⏹ Остановлено пользователем.[/]")
Когда все готово, зависимости установлены - можем запускать парсер стандартной командой:
python parser.py
Я также собрал под этот парсер репозиторий на Гитхабе - можно напрямую скачать его оттуда, ну и по традиции на своем сайте сделал чуть более рефлексирующую статью, с упором на рефлексию и разбор мыслей - как я писал парсер на Python для решения, задачи по сбору данных.