Введение

Представим следующую задачу: у нас есть микросервисная архитектура, в которой сервисы взаимодействуют через брокер сообщений, или через gRPC. Так или иначе, оба варианта предоставляют полнодуплексный канал связи, через который один сервис может отправлять множество сообщений другому сервису, так и в обратную сторону - сервис, исполняющий запрос, может отправлять несколько ответов (например в случае потоковой обработки данных). Такой вариант реализации ответа можно в некотором смысле называть стримингом.

В числе прочих задач, решаемых при реализации возможности стриминга, существует задача определения ситуации, в которой сервис, исполняющий запрос, упал с ошибкой, и больше не может продолжать стриминг ответов. В таком случае мы даже не можем понять что именно произошло - обработка и отдача очередной порции ответа будет, но задерживается, либо же передача прервалась, и нужно сообщить об ошибке “наверх”. В протоколе HTTP, например, для детерминирования корректной вычитки ответа может быть использован заголовок Content-Length. Достаточно посчитать количество вычитанных из сокета байт тела запроса\ответа, и сравнить со значением заголовка. Сходится - мы все получили, не сошлось и сокет закрыт - ошибка. Однако вариант решения с заранее заданным количеством данных в первой порции ответов не является универсальным, поскольку не во всех случаях можно точно понимать, сколько именно данных будет передано. Да и архитектура с использованием брокеров сообщений предполагает постоянное поддержание соединения, поэтому мы можем только знать, что из такой-то очереди поступают ответы на ранее сделанный запрос, и в каком-то из ответов будет метка окончания, как маркер того, что запрос обработан и ответ выслан и получен полностью, а если такого маркера еще не получено - остается продолжать ждать. Но ждать можно бесконечно.

В случае, когда нужно определить, что ответ никак не поступает, или поступила лишь его часть, но остальное все никак не “доедет”, можно использовать таймауты, и это похоже на единственное, достаточно простое и вменяемое решение, не требующее существенного изменения инфраструктуры кода.

Инфраструктура отложенных вычислений в Python

Python для любого типа реализации многозадачности (процессы, потоки, asyncio) предоставляет разработчикам набор объектов и методов с поддержкой таймаутов. Например модуль concurrent.futures предоставляет класс Future, являющийся результатом отложенных вычислений. Метод result() у Future поддерживает аргумент timeout, который, будучи выставленным в значение, отличное от None, выдаст TimeoutError, если за отведенное время результат так и не был установлен. То же самое касается класса Queue, предназначенного для организации очереди. Но, как и Queue, так и Future из concurrent относится только к межпотоковому и межпроцессному взаимодействию. Если же мы будем говорить об асинхронном взаимодействии, то увидим, что аналогичными объектами таймауты из коробки уже не поддерживаются. Но asyncio предоставляет метод wait_for, в котором можно указывать таймаут ожидания, по истечении которого будет выдан asyncio.TimeoutError.

Но wait_for можно навесить только на корутину или future, а вот на async for таймаут уже просто так не повесить. Особенно эта ситуация будет актуальна при использовании, например, модуля aio_pika, реализующего асинхронное взаимодействие с RabbitMQ. Документация предлагает для организации консьюмера как-раз таки пользоваться async for`ом.

async with queue.iterator() as queue_iter:
  async for message in queue_iter:

Любой итератор предоставляет метод __next__(), а асинхронный соответственно корутину __anext__(), за которые и можно дергать для получения следующего значения из итератора, правда тогда код превратится во что-то неудобоваримое.

async with queue.iterator() as queue_iter:
    try:
      message = await asyncio.wait_for(queue_iter.__ anext __(), timeout=1.0)
    except TimeoutError:
      print('timeout!')

Как-то не очень красиво. Хотелось бы иметь более универсальную обертку, которая у себя внутри поддерживает wait_for для __anext__, и может корректно определять ситуацию, в которой последняя порция выдана итератором (например по очереди, как в примере из документации aio_pika).

Чтобы собрать асинхронный итератор с поддержкой таймаута, нужно проделать следующие шаги:

  1. Создать класс, реализующий метод __aiter__

  2. __aiter__ будет возвращать объект, поддерживающий вызов __anext__ для получения следующего значения из итератора

  3. Так как наш итератор будет служить оберткой, то метод __anext__ будет делегировать вызов вложенному итератору, накладывая wait_for на каждый вызов. Таким образом мы будем применять таймаут к каждой итерации, а не к всему циклу.

  4. Реализовать условие завершения работы итератора-обертки. В Python итераторы выбрасывают специальный тип исключения, для того чтобы уведомить конструкци for или async for о том, что итерирование завершено, последовательность пройдена. Так как реализация итератора-обертки с поддержкой таймаута должна понимать, в каком случае завершаться штатно, нам потребуется добавить логику проверки и выброса исключения в реализацию.

Итоговый код:

class AsyncTimedIterable:
    def __init__(self, iterable, timeout=0):

        class AsyncTimedIterator:
            def __init__(self):
                # Сохраним внутри итератор, вызовы которого требуется делать с использованием таймаутов
                self._iterator = iterable.__aiter__()

            async def __anext__(self):
                try:
                    # Здесь мы и будем вызвать __anext__ обернутого итератора, устанавливая таймаут в wait_for
                    result = await asyncio.wait_for(self._iterator.__anext__(), int(timeout))
					# Если в каком-то из объектов, полученных на очередной итерации, есть признак того, что этот ответ последний - проверяем наличие этого признака и выбрасываем StopAsyncIteration чтобы выйти из async for
					if not result:
						raise StopAsyncIteration

					return result

                except asyncio.TimeoutError as e:
                    # Если wait_for выдаст исключение - тут можно его обработать, либо передать наверх, также прекратив работу async for
                    raise e

        self._factory = AsyncTimedIterator

    def __aiter__(self):
        return self._factory()

Теперь мы можем обернуть queue_iter (итератор сообщений по очереди в aio_pika) и указать таймаут:

timed_iter = AsyncTimedIterable(queue_iter, 30)

А далее для получения сообщений использовать async for с нашей оберткой:

async for r in timed_iter:
	pass

Если интервал времени между сообщениями превысит заданный порог - будет выброшено исключение asyncio.TimeoutError, в случае корректного завершения работы - исключение StopAsyncIteration завершит работу цикла async for.

Про практические инструменты разработки мы с коллегами рассказываем в рамках онлайн-курсов. Заглядывайте в каталог и выбирайте подходящее направление.

Комментарии (3)


  1. me21
    24.04.2024 15:29

    https://pypi.org/project/async-timeout/

    /thread


  1. danilovmy
    24.04.2024 15:29

    а может просто таск и таймаут таймер завернуть в asyncio.wait(mytask, mytimeouttimer, return_when=asyncio.FIRST_COMPLETED)

    мне кажется проще получится.


    1. Dmitry89 Автор
      24.04.2024 15:29

      Смотря чего хотим добиться. Если весь цикл - то ок, если замер между итерациями то нет.