Введение
Представим следующую задачу: у нас есть микросервисная архитектура, в которой сервисы взаимодействуют через брокер сообщений, или через gRPC. Так или иначе, оба варианта предоставляют полнодуплексный канал связи, через который один сервис может отправлять множество сообщений другому сервису, так и в обратную сторону - сервис, исполняющий запрос, может отправлять несколько ответов (например в случае потоковой обработки данных). Такой вариант реализации ответа можно в некотором смысле называть стримингом.
В числе прочих задач, решаемых при реализации возможности стриминга, существует задача определения ситуации, в которой сервис, исполняющий запрос, упал с ошибкой, и больше не может продолжать стриминг ответов. В таком случае мы даже не можем понять что именно произошло - обработка и отдача очередной порции ответа будет, но задерживается, либо же передача прервалась, и нужно сообщить об ошибке “наверх”. В протоколе HTTP, например, для детерминирования корректной вычитки ответа может быть использован заголовок Content-Length
. Достаточно посчитать количество вычитанных из сокета байт тела запроса\ответа, и сравнить со значением заголовка. Сходится - мы все получили, не сошлось и сокет закрыт - ошибка. Однако вариант решения с заранее заданным количеством данных в первой порции ответов не является универсальным, поскольку не во всех случаях можно точно понимать, сколько именно данных будет передано. Да и архитектура с использованием брокеров сообщений предполагает постоянное поддержание соединения, поэтому мы можем только знать, что из такой-то очереди поступают ответы на ранее сделанный запрос, и в каком-то из ответов будет метка окончания, как маркер того, что запрос обработан и ответ выслан и получен полностью, а если такого маркера еще не получено - остается продолжать ждать. Но ждать можно бесконечно.
В случае, когда нужно определить, что ответ никак не поступает, или поступила лишь его часть, но остальное все никак не “доедет”, можно использовать таймауты, и это похоже на единственное, достаточно простое и вменяемое решение, не требующее существенного изменения инфраструктуры кода.
Инфраструктура отложенных вычислений в Python
Python для любого типа реализации многозадачности (процессы, потоки, asyncio) предоставляет разработчикам набор объектов и методов с поддержкой таймаутов. Например модуль concurrent.futures
предоставляет класс Future, являющийся результатом отложенных вычислений. Метод result()
у Future поддерживает аргумент timeout, который, будучи выставленным в значение, отличное от None
, выдаст TimeoutError
, если за отведенное время результат так и не был установлен. То же самое касается класса Queue, предназначенного для организации очереди. Но, как и Queue
, так и Future
из concurrent относится только к межпотоковому и межпроцессному взаимодействию. Если же мы будем говорить об асинхронном взаимодействии, то увидим, что аналогичными объектами таймауты из коробки уже не поддерживаются. Но asyncio
предоставляет метод wait_for
, в котором можно указывать таймаут ожидания, по истечении которого будет выдан asyncio.TimeoutError
.
Но wait_for
можно навесить только на корутину или future
, а вот на async for
таймаут уже просто так не повесить. Особенно эта ситуация будет актуальна при использовании, например, модуля aio_pika
, реализующего асинхронное взаимодействие с RabbitMQ. Документация предлагает для организации консьюмера как-раз таки пользоваться async for`ом.
async with queue.iterator() as queue_iter:
async for message in queue_iter:
Любой итератор предоставляет метод __next__()
, а асинхронный соответственно корутину __anext__()
, за которые и можно дергать для получения следующего значения из итератора, правда тогда код превратится во что-то неудобоваримое.
async with queue.iterator() as queue_iter:
try:
message = await asyncio.wait_for(queue_iter.__ anext __(), timeout=1.0)
except TimeoutError:
print('timeout!')
Как-то не очень красиво. Хотелось бы иметь более универсальную обертку, которая у себя внутри поддерживает wait_for
для __anext__
, и может корректно определять ситуацию, в которой последняя порция выдана итератором (например по очереди, как в примере из документации aio_pika
).
Чтобы собрать асинхронный итератор с поддержкой таймаута, нужно проделать следующие шаги:
Создать класс, реализующий метод
__aiter__
__aiter__
будет возвращать объект, поддерживающий вызов__anext__
для получения следующего значения из итератораТак как наш итератор будет служить оберткой, то метод
__anext__
будет делегировать вызов вложенному итератору, накладываяwait_for
на каждый вызов. Таким образом мы будем применять таймаут к каждой итерации, а не к всему циклу.Реализовать условие завершения работы итератора-обертки. В Python итераторы выбрасывают специальный тип исключения, для того чтобы уведомить конструкци for или async for о том, что итерирование завершено, последовательность пройдена. Так как реализация итератора-обертки с поддержкой таймаута должна понимать, в каком случае завершаться штатно, нам потребуется добавить логику проверки и выброса исключения в реализацию.
Итоговый код:
class AsyncTimedIterable:
def __init__(self, iterable, timeout=0):
class AsyncTimedIterator:
def __init__(self):
# Сохраним внутри итератор, вызовы которого требуется делать с использованием таймаутов
self._iterator = iterable.__aiter__()
async def __anext__(self):
try:
# Здесь мы и будем вызвать __anext__ обернутого итератора, устанавливая таймаут в wait_for
result = await asyncio.wait_for(self._iterator.__anext__(), int(timeout))
# Если в каком-то из объектов, полученных на очередной итерации, есть признак того, что этот ответ последний - проверяем наличие этого признака и выбрасываем StopAsyncIteration чтобы выйти из async for
if not result:
raise StopAsyncIteration
return result
except asyncio.TimeoutError as e:
# Если wait_for выдаст исключение - тут можно его обработать, либо передать наверх, также прекратив работу async for
raise e
self._factory = AsyncTimedIterator
def __aiter__(self):
return self._factory()
Теперь мы можем обернуть queue_iter (итератор сообщений по очереди в aio_pika) и указать таймаут:
timed_iter = AsyncTimedIterable(queue_iter, 30)
А далее для получения сообщений использовать async for с нашей оберткой:
async for r in timed_iter:
pass
Если интервал времени между сообщениями превысит заданный порог - будет выброшено исключение asyncio.TimeoutError, в случае корректного завершения работы - исключение StopAsyncIteration завершит работу цикла async for.
Про практические инструменты разработки мы с коллегами рассказываем в рамках онлайн-курсов. Заглядывайте в каталог и выбирайте подходящее направление.
me21
https://pypi.org/project/async-timeout/
/thread