Последнее время достаточно популярна "async/await" концепция в Rust. Безусловно для подавляющего большинства задач это лучший вариант. Так как она ориентирована на то что система, которая обрабатывает сетевые запросы, например сталкивается с блокировкой ввода/вывода к базе данных. В результате лучшим решением будет использование "async/await", так как он позволяет использовать один поток для обработки множества запросов. Если функция "async" не может быть завершена например из-за ожидания ввода-вывода, она может отдать управление в точке ее вызова "await". И "executor" например "Tokio" может переключиться на другую задачу.
Однако, есть случаи, когда "async/await" не подходит. Например, когда вам нужно обрабатывать запрос с минимальным временем ожидания. По этой причине вам необходимо использовать системные потоки. Что бы контролировать свою задачу в рамках одного потока, а не надеяться на "executor" от Tokio который решит за вас какую задачу выполнить следующей.
Для того что бы продемонстрировать разницу между "async/await" и системными потоками, я выбрал задачу из области Fintech для матчинга ордеров. Подразумевается матчинга ордеров на бирже, будет производиться в рамках одного системного потока, который будет прикреплен к ядру процессора. Ордера будут поступать в матчинг систему асинхронно из других потоков.
Первая проблема с которой я столкнулся это каким образом из одного потока послать ордера в матчинг систему работающей в другом потоке. В Tokio есть "mpsc" каналы, но они не подходят для моей задачи, так как они асинхронные. Сначала я обратился к стандартной библиотеке "std::sync::mpsc", но она блокирует поток, как при отправке так и при получении сообщений. В результате, я решил использовать "crossbeam" библиотеку, которая позволяет отправлять и получать сообщения без блокировки потока. Вот пример кода:
use crossbeam_queue::ArrayQueue;
let order_queue:Arc<ArrayQueue<Order>> = Arc::new(ArrayQueue::new(100));
// send from one thread
order_queue.push(order);
// receive from another thread
while let Some(order) = order_queue.pop() {
// process order
}
Вторая проблема это запуск моей матчинг системы в отдельном потоке без помощи "executor"-ов типа Tokio. Для этого я использовал стандартную библиотеку "std::thread". Вот пример кода:
let match_system_thread_handle = std::thread::spawn(move || {
let mut matcher_system = OrderMatcher::new(crypto_currency_id, currency_id);
loop {
while let Some(order) = order_queue.pop() {
matcher_system.add_order(order);
};
let order_matches = matcher_system.match_orders();
for order_match in order_matches {
let _ = order_match_queue.push(order_match);
}
std::thread::sleep(std::time::Duration::from_secs(1));
}
});
Как видно из примера выше, я создал поток, который будет выполняться вечно. Внутри потока я получаю ордера из очереди "order_queue", произвожу матчинг и отправляю результат матчинга в другую очередь "order_match_queue".
Третья проблема это прикрепление потока к ядру процессора. Для этого я использовал библиотеку "core_affinity". Вот пример кода:
let core_ids = core_affinity::get_core_ids().unwrap();
let core_id = core_ids[0];
let matcher_system = MatcherSystem::start(crypto_currency_id, currency_id, core_id);
impl MatcherSystem {
pub fn start(crypto_currency_id: u64, currency_id: u64, core_id: CoreId) -> MatcherSystem {
let _match_system_thread_handle = std::thread::spawn(move || {
let ok = core_affinity::set_for_current(core_id);
if ok {
let mut matcher_system = OrderMatcher::new(crypto_currency_id, currency_id);
loop {
// process orders
}
} else {
panic!("Failed to set core affinity");
}
});
}
}
В результате, у меня получилась Open Source библиотека для матчинга ордеров на бирже, которая работает в рамках одного системного потока. Это позволяет контролировать задачу матчинга в рамках одного потока, а не надеяться на "executor" от Tokio. Для наглядной демонстрации этого примера я реализовал минимальную версию библиотеки для быстрого прототипирования биржи криптовалют. Вот пример кода использования этой библиотеки:
fn main() {
let storage_system = Arc::new(StorageSystem::new());
let mut assets_system = AssetSystem::new(storage_system.clone());
if assets_system.get_currencies().len() == 0 {
let _ = assets_system.create_currency(Currency { id: 0, symbol: "USD".to_string() });
}
if assets_system.get_crypto_currencies().len() == 0 {
let _ = assets_system.create_crypto_currency(CryptoCurrency { id: 0, symbol: "BTC".to_string() });
}
let assets_system = Arc::new(assets_system);
let mut accounts_system = AccountSystem::new(storage_system.clone(), assets_system.clone());
let currency_id = assets_system.get_currencies()[0].id;
let crypto_currency_id = assets_system.get_crypto_currencies()[0].id;
if storage_system.load_accounts().len() == 0 {
let account1_id = accounts_system.create_account(Account { id: 0, name: "Alice".to_string(), timestamp: SystemTime::now() });
let account2_id = accounts_system.create_account(Account { id: 0, name: "Bob".to_string(), timestamp: SystemTime::now() });
accounts_system.add_currency_to_account(account1_id, currency_id, 100000.0);
accounts_system.add_crypto_currency_to_account(account2_id, crypto_currency_id, 1.0);
}
let accounts = storage_system.load_accounts();
let account1_id = accounts[0].id;
let account2_id = accounts[1].id;
let mut order_system = OrderSystem::new(storage_system.clone(), assets_system.clone());
let core_ids = core_affinity::get_core_ids().unwrap();
let core_id = core_ids[0];
let matcher_system = MatcherSystem::start(crypto_currency_id, currency_id, core_id);
let order1 = order_system.create_order(Order { id: 0, account_id: account1_id, trade_type: TradeType::Buy, price_type: PriceType::Market, execution_type: ExecutionType::Full, crypto_currency_id: crypto_currency_id, currency_id, quantity: 0.5, status: OrderStatus::Open, timestamp: SystemTime::now()});
let order2 = order_system.create_order(Order { id: 0, account_id: account2_id, trade_type: TradeType::Sell, price_type: PriceType::Limit(50000.00), execution_type: ExecutionType::Partial, crypto_currency_id: crypto_currency_id, currency_id, quantity: 1.0, status: OrderStatus::Open, timestamp: SystemTime::now()});
let _ = matcher_system.add_order(order1);
let _ = matcher_system.add_order(order2);
print_accounts(storage_system.clone());
loop {
while let Some(order_match) = matcher_system.get_order_match() {
tracing::info!("OrderMatch: Buy Order Id: {} Sell Order Id: {} Quantity: {} Price: {}", order_match.buy_order_id, order_match.sell_order_id, order_match.quantity, order_match.price);
order_system.create_order_history(&order_match, &mut accounts_system);
print_accounts(storage_system.clone());
if storage_system.get_account_currency(account1_id, currency_id).unwrap().balance > 0.0 {
let order = order_system.create_order(Order { id: 0, account_id: account1_id, trade_type: TradeType::Buy, price_type: PriceType::Market, execution_type: ExecutionType::Full, crypto_currency_id: crypto_currency_id, currency_id, quantity: 0.5, status: OrderStatus::Open, timestamp: SystemTime::now()});
matcher_system.add_order(order);
}
}
std::thread::sleep(std::time::Duration::from_secs(1));
}
}
fn print_accounts(storage_system: Arc<StorageSystem>) {
for account in storage_system.load_accounts() {
let datetime: DateTime<Local> = account.timestamp.into();
tracing::info! {
"AccountId: {} Name: {} Timestamp: {}",account.id, account.name, datetime.format("%Y-%m-%d %H:%M:%S").to_string()
};
for account_currency in storage_system.get_account_currency_by_account_id(account.id) {
tracing::info! {
"CurrencyId: {} Symbol: {} Balance: {:.2}", account_currency.id, storage_system.get_currency(account_currency.currency_id).unwrap().symbol, account_currency.balance
};
for account_currency_history in storage_system.get_currency_history_by_account_id_account_currency_id(account.id, account_currency.id) {
let datetime: DateTime<Local> = account_currency_history.timestamp.into();
tracing::info! {
"CurrencyHistoryId: {} Balance: {:.2} Timestamp: {}", account_currency_history.id, account_currency_history.balance, datetime.format("%Y-%m-%d %H:%M:%S").to_string()
};
}
}
for account_crypto_currency in storage_system.get_account_crypto_currencies_by_account_id(account.id) {
tracing::info! {
"CryptoCurrencyId: {} {} Amount: {}", account_crypto_currency.id, storage_system.get_crypto_currency(account_crypto_currency.crypto_currency_id).unwrap().symbol, account_crypto_currency.quantity
};
for account_crypto_currency_history in storage_system.get_crypto_currency_history_by_account_id_crypto_currency_id(account.id, account_crypto_currency.crypto_currency_id) {
let datetime: DateTime<Local> = account_crypto_currency_history.timestamp.into();
tracing::info! {
"CryptoCurrencyHistoryId: {} Quantity: {} Timestamp: {}", account_crypto_currency_history.id, account_crypto_currency_history.quantity, datetime.format("%Y-%m-%d %H:%M:%S").to_string()
};
}
}
}
}
Пример работы:
2024-07-08T14:55:28.291721Z INFO ThreadId(01) examples\trade-engine.rs:81: AccountId: 1 Name: Alice Timestamp: 2024-07-08 19:55:28
2024-07-08T14:55:28.291953Z INFO ThreadId(01) examples\trade-engine.rs:86: CurrencyId: 1 Symbol: USD Balance: 100000.00
2024-07-08T14:55:28.292068Z INFO ThreadId(01) examples\trade-engine.rs:92: CurrencyHistoryId: 1 Balance: 100000.00 Timestamp: 2024-07-08 19:55:28
2024-07-08T14:55:28.292168Z INFO ThreadId(01) examples\trade-engine.rs:81: AccountId: 2 Name: Bob Timestamp: 2024-07-08 19:55:28
2024-07-08T14:55:28.292261Z INFO ThreadId(01) examples\trade-engine.rs:86: CurrencyId: 2 Symbol: USD Balance: 0.00
2024-07-08T14:55:28.292380Z INFO ThreadId(01) examples\trade-engine.rs:98: CryptoCurrencyId: 1 BTC Amount: 1
2024-07-08T14:55:28.292476Z INFO ThreadId(01) examples\trade-engine.rs:104: CryptoCurrencyHistoryId: 1 Quantity: 1 Timestamp: 2024-07-08 19:55:28
2024-07-08T14:55:29.288109Z INFO ThreadId(02) src\matcher.rs:157: Before Matching
2024-07-08T14:55:29.288289Z INFO ThreadId(02) src\matcher.rs:160: Buy Order: Order { id: 1, account_id: 1, trade_type: Buy, price_type: Market, execution_type: Full, crypto_currency_id: 1, currency_id: 1, quantity: 0.5, timestamp: SystemTime { intervals: 133649241282877806 }, status: Open }
2024-07-08T14:55:29.288362Z INFO ThreadId(02) src\matcher.rs:163: Sell Order: Order { id: 2, account_id: 2, trade_type: Sell, price_type: Limit(50000.0), execution_type: Partial, crypto_currency_id: 1, currency_id: 1, quantity: 1.0, timestamp: SystemTime { intervals: 133649241282892932 }, status: Open }
2024-07-08T14:55:29.288436Z INFO ThreadId(02) src\matcher.rs:157: After Matching
2024-07-08T14:55:29.288492Z INFO ThreadId(02) src\matcher.rs:163: Sell Order: Order { id: 2, account_id: 2, trade_type: Sell, price_type: Limit(50000.0), execution_type: Partial, crypto_currency_id: 1, currency_id: 1, quantity: 0.5, timestamp: SystemTime { intervals: 133649241282892932 }, status: Open }
2024-07-08T14:55:29.292973Z INFO ThreadId(01) examples\trade-engine.rs:64: OrderMatch: Buy Order Id: 1 Sell Order Id: 2 Quantity: 0.5 Price: 50000
2024-07-08T14:55:29.312581Z INFO ThreadId(01) examples\trade-engine.rs:81: AccountId: 1 Name: Alice Timestamp: 2024-07-08 19:55:28
2024-07-08T14:55:29.312779Z INFO ThreadId(01) examples\trade-engine.rs:86: CurrencyId: 1 Symbol: USD Balance: 75000.00
2024-07-08T14:55:29.312920Z INFO ThreadId(01) examples\trade-engine.rs:92: CurrencyHistoryId: 1 Balance: 100000.00 Timestamp: 2024-07-08 19:55:28
2024-07-08T14:55:29.313002Z INFO ThreadId(01) examples\trade-engine.rs:92: CurrencyHistoryId: 2 Balance: 75000.00 Timestamp: 2024-07-08 19:55:29
2024-07-08T14:55:29.313127Z INFO ThreadId(01) examples\trade-engine.rs:98: CryptoCurrencyId: 2 BTC Amount: 0.5
2024-07-08T14:55:29.313255Z INFO ThreadId(01) examples\trade-engine.rs:104: CryptoCurrencyHistoryId: 2 Quantity: 0.5 Timestamp: 2024-07-08 19:55:29
2024-07-08T14:55:29.313335Z INFO ThreadId(01) examples\trade-engine.rs:81: AccountId: 2 Name: Bob Timestamp: 2024-07-08 19:55:28
2024-07-08T14:55:29.313454Z INFO ThreadId(01) examples\trade-engine.rs:86: CurrencyId: 2 Symbol: USD Balance: 25000.00
2024-07-08T14:55:29.313579Z INFO ThreadId(01) examples\trade-engine.rs:92: CurrencyHistoryId: 3 Balance: 25000.00 Timestamp: 2024-07-08 19:55:29
2024-07-08T14:55:29.313707Z INFO ThreadId(01) examples\trade-engine.rs:98: CryptoCurrencyId: 1 BTC Amount: 0.5
2024-07-08T14:55:29.313814Z INFO ThreadId(01) examples\trade-engine.rs:104: CryptoCurrencyHistoryId: 1 Quantity: 1 Timestamp: 2024-07-08 19:55:28
2024-07-08T14:55:29.313881Z INFO ThreadId(01) examples\trade-engine.rs:104: CryptoCurrencyHistoryId: 3 Quantity: 0.5 Timestamp: 2024-07-08 19:55:29
2024-07-08T14:55:30.288721Z INFO ThreadId(02) src\matcher.rs:157: Before Matching
2024-07-08T14:55:30.288946Z INFO ThreadId(02) src\matcher.rs:160: Buy Order: Order { id: 3, account_id: 1, trade_type: Buy, price_type: Market, execution_type: Full, crypto_currency_id: 1, currency_id: 1, quantity: 0.5, timestamp: SystemTime { intervals: 133649241293139637 }, status: Open }
2024-07-08T14:55:30.289023Z INFO ThreadId(02) src\matcher.rs:163: Sell Order: Order { id: 2, account_id: 2, trade_type: Sell, price_type: Limit(50000.0), execution_type: Partial, crypto_currency_id: 1, currency_id: 1, quantity: 0.5, timestamp: SystemTime { intervals: 133649241282892932 }, status: Open }
2024-07-08T14:55:30.315944Z INFO ThreadId(01) examples\trade-engine.rs:64: OrderMatch: Buy Order Id: 3 Sell Order Id: 2 Quantity: 0.5 Price: 50000
2024-07-08T14:55:30.334073Z INFO ThreadId(01) examples\trade-engine.rs:81: AccountId: 1 Name: Alice Timestamp: 2024-07-08 19:55:28
2024-07-08T14:55:30.334267Z INFO ThreadId(01) examples\trade-engine.rs:86: CurrencyId: 1 Symbol: USD Balance: 50000.00
2024-07-08T14:55:30.334448Z INFO ThreadId(01) examples\trade-engine.rs:92: CurrencyHistoryId: 1 Balance: 100000.00 Timestamp: 2024-07-08 19:55:28
2024-07-08T14:55:30.334571Z INFO ThreadId(01) examples\trade-engine.rs:92: CurrencyHistoryId: 2 Balance: 75000.00 Timestamp: 2024-07-08 19:55:29
2024-07-08T14:55:30.334686Z INFO ThreadId(01) examples\trade-engine.rs:92: CurrencyHistoryId: 4 Balance: 50000.00 Timestamp: 2024-07-08 19:55:30
2024-07-08T14:55:30.334863Z INFO ThreadId(01) examples\trade-engine.rs:98: CryptoCurrencyId: 2 BTC Amount: 1
2024-07-08T14:55:30.335030Z INFO ThreadId(01) examples\trade-engine.rs:104: CryptoCurrencyHistoryId: 2 Quantity: 0.5 Timestamp: 2024-07-08 19:55:29
2024-07-08T14:55:30.335130Z INFO ThreadId(01) examples\trade-engine.rs:104: CryptoCurrencyHistoryId: 4 Quantity: 1 Timestamp: 2024-07-08 19:55:30
2024-07-08T14:55:30.335217Z INFO ThreadId(01) examples\trade-engine.rs:81: AccountId: 2 Name: Bob Timestamp: 2024-07-08 19:55:28
2024-07-08T14:55:30.335340Z INFO ThreadId(01) examples\trade-engine.rs:86: CurrencyId: 2 Symbol: USD Balance: 50000.00
2024-07-08T14:55:30.335495Z INFO ThreadId(01) examples\trade-engine.rs:92: CurrencyHistoryId: 3 Balance: 25000.00 Timestamp: 2024-07-08 19:55:29
2024-07-08T14:55:30.335588Z INFO ThreadId(01) examples\trade-engine.rs:92: CurrencyHistoryId: 5 Balance: 50000.00 Timestamp: 2024-07-08 19:55:30
2024-07-08T14:55:30.335720Z INFO ThreadId(01) examples\trade-engine.rs:98: CryptoCurrencyId: 1 BTC Amount: 0
2024-07-08T14:55:30.335846Z INFO ThreadId(01) examples\trade-engine.rs:104: CryptoCurrencyHistoryId: 1 Quantity: 1 Timestamp: 2024-07-08 19:55:28
2024-07-08T14:55:30.335923Z INFO ThreadId(01) examples\trade-engine.rs:104: CryptoCurrencyHistoryId: 3 Quantity: 0.5 Timestamp: 2024-07-08 19:55:29
2024-07-08T14:55:30.335993Z INFO ThreadId(01) examples\trade-engine.rs:104: CryptoCurrencyHistoryId: 5 Quantity: 0 Timestamp: 2024-07-08 19:55:30
2024-07-08T14:55:31.289822Z INFO ThreadId(02) src\matcher.rs:157: Before Matching
2024-07-08T14:55:31.290319Z INFO ThreadId(02) src\matcher.rs:160: Buy Order: Order { id: 4, account_id: 1, trade_type: Buy, price_type: Market, execution_type: Full, crypto_currency_id: 1, currency_id: 1, quantity: 0.5, timestamp: SystemTime { intervals: 133649241303360887 }, status: Open }
2024-07-08T14:55:31.290590Z INFO ThreadId(02) src\matcher.rs:157: After Matching
Возможности библиотеки:
AccountSystem: аккаунты, валюты, криптовалюты, история
AssetSystem: валюты, криптовалюты
OrderSystem: ордера, история
MatcherSystem: матчинг ордеров full or partial (только для покупке Market and Sell limit)
StorageSystem: redb быстрая, ACID, встроенный key-value сторедж
TODO
MatcherSystem: матчинг full or partial (для всех типов ордеров)
StorageSystem: sharding, distributed transactions, distributed storage
Исходный код проекта находится на GitHub. Если у вас есть желание помочь мне в разработке, то пишите мне в Telegram @ievkz или Discord @igumnovnsk.
Комментарии (3)
benjik
08.07.2024 18:43+1Круто, спасибо!
Есть несколько моментов:
Гипотезы/заявления о производительности стоит проверять/подтверждать бенчмарками. Блокирующий mpsc может оказаться быстрее не блокирующего array_queue
В кейсе где валют >= ядер все может стать печально из-за планировщика
Для этих целей традиционно используют ringbuffer/disruptor
Mnwamnowich
Вы не избавились от executor, а заиспользовали оный от операционной системы.
Вы точно также локаете свой поток и он уходит в долгое ожидание = отдаёт руль ОСи.
Вашу биржу вы тоже наверняка по tcp опрашиваете, а значит, у вас тоже есть блокирование потока.
ИМХО тут оптимизация во вред произошла, вы делали замеры?
Мне кажется single thread токио будет быстрее и удобнее, если там нету cpu bound задачек.
Gorthauer87
О ещё есть вот такая либа:
https://github.com/DataDog/glommio
Per thread executor и плюс io_uring.