image

В Python существует библиотека Trio – библиотека асинхронного программирования.
Знакомство с Trio в основном будет интересно тем, кто работает на Asyncio, потому что это хорошая альтернатива, позволяющая решать часть проблем, с которыми не может справиться Asyncio. В этом обзоре рассмотрим, что из себя представляет Trio и какие фичи она нам дает.

Для тех, кто только начинает работу в асинхронном программировании, предлагаю прочитать небольшую вводную о том, что такое асинхронность и синхронность.

Синхронность и асинхронность


В синхронном программировании все операции выполняются последовательно, и к новой задаче можно приступить только после завершения предыдущей. Однако одна из его «болевых» точек в том, что, если один из потоков работает над задачей очень долго, может зависнуть вся программа. Существуют задачи, не требующие вычислительных мощностей, но занимающие процессорное время, которое можно использовать более рационально, отдав управление другому потоку. В синхронном программировании нет возможности приостановить текущую задачу, чтобы в ее промежутке выполнить следующую.

Для чего нужна асинхронность? Нужно различать настоящую асинхронность и асинхронность ввода-вывода. В нашем случае речь идет именно об асинхронности ввода-вывода. Глобально — для того, чтобы экономить время и более рационально использовать производственные мощности. Асинхронность позволяет обойти проблемные места потоков. В асинхронности ввода-вывода текущий поток не будет ждать выполнения какого-то внешнего события, а отдаст управление другому потоку. Таким образом, по факту, одномоментно выполняется только один поток. Отдавший управление поток уходит в очередь и ждет, когда ему вернется управление. Возможно, к тому моменту произойдет ожидаемое внешнее событие и можно будет продолжить работу. Это позволит переключаться между задачами для того, чтобы минимизировать трату времени.

И вот теперь мы можем снова вернуться к тому, что такое Asyncio. В основе работы этой библиотеки цикл событий (event loop), который включает в себя очередь задач и сам цикл. Цикл управляет выполнением задач, а именно вытягивает задачи из очереди и определяет, что будет происходить с ней. Например, это может быть обработка задач ввода/вывода. То есть цикл событий выбирает задачу, регистрирует и в нужный момент запускает ее обработку.

Корутины (coroutines) — специальные функции, которые возвращают управление этой задачей обратно циклу событий, то есть возвращают их в очередь. Необходимо, чтобы эти сопрограммы были запущены именно через цикл событий.

Также есть футуры — объекты, в которых хранится текущий результат выполнения какой-либо задачи. Это может быть информация о том, что задача ещё не обработана или уже полученный результат; а может быть вообще исключение.

В целом библиотека Asyncio на слуху, однако, у нее есть ряд недостатков, которые как раз способна закрыть Trio.

Trio


Как рассказывает сам автор библиотеки Натаниял Смит, разрабатывая Trio, он стремился создать легковесный и легко используемый инструмент для разработчика, который обеспечил бы максимально простой асинхронный ввод/выход и обработку ошибок.

Важная фишка Trio – асинхронный менеджмент контекста, которого нет в Asyncio. Для этого автор создал в Trio так называемую «детскую»(nursery) — область отмены, которая берет на себя ответственность за атомарность (непрерывность) выполнения группы потоков. Ключевая идея в том, что, если в «детской» одна из корутин завершается ошибкой, то все потоки в «детской» будут или успешно завершены или отменены. В любом случае результат работы будет корректен. И только когда все корутины будут завершены, после выхода из функции разработчик уже сам принимает решение как действовать дальше.

То есть «детская» позволяет предотвратить продолжение обработки ошибки, которая может привести к тому, что либо все «упадет», либо на выходе будет неверный результат.
Это именно то, что может произойти с Asyncio, потому что в Asyncio процесс работы не останавливается, несмотря на то что случилась ошибка. И в данном случае, во-первых, разработчик не будет знать, что именно произошло в момент ошибки, во-вторых, обработка продолжится.

Примеры


Рассмотрим простейший пример из двух конкурирующих функций:

Asyncio

import asyncio

async def foo1():
    print('  foo1: выполняется')
    await asyncio.sleep(2)
    print('  foo1: выполнен')

async def foo2():
    print('  foo2: выполняется')
    await asyncio.sleep(1)
    print('  foo2: выполнен')

loop = asyncio.get_event_loop()
bundle = asyncio.wait([
    loop.create_task(foo1()),
    loop.create_task(foo2()),
])
try:
    loop.run_until_complete(bundle)
finally:
    loop.close()

Trio

import trio

async def foo1():
    print('  foo1: выполняется')
    await trio.sleep(2)
    print('  foo1: выполнен')

async def foo2():
    print('  foo2: выполняется')
    await trio.sleep(1)
    print('  foo2: выполнен')

async def root():
    async with trio.open_nursery() as nursery:
        nursery.start_soon(foo1)
        nursery.start_soon(foo2)

trio.run(root)

в обоих случаях результат будет одинаков:

foo1: выполняется
foo2: выполняется
foo2: выполнен
foo1: выполнен

Структурно код Asyncio и Trio в этом примере похож.

Явное различие только в том, что Trio не требует явного завершения цикла событий.

Рассмотрим чуть более живой пример. Сделаем обращение к web-сервису для получения timestamp.

Для Asyncio воспользуемся дополнительно aiohttp:

import time
import asyncio
import aiohttp

URL = 'https://yandex.ru/time/sync.json?geo=213'
MAX_CLIENTS = 5

async def foo(session, i):
    start = time.time()
    async with session.get(URL) as response:
        content = await response.json()
        print(f'{i} | {content.get("time")} (получено за {time.time() - start})')

async def root():
    start = time.time()
    async with aiohttp.ClientSession() as session:
        tasks = [
            asyncio.ensure_future(foo(session, i))
            for i in range(MAX_CLIENTS)
        ]
        await asyncio.wait(tasks)
    print(f'завершено за {time.time() - start}')

ioloop = asyncio.get_event_loop()
try:
    ioloop.run_until_complete(root())
finally:
    ioloop.close()

Для Trio воспользуемся asks:

import trio
import time
import asks
URL = 'https://yandex.ru/time/sync.json?geo=213'
MAX_CLIENTS = 5

asks.init('trio')

async def foo(i):
    start = time.time()
    response = await asks.get(URL)
    content = response.json()
    print(f'{i} | {content.get("time")} (получено за {time.time() - start})')

async def root():
    start = time.time()
    async with trio.open_nursery() as nursery:
        for i in range(MAX_CLIENTS):
            nursery.start_soon(foo, i)

    print(f'завершено за {time.time() - start}')

trio.run(root)

В обоих случаях получим что-то вроде

0 | 1543837647522 (получено за 0.11855053901672363)
2 | 1543837647535 (получено за 0.1389765739440918)
3 | 1543837647527 (получено за 0.13904547691345215)
4 | 1543837647557 (получено за 0.1591191291809082)
1 | 1543837647607 (получено за 0.2100353240966797)
завершено за 0.2102828025817871

Хорошо. Представим, что в процессе выполнения одной из корутин произошла ошибка
для Asyncio.

async def foo(session, i):
    start = time.time()
    if i == 3:
        raise Exception
    async with session.get(URL) as response:
        content = await response.json()
        print(f'{i} | {content.get("time")} (получено за {time.time() - start})')

1 | 1543839060815 (получено за 0.10857725143432617)
2 | 1543839060844 (получено за 0.10372781753540039)
5 | 1543839060843 (получено за 0.10734415054321289)
4 | 1543839060874 (получено за 0.13985681533813477)
завершено за 0.15044045448303223
Traceback (most recent call last):
  File "...py", line 12, in foo
    raise Exception
Exception

для Trio

async def foo(i):
    start = time.time()
    response = await asks.get(URL)
    content = response.json()
    if i == 3:
        raise Exception
    print(f'{i} | {content.get("time")} (получено за {time.time() - start})')


4 | 1543839223372 (получено за 0.13524699211120605)
2 | 1543839223379 (получено за 0.13848185539245605)
Traceback (most recent call last):
  File "...py", line 28, in <module>
    trio.run(root)
  File "/lib64/python3.6/site-packages/trio/_core/_run.py", line 1337, in run
    raise runner.main_task_outcome.error
  File "...py", line 23, in root
    nursery.start_soon(foo, i)
  File "/lib64/python3.6/site-packages/trio/_core/_run.py", line 397, in __aexit__
    raise combined_error_from_nursery
  File "...py", line 15, in foo
    raise Exception
Exception

Наглядно видно, что в Trio сразу по возникновению ошибки сработала «область отмены», и две из четырех задач, не содержащих ошибки, были аварийно завершены.

В Asyncio же были выполнены все задачи, и только потом появился трэйсбэк.

В приведенном примере это не важно, но представим, что задачи так или иначе зависят друг от друга, и набор задач должен обладать свойством атомарности. В таком случае своевременное реагирование на ошибку становится куда важнее. Конечно, можно использовать await asyncio.wait(tasks, return_when=FIRST_EXCEPTION), но надо не забывать корректно завершать открытые задачи.

А вот еще один пример:

Допустим, что корутины одновременно обращаются к нескольким аналогичным web-сервисам, и важен первый полученный ответ.

import asyncio
from asyncio import FIRST_COMPLETED
import aiohttp

URL = 'https://yandex.ru/time/sync.json?geo=213'
MAX_CLIENTS = 5

async def foo(session):
    async with session.get(URL) as response:
        content = await response.json()
        return content.get("time")

async def root():
    async with aiohttp.ClientSession() as session:
        tasks = [
            asyncio.ensure_future(foo(session))
            for i in range(1, MAX_CLIENTS + 1)
        ]
        done, pending = await asyncio.wait(tasks, return_when=FIRST_COMPLETED)
        print(done.pop().result())
        for future in pending:
            future.cancel()

ioloop = asyncio.get_event_loop()
try:
    ioloop.run_until_complete(root())
except:
    ioloop.close()

Все довольно просто. Единственное требование — не забыть завершить задачи, которые не были завершены.

В Trio провернуть аналогичный маневр несколько сложнее, зато практически невозможно оставить незаметные сразу «хвосты»:

import trio
import asks
URL = 'https://yandex.ru/time/sync.json?geo=213'
MAX_CLIENTS = 5
asks.init('trio')

async def foo(session, send_channel, nursery):
    response = await session.request('GET', url=URL)
    content = response.json()
    async with send_channel:
        send_channel.send_nowait(content.get("time"))
    nursery.cancel_scope.cancel()

async def root():
    send_channel, receive_channel = trio.open_memory_channel(1)
    async with send_channel, receive_channel:
        async with trio.open_nursery() as nursery:
            async with asks.Session() as session:
                for i in range(MAX_CLIENTS):
                    nursery.start_soon(foo, session, send_channel.clone(), nursery)

        async with receive_channel:
            x = await receive_channel.receive()
            print(x)

trio.run(root)

nursery.cancel_scope.cancel() — первая завершившая корутина вызовет в области отмены функцию, которая отменит все остальные задачи, так что не надо заботиться об этом отдельно.
Правда, чтобы передать результат выполнения корутины в вызвавшую ее функцию, придется инициировать канал связи.

Надеюсь, этот сравнительный обзор дал понимание основных особенностей Trio. Всем спасибо!