? В этой статье я покажу как решить одну из проблем, возникающих при использовании распределенных очередей задач — регулирование пропускной способности очереди, или же, более простым языком, настройка ее rate limit'a. В качестве примера я возьму python и свою любимую связку Celery+RabbitMQ, хотя алгоритм, который я использую, никак не зависит от этих инструментов и может быть реализован на любом другом стэке.


Celery+RabbitMQ


So what's the problem?


? Для начала пара слов о том, какую проблему я вообще пытаюсь решить. Дело в том, что 99.9% сервисов в интернете запрещают бесконтрольно закидывать их сотнями/тысячами запросов в секунду, угрожая дать в ответ какой-нибудь 403 или 500. Нет, ну правда, жалко им чтоле? Иногда таким сервисом может выступать даже своя собственная БД… Вобщем, доверять нынче нельзя никому, поэтому приходится себя как-то сдерживать.


? Конечно, если вся работа ведется внутри 1го процесса, то никакой проблемы нет, но т.к мы работаем с Celery, то у нас может быть не только N процессов (далее воркеров), но и M машин, и задача все это дело синхронизировать уже не кажется столь тривиальной.


What's in the box


? Первое, на что натыкаешься, когда ищешь, как же настроить throttling в celery, это встроенный параметр rate_limit класса Task. Звучит как то, что надо, но, копнув чуть глубже, замечаем, что:


Нельзя задать rate limit на группу задач.
Это неудобно, т.к зачастую доступ к какому-то лимитированому ресурсу размазан между разными тасками.

   # представим что у нас лимит на вызовы API гитхаба 60 req/min
   # придется поделить вызовы поровну
   @app.task(rate_limit='30/m')
   def get_github_api1():
       ...

   @app.task(rate_limit='30/m')
   def get_github_api2():
       ...

Этот лимит работает только внутри воркера, то есть он локальный и у каждого воркера свой.

? Конечно, можно еще раз поделить лимит, теперь взяв в расчет еще и количество воркеров. Но все это начнет работать дико неэффективно, если таски будут прилетать неравномерно, например в какую-то минуту мы получим 60 вызовов get_github_api1() и 0 вызовов get_github_api2() — будут выполнены только 30 вызовов первого типа, хотя могли бы быть все 60. К тому же каждый раз, как появится новая таска, которой нужен доступ к этому ресурсу, придется снова везде пересчитывать все лимиты. Вобщем фича конечно полезная, но только для самых простых вариантов.


Bringing decision


Token Bucket


? Решением проблемы для меня стал Token Bucket — алгоритм, использующийся для контроля полосы пропускания канала в компьютерных и телекомуникационных сетях. Опишу его в 2ух словах: пакет данных, чтобы пройти проверку канала на лимит, должен иметь при себе токен, который он взял из хранилища; в то же время в хранилище токены поступают с некоторой частотой. То есть пропускная способоность канала ограничивается скоростью выпуска токенов, которую нам и надо регулировать.
? В нашем же случае вместо пакета данных мы имеем таску, а хранилищем токенов будут выступать очереди RabbitMQ.


Token Bucket


Wrting some code


Чтож, приступим к написанию кода. Создадим файл main.py и зададим базовые настройки:


from celery import Celery
from kombu import Queue

app = Celery('Test app', broker='amqp://guest@localhost//')

# 1 очередь под сами таски и 1 очередь под токены для них
app.conf.task_queues = [
    Queue('github'),
    # я ограничил длину очереди до 2ух, чтобы токены не скапливались
    # иначе это может привести к пробою нашего rate limit'a
    Queue('github_tokens', max_length=2)
]

# это таска будет играть роль нашего токена
# она никогда не будет запущена, мы просто будем забирать ее как сообщение из очереди
@app.task
def token():
    return 1

# настраиваем постоянный выпуск нашего токена
@app.on_after_configure.connect
def setup_periodic_tasks(sender, **kwargs):
    # мы будем выпускать по 1му токену в секунду
    # это значит что rate limit для очереди github - 60 задач в минуту
    sender.add_periodic_task(1.0, token.signature(queue='github_tokens'))

Не забудьте развернуть Rabbit, я предпочитаю делать это 1ой строчкой докера:


docker run -d --rm --name rabbit -p 15672:15672 -p 5672:5672 rabbitmq:3-management

Теперь запустим celery beat — это специальный воркер celery, запускаемый всегда в единственном экземпляре и отвечающий за запуск периодических задач.


celery -A main beat --loglevel=info

После этого в консоли раз в секунду начнут появляться сообщения:


[2020-03-22 22:49:00,992: INFO/MainProcess] Scheduler: Sending due task main.token() (main.token)

Отлично, мы наладили выпуск токенов для нашего 'ведра'. Осталось только научить наших воркеров из него брать. Попробуем оптимизировать код, который мы написали ранее для запросов в github. Добавим эти строчки к main.py:


# Напишем функцию для взятия токена из очереди
def rate_limit(task, task_group):
    # берем соединение с брокером из пула
    with task.app.connection_for_read() as conn:
        # забираем токен
        msg = conn.default_channel.basic_get(task_group+'_tokens', no_ack=True)
        # получили None - очередь пуста, токенов нет
        if msg is None:
            # повторить таску через 1 сек
            task.retry(countdown=1)

# Добавим print в таски для логирования
# Здесь я поставил max_retries=None, так что таски будут
# повторяться, пока не будут выполнены
@app.task(bind=True)
def get_github_api1(self, max_retries=None):
    rate_limit(self, 'github')
    print ('Called Api 1')

@app.task(bind=True)
def get_github_api2(self, max_retries=None):
    rate_limit(self, 'github')
    print ('Called Api 2')

А теперь проверим, как это все работает. В дополнение к уже запущенному beat добавим 8 воркеров:


celery -A main worker -с 8 -Q github

И создадим отдельный маленький скрипт для запуска этих задач, назовем его producer.py:


from main import get_github_api1, get_github_api2

tasks = [get_github_api1, get_github_api2]

for  i in range(100):
    # запускаю таски в перемешку
    tasks[i % 2].apply_async(queue='github')

Запускаем — python producer.py, и смотрим в логи воркеров:


[2020-03-23 13:04:15,017: WARNING/ForkPoolWorker-3] Called Api 2
[2020-03-23 13:04:16,053: WARNING/ForkPoolWorker-8] Called Api 2
[2020-03-23 13:04:17,112: WARNING/ForkPoolWorker-1] Called Api 2
[2020-03-23 13:04:18,187: WARNING/ForkPoolWorker-1] Called Api 1
... (96 more lines)

Несмотря на то, что у нас целых 8 рабочих процессов, таски выполняются примерно раз в секунду, отправляясь в конец очереди, если на момент их выполнения не оказалось токена. Также, я думаю, вы уже заметили, что на самом деле мы накладываем rate limit не совсем на очередь, а скорее на какую-то логически связанную группу задач, которые на самом деле могут находится как в разных очередях, так и в одной. Таким образом наш контроль становится даже более детальным и гранулированным.


Putting it all together


? Конечно, количество таких групп задач не ограничено (разве что возможностями брокера). Соберем весь код в кучку, расширим и причешим его:


from celery import Celery
from kombu import Queue
from queue import Empty
from functools import wraps

app = Celery('hello', broker='amqp://guest@localhost//')

task_queues = [
    Queue('github'),
    Queue('google')
]

# количество запусков в минуту
rate_limits = {
    'github': 60,
    'google': 100
}

# автоматически сгенерируем очереди с токенами под все группы, на которые нужен лимит
task_queues += [Queue(name+'_tokens', max_length=2) for name, limit in rate_limits.items()]

app.conf.task_queues = task_queues

@app.task
def token():
    return 1

@app.on_after_configure.connect
def setup_periodic_tasks(sender, **kwargs):
    # автоматически настроим выпуск токенов с нужной скоростью
    for name, limit in rate_limits.items():
        sender.add_periodic_task(60 / limit, token.signature(queue=name+'_tokens'))

# Как можно не любить декораторы?
def rate_limit(task_group):
    def decorator_func(func):
        @wraps(func)
        def function(self, *args, **kwargs):
            with self.app.connection_for_read() as conn:
                # тут я для примера использовал другой более высокоуровневый подход:
                # в замен на получение полноценного интерфейса очереди
                # мы немного теряем в перфомансе, т.к под капотом происходит обмен
                # несколькими сообщениями с брокером
                with conn.SimpleQueue(task_group+'_tokens', no_ack=True, queue_opts={'max_length':2}) as queue:
                    try:
                        # из плюсов также - наличие вот такого блокирующего вызова
                        # это может быть удобнее, чем постоянная ротация с retry()
                        # впрочем, это нужно подбирать под кейс
                        queue.get(block=True, timeout=5)
                        return func(self, *args, **kwargs)
                    except Empty:
                        self.retry(countdown=1)
        return function
    return decorator_func

# с декораторами все-таки намного красивее и читабельнее, согласитесь? ;)
@app.task(bind=True, max_retries=None)
@rate_limit('github')
def get_github_api1(self):
    print ('Called github Api 1')

@app.task(bind=True, max_retries=None)
@rate_limit('github')
def get_github_api2(self):
    print ('Called github Api 2')

@app.task(bind=True, max_retries=None)
@rate_limit('google')
def query_google_api1(self):
    print ('Called Google Api 1')

@app.task(bind=True, max_retries=None)
@rate_limit('google')
def query_google_api1(self):
    print ('Called Google Api 2')

Таким образом суммарные вызовы задач группы google не превысят 100/мин, а группы github — 60/мин. Заметьте, что для того, чтобы настроить такой throttling, понадобилось меньше 50 строк. Как по мне, достаточно просто.


Moving further


? Ну, вот все и работает как надо, причем без каких-либо сторонних примочек, средствами только самого брокера. Но зачем останавливаться на достигнутом ;)? Грамотно используя данный алгоритм, можно пойти дальше и создать намного более сложные и гибкие стратегии. Например, некоторые таски могут брать не 1, а несколько токенов (возможно даже из разных очередей, если обращение идет к нескольким сервисам), таким образом у нас появится понятие 'веса' задачи, или же расширить размер нашего 'ведра' токенов, позволив им накапливаться, тем самым компенсируя периоды простоя. Вобщем, пространство для маневра просто огромное и ограничено только вашим воображением и инженерными навыками) Всем спасибо, всем удачи!


P.s. Поделитесь кто как решал подобную проблему, будет интересно услышать ;)