Сегодня решил собрать воедино то, что я знаю о разработке высокочастотных систем в связке с Rust. Из кусков кода, по сусекам, что называется, по репозиториям наскреб и склеил с помощью готопоты достойную к вашему вниманию либу. Библа позволит сохранить время всем, кто стремиться одновременно и к скорости, и гибкости. Планирую сам активно юзать, чтобы перейти со старой асинхронной торговой инфры в истинные треды с закосом под ультра. И развивать либу под брендом TitanRt. А если сообществу зайдет, так вообще мотивации прибавиться.
Итак, TitanRt - typed reactive runtime для построения реактивных, низколатентных систем на Rust.
Если упростить: это минималистичная основа для приложений, которые живут в цикле событий, где важны:
миллисекунды (а иногда и наносекунды)
предсказуемая обратная нагрузка (back-pressure)
чёткий контроль жизненного цикла модели и её воркеров
максимальная гибкость в области более высокоуровневой разработки
возможность прибить всё красиво и быстро, а не висеть на zombie-тредах
Зачем ещё один рантайм?
Rust уже имеет Tokio, Actix и прочих асинхронных гигантов. Но они решают другую задачу — высокоуровневый async/await, HTTP-сервисы, очереди.
TitanRt сделан для систем реального времени:
торговые движки (HFT/market making)
анализ рыночных потоков
телеметрия и алертинг
системы с жёстким контролем задержки
То есть там, где:
нельзя позволить себе лишний аллокатор
хочется контролировать ядро CPU, на котором работает поток
нужны строго типизированные каналы между моделью и воркерами
Архитектура
Вместо гигантского фреймворка TitanRt — это всего пара простых идей:
Model-first:
Ваша бизнес-логика =BaseModel
.
Она сама создаёт коннекторы и стримы, сама ими управляет, хотя не детерминирует это поведение. Вы можете управлять извне через Control Plane слой.Connector / Stream layer:
Коннектор = фабрика стримов.
Стрим = воркер-тред с типизированными каналами и вкуснятиной в виду StateCell<T>, где под капотом arc-swap. Со стримом можно общаться разными способами: через ваш типизированный Action, который вы отправляете через ваш типизированный Tx<Action>, а так же через типизированный hook, в который приходит сырое событие стрима, ваш Rx<Event> и StateCell<T> - здесь вы выбираете, что делать с данными, чтобы оркестрировать этим цикле модели.Runtime:
Небольшой управляющий поток, который гоняет команды:Start
,Stop
,Restart
,HotReload
,Shutdown
. Ну и саму модель, на которой вызывается ваш импл execute() или event() - если вы вдруг гоняете события извне рантайма.
В общем и целом, рантайм не знает про ваши протоколы и бизнес-логику — только управляет жизненным циклом модели.
Не совсем детальная, но какая-никакая схема:
┌──────────────────────────┐
│ Runtime │
│ Start/Stop/Restart/... │
└───────────▲─────────────┘
│
┌─────────┴─────────┐
│ Model │
│ owns connectors │
└─────────┬─────────┘
│
┌───────────▼───────────┐
│ Connector(s) │
│ spawn Stream(s) │
└───────────┬───────────┘
│
Actions ───►│ │◄─── Events
▼
Stream
Как это выглядит в коде
Минимальная модель:
use titanrt::model::{BaseModel, ExecutionResult, StopKind, StopState};
use titanrt::utils::CancelToken;
use anyhow::Result;
#[derive(Clone, Debug)]
struct MyConfig {
greeting: String,
}
#[derive(Clone, Debug)]
struct MyEvent(String);
#[derive(Clone)]
struct MyOutputTx; // no-op
impl titanrt::io::base::BaseTx for MyOutputTx {
type EventType = String;
fn try_send(&mut self, v: String) -> Result<(), titanrt::error::SendError<String>> {
println!("OUT: {v}");
Ok(())
}
fn send(
&mut self,
v: String,
_: &CancelToken,
_: Option<std::time::Duration>,
) -> Result<(), titanrt::error::SendError<String>> {
self.try_send(v)
}
}
struct MyModel {
cfg: MyConfig,
out: MyOutputTx,
}
impl BaseModel for MyModel {
type Config = MyConfig;
type OutputTx = MyOutputTx;
type Event = MyEvent;
type Ctx = ();
fn initialize(
_ctx: (),
config: MyConfig,
_core_id: Option<usize>,
output_tx: MyOutputTx,
_cancel: CancelToken,
) -> Result<Self> {
Ok(Self { cfg: config, out: output_tx })
}
fn execute(&mut self) -> ExecutionResult {
let _ = self.out.try_send(format!("{}!", self.cfg.greeting));
ExecutionResult::Relax
}
fn on_event(&mut self, event: MyEvent) {
let _ = self.out.try_send(format!("event: {}", event.0));
}
fn stop(&mut self, _kind: StopKind) -> StopState {
StopState::Done
}
}
А потом — просто запускаем:
fn main() -> Result<()> {
let cfg = titanrt::config::RuntimeConfig {
init_model_on_start: true,
core_id: None,
max_inputs_pending: Some(1024),
max_inputs_drain: Some(64),
stop_model_timeout: Some(5),
};
let rt = titanrt::runtime::Runtime::<MyModel>::spawn(
cfg,
NullModelCtx,
MyConfig { greeting: "Hello, TitanRt".into() },
MyOutputTx,
)?;
rt.run_blocking()?;
Ok(())
}
Чем это отличается от Tokio?
Без async/await: здесь честные треды, без скрытого планировщика. Тем не менее, планируется добавить опциональный токио рантайм из коробки.
CPU pinning: можно закрепить поток за ядром. Хочется в будущих версиях добавить настройку планировщика и приоритетность.
Typed I/O: никакого
Any
, всё компилируется статически. Строго.Минимум зависимостей:
crossbeam
,arc-swap
,ahash, ringbuf,
немного serde.
Когда использовать TitanRt
Нужно максимально предсказуемое поведение под нагрузкой.
Ваш код живёт в бесконечном цикле: обработка стакана, телеметрии, сигналов.
Вы пишете HFT-движок, рынок данных, RT-алертинг или даже какую-нибудь игровую сетевую петлю.
А если нужен HTTP, gRPC, база и веб — берите Tokio/Actix, TitanRt не про это.
Репозиторий и документация
Итог
TitanRt — это скелет для реактивных систем, где важна латентность и контроль.
Вы пишете модель и коннекторы → модель создаёт коннекторы и стримы → стримы гоняют события → рантайм следит за жизненным циклом.
Просто, предсказуемо и типобезопасно.
Плюшка в виде простого yellowstone-grpc стрима:
pub struct CompositeConnector {
pub(crate) config: CompositeConfig,
pub(crate) cancel_token: CancelToken,
pub(crate) core_stats: Option<Arc<CoreStats>>,
}
impl BaseConnector for CompositeConnector {
type Config = CompositeConfig;
fn init(
config: Self::Config,
cancel_token: CancelToken,
reserved_core_ids: Option<Vec<usize>>,
) -> anyhow::Result<Self> {
let core_stats = if config.with_core_stats {
Some(CoreStats::new(
config.default_max_cores,
config.specific_cores.clone(),
reserved_core_ids.unwrap_or_default(),
)?)
} else {
None
};
Ok(Self {
config,
cancel_token,
core_stats,
})
}
fn name(&self) -> impl AsRef<str> + Display {
"CompositeConnector"
}
fn config(&self) -> &Self::Config {
&self.config
}
fn cancel_token(&self) -> &CancelToken {
&self.cancel_token
}
fn cores_stats(&self) -> Option<Arc<CoreStats>> {
self.core_stats.clone()
}
}
#[derive(Clone)]
pub enum GeyserAction {
Subscribe(SubscribeRequest),
UnsubscribeAll,
}
#[derive(Clone, Debug)]
pub struct GeyserGrpcDescriptor {
pub endpoint: String,
pub auth_token: Option<String>,
pub max_pending_actions: Option<usize>,
pub max_pending_events: Option<usize>,
pub core_pick_policy: CorePickPolicy,
pub subscription: Option<SubscribeRequest>,
}
impl GeyserGrpcDescriptor {
pub fn new(
endpoint: String,
auth_token: Option<String>,
max_pending_actions: Option<usize>,
max_pending_events: Option<usize>,
core_pick_policy: CorePickPolicy,
) -> Self {
Self {
endpoint,
auth_token,
max_pending_actions,
max_pending_events,
core_pick_policy,
subscription: None,
}
}
pub fn with_subscription(mut self, sub: SubscribeRequest) -> Self {
self.subscription = Some(sub);
self
}
}
impl StreamDescriptor for GeyserGrpcDescriptor {
fn venue(&self) -> impl Venue {
Venues::Solana
}
fn kind(&self) -> impl Kind {
"YellowstoneGrpc"
}
fn max_pending_actions(&self) -> Option<usize> {
self.max_pending_actions
}
fn max_pending_events(&self) -> Option<usize> {
self.max_pending_events
}
fn core_pick_policy(&self) -> Option<CorePickPolicy> {
Some(self.core_pick_policy)
}
fn health_at_start(&self) -> bool {
false
}
}
#[derive(Clone, Debug)]
pub enum GeyserEvent {
Raw(UpdateOneof),
}
impl<E, S> StreamSpawner<GeyserGrpcDescriptor, E, S> for CompositeConector
where
S: StateMarker,
E: BaseTx + TxPairExt,
{
}
impl<E, S> StreamRunner<GeyserGrpcDescriptor, E, S> for CompositeConector
where
S: StateMarker,
E: BaseTx,
{
type Config = ();
type ActionTx = RingSender<GeyserAction>;
type RawEvent = GeyserEvent;
type Hook = fn(&Self::RawEvent, &mut E, &StateCell<S>);
fn build_config(&mut self, _desc: &GeyserGrpcDescriptor) -> anyhow::Result<Self::Config> {
Ok(())
}
fn run(
mut ctx: RuntimeCtx<GeyserGrpcDescriptor, Self, E, S>,
hook: Self::Hook,
) -> StreamResult<()> {
// Однопоточный рантайм ТОЛЬКО внутри run (внешний API остаётся синхронным)
let rt = Builder::new_current_thread()
.enable_time()
.enable_io()
.build()
.map_err(|e| StreamError::Unknown(anyhow!(e)))?;
let mut rng = SmallRng::from_os_rng();
let mut backoff_ms: u64 = 50;
let backoff_max_ms: u64 = 5_000;
let backoff_mul: f64 = 1.7;
if !ctx.desc.health_at_start() {
ctx.health.set(true);
}
rt.block_on(async move {
'reconnect: loop {
if ctx.cancel.is_cancelled() { break Ok(()); }
if backoff_ms > 50 {
let j = rng.random_range(0..(backoff_ms / 5 + 1));
tokio::time::sleep(Duration::from_millis(j)).await;
}
let mut client = match GeyserGrpcClient::build_from_shared(ctx.desc.endpoint.clone())
.map_err(|e| StreamError::Unknown(anyhow!(e)))?
.x_token(ctx.desc.auth_token.clone())
.map_err(|e| StreamError::Unknown(anyhow!(e)))?
.tls_config(ClientTlsConfig::new().with_native_roots())
.map_err(|e| StreamError::Unknown(anyhow!(e)))?
.connect().await
{
Ok(c) => c,
Err(e) => {
tracing::warn!("grpc connect error: {e}");
let sleep = backoff_ms.min(backoff_max_ms);
tokio::time::sleep(Duration::from_millis(sleep)).await;
backoff_ms = ((backoff_ms as f64) * backoff_mul) as u64;
continue 'reconnect;
}
};
let (mut tx, mut rx) = match client.subscribe().await {
Ok(x) => x,
Err(e) => {
tracing::warn!("subscribe call error: {e}");
let sleep = backoff_ms.min(backoff_max_ms);
tokio::time::sleep(Duration::from_millis(sleep)).await;
backoff_ms = ((backoff_ms as f64) * backoff_mul) as u64;
continue 'reconnect;
}
};
if let Some(req) = ctx.desc.subscription.as_ref() {
if let Err(e) = tx.send(req.clone()).await {
tracing::warn!("send initial subreq: {e}");
}
}
ctx.health.set(true);
backoff_ms = 50;
let mut tick = tokio::time::interval(Duration::from_millis(1));
tick.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Delay);
'session: loop {
tokio::select! {
biased;
m = rx.next() => {
match m {
Some(Ok(update)) => {
match update.update_oneof {
Some(UpdateOneof::Ping(_)) => {
tracing::debug!("ping update");
let _ = tx.send(SubscribeRequest {
ping: Some(SubscribeRequestPing {id: 0}),
..Default::default()
}).await;
}
Some(u) => {
// А вот и хук из модели: модель сама решает, что делать
hook(&GeyserEvent::Raw(u), &mut ctx.event_tx, &ctx.state);
}
None => {}
}
}
Some(Err(status)) => {
tracing::warn!("grpc stream error: {status}");
ctx.health.set(false);
break 'session;
}
None => {
tracing::warn!("grpc stream ended");
ctx.health.set(false);
break 'session;
}
}
}
_ = tick.tick() => {
let mut sent = 0usize;
Проверим команды из модели
while let Ok(a) = ctx.action_rx.try_recv() {
match a {
GeyserAction::Subscribe(req) => {
match tx.send(req).await {
Ok(_) => {
sent += 1;
if sent >= 4096 { break; }
}
Err(e) => {
tracing::warn!("subreq send failed: {e}");
break 'session;
}
}
},
GeyserAction::UnsubscribeAll => {}
}
}
}
}
if ctx.cancel.is_cancelled() { break 'reconnect Ok(()); }
}
let sleep = backoff_ms.min(backoff_max_ms);
tokio::time::sleep(Duration::from_millis(sleep)).await;
backoff_ms = ((backoff_ms as f64) * backoff_mul) as u64;
}
})?;
Ok(())
}
}
Ну что, делаем SolanaTxStream для отправки лидерам?