И для чего он используется в фреймворке для приватных блокчейнов Exonum


Tokio — это фреймворк для разработки сетевых масштабируемых приложений на Rust, использующий компоненты для работы с асинхронным вводом/выводом. Tokio часто служит основой для других библиотек и реализаций высокопроизводительных протоколов. Несмотря на то что он является довольно молодым фреймворком, ему уже удалось стать частью экосистемы межплатформенного программного обеспечения.

И хотя Tokio критикуют за излишнюю сложность в освоении, он уже используется в продакшн-средах, поскольку код, написанный на Tokio, легче поддерживать. Например, его уже интегрировали в hyper, tower-grpc и сonduit. Мы тоже обратились к этому решению при разработке нашей платформы Exonum.

Работа над Exonum началась в 2016 году, когда Tokio еще не существовал, поэтому сперва нами использовалась библиотека Mio v0.5. С появлением Tokio стало ясно, что используемая библиотека Mio устарела, более того, с её помощью было сложно организовывать событийную модель Exonum. Модель включала несколько типов событий (сетевые сообщения, таймауты, сообщения из REST API и др.), а также их сортировки по степени приоритетности.

Каждое событие влечет за собой изменение состояния узла, а значит их необходимо обрабатывать в одном потоке, в определенном порядке и по одному принципу. На Mio схему обработки каждого события приходилось описывать вручную, что при поддержании кода (добавлении/изменении параметров) могло оборачиваться большим количеством ошибок. Tokio позволил упростить этот процесс за счет встроенных функций.

Ниже мы расскажем о компонентах стека Tokio, которые позволяют эффективно организовывать обработку асинхронных задач.


/ изображение Kevin Dooley CC

Архитектура Tokio


По своей сути Tokio представляет собой «обертку» над Mio. Mio — это крэйт Rust, который предоставляет API для низкоуровневого ввода/вывода и не зависит от платформы — он работает с несколькими инструментами: epoll в Linux, kqueue в Mac OS или IOCP в Windows. Таким образом, архитектура Tokio может быть представлена следующим образом:



Futures

Как видно из схемы выше, главным функциональным компонентом Tokio, является futures — это crate Rust, который позволяет работать с асинхронным кодом в синхронной манере. Иными словами, библиотека дает возможность оперировать с кодом, который реализует еще не выполненные задачи, как будто они уже завершились.

По сути, futures — это значения, которые будут подсчитаны в будущем, но пока неизвестны. В формате futures можно представлять разного рода события: запросы к базам данных, таймауты, длительные задачи для CPU, чтение информации с сокета и т. д., и синхронизировать их.

Примером future в реальной жизни может служить уведомление о доставке заказного письма почтой: по завершении доставки отправителю направляется уведомление об успешном получении адресатом письма. Получив уведомление, отправитель определяет, какие действия ему предпринимать дальше.  

Разработчик Дэвид Симмонс (David Simmons), сотрудничавший с компаниями Intel, Genuity и Sparco Media, в качестве примера организации асинхронного ввода/вывода с помощью futures приводит обмен сообщениями с HTTP-сервером.

Представьте, что сервер каждый раз порождает новую нить (thread) для установленного соединения. При синхронном I/O система сперва считает байты по порядку, затем обработает информацию и запишет результат обратно. При этом в момент чтения/записи нить не сможет продолжать выполнение (она блокируется), пока операция не будет завершена. Это приводит к тому, что при большом числе соединений возникают трудности при масштабировании (так называемая проблема C10k).

В случае асинхронной обработки, нить ставит в очередь запрос на I/O и продолжает выполнение (то есть не блокируется). Система осуществляет чтение/запись через какое-то время, а нить, прежде чем использовать результаты, спрашивает, был ли выполнен запрос. Таким образом, futures способны выполнять разные задачи, например, один может считывать запрос, второй — его обрабатывать, а третий — формировать ответ.

В крэйте futures определен типаж Future, который является ядром всей библиотеки. Этот типаж определяется для объектов, которые выполняются не сразу, а спустя некоторое время. Его основная часть выражена в коде следующим образом:

trait Future {
    type Item;
    type Error;
    fn poll(&mut self) -> Poll<Self::Item, Self::Error>;
    fn wait(self) -> Result<Self::Item, Self::Error> { ... }

    fn map<F, U>(self, f: F) -> Map<Self, F>
        where F: FnOnce(Self::Item) -> U { ... }
}

«Сердцем» типажа Future является метод poll(). Он отвечает за пересылку индикатора завершения работы, ожидания вызова или посчитанного значения. При этом futures запускаются в контексте задачи (task). Задача ассоциируется только с одним future, однако последний может быть составным, то есть содержать внутри себя несколько других futures, объединенных командами join_all() или and_then(). Например:

let client_to_server = copy(client_reader, server_writer)
                    .and_then(|(n, _, server_writer)| {
                        shutdown(server_writer).map(move |_| n)
                    });

За координацию task/future отвечает исполнитель (executor). Если есть несколько задач, запущенных одновременно, и часть из них ожидает результатов внешних асинхронных событий (например, чтение данных из сети/сокета), исполнитель должен эффективно распределить ресурсы процессора для оптимального их выполнения. На практике это происходит за счет «перебрасывания» мощностей процессора на задачи, которые могут быть выполнены, пока другие задачи заблокированы из-за отсутствия внешних данных.

В случае отложенной задачи, executor получает информацию о том, что ее можно выполнять, при помощи метода notify(). Примером может служить исполнитель крэйта futures, который «просыпается» при вызове wait() — исходный код примера представлен в официальном репозитории Rust на GitHub:

    pub fn wait_future(&mut self) -> Result<F::Item, F::Error> {
        ThreadNotify::with_current(|notify| {

            loop {
                match self.poll_future_notify(notify, 0)? {
                    Async::NotReady => notify.park(),
                    Async::Ready(e) => return Ok(e),
                }
            }
        })
    }

Streams

Кроме futures, Tokio работает и с другими компонентами для асинхронного I/O — потоками (streams). В то время как future возвращает лишь один финальный результат, stream работает с серией событий и способен вернуть несколько результатов.

Снова пример из реальной жизни: периодические оповещения от датчика измерения температуры могут быть представлены в виде stream. Датчик будет регулярно отправлять значение измерения температуры пользователю через некоторые промежутки времени.

Типаж stream может выглядеть следующим образом:

trait Stream {
    type Item;
    type Error;
    fn poll(&mut self) -> Poll<Option<Self::Item>, Self::Error>;
}

Механика работы со stream идентична той, что применяется к futures: используются похожие комбинаторы для преобразования и изменения деталей потока. Более того, stream легко может быть преобразован в future при помощи адаптера into_future.

Ниже мы предметно рассмотрим применение futures и stream в нашем фреймворке Exonum.

Tokio в Exonum


Как уже было сказано, разработчиками Exonum было принято решение использовать библиотеку Tokio для реализации цикла событий (event loop) во фреймворке.?
Упрощенная схема организации событийной модели в Exonum может быть представлена следующим образом:


Каждый узел сети обменивается сообщениями с другими узлами. Все входящие сообщения попадают в очередь сетевых событий, куда кроме них также попадают внутренние события (тайм-ауты и события внутреннего API). Каждый тип события формирует отдельный поток (stream). Но обработка таких событий, как было отмечено ранее, — процесс синхронный, поскольку влечет за собой изменения состояния узла. Event Agregator объединяет несколько цепочек событий в одну и отправляет их с помощью канала в event loop, где они обрабатываются в порядке установленного приоритета.

При коммуникации между узлами Exonum выполняет следующие связанные операции на каждом из них:
 
Подключение к узлу N (открытие сокета, настройка сокета) —> Получение сообщений узла N (получение байтов из сокета, разбиение байтов на сообщения) —> Пересылка сообщений в канал текущего узла

let connect_handle = Retry::spawn(handle.clone(), strategy, action)
            .map_err(into_other)
            // Configure socket
            .and_then(move |sock| {
                sock.set_nodelay(network_config.tcp_nodelay)?;
                let duration =
                    network_config.tcp_keep_alive.map(Duration::from_millis);
                sock.set_keepalive(duration)?;
                Ok(sock)
            })
            // Connect socket with the outgoing channel
            .and_then(move |sock| {
                trace!("Established connection with peer={}", peer);

                let stream = sock.framed(MessagesCodec::new(max_message_len));
                let (sink, stream) = stream.split();

                let writer = conn_rx
                    .map_err(|_| other_error("Can't send data into socket"))
                    .forward(sink);
                let reader = stream.for_each(result_ok);

                reader
                    .select2(writer)
                    .map_err(|_| other_error("Socket error"))
                    .and_then(|res| match res {
                        Either::A((_, _reader)) => Ok("by reader"),
                        Either::B((_, _writer)) => Ok("by writer"),
                    })
            })

Ввиду того, что futures дает возможность комбинировать различные абстракции в цепочку без потери производительности системы, программный код получается разбит на маленькие функциональные блоки, а, следовательно, его становится легче поддерживать.

Использование сети является нетривиальной задачей. Для работы с узлом необходимо к нему подключиться, а также обеспечить логику переподключений в случае разрыва:

.map_err(into_other)

Помимо этого, необходимо произвести настройку сокета:

.and_then(move |sock| {
                sock.set_nodelay(network_config.tcp_nodelay)?;
                let duration =
                    network_config.tcp_keep_alive.map(Duration::from_millis);
                sock.set_keepalive(duration)?;
                Ok(sock)
            })

И парсить входящие байты как сообщения:

let (sink, stream) = stream.split();

Каждый из приведенных блоков кода, в свою очередь, состоит из более мелких участков. Однако благодаря тому, что futures позволяет свободно комбинировать эти блоки, нам нет необходимости задумываться о внутреннем строении каждого из них.

В завершение хотелось бы отметить, что на данный момент Exonum в качестве API использует несколько устаревшую версию iron на базе библиотеки hyper. Однако сейчас мы рассматриваем вариант перехода на чистый hyper, который использует Tokio.



Предлагаем вам еще несколько материалов по теме из нашего блога на Хабре:

Комментарии (28)


  1. mwizard
    22.03.2018 19:35

    Да это же один в один asyncio из Python 3. Теперь осталось обернуть futures в полноценный псевдосинхронный async/await.


    1. Gorthauer87
      22.03.2018 20:29

      Ты не поверишь.
      https://crates.io/crates/futures-await
      Работает, правда, только на nightly.


    1. Fedcomp
      22.03.2018 20:35

      В nigthly rust есть github.com/alexcrichton/futures-await


    1. Hixon10
      22.03.2018 21:55
      +1

      Ровно, как и «один в один» Netty. Да и, наверное, любой NIO-фреймворк. Все мы пляшем вокруг poll.


    1. potan
      23.03.2018 01:26

      Лучше бы сделали аналог do в Haskell и for в Scala. Можно было бы писать на полноценных фьючах чистый и понятный код, не засоряя его async/await.


      1. Googolplex
        23.03.2018 02:04

        async/await существенно удобнее for/do. Только лишь с помощью комбинаторов очень многие паттерны выражать безумно геморройно. Год назад в одном проекте на скале большие асинхронные методы сначала мы пытались писать с помощью комбинаторов и for. Получалось совершенно нечитаемое месиво. После перехода на scala-async код стал чище и понятнее на порядок.


        С async-await в Rust писать асинхронный код очень легко и приятно. А с недавними разработками в области immovable generators станет все совсем хорошо.


        1. Gorthauer87
          23.03.2018 10:33

          Интересно, а если оператор? Перегрузить в качестве комбинатора? Должно сейчас же на найтли сработать вместо await


          1. potan
            23.03.2018 13:35
            -1

            В Haskell это сделано оператором (>>=)
            То есть

            do
               x <- future1
               y <- genFuture2 x
               pure y
            

            можно записать как
            future1 >>= genFuture2
            

            Но практика показывает, что do обычно читабельнее.
            В Scala вместо оператора (>>=) почему то сделали метод flatMap. for вызлядит симпатичнее.
            for {
              x <- future1
              y <- genFuture2(x)
            } yield {
              y
            }
            

            против
            future1.flatMap(genFuture2)
            


            1. mayorovp
              23.03.2018 17:25

              А теперь добавьте в код несколько циклов…


              1. potan
                23.03.2018 17:32
                -1

                Вот в циклах с async/await я бы работать не рискнул, поскольку не очень понимаю его семантику в этом случае. Использовавшие его коллеги в этих случаях сталкивались с неожиданностями.
                Я обычно использую sequence, которая меняет местами Seq и монаду (в данном случае Future).


                1. mayorovp
                  23.03.2018 17:50

                  А что не так с семантикой?

                  await приостанавливает выполнение асинхронной функции (освобождая поток) пока не произойдет события. Его семантика никак не меняется в цикле.


                  1. potan
                    23.03.2018 17:56

                    То есть не гарантируется, что он не будет удерживать нить во время ожидания?


                    1. mayorovp
                      23.03.2018 18:41

                      Гарантируется.

                      Ну, на самом деле зависит от реализации, но реализация без таких гарантий будет ошибкой.


                      1. potan
                        23.03.2018 19:01

                        Тогда я не понимаю в какой код преобразуется

                        async {
                           for( i <- Seq(1,2,3,4)) {
                              val x = await { f(i) }
                              println(x)
                           }
                        }
                        


                        1. mayorovp
                          23.03.2018 19:07

                          Ну так скопмпилируйте а потом декомпилируйте обратно...


                          Если это Scala — то скорее всего там конечный автомат с тремя состояниями будет.


                    1. humbug
                      24.03.2018 00:00

                      То есть не гарантируется, что он не будет удерживать нить во время ожидания?

                      Для таких гарантий должны соблюдаться несколько правил:


                      • операции должны быть неблокирующими. poll_read для fs (а иногда и для для stdin) является блокирующим вызовом.
                      • должны быть зарегистрированы другие фьючи, чтобы дать возможность executor'у переключиться на другую задачу


        1. potan
          23.03.2018 13:25
          -1

          Я работал с кодом на Scala, активно использующим async/await. Как правило, простой способ в нем разоблаться был переписать на for — код сразу получался компактнее и структурированиее.
          Явное использование flatMap может выглядеть громозко и страшно, но с for такой проблемы нет.


    1. fafhrd91
      23.03.2018 13:47

      Да нету там ничего общего, разве что epoll and kqueue


  1. humbug
    23.03.2018 23:15
    +2

    С появлением Tokio стало ясно, что используемая библиотека Mio устарела

    mio не может устареть, так как автор tokio по совместительству является автором mio, под капотом tokio — обертка над mio, которая активно обновляется. Уж вы-то должны это знать.


    Устарели вы, а не tokio. Мало того, что вы до сих пор сидите на tokio-core, который, вообще-то уже deprecated, у вас какое-то самописное threadpool говнище, которое жрет мои ресурсы, вы создаете кучу тредов, хотя вас об этом не просили, так у вас обработка сокетов вообще однопоточная.


    Почитайте ветку, вам многое станет ясно.


    1. humbug
      23.03.2018 23:24

      Более того, stream легко может быть преобразован в future при помощи адаптера into_future.

      Это слегка "заблуждение". Лучше продемонстрировать на примере futures v0.2. В новой версии авторы переименовали комбинатор .into_future() в .next(). Что сразу проясняет ситуацию.


    1. fafhrd91
      24.03.2018 03:31
      +1

      Единственно tokio медленней на 20% чем tokio-core, а так все хорошо


      1. humbug
        24.03.2018 08:07

        Медленнее в однопоточном режиме. В многопоточной обработке событий мое приложение стало на 5% быстрее. Мы будем улучшать производительность. Закрыто много багов, улучшили архитектуру, вот что главное.


        1. fafhrd91
          24.03.2018 10:46
          +1

          Моё многопоточное приложение стало на 20% медленнее. Если что tokio-core не ограничивает количество потоков. К тому же забавно смотреть как hyper откатывает tokio на старую версию tokio-core чтобы в бенчмарках выглядеть лучше.


          1. humbug
            24.03.2018 17:04

            Тем забавнее наблюдать за бенчмарками, ведь tokio-core сейчас является фасадом для tokio.


            1. fafhrd91
              24.03.2018 18:06

              В этом то и прикол что hyper даунгрейдеулся на версию tokio-core="=0.1.12" в которой tokio не используется. Я сделал тоже самое, и в своих проектах сделал также. Каждому своё конечно, но как-то терять 20% не очень


              1. humbug
                25.03.2018 00:09

                Почему-то все забывают о починенных в tokio багах в погоне за скоростью.


                1. fafhrd91
                  25.03.2018 05:27

                  Я что-то не уловил про баги? Какие баги? Я довольно близко знаком с tokio, не могу припомнить никаких, по крайней мере никаких фиксов в комит логе не вижу. если для вас потеря 20% это нормальн для не меня это не приемлемо.


                  1. humbug
                    25.03.2018 09:10

                    Вот фикс бага. Без него нет возможности возвращать ошибку из UdpCodec::encode, соответственно единственный способ обработать нестандартную ситуацию — упасть. В дальнейшем этот фикс позволил отказаться от разделения кодеков tcp и udp, используя одни и те же типажи: Encoder и Decoder.


                    Я помню этот фикс, потому что он был сделан мной.


                    Я довольно близко знаком с tokio

                    Вы спорите с контрибьютером с вкладом в 3к строк кода и 11 закрытых PR. Но у вас-то наверняка больше)))