Многие сервисы предоставляют возможность взаимодействовать с ними не только обычным пользователям через отточенные и оптимизированные графические интерфейсы, но и внешним разработчикам из своих программ через API. При этом сервисам важно контролировать нагрузку на свою инфраструктуру. В ситуации с обычными пользователями большинство проблем с нагрузкой не возникнет из-за контроля кода приложения, отправляющего запросы к сервису, со стороны разработчиков сервиса (пользователей, пытающихся что-то делать в приложении за рамками предложенных разработчиками интерфейсов и задокументированных возможностей, мы в данной статье не рассматриваем). В случае же со внешними разработчиками простор для создания нагрузки на сервис ограничен только фантазией этих самых внешних разработчиков. Чтобы немного ограничить этот простор, стала распространена практика введения ограничений на количество запросов в единицу времени к API сервиса. 

Мы уже рассказывали о том, как можно реализовать эти ограничения, если вы сами разрабатываете API сервиса, сегодня хотим рассказать о том, как жить с «клиентской» стороны и удобно пользоваться ограниченным по количеству запросов API.

Вводные данные

Меня зовут Юрий Гаврилов, я работаю в команде Data Platfrom в ManyChat. У нас в компании есть маркетинговый отдел, который, среди всего прочего, любит общаться с нашими клиентами через сервис Intercom, позволяющий отправлять удобные In-App сообщения пользователю прямо в нашем веб-приложении. Чтобы эти коммуникации были осмысленными, Intercom должен получать некоторую информацию о наших клиентах (имя, дату регистрации, различные простые метрики их аккаунтов и т.д.). За предоставление этих данных в Intercom отвечает наш довольно-таки монолитный бекенд-компонент, хранящий информацию о наших пользователях. А ещё, совсем недавно, мы построили классный аналитический контур (о котором обязательно расскажем в следующих статьях), также хранящий кучу информации о наших пользователях, и довольно изолированный от упомянутого выше бекенд-компонента. В этом контуре аналитики обсчитывают более сложные пользовательские метрики, гоняют ML-алгоритмы и хранят витрины с результатами всех этих вычислений. Для ещё более крутых коммуникаций с клиентами, часть из этих результатов также хотелось бы иметь в Intercom.

И тут мы сталкиваемся с проблемой: есть разные, изолированные друг от друга компоненты, желающие делать вызовы внешнего API с довольно сильным ограничением в 1000 запросов в минуту. При этом, ограничение применяется в рамках всего аккаунта Intercom, а не рассчитывается индивидуально для каждого компонента.

Мысли о решении проблемы вслух

Понятно, что хаотичный вызов методов внешнего ограниченного API из разных компонентов когда хочется, может быстро привести к неприятным последствиям. Прогрузка «продвинутых» данных из аналитического контура рано или поздно перебьет и не даст загрузиться важным «базовым» данным из бекенд-компонента, каким бы настойчивым и терпеливым мы его не настроили.

Из-за этого сразу же приходят мысли, что нам нужна «единая точка общения» с этим API, которая будет непосредственно обращаться к API, контролировать исчерпание лимитов и управлять логикой по приоритизации взаимодействия с API. К этой точке, в свою очередь, должны обращаться все желающие взаимодействовать с API компоненты.

Реализация «единой точки общения»

Мы уже говорили, что в ManyChat любят Redis — он же нам помог для решения и этой задачи. Для создания такой «единой точки» нужно где-то собирать информацию о том, какие именно методы хотят вызвать во внешнем API наши компоненты. Если внешние компоненты захотят вызвать методов больше, чем позволено ограничениями API, на момент «передышки», пока не обновятся лимиты, эту информацию нужно где-то хранить. А ещё, нам бы очень хотелось ввести систему приоритетов, чтобы «базовые» данные, которые бекенд-компонент хочет отправить в Intercom, не ждали, пока прогрузится много «продвинутых» данных из аналитического контура.

Все эти проблемы позволяет решить Redis, а точнее структура данных List и реализованные на ней очереди.

На каждый приоритет нам нужно создать по своей очереди, в которую компоненты будут записывать свои намерения вызвать тот или иной метод в API, а один общий consumer будет в порядке приоритетности эти очереди вычитывать и непосредственно вызывать методы API. Если при вызове очереди он сталкивается с достижением rate-limit, он подождет, пока лимиты сбросятся, и продолжит работу.

В нашем случае очереди нужно две — для «базовых» данных из бекенд-компонента (давайте назовем её BackendQueue), и менее приоритетных «продвинутых» данных из аналитического контура (AnalyticsQueue). Прелесть такого подхода заключается также и в том, что совершенно не важно, на каких языках программирования написаны компоненты и consumer, все они смогут выполнять свою работу, нужно только определиться с форматом хранящихся в очереди данных.

Давайте для определенности и простоты примем в этой статье такой формат (JSON):

{
    "method_name": "users_update", // название метода, который нужно вызвать
    "parameters": {"user_id": 123} // параметры, с которыми должен быть вызван этот метод
}

Тогда MVP нашего consumer'a может выглядеть так (PHP)
class APICaller
{
    private const RETRIES_LIMIT = 5;
    private const RATE_LIMIT_TIMEFRAME = 10;
    
    ...
    
    public function callMethod(array $payload): void
    {
        switch ($payload['method_name']) {
            case 'users_update':
                $this->getIntercomAPI()->users->update($payload['parameters']);
                break;
            default:
                throw new \RuntimeException('Unknown method in API call');
        }
    }

    public function actionProcessQueue(): void
    {
        while (true) {
            $payload = $this->getRedis()->rawCommand('LPOP', 'BackendQueue');
            if ($payload === null) {
                $payload = $this->getRedis()->rawCommand('LPOP', 'AnalyticsQueue');
            }

            if ($payload) {
                $retries = 0;
                $processed = false;
                while ($processed === false && $retries < self::RETRIES_LIMIT)
                {
                    try {
                        $this->callMethod(json_decode($payload));
                        $processed = true;
                    } catch (IntercomRateLimitException $e) {
                        $retries++;
                        sleep(self::RATE_LIMIT_TIMEFRAME);
                    }
                }
            } else {
                sleep(1);
            }
        }
    }
}

Здесь мы разбираем очереди в порядке приоритетов, при получении информации о методе пытаемся его выполнить, если сталкиваемся с лимитами — ждем и пытаемся снова ограниченное количество раз.

А наши компоненты на разных языках программирования могут отправлять запрос на вызов интересующего их метода:

Backend (PHP):

...
$payload = [
    'method_name' => 'users_update',
    'parameters' => ['user_id' => 123, 'registration_date' => '2020-10-01'],
];
$this->getRedis()->rawCommand('RPUSH', 'BackendQueue', json_encode($payload));
...

Аналитический контур (Python):

...
payload = {
    'method_name': 'users_update',
    'parameters': {'user_id': 123, 'advanced_metric': 42},
}
redis_client.rpush('AnalyticsQueue', json.dumps(payload))
...

Проблемы метода и их решения

Однопоточность > немасштабируемость

Когда мы попытались перейти на такую систему — всё заработало, данные доходили до Intercom, лимиты не исчерпывались. Но возникла другая проблема — каждое обращение к внешнему сервису занимает какое-то время, и когда все вызовы API «сместились» в один поток, мы совсем перестали доходить до rate-limit, перформанс customer'a был в несколько раз меньше rate-limit'ов, и стало понятно, что нужно всю эту радость как-то распараллеливать. Redis вполне безопасно (в смысле параллельности) позволяет разбирать свои очереди нескольким consumer'ам. В целом, нет никакой проблемы в том, чтобы запустить несколько consumer'ов, описанных выше, на одни и те же очереди и проблемы не будет. Каждый из них будет точно так же с соблюдением приоритетности разбирать очередь и, при превышении лимитов, ждать, пока внешний сервис сбросит лимит, и далее работать снова до исчерпания лимита.

Но мы решили немного ограничить вызовы с нашей стороны, потерять несколько десятков возможных вызовов в пределах лимитов, и не доводить consumer'ов до ситуации, когда внешний сервис сообщает им о достижении лимита вызовов. Для этого мы рассчитали для каждого consumer'а его собственный лимит на количество вызовов в единицу времени, при достижении которого он перестает разбирать очередь и делать вызовы к API до перехода к следующему временному интервалу.

Пример самоконтролирующего лимиты consumer'a (PHP)
class APICaller
{
    private const RETRIES_LIMIT = 5;
    private const RATE_LIMIT_TIMEFRAME = 10;
    private const INTERCOM_RATE_LIMIT = 150;
    private const INTERCOM_API_WORKERS = 5;

    ...

    public function callMethod(array $payload): void
    {
        switch ($payload['method_name']) {
            case 'users_update':
                $this->getIntercomAPI()->users->update($payload['parameters']);
                break;
            default:
                throw new \RuntimeException('Unknown method in API call');
        }
    }

    public function actionProcessQueue(): void
    {
        $currentTimeframe = $this->getCurrentTimeframe();
        $currentRequestCount = 0;
        
        while (true) {
            if ($currentTimeframe !== $this->getCurrentTimeframe()) {
                $currentTimeframe = $this->getCurrentTimeframe();
                $currentRequestCount = 0;
            } elseif ($currentRequestCount > $this->getProcessRateLimit()) {
                usleep(100 * 1000);
                continue;
            }
            
            $payload = $this->getRedis()->rawCommand('LPOP', 'BackendQueue');
            if ($payload === null) {
                $payload = $this->getRedis()->rawCommand('LPOP', 'AnalyticsQueue');
            }

            if ($payload) {
                $retries = 0;
                $processed = false;
                while ($processed === false && $retries < self::RETRIES_LIMIT)
                {
                    try {
                        $this->callMethod(json_decode($payload));
                        $processed = true;
                    } catch (IntercomRateLimitException $e) {
                        $retries++;
                        sleep(self::RATE_LIMIT_TIMEFRAME);
                    }
                }
            } else {
                sleep(1);
            }
        }
    }

    private function getProcessRateLimit(): int
    {
        return (int) floor(self::INTERCOM_RATE_LIMIT / self::INTERCOM_API_WORKERS);
    }

    private function getCurrentTimeframe(): int
    {
        return (int) ceil(time() / self::RATE_LIMIT_TIMEFRAME);
    }
}

Одностороннее взаимодействие с API

Иногда бывает нужно не только вызвать какой-то метод во внешнем API, но и получить и обработать его ответ. Мы же перевели наше взаимодействие компонентов с API в асинхронный формат. Проблема получения ответа от сервера и доведения его до исходного компонента — решаемая. Достаточно дополнить данные о методе, который мы хотим выполнить, данными о callback'e, который consumer'у необходимо выполнить при получении ответа от внешнего сервиса. Мы написали несколько стандартных callback'ов, которые складывают полученные ответы в определенные очереди, из которых компоненты могут их прочитать и обработать самостоятельно.

Внедрение такой структуры взаимодействия, безусловно, менее удобное, чем стандартные блокирующие вызовы методов и получение ответа сразу, но обойтись без такой жертвы нам не удалось.

Что делать, когда запросов становится настолько много, что очереди разгребаются часами/днями/неделями?

Здесь мы точно не волшебники, и запрограммировать свое взаимодействие с внешним API так, чтобы превышать его rate-limit, мы точно не можем. По задачам загрузки данных наших клиентов мы внедрили у себя систему по учету тех данных, что уже были отправлены во внешний сервис. Когда наступает очередная выгрузка данных о пользователях, отправляются только данные о тех пользователях, которые обновились, и, тем самым, уменьшается необходимое количество запросов.

В остальном, в данной ситуации не остаётся ничего иного, кроме как разговаривать со внешним сервисом на тему увеличения лимита запросов.

Заключение

В этой статье мы рассмотрели довольно простой подход к контролю количества запросов к внешнему сервису через API, а также их упорядочиванию, в условиях, когда обращаться к API желают различные несвязанные друг с другом компоненты, а пропускная способность API ограничена.

Надеюсь, что кому-нибудь такой подход окажется полезным и интересным. Буду рад ответить на вопросы в комментариях и узнать, какие подходы для взаимодействия с ограниченными по количеству запросов API используете вы.