Привет!
Меня зовут Александр, я руковожу backend-разработкой в КТS. Сегодня расскажу, как написать асинхронный краулер.
Такая задача часто встречается на практике, когда нужно реализовать периодическую синхронизацию/обкачку между сервисами.
Статья написана по мотивам вебинара, который мы провели в рамках нового курса «Асинхронное программирование на Python для начинающих». Курс стартует 18 октября, поэтому, если вам интересно — загляните посмотреть.
Что будет в статье:
Цель
У нас есть краулер, который обкачивает страницы. Это может быть поисковый бот Google, который ходит по сайтам, скачивает данные, кладет в базу и индексирует, или какой-нибудь агрегатор: аптек, маркетплейсов и т.д.
Задача в том, что краулер должен работать и не положить сервис, который он обкачивает.
Код для начала работы:
import asyncio
from dataclasses import dataclass
from typing import Optional
class Pool:
def __init__(self, max_rate: int, interval: int = 1, concurrent_level: Optional[int] = None):
self.max_rate = max_rate
self.interval = interval
self.concurrent_level = concurrent_level
async def start(pool):
await asyncio.sleep(5)
def main():
loop = asyncio.get_event_loop()
try:
loop.run_until_complete(start())
except KeyboardInterrupt:
loop.close()
if __name__ == '__main__':
main()
Краулеру нужно посетить и скачать много страниц, следовательно, много раз обратиться к ресурсу. Мы можем позволить себе отправлять много запросов, но сервис, на который мы приходим, может не выдержать большой нагрузки. Поэтому к источнику данных нужно ходить управляемо — сделать rate-limit.
Если в какой-то момент задача прервалась, или мы сами решили остановить краулер, нужно сделать корректную и аккуратную остановку работы. Для этого начатые задачи должны завершиться, а новые задачи из очереди должны прекратить поступать.
Исходный код
У нас есть сущность Pool. Эта сущность умеет управлять количеством запросов в единицу времени. Pool принимает:
max_rate — максимальное количество запросов
interval — интервал. Если мы передаем значения max_rate = 5 и interval = 1, в секунду может исполняться 5 запросов
concurrent_level — обозначает допустимое количество параллельных запросов
max_rate и concurrent_level могут не совпадать, когда время выполнения запроса больше, чем interval. Например, мы делаем 5 запросов в секунду, как заявлено в переменных, но API все равно отвечает медленнее. Чтобы не положить сервис, мы вводим переменную concurrent_level.
Планировщик
Для начала нужно написать что-то, что позволит делать ровно 5 запросов в секунду, не обращая внимание на время запроса. Для этого мы запустим планировщик, который назовем scheduler. Он будет просыпаться раз в секунду и ставить количество задач, равное max_rate. Планировщик не ждет их исполнения, просто создает 5 задач каждую секунду.
Дополним class Pool и напишем функцию scheduler:
from task import Task
class Pool:
def __init__(self, max_rate: int, interval: int = 1, concurrent_level: Optional[int] = None):
self.max_rate = max_rate
self.interval = interval
self.concurrent_level = concurrent_level
self.is_running = False
async def _scheduler(self):
while self.is_running:
for _ in range(self.max_rate):
pass
Обратите внимание на две вещи:
функция бесконечная, пока работает наш краулер
раз в период функция выполняет max_rate раз какое-то действие
Задача для краулера
Scheduler должен откуда-то взять задачи, которые нужно запланировать. Для этого нам нужно сделать очередь, которую мы возьмем из библиотеки asyncio. Примитив называется asyncio.Queue(). В class Pool дописываем:
class Pool:
def __init__(self, max_rate: int, interval: int = 1, concurrent_level: Optional[int] = None):
self.max_rate = max_rate
self.interval = interval
self.concurrent_level = concurrent_level
self.is_running = False
self._queue = asyncio.Queue()
Теперь мы просыпаемся раз в интервал и получаем количество задач, равное max_rate. Но нужно что-то сделать, чтобы они исполнялись.
Для этого в asyncio есть функция create_task. Она запускает выполнение корутины, но при этом не дожидается ее исполнения, а создает фоновую задачу. В create_task передадим метод perform.
async def _scheduler(self):
while self.is_running:
for _ in range(self.max_rate):
task = await self._queue.get()
asyncio.create_task(task.perform))
await asyncio.sleep(self.interval)
Пробный запуск
Давайте попробуем все это запустить. Сделаем функцию start и таким же образом запустим scheduler. Нам нужно не ждать его, а просто запустить в фоне корутину с помощью create_task:
async def _scheduler(self):
while self.is_running:
for _ in range(self.max_rate):
task = await self._queue.get()
asyncio.create_task(self._worker(task))
await asyncio.sleep(self.interval)
def start(self):
self.is_running = True
asyncio.create_task(self._scheduler())
В будущем для корректного завершения работы краулера нужно завершить работу scheduler. Для этого нужно вызвать cancel у задачи, поэтому возвращаемое значение из create_task мы сохраняем в переменную scheduler_task:
class Pool:
def __init__(self, max_rate: int, interval: int = 1, concurrent_level: Optional[int] = None):
self.max_rate = max_rate
self.interval = interval
self.concurrent_level = concurrent_level
self.is_running = False
self._queue = asyncio.Queue()
self._scheduler_task: Optional[asyncio.Task] = None
Выставим rate-limit на 3 и внутри start запустим наш Pool:
def start(self):
self.is_running = True
self._scheduler_task = asyncio.create_task(self._scheduler())
async def start(pool):
pool = Pool(3)
pool.start()
await asyncio.sleep(5)
Запускаем и видим, что ничего не произошло:
Это потому, что внутри очереди ничего нет. Мы сделали старт и поспали 5 секунд, а на момент окончания задачи у нас осталась фоновая задача scheduler.
Промежуточный итог
У нас есть Pool с параметрами:
— ограничение количества запросов max_rate
— интервал активизации планировщика interval
— максимальное количество параллельных запросов concurrent_levelМы написали планировщик scheduler, который работает постоянно, просыпается раз в объявленный интервал, достает из очереди max_rate задач и запускает их исполнение.
Задача task — просто дата-класс с функцией perform. Для описания поведения задачи нужно создать класс-наследник и в нем переопределить perform.
Еще мы написали функцию start, в которой выставили признак работы is_running и в фоне запустили наш планировщик.
Функции put и join
Перед тем, как запустить Pool, попробуем положить туда задачку. Для этого напишем функцию put, которая принимает задачу и кладет ее в нашу внутреннюю очередь.
Дополнительно добавим tid (task_id) и print в код задачи:
import asyncio
from dataclasses import dataclass
@dataclass
class Task:
tid: int
async def perform(self, pool):
print('start perform', self.tid)
await asyncio.sleep(3)
print('complete perform', self.tid)
И добавим 10 задач перед стартом pool:
async def start(pool):
pool = Pool(3)
for tid in range(10):
await pool.put(Task(tid))
pool.start()
await asyncio.sleep(5)
Добавим еще кое-что. У стандартной библиотеки queue есть метод join. Тогда краулер будет ждать не 5 секунд, как мы указали в начале, а до тех пор, пока очередь не опустеет:
async def start(pool):
pool = Pool(3)
for tid in range(10):
await pool.put(Task(tid))
pool.start()
await pool.join()
Запустим и посмотрим, что произойдет:
Хотя все зависло, планировщик работал.
Вы можете увидеть, что задача выполняется 3 секунды. И, несмотря на то, что предыдущие задачи еще не завершились, планировщик все равно создает новые. Это плохо, потому что если API отвечает медленнее, чем мы шлем к нему запросы, есть вероятность «положить» сервис. Эту проблему мы решим чуть позже.
Чтобы join отработал, нужно помечать задачи выполненными. Не будем усложнять код scheduler и сделаем отдельную функцию _worker. В нее перенесем perform и ниже добавим self._queue.task_done(). Это означает, что задачу мы выполнили:
async def _worker(self, task: Task):
await task.perform(self)
self._queue.task_done()
Обратите внимание, что _worker вызывается без await, потому что scheduler не должен ждать его завершения. Иначе он не успеет запланировать задачи.
В scheduler вместо perform нужно передать _worker и task:
async def _scheduler(self):
while self.is_running:
for _ in range(self.max_rate):
task = await self._queue.get()
asyncio.create_task(self._worker(task))
await asyncio.sleep(self.interval)
Снова попробуем запустить:
Программа завершилась, но осталось предупреждение о том, что scheduler остался работать в фоне. Функцию stop напишем чуть позже.
Semaphore
На этом этапе видим, что:
метод start запускает наш Pool и планировщик scheduler
планировщик раз в секунду ставит новые задачи и запускает _worker
_worker эти задачи выполняет
метод join ждет, пока очередь не станет пустой
Если время выполнения задач больше интервала активизации планировщика (interval), он накидывает дополнительные задачи сверху тех, которые еще не выполнились.
В таком случае количество параллельных запросов к сервису за interval будет больше rate_limit. Поэтому нужно ограничить количество параллельных запросов. Для этого нам потребуется переменная concurrent_level, которая по умолчанию равна rate_limit.
В asyncio есть примитив синхронизации Semaphore. С его помощью можно ограничить количество параллельных исполняемых worker. Если количество запланированных задач больше заданного значения, мы ждем их исполнения. В нашем примере задач 3.
Объявим Semaphore и передадим в него либо concurrent_level, либо max_rate.
Когда worker начинает исполняться, нам нужно занять Semaphore. Для этого используем «асинхронный контекстный менеджер»: async with self._sem. Мы занимаем Semaphore, пока не закончатся операции ниже — await task.perform(self) и self._queue.task_done().
async def _worker(self, task: Task):
async with self._sem:
await task.perform(self)
self._queue.task_done()
Добавим Semaphore внутрь scheduler, чтобы scheduler не запускал новые worker'ы, если количество параллельных worker'ов уже достигло максимума:
async def _scheduler(self):
while self.is_running:
for _ in range(self.max_rate):
async with self._sem:
task = await self._queue.get()
asyncio.create_task(self._worker(task))
await asyncio.sleep(self.interval)
Запускаем:
Мы добавили 3 задачи и ждем, пока они исполнятся. Таким образом мы соблюдаем максимальное параллельное количество запросов.
Остановка фонового планировщика
У нас осталась проблема с корректным завершением планировщика. После завершения остановки краулера появляется предупреждение о незавершенной корутине.
Чтобы этого не было, напишем функцию stop:
async def stop(self):
self.is_running = False
self._scheduler_task.cancel()
Теперь после того, как внутри пула закончатся задачи, его нужно корректно остановить. Добавим метод stop в конце функции start:
async def start():
pool = Pool(3)
for tid in range(10):
await pool.put(Task(tid))
pool.start()
await pool.join()
await pool.stop()
Попробуем:
Теперь все работает корректно.
Мы остановили планировщик, когда задачи в очереди закончились. Но если мы остановим краулер в процессе работы, начнут появляться предупреждения о том, что какая-то задача не завершилась:
А чем больше время выполнения perform, тем больше будет таких уведомлений.
Поэтому нам нужно ожидать, когда все worker завершатся. Для этого введем дополнительную переменную, обозначающую количество параллельно работающих worker: concurrent_workers. Изначально она равна 0. При запуске воркера мы увеличиваем concurrent_workers на 1. При выходе, наоборот, уменьшаем на 1:
async def _worker(self, task: FetchTask):
async with self._sem:
self._cuncurrent_workers += 1
await task.perform(self)
self._queue.task_done()
self._cuncurrent_workers -= 1
Теперь нужно как-то сказать функции stop, что все параллельные worker завершились. Это произойдет, когда is_running будет false и concurrent_workers станет равной 0.
Для этого есть примитив синхронизации Event. В нашем коде мы добавим его в Pool и назовем stop_event. Это переменная, на которой можно ждать await self._stop_event.wait() до тех пор, пока кто-то не вызовет self._stop_event.set():
class Pool:
def __init__(self, max_rate: int, interval: int = 1, concurrent_level: Optional[int] = None):
self.max_rate = max_rate
self.interval = interval
self.concurrent_level = concurrent_level
self.is_running = False
self._queue = asyncio.Queue()
self._scheduler_task: Optional[asyncio.Task] = None
self._sem = asyncio.Semaphore(concurrent_level or max_rate)
self._cuncurrent_workers = 0
self._stop_event = asyncio.Event()
Если равна, то все worker завершили свою работу, планировщик отменен и не создает новые задачи. В таком случае все компоненты Pool остановлены или завершили свою работу — программу можно завершать.
Но если concurrent_workers не равна 0, нам нужно внутри метода stop подождать событие stop_event:
async def stop(self):
self.is_running = False
self._scheduler_task.cancel()
if self._cuncurrent_workers != 0:
await self._stop_event.wait()
Когда Pool остановлен, последний работающий worker должен отправить уведомление:
async def _worker(self, task: FetchTask):
async with self._sem:
self._cuncurrent_workers += 1
await task.perform(self)
self._queue.task_done()
self._cuncurrent_workers -= 1
if not self.is_running and self._cuncurrent_workers == 0:
self._stop_event.set()
Обновим функцию main, чтобы все корректно работало:
def main():
loop = asyncio.get_event_loop()
pool = Pool(3)
try:
loop.run_until_complete(start(pool))
except KeyboardInterrupt:
loop.run_until_complete(pool.stop())
loop.close()
Теперь все работает. После нажатия Ctrl + C выполняются оставшиеся задачи, и программа завершается:
Работа краулера на примере обкачки нашего блога на Хабре
Мы реализовали механику пула на нашей абстрактной задачке task.
Для следующего этапа я подготовил задачу FetchTask.
fetch_task
MAX_DEPTH = 2
PARSED_URLS = set()
@dataclass
class FetchTask(Task):
url: URL
depth: int
def parser(self, data: str) -> List['FetchTask']:
if self.depth + 1 > MAX_DEPTH:
return []
soup = BeautifulSoup(data, 'lxml')
res = []
for link in soup.find_all('a', href=True):
new_url = URL(link['href'])
if new_url.host is None and new_url.path.startswith('/'):
new_url = URL.build(
scheme=self.url.scheme,
host=self.url.host,
path=new_url.path,
query_string=new_url.query_string
)
if new_url in PARSED_URLS:
continue
PARSED_URLS.add(new_url)
res.append(FetchTask(
tid=self.tid,
url=new_url,
depth=self.depth + 1
))
return res
async def perform(self, pool):
async with aiohttp.ClientSession() as session:
async with session.get(self.url) as resp:
print(self.url, resp.status)
data = await resp.text()
res: List[FetchTask] = await asyncio.get_running_loop().run_in_executor(
None, self.parser, data
)
for task in res:
await pool.put(task)
Внутри функции parcer есть переменная soup, которая объявлена как soup = BeautifulSoup(data, ’lxml’). Дам небольшие пояснения.
BeautifulSoup — парсер для анализа HTML/XML.
lxml — реализация HTML/XML парсера. Из-за GIL мы специально запускаем res внутри функции perform через executor:
async def perform(self, pool):
async with aiohttp.ClientSession() as session:
async with session.get(self.url) as resp:
print(self.url, resp.status)
data = await resp.text()
res: List[FetchTask] = await asyncio.get_running_loop().run_in_executor(
None, self.parser, data
)
for task in res:
await pool.put(task)
GIL — блокировка, которая запрещает параллельные потоки в Python. Но если вы пишите расширение на С, есть возможность «отпустить» GIL.
Парсер lxml написан на С. У себя под капотом он умеет отпускать GIL и выполняться в отдельном потоке. Это относится и к некоторым другим расширениям: https://lxml.de/2.0/FAQ.html#id1
В fetch_task также переопределяем функцию perform, в которой нужно сходить в сеть. Для этого я взял aiohttp client.
В задаче FetchTask мы идем на указанный URL, оттуда получаем данные и запускаем executor для их обработки. Нужно взять все ссылки в документе, перейти на них и тоже обкачать:
def parser(self, data: str) -> List['FetchTask']:
if self.depth + 1 > MAX_DEPTH:
return []
soup = BeautifulSoup(data, 'lxml')
res = []
for link in soup.find_all('a', href=True):
new_url = URL(link['href'])
if new_url.host is None and new_url.path.startswith('/'):
new_url = URL.build(
scheme=self.url.scheme,
host=self.url.host,
path=new_url.path,
query_string=new_url.query_string
)
if new_url in PARSED_URLS:
continue
PARSED_URLS.add(new_url)
res.append(FetchTask(
tid=self.tid,
url=new_url,
depth=self.depth + 1
))
return res
В конце мы добавляем в результат новую задачу и увеличиваем на 1 глубину depth.
Например, когда мы поставили задачку habr.com, глубина была равна 1. Мы скачали этот документ, в котором есть и другие ссылки: блоги Mail.ru, Yandex или KTS. Когда мы стали обкачивать следующие страницы, глубина увеличилась до 2. Этот параметр нужен для ограничения количества обкачиваемых ресурсов, фактически — глубины.
Обратите внимание, что у нас есть список посещенных страничек PARSED_URLS. Так мы не будем дважды посещать одни и те же страницы.
Теперь импортируем задачи в краулер из fetch_task и изменяем start:
async def start(pool):
await pool.put(
FetchTask(URL('https://habr.com/ru/company/kts/blog/'), 1)
)
pool.start()
await pool.join()
await pool.stop()
Выставляем 3 запроса в секунду и смотрим, как наш краулер потихоньку обкачивает Хабр:
Спасибо за внимание
На этом все! Спасибо всем, кто дочитал статью.
Если сталкивались с подобными задачами, пожалуйста, поделитесь своим опытом в комментариях.
flint
Практически все фреймворки для скрапинга асинхронные. От Scrapy до менее известных вроде Grab. Чем отличается ваш подход?
mercifulcarnifex
Думаю, тут задача была продемонстрировать возможности и api asyncio в первую очередь, а задача для этого хорошо подходит
alexopryshko Автор
Код написан в рамках учебного вебинара по asyncio. Его целью было показать как пользоваться некоторыми компонентами библиотеки, а задача "краулер" хорошо для этого подходит.