Async в моде. Async Python, Async Rust, Go, Node, .NET и т.д. Выбирайте свою любимую экосистему, и в ней будет свой модуль для асинхронности. Насколько хорошо работает вся эта асинхронная история, во многом зависит от экосистемы и среды выполнения языка, но в целом у нее есть несколько приятных преимуществ. Что действительно становится проще – это ожидание завершения операции, выполнение которой требует некоторого времени. Теперь это настолько просто, что количество способов выстрелить себе в ногу невероятно выросло. Тот способ, о котором я хочу сегодня поговорить – это тот, в котором вы не осознаете, что стреляете себе в ногу, пока система не начнет перегружаться. Да, я про управление давлением. Родственным термином в сетевых технологиях является «контроль потока». 

Что такое «обратное давление» (Back Pressure)

Есть множество объяснений обратного давления, и одно из них звучит как «Back pressure – сопротивление потока данных проходящего через программное обеспечение», материал о котором я настоятельно рекомендую почитать. Поэтому, вместо того чтобы вдаваться в детали, я хочу дать очень краткое определение и пояснение к термину: обратное давление (backpressure) – это сопротивление, которое возникает при противостоянии потоку данных через систему. Backpressure звучит как нечто негативное и вызывает ассоциации с переполненной ванной, вода из которой не уходит из-за засоренной трубы, но я здесь, чтобы спасти ваш день.

Набор, с которым мы имеем дело, более-менее однотипен для всех случаев: у нас есть система из различных компонентов, организованная в конвейер, и этот конвейер должен принимать определенное количество входящих сообщений.

Представьте себе моделирование доставки багажа в аэропортах. Багаж прибывает, сортируется, загружается в самолет и, наконец, выгружается. В любом случае, одна единица багажа кидается вместе с другими чемоданами в контейнеры для транспортировки. Когда контейнер заполняется, его надо забрать. Теперь человек, который хочет бросить багаж в контейнер, не может этого сделать, потому что контейнера нет. Решение нужно принимать быстро. Один из вариантов – ждать: часто это называют очередью или буферизацией. Другой вариант – кидаться багажом до тех пор, пока не прибудет контейнер – это сбрасывание. Звучит плохо, но позже мы поговорим о том, почему иногда важно делать именно так. Здесь играет роль еще одна вещь. 

Представьте, что человек, которому поручено кидать багаж в контейнер, не получает контейнер в течение длительного периода времени (например, неделю). В конце концов, если багаж не будет выброшен окончательно, вокруг этого человека соберется ужасно много чемоданов. В итоге количество багажа, которое придется сортировать, будет настолько огромным, что у людей не хватит физического пространства, чтобы его хранить. В этот момент надо бы сказать аэропорту, чтобы он не принимал багаж, пока не решится проблема с контейнером. Именно этот процесс и называют контролем потока и именно он является важнейшим аспектом сетевого взаимодействия.

Все эти конвейеры обработки обычно масштабируются для определенного количества сообщений (или в данном случае – багажа) в единицу времени. Если число сообщений превысит допустимое – или, что хуже всего – трубопровод остановится, могут произойти ужасные вещи. В реальном мире примером этого явления стало открытие пятого терминала аэропорта Хитроу, где 42 000 чемоданов неправильно отправлялись в течение 10 дней из-за некорректной работы IT-инфраструктуры. Аэропорту пришлось отменить более 500 рейсов, и в течение некоторого времени авиакомпании разрешали только провоз ручной клади.

Back Pressure – это важно

Из катастрофы в Хитроу мы выяснили, что способность сообщать об обратном давлении имеет решающее значение. В реальной жизни, как и в вычислениях, время всегда ограничено. В конце концов, кто-то просто перестает чего-то ждать. В частности, если внутри системы что-то может ждать вечно, то снаружи системы такое невозможно.

Пример из реальной жизни: если ваш чемодан должен отправиться из Хитроу в Париж, но вы пробудете там всего 7 дней, то совершенно бессмысленно отправлять туда багаж с десятидневной задержкой. На самом деле, вам будет нужно, чтобы ваш багаж отправился обратно в ваш родной аэропорт.

На самом деле лучше признать поражение и сказать, что вы перегружены, чем прикидываться работоспособным и продолжать накапливать данные, поскольку в какой-то момент это только ухудшит ситуацию.

Так почему же Back Pressure стало темой обсуждения, если мы годами писали программное обеспечение, основываясь на потоках, и этого феномена, казалось, не возникало? Все дело в сочетании многих факторов, и нескольких конкретных, которые помогут вам с легкостью выстрелить себе в ногу.

Плохие значения по умолчанию

Чтобы понять, какое значение обратное давление имеет в асинхронном коде, я хочу показать вам, казалось бы, простой фрагмент кода с asyncio на Python, который демонстрирует несколько ситуаций, когда мы случайно забываем о back pressure. 

from asyncio import start_server, run

async def on_client_connected(reader, writer):
    while True:
        data = await reader.readline()
        if not data:
            break
        writer.write(data)

async def server():
    srv = await start_server(on_client_connected, '127.0.0.1', 8888)
    async with srv:
        await srv.serve_forever()

run(server())

Если вы новичок в концепции async/await, просто представьте, что в любой момент, когда вызывается await, функция приостанавливается до тех пор, пока задача не завершится. Здесь функция start_server, предоставляемая asyncio Python, запускает спрятанный цикл получения. Он прослушивает сокет и создает независимую задачу, выполняющую функцию on_client_connected для каждого подключающегося сокета.

Теперь все выглядит довольно просто. Вы можете удалить все ключевые слова await и async, и в итоге вы получите код, который выглядит очень похоже на вариант написания кода с потоками.

Однако здесь скрывается одна очень важная проблема, которая является корнем всех остальных: вызовы функций, перед которыми нет ожидания. В коде с потоками любая функция может вернуть значение (yield). А в асинхронном коде так могут сделать только асинхронные функции. Значит, например, метод writer.write не может ничего заблокировать. Как же это работает? Функция попытается записать данные прямо в буфер сокетов операционной системы, который является неблокируемым. Однако, что случится, если буфер заполнится и сокет заблокируется? В случае с потоками можно поставить блок, что было бы идеальным решением, поскольку так мы сможем оказывать некоторое обратное давление. Однако, поскольку потоков нет, так мы поступить не можем. Таким образом, мы остаемся один на один с буферизацией или удалением данных (dropping). Удаление данных – ужасная история, поэтому Python вместо этого выбирает буфер. Но что произойдет, если кто-то отправит много данных, но никто их не прочитает? В таком случае буфер будет расти и расти. Этот недостаток API является причиной того, что в документации Python говорится, что write в одиночку использовать плохо, и его обязательно следует комбинировать с drain:

writer.write(data)
await writer.drain()

Drain приведет к «сливу» некоторого избытка в буфер. Буфер не очистится целиком, но этого будет достаточно, чтобы ситуация не вышла из-под контроля. Так почему же drain не работает безусловно? Ну, это огромный недосмотр в работе с API, и я не знаю, как так вышло.

Важная вещь, о которой надо помнить, это то, что большинство сокетов основаны на TCP, а у TCP есть встроенный контроль потока. Отправитель будет писать с той скоростью, с которой читатель будет готов принять данные (с поправкой на буферизацию). От вас, как от разработчика, этот механизм скрыт, поскольку даже библиотеки сокетов BSD не показывают эту неявную обработку контроля потоков.

Итак, можно ли считать, что мы исправили ситуацию с back pressure здесь? Что ж, давайте посмотрим, как это все будет выглядеть в мире потоков. В нем наш код, скорее всего, имел бы фиксированное количество запущенных потоков, и цикл получения данных ждал бы, пока поток станет доступным, чтобы принять запрос. Однако в асинхронном примере у нас есть неограниченное количество подключений, которые мы готовы обработать. Это же значит, что мы готовы принять очень большое количество подключений, даже если потенциально система перегрузится. В случае с таким маленьким примером проблема может и небольшая, но представьте, что будет, если мы получим доступ к базе данных.

Представьте пул подключений к базе данных, который будет выдавать до 50 подключений. Что хорошего в том, чтобы принимать 10000 подключений, если большинство из них станут узким местом?

Ожидание или ожидание ради ожидания

Итак, наконец, мы приходим к тому, о чем я хотел поговорить первым делом. В большинстве асинхронных систем и, безусловно, в большинстве случаев, с которыми я сталкивался в Python, даже если вы исправите поведение буферизации на уровне сокетов, вы окажитесь в мире объединения кучи асинхронных функций и без обратного давления.

Вспомним наш пример с пулом подключений к базе данных и предположим, что нам доступно только 50 подключений. Значит с имеющимся кодом, у нас может быть не более 50 одновременных сессий базы данных. Итак, допустим, мы хотим, чтобы было обработано в 4 раза больше запросов, поскольку мы ожидаем, что большинство из того, что приложение делает, от базы данных никак не зависит. Один из способов реализовать такую механику – сделать семафор с 200 токенами и получить один в самом начале. Если токены закончатся, мы будем ждать, пока семафор выпустит новый.

Но подождите. Мы снова вернулись к очереди! Просто мы встали в очередь немного раньше. Если бы мы хотели перегрузить систему, мы бы встали в очередь с самого начала. Так что теперь все будут ждать максимальное количество времени, которое готовы ждать, а потом просто сдадутся. Хуже того: сервер все еще может обрабатывать эти запросы некоторое время, пока не поймет, что клиент исчез и ответ ему больше не нужен.

Поэтому вместо того, чтобы ждать изначально, надо бы получить какую-то обратную связь. Представьте, что вы находитесь в отделении почты и вытаскиваете из автомата талон, который говорит вам, когда ваша очередь. Этот талон дает вам хорошее представление о том, сколько придется ждать. Если время ожидания слишком велико, вы можете отказаться от талона и прийти попозже. Обратите внимание, что время, в течение которого вы будете ждать свою очередь на почте, не зависит от времени ожидания для вашей конкретной задачи (например, потому что кому-то нужно принести вашу посылку, проверить документы и получить подпись).

Вот наивная версия, в которой мы можем только заменить, что чего-то ждем:

from asyncio.sync import Semaphore

semaphore = Semaphore(200)

async def handle_request(request):
    await semaphore.acquire()
    try:
        return generate_response(request)
    finally:
        semaphore.release()

При вызове асинхронной функции handle_request мы увидим, что мы ждем и ничего не происходит. Мы не можем понять, ждем ли мы, потому что случилась перегрузка, или просто потому, что генерация ответа занимает столько времени. Мы получаем бесконечную буферизацию, пока сервер, наконец, не исчерпает память и не упадет.

Причина в том, что у нас нет канала связи для back pressure. И как же это исправить? Один из вариантов — добавить слой косвенности. Здесь, к сожалению, семафор asyncio бесполезен, поскольку он позволяет нам только ждать. Давайте представим, что мы могли бы спросить семафор, сколько токенов осталось, тогда мы можем сделать нечто подобное:

from hypothetical_asyncio.sync import Semaphore, Service

semaphore = Semaphore(200)

class RequestHandlerService(Service):
    async def handle(self, request):
        await semaphore.acquire()
        try:
            return generate_response(request)
        finally:
            semaphore.release()

    @property
    def is_ready(self):
        return semaphore.tokens_available()

Мы несколько изменили систему. Теперь у нас есть RequestHandlerService, у которого есть чуть больше информации. В частности, в нем есть понятие «готовности». У сервиса можно спросить, готов ли он. Эта операция неблокирующая и она может помочь с оценкой. Так и должно быть.

Теперь вызывающая функция вместо такой:

response = await handle_request(request)

Станет такой:

request_handler = RequestHandlerService()
if not request_handler.is_ready:
    response = Response(status_code=503)
else:
    response = await request_handler.handle(request)

Вариаций на эту тему можно придумать много, а идея одна. Перед тем, как что-то делать, надо понять, насколько вероятно, что у нас все получится, и если мы окажемся перегружены, то передадим эту информацию выше.

Определение для сервиса я так и не придумал. Архитектура аналогична tower из Rust и actix-сервиса оттуда же. У обоих очень похожее определение характеристики сервиса и оно очень похоже на то, что есть у нас. 

Теперь у нас есть шанс «скучковаться» на семафоре. Так вы можете рискнуть или все равно потерпеть неудачу, если будет вызван дескриптор.

Библиотека, которая решает эту проблему лучше, чем asyncio, — это trio, которая представляет внутренний счетчик для семафора и CapacityLimiter, который является семафором, оптимизированным для ограничения емкости, что защищает от некоторых распространенных ошибок.

Потоки и протоколы

Теперь пример выше решает ситуации в стиле RPC. При каждом вызове нас сразу информируют, если система перегружена. У многих из этих протоколов есть довольно простые способы сообщить, что сервер под нагрузкой. Например, HTTP может выдать 503, где также может быть заголовок retry-after, который сообщит клиенту, когда лучше повторить попытку. Эта новая попытка дает естественный момент для оценки того, надо ли отправить такой же запрос или как-то его изменить. Например, если вы не можете повторить попытку и через 15 секунд, то стоит сообщить об этом пользователю, а не показывать значок бесконечной загрузки.

Однако существуют не только протоколы вида запрос/ответ. У многих протоколов есть постоянные открытые соединения, через которые можно передавать большое количество данных. Традиционно многие из этих протоколов основываются на TCP, который, как упоминалось ранее, имеет встроенный контроль потоков. Однако через библиотеки сокетов контроль потока на самом деле недоступен, поэтому протоколы высокого уровня обычно добавляют к нему свой собственный контроль потока. Например, в HTTP 2 есть кастомный протокол управления потоками, поскольку HTTP 2 мультиплексирует несколько независимых потоков через одно TCP-соединение.

Ориентируясь на TCP, где контроль потока осуществляется под капотом, разработчик может пойти по опасному пути и просто считывать байты из сокета и предполагать, что это все, что нужно знать. Однако TCP API вводит в заблуждение, поскольку управление потоком — с точки зрения API — полностью скрыто от пользователя. Когда вы пишете свой собственный протокол потоковой передачи, вам нужно быть абсолютно уверенным, что существует двунаправленный канал связи и что отправитель не просто пишет, но и читает, чтобы узнать, можно ли продолжать слать данные.

С потоками дела обстоят обычно иначе. Множество потоков – это просто потоки байтов или фреймов данных, и вы не можете просто выбрасывать пакеты между ними. Хуже того: отправителю часто бывает нелегко проверить, следует ли ему замедлиться. В HTTP 2 вам нужно постоянно чередовать операции чтения и записи на уровне пользователя. Там вам просто необходимо контролировать поток. Когда вам можно будет продолжить писать, сервер отправит вам фреймы WINDOW_UPDATE.

Это значит, что потоковый код станет намного сложнее, поскольку сначала вам нужно будет написать фреймворк, который может взаимодействовать с входящей информацией контроля потока. Например, в библиотеке Python hyper-h2 есть удивительно сложный пример сервера загрузки файлов с управлением потоками на основе curio, и этот пример даже не дописан.

Новые способы выстрелить себе в ногу

async/await – это, конечно, хорошо, но они поощряют написание сервисов, которые будут вести себя ужасно при перегрузке. С одной стороны, потому что, что может быть проще, чем сделать очередь, но с другой, еще и потому, что асинхронность функции постфактум – это проблема API. Я могу лишь предположить, что именно поэтому в Python есть функция записи в поток write, которая не умеет в await.

Однако, самая большая причина заключатся в том, что async/await позволяет вам писать код, который многие люди не написали бы с потоками изначально. Я думаю, что это хорошо, потому что так снижается барьер для написания более крупных систем. Недостаток в том, что это также означает, что многие другие разработчики, у которых было мало опыта с распределенными системами, теперь сталкиваются с ворохом проблем, связанным с ними, даже если пишут всего один проект. HTTP 2 — это достаточно сложный протокол из-за мультиплексирования, поэтому единственный разумный способ его реализации основывается на примере async/await

От этих проблем страдает не только код с async/await. Например, Dask – это библиотека параллелизма для Python, которая используется разработчиками в области Data Science. Несмотря на то, что она не использует async/await, есть сообщения об ошибках, связанных с нехваткой памяти в системе из-за отсутствия back pressure. Но эти вопросы довольно фундаментальны.

Однако отсутствие обратного давления – это ружье для выстрела себе в ногу размером с базуку. Если вы слишком поздно поймете, что создали монстра, его будет практически невозможно переделать без серьезных изменений в базе кода, потому что вы, возможно, забыли сделать некоторые функции асинхронными, какими они и должны были быть изначально. И другая среда программирования здесь не поможет. У вас будут те же проблемы, с которыми сталкиваются люди во всех средах, включая последние вариации с Go и Rust. Нередко можно найти открытые issue с «обработкой контроля потоков» или «обработкой back pressure» даже в очень популярных проектах. Эти issue открыты уже достаточно давно, потому что оказывается, что починить все постфактум трудно. Например, у Go с 2014 года открыт issue с добавлением семафора во все операции ввода-вывода файловой системы, поскольку хост может перегружаться. У aiohttp есть issue от 2016 года, в котором описано, что клиенты могли взломать сервер и-за недостаточного обратного давления. Примеров еще множество.

Если вы посмотрите на документацию Python hyper-h2, то увидите шокирующее количество примеров, в которых говорится что-то вроде «не обрабатывает контроль потока», «не работает с контролем потока HTTP/2, что является недостатком, но в основном функционален» и т.д. Я считаю, что управление потоками – это сложно, когда оно появляется на горизонте, и легко прикинуться, что это не проблема, именно поэтому мы и оказались в таком бедламе. Контроль потоков также добавляет значительные накладные расходы и не очень хорошо смотрится в тестах.

Итак, для разработчиков асинхронных библиотек есть предложение – сделайте другим разработчикам подарок на новый год: придайте обратному давлению и контролю потока ту значимость, которую они заслуживают в документации и API.


Материал подготовлен в рамках курса «Python Developer. Professional». Если вам интересно узнать подробнее о формате обучения и программе, познакомиться с преподавателем курса — приглашаем на день открытых дверей онлайн. Регистрация здесь.

Комментарии (8)


  1. x8core
    09.11.2021 15:09

    Back pressure выглядит как подтип high load.


  1. boblenin
    09.11.2021 21:02

    CircuitBreaker разве не поможет в этой ситуации? И можно не усложнять интерфейс для пользователей.


    1. mayorovp
      09.11.2021 21:43

      CircuitBreaker не будет работать если вы не знаете текущую загруженность. А вы её и не узнаете если упустите давление.


      1. boblenin
        10.11.2021 00:38
        +1

        Размер очереди разве не индикатор? Время отклика? В зависимости от сервиса возможно даже доступность свободных ресурсов. Я не критикую - спрашиваю.


        1. mayorovp
          10.11.2021 10:42

          У вас не будет никакого размера очереди если вы напишете код неправильно — за отсутствием самой очереди. И время отклика вы померить тоже не сможете если код будет возвращать управление слишком рано.


          1. boblenin
            12.11.2021 17:12

            Как вы можете вернуть управление слишком рано, если у вас нет очереди? Не понимаю.


            1. mayorovp
              12.11.2021 18:46
              +1

              Где-то очередь, скорее всего, есть — но в случайном месте, и без данных о размере.


              Посмотрите примеры из статьи ещё раз — в одном случае очередь "стихийно" образовалась в буфере отправки, в другом прячется в продолжениях операции acquire семафора. И если во втором случае можно хотя бы время отклика посчитать — то в первом даже эта информация оказывается скрыта, размер же недоступен в обоих случаях.


              Чтобы иметь возможность принимать решения на основе размера очереди и времени отклика — нужно сначала "прокинуть" обратное давление непосредственно до того кода, который организует очередь и измеряет время отклика.


              1. boblenin
                13.11.2021 00:34

                Спасибо за ответ. И за бурю эмоций, которую он во мне вызвал. Отрицание, принятие, понимание. Полезное дело делаете.