И для чего он используется в фреймворке для приватных блокчейнов Exonum
Tokio — это фреймворк для разработки сетевых масштабируемых приложений на Rust, использующий компоненты для работы с асинхронным вводом/выводом. Tokio часто служит основой для других библиотек и реализаций высокопроизводительных протоколов. Несмотря на то что он является довольно молодым фреймворком, ему уже удалось стать частью экосистемы межплатформенного программного обеспечения.
И хотя Tokio критикуют за излишнюю сложность в освоении, он уже используется в продакшн-средах, поскольку код, написанный на Tokio, легче поддерживать. Например, его уже интегрировали в hyper, tower-grpc и сonduit. Мы тоже обратились к этому решению при разработке нашей платформы Exonum.
Работа над Exonum началась в 2016 году, когда Tokio еще не существовал, поэтому сперва нами использовалась библиотека Mio v0.5. С появлением Tokio стало ясно, что используемая библиотека Mio устарела, более того, с её помощью было сложно организовывать событийную модель Exonum. Модель включала несколько типов событий (сетевые сообщения, таймауты, сообщения из REST API и др.), а также их сортировки по степени приоритетности.
Каждое событие влечет за собой изменение состояния узла, а значит их необходимо обрабатывать в одном потоке, в определенном порядке и по одному принципу. На Mio схему обработки каждого события приходилось описывать вручную, что при поддержании кода (добавлении/изменении параметров) могло оборачиваться большим количеством ошибок. Tokio позволил упростить этот процесс за счет встроенных функций.
Ниже мы расскажем о компонентах стека Tokio, которые позволяют эффективно организовывать обработку асинхронных задач.
/ изображение Kevin Dooley CC
Архитектура Tokio
По своей сути Tokio представляет собой «обертку» над Mio. Mio — это крэйт Rust, который предоставляет API для низкоуровневого ввода/вывода и не зависит от платформы — он работает с несколькими инструментами: epoll в Linux, kqueue в Mac OS или IOCP в Windows. Таким образом, архитектура Tokio может быть представлена следующим образом:
Futures
Как видно из схемы выше, главным функциональным компонентом Tokio, является futures — это crate Rust, который позволяет работать с асинхронным кодом в синхронной манере. Иными словами, библиотека дает возможность оперировать с кодом, который реализует еще не выполненные задачи, как будто они уже завершились.
По сути, futures — это значения, которые будут подсчитаны в будущем, но пока неизвестны. В формате futures можно представлять разного рода события: запросы к базам данных, таймауты, длительные задачи для CPU, чтение информации с сокета и т. д., и синхронизировать их.
Примером future в реальной жизни может служить уведомление о доставке заказного письма почтой: по завершении доставки отправителю направляется уведомление об успешном получении адресатом письма. Получив уведомление, отправитель определяет, какие действия ему предпринимать дальше.
Разработчик Дэвид Симмонс (David Simmons), сотрудничавший с компаниями Intel, Genuity и Sparco Media, в качестве примера организации асинхронного ввода/вывода с помощью futures приводит обмен сообщениями с HTTP-сервером.
Представьте, что сервер каждый раз порождает новую нить (thread) для установленного соединения. При синхронном I/O система сперва считает байты по порядку, затем обработает информацию и запишет результат обратно. При этом в момент чтения/записи нить не сможет продолжать выполнение (она блокируется), пока операция не будет завершена. Это приводит к тому, что при большом числе соединений возникают трудности при масштабировании (так называемая проблема C10k).
В случае асинхронной обработки, нить ставит в очередь запрос на I/O и продолжает выполнение (то есть не блокируется). Система осуществляет чтение/запись через какое-то время, а нить, прежде чем использовать результаты, спрашивает, был ли выполнен запрос. Таким образом, futures способны выполнять разные задачи, например, один может считывать запрос, второй — его обрабатывать, а третий — формировать ответ.
В крэйте futures определен типаж Future, который является ядром всей библиотеки. Этот типаж определяется для объектов, которые выполняются не сразу, а спустя некоторое время. Его основная часть выражена в коде следующим образом:
trait Future {
type Item;
type Error;
fn poll(&mut self) -> Poll<Self::Item, Self::Error>;
fn wait(self) -> Result<Self::Item, Self::Error> { ... }
fn map<F, U>(self, f: F) -> Map<Self, F>
where F: FnOnce(Self::Item) -> U { ... }
}
«Сердцем» типажа Future является метод poll(). Он отвечает за пересылку индикатора завершения работы, ожидания вызова или посчитанного значения. При этом futures запускаются в контексте задачи (task). Задача ассоциируется только с одним future, однако последний может быть составным, то есть содержать внутри себя несколько других futures, объединенных командами join_all() или and_then(). Например:
let client_to_server = copy(client_reader, server_writer)
.and_then(|(n, _, server_writer)| {
shutdown(server_writer).map(move |_| n)
});
За координацию task/future отвечает исполнитель (executor). Если есть несколько задач, запущенных одновременно, и часть из них ожидает результатов внешних асинхронных событий (например, чтение данных из сети/сокета), исполнитель должен эффективно распределить ресурсы процессора для оптимального их выполнения. На практике это происходит за счет «перебрасывания» мощностей процессора на задачи, которые могут быть выполнены, пока другие задачи заблокированы из-за отсутствия внешних данных.
В случае отложенной задачи, executor получает информацию о том, что ее можно выполнять, при помощи метода notify(). Примером может служить исполнитель крэйта futures, который «просыпается» при вызове wait() — исходный код примера представлен в официальном репозитории Rust на GitHub:
pub fn wait_future(&mut self) -> Result<F::Item, F::Error> {
ThreadNotify::with_current(|notify| {
loop {
match self.poll_future_notify(notify, 0)? {
Async::NotReady => notify.park(),
Async::Ready(e) => return Ok(e),
}
}
})
}
Streams
Кроме futures, Tokio работает и с другими компонентами для асинхронного I/O — потоками (streams). В то время как future возвращает лишь один финальный результат, stream работает с серией событий и способен вернуть несколько результатов.
Снова пример из реальной жизни: периодические оповещения от датчика измерения температуры могут быть представлены в виде stream. Датчик будет регулярно отправлять значение измерения температуры пользователю через некоторые промежутки времени.
Типаж stream может выглядеть следующим образом:
trait Stream {
type Item;
type Error;
fn poll(&mut self) -> Poll<Option<Self::Item>, Self::Error>;
}
Механика работы со stream идентична той, что применяется к futures: используются похожие комбинаторы для преобразования и изменения деталей потока. Более того, stream легко может быть преобразован в future при помощи адаптера into_future.
Ниже мы предметно рассмотрим применение futures и stream в нашем фреймворке Exonum.
Tokio в Exonum
Как уже было сказано, разработчиками Exonum было принято решение использовать библиотеку Tokio для реализации цикла событий (event loop) во фреймворке.?
Упрощенная схема организации событийной модели в Exonum может быть представлена следующим образом:
Каждый узел сети обменивается сообщениями с другими узлами. Все входящие сообщения попадают в очередь сетевых событий, куда кроме них также попадают внутренние события (тайм-ауты и события внутреннего API). Каждый тип события формирует отдельный поток (stream). Но обработка таких событий, как было отмечено ранее, — процесс синхронный, поскольку влечет за собой изменения состояния узла. Event Agregator объединяет несколько цепочек событий в одну и отправляет их с помощью канала в event loop, где они обрабатываются в порядке установленного приоритета.
При коммуникации между узлами Exonum выполняет следующие связанные операции на каждом из них:
Подключение к узлу N (открытие сокета, настройка сокета) —> Получение сообщений узла N (получение байтов из сокета, разбиение байтов на сообщения) —> Пересылка сообщений в канал текущего узла
let connect_handle = Retry::spawn(handle.clone(), strategy, action)
.map_err(into_other)
// Configure socket
.and_then(move |sock| {
sock.set_nodelay(network_config.tcp_nodelay)?;
let duration =
network_config.tcp_keep_alive.map(Duration::from_millis);
sock.set_keepalive(duration)?;
Ok(sock)
})
// Connect socket with the outgoing channel
.and_then(move |sock| {
trace!("Established connection with peer={}", peer);
let stream = sock.framed(MessagesCodec::new(max_message_len));
let (sink, stream) = stream.split();
let writer = conn_rx
.map_err(|_| other_error("Can't send data into socket"))
.forward(sink);
let reader = stream.for_each(result_ok);
reader
.select2(writer)
.map_err(|_| other_error("Socket error"))
.and_then(|res| match res {
Either::A((_, _reader)) => Ok("by reader"),
Either::B((_, _writer)) => Ok("by writer"),
})
})
Ввиду того, что futures дает возможность комбинировать различные абстракции в цепочку без потери производительности системы, программный код получается разбит на маленькие функциональные блоки, а, следовательно, его становится легче поддерживать.
Использование сети является нетривиальной задачей. Для работы с узлом необходимо к нему подключиться, а также обеспечить логику переподключений в случае разрыва:
.map_err(into_other)
Помимо этого, необходимо произвести настройку сокета:
.and_then(move |sock| {
sock.set_nodelay(network_config.tcp_nodelay)?;
let duration =
network_config.tcp_keep_alive.map(Duration::from_millis);
sock.set_keepalive(duration)?;
Ok(sock)
})
И парсить входящие байты как сообщения:
let (sink, stream) = stream.split();
Каждый из приведенных блоков кода, в свою очередь, состоит из более мелких участков. Однако благодаря тому, что futures позволяет свободно комбинировать эти блоки, нам нет необходимости задумываться о внутреннем строении каждого из них.
В завершение хотелось бы отметить, что на данный момент Exonum в качестве API использует несколько устаревшую версию iron на базе библиотеки hyper. Однако сейчас мы рассматриваем вариант перехода на чистый hyper, который использует Tokio.
Предлагаем вам еще несколько материалов по теме из нашего блога на Хабре:
- Краткая история Rust: от хобби до самого популярного ЯП по данным StackOverflow
- Как создать блокчейн-проект на Exonum: краткое руководство
- Наши корпоративные странички в Vk и Fb
Комментарии (28)
humbug
23.03.2018 23:15+2С появлением Tokio стало ясно, что используемая библиотека Mio устарела
mio
не может устареть, так как авторtokio
по совместительству является авторомmio
, под капотомtokio
— обертка надmio
, которая активно обновляется. Уж вы-то должны это знать.
Устарели вы, а не
tokio
. Мало того, что вы до сих пор сидите наtokio-core
, который, вообще-то уже deprecated, у вас какое-то самописное threadpool говнище, которое жрет мои ресурсы, вы создаете кучу тредов, хотя вас об этом не просили, так у вас обработка сокетов вообще однопоточная.
Почитайте ветку, вам многое станет ясно.
fafhrd91
24.03.2018 03:31+1Единственно tokio медленней на 20% чем tokio-core, а так все хорошо
humbug
24.03.2018 08:07Медленнее в однопоточном режиме. В многопоточной обработке событий мое приложение стало на 5% быстрее. Мы будем улучшать производительность. Закрыто много багов, улучшили архитектуру, вот что главное.
fafhrd91
24.03.2018 10:46+1Моё многопоточное приложение стало на 20% медленнее. Если что tokio-core не ограничивает количество потоков. К тому же забавно смотреть как hyper откатывает tokio на старую версию tokio-core чтобы в бенчмарках выглядеть лучше.
humbug
24.03.2018 17:04Тем забавнее наблюдать за бенчмарками, ведь tokio-core сейчас является фасадом для tokio.
fafhrd91
24.03.2018 18:06В этом то и прикол что hyper даунгрейдеулся на версию tokio-core="=0.1.12" в которой tokio не используется. Я сделал тоже самое, и в своих проектах сделал также. Каждому своё конечно, но как-то терять 20% не очень
humbug
25.03.2018 00:09Почему-то все забывают о починенных в
tokio
багах в погоне за скоростью.fafhrd91
25.03.2018 05:27Я что-то не уловил про баги? Какие баги? Я довольно близко знаком с tokio, не могу припомнить никаких, по крайней мере никаких фиксов в комит логе не вижу. если для вас потеря 20% это нормальн для не меня это не приемлемо.
humbug
25.03.2018 09:10Вот фикс бага. Без него нет возможности возвращать ошибку из UdpCodec::encode, соответственно единственный способ обработать нестандартную ситуацию — упасть. В дальнейшем этот фикс позволил отказаться от разделения кодеков
tcp
иudp
, используя одни и те же типажи: Encoder и Decoder.
Я помню этот фикс, потому что он был сделан мной.
Я довольно близко знаком с tokio
Вы спорите с контрибьютером с вкладом в 3к строк кода и 11 закрытых PR. Но у вас-то наверняка больше)))
mwizard
Да это же один в один asyncio из Python 3. Теперь осталось обернуть futures в полноценный псевдосинхронный async/await.
Gorthauer87
Ты не поверишь.
https://crates.io/crates/futures-await
Работает, правда, только на nightly.
Fedcomp
В nigthly rust есть github.com/alexcrichton/futures-await
Hixon10
Ровно, как и «один в один» Netty. Да и, наверное, любой NIO-фреймворк. Все мы пляшем вокруг poll.
potan
Лучше бы сделали аналог do в Haskell и for в Scala. Можно было бы писать на полноценных фьючах чистый и понятный код, не засоряя его async/await.
Googolplex
async/await существенно удобнее for/do. Только лишь с помощью комбинаторов очень многие паттерны выражать безумно геморройно. Год назад в одном проекте на скале большие асинхронные методы сначала мы пытались писать с помощью комбинаторов и for. Получалось совершенно нечитаемое месиво. После перехода на scala-async код стал чище и понятнее на порядок.
С async-await в Rust писать асинхронный код очень легко и приятно. А с недавними разработками в области immovable generators станет все совсем хорошо.
Gorthauer87
Интересно, а если оператор? Перегрузить в качестве комбинатора? Должно сейчас же на найтли сработать вместо await
potan
В Haskell это сделано оператором (>>=)
То есть
можно записать как
Но практика показывает, что do обычно читабельнее.
В Scala вместо оператора (>>=) почему то сделали метод flatMap. for вызлядит симпатичнее.
против
mayorovp
А теперь добавьте в код несколько циклов…
potan
Вот в циклах с async/await я бы работать не рискнул, поскольку не очень понимаю его семантику в этом случае. Использовавшие его коллеги в этих случаях сталкивались с неожиданностями.
Я обычно использую sequence, которая меняет местами Seq и монаду (в данном случае Future).
mayorovp
А что не так с семантикой?
await приостанавливает выполнение асинхронной функции (освобождая поток) пока не произойдет события. Его семантика никак не меняется в цикле.
potan
То есть не гарантируется, что он не будет удерживать нить во время ожидания?
mayorovp
Гарантируется.
Ну, на самом деле зависит от реализации, но реализация без таких гарантий будет ошибкой.
potan
Тогда я не понимаю в какой код преобразуется
mayorovp
Ну так скопмпилируйте а потом декомпилируйте обратно...
Если это Scala — то скорее всего там конечный автомат с тремя состояниями будет.
humbug
Для таких гарантий должны соблюдаться несколько правил:
poll_read
для fs (а иногда и для для stdin) является блокирующим вызовом.potan
Я работал с кодом на Scala, активно использующим async/await. Как правило, простой способ в нем разоблаться был переписать на for — код сразу получался компактнее и структурированиее.
Явное использование flatMap может выглядеть громозко и страшно, но с for такой проблемы нет.
fafhrd91
Да нету там ничего общего, разве что epoll and kqueue