Сегодня решил собрать воедино то, что я знаю о разработке высокочастотных систем в связке с Rust. Из кусков кода, по сусекам, что называется, по репозиториям наскреб и склеил с помощью готопоты достойную к вашему вниманию либу. Библа позволит сохранить время всем, кто стремиться одновременно и к скорости, и гибкости. Планирую сам активно юзать, чтобы перейти со старой асинхронной торговой инфры в истинные треды с закосом под ультра. И развивать либу под брендом TitanRt. А если сообществу зайдет, так вообще мотивации прибавиться.

Итак, TitanRt - typed reactive runtime для построения реактивных, низколатентных систем на Rust.

Если упростить: это минималистичная основа для приложений, которые живут в цикле событий, где важны:

  • миллисекунды (а иногда и наносекунды)

  • предсказуемая обратная нагрузка (back-pressure)

  • чёткий контроль жизненного цикла модели и её воркеров

  • максимальная гибкость в области более высокоуровневой разработки

  • возможность прибить всё красиво и быстро, а не висеть на zombie-тредах


Зачем ещё один рантайм?

Rust уже имеет Tokio, Actix и прочих асинхронных гигантов. Но они решают другую задачу — высокоуровневый async/await, HTTP-сервисы, очереди.

TitanRt сделан для систем реального времени:

  • торговые движки (HFT/market making)

  • анализ рыночных потоков

  • телеметрия и алертинг

  • системы с жёстким контролем задержки

То есть там, где:

  • нельзя позволить себе лишний аллокатор

  • хочется контролировать ядро CPU, на котором работает поток

  • нужны строго типизированные каналы между моделью и воркерами


Архитектура

Вместо гигантского фреймворка TitanRt — это всего пара простых идей:

  1. Model-first:
    Ваша бизнес-логика = BaseModel.
    Она сама создаёт коннекторы и стримы, сама ими управляет, хотя не детерминирует это поведение. Вы можете управлять извне через Control Plane слой.

  2. Connector / Stream layer:
    Коннектор = фабрика стримов.
    Стрим = воркер-тред с типизированными каналами и вкуснятиной в виду StateCell<T>, где под капотом arc-swap. Со стримом можно общаться разными способами: через ваш типизированный Action, который вы отправляете через ваш типизированный Tx<Action>, а так же через типизированный hook, в который приходит сырое событие стрима, ваш Rx<Event> и StateCell<T> - здесь вы выбираете, что делать с данными, чтобы оркестрировать этим цикле модели.

  3. Runtime:
    Небольшой управляющий поток, который гоняет команды: Start, Stop,Restart,HotReload, Shutdown. Ну и саму модель, на которой вызывается ваш импл execute() или event() - если вы вдруг гоняете события извне рантайма.
    В общем и целом, рантайм не знает про ваши протоколы и бизнес-логику — только управляет жизненным циклом модели.

Не совсем детальная, но какая-никакая схема:

                   ┌──────────────────────────┐
                   │         Runtime          │
                   │ Start/Stop/Restart/...   │
                   └───────────▲─────────────┘
                               │
                     ┌─────────┴─────────┐
                     │       Model        │
                     │ owns connectors    │
                     └─────────┬─────────┘
                               │
                   ┌───────────▼───────────┐
                   │     Connector(s)      │
                   │    spawn Stream(s)    │
                   └───────────┬───────────┘
                               │
                    Actions ───►│   │◄─── Events
                               ▼
                            Stream

Как это выглядит в коде

Минимальная модель:

use titanrt::model::{BaseModel, ExecutionResult, StopKind, StopState};
use titanrt::utils::CancelToken;
use anyhow::Result;

#[derive(Clone, Debug)]
struct MyConfig {
    greeting: String,
}

#[derive(Clone, Debug)]
struct MyEvent(String);

#[derive(Clone)]
struct MyOutputTx; // no-op

impl titanrt::io::base::BaseTx for MyOutputTx {
    type EventType = String;
    fn try_send(&mut self, v: String) -> Result<(), titanrt::error::SendError<String>> {
        println!("OUT: {v}");
        Ok(())
    }
    fn send(
        &mut self,
        v: String,
        _: &CancelToken,
        _: Option<std::time::Duration>,
    ) -> Result<(), titanrt::error::SendError<String>> {
        self.try_send(v)
    }
}

struct MyModel {
    cfg: MyConfig,
    out: MyOutputTx,
}

impl BaseModel for MyModel {
    type Config = MyConfig;
    type OutputTx = MyOutputTx;
    type Event = MyEvent;
    type Ctx = ();

    fn initialize(
        _ctx: (),
        config: MyConfig,
        _core_id: Option<usize>,
        output_tx: MyOutputTx,
        _cancel: CancelToken,
    ) -> Result<Self> {
        Ok(Self { cfg: config, out: output_tx })
    }

    fn execute(&mut self) -> ExecutionResult {
        let _ = self.out.try_send(format!("{}!", self.cfg.greeting));
        ExecutionResult::Relax
    }

    fn on_event(&mut self, event: MyEvent) {
        let _ = self.out.try_send(format!("event: {}", event.0));
    }

    fn stop(&mut self, _kind: StopKind) -> StopState {
        StopState::Done
    }
}

А потом — просто запускаем:

fn main() -> Result<()> {
    let cfg = titanrt::config::RuntimeConfig {
        init_model_on_start: true,
        core_id: None,
        max_inputs_pending: Some(1024),
        max_inputs_drain: Some(64),
        stop_model_timeout: Some(5),
    };

    let rt = titanrt::runtime::Runtime::<MyModel>::spawn(
        cfg,
        NullModelCtx, 
        MyConfig { greeting: "Hello, TitanRt".into() },
        MyOutputTx,
    )?;

    rt.run_blocking()?;
    Ok(())
}

Чем это отличается от Tokio?

  • Без async/await: здесь честные треды, без скрытого планировщика. Тем не менее, планируется добавить опциональный токио рантайм из коробки.

  • CPU pinning: можно закрепить поток за ядром. Хочется в будущих версиях добавить настройку планировщика и приоритетность.

  • Typed I/O: никакого Any, всё компилируется статически. Строго.

  • Минимум зависимостей: crossbeam, arc-swap,ahash, ringbuf, немного serde.


Когда использовать TitanRt

  • Нужно максимально предсказуемое поведение под нагрузкой.

  • Ваш код живёт в бесконечном цикле: обработка стакана, телеметрии, сигналов.

  • Вы пишете HFT-движок, рынок данных, RT-алертинг или даже какую-нибудь игровую сетевую петлю.

А если нужен HTTP, gRPC, база и веб — берите Tokio/Actix, TitanRt не про это.


Репозиторий и документация


Итог

TitanRt — это скелет для реактивных систем, где важна латентность и контроль.
Вы пишете модель и коннекторы → модель создаёт коннекторы и стримы → стримы гоняют события → рантайм следит за жизненным циклом.

Просто, предсказуемо и типобезопасно.


Плюшка в виде простого yellowstone-grpc стрима:


pub struct CompositeConnector {
    pub(crate) config: CompositeConfig,
    pub(crate) cancel_token: CancelToken,
    pub(crate) core_stats: Option<Arc<CoreStats>>,
}

impl BaseConnector for CompositeConnector {
    type Config = CompositeConfig;

    fn init(
        config: Self::Config,
        cancel_token: CancelToken,
        reserved_core_ids: Option<Vec<usize>>,
    ) -> anyhow::Result<Self> {
        let core_stats = if config.with_core_stats {
            Some(CoreStats::new(
                config.default_max_cores,
                config.specific_cores.clone(),
                reserved_core_ids.unwrap_or_default(),
            )?)
        } else {
            None
        };

        Ok(Self {
            config,
            cancel_token,
            core_stats,
        })
    }

    fn name(&self) -> impl AsRef<str> + Display {
        "CompositeConnector"
    }

    fn config(&self) -> &Self::Config {
        &self.config
    }

    fn cancel_token(&self) -> &CancelToken {
        &self.cancel_token
    }

    fn cores_stats(&self) -> Option<Arc<CoreStats>> {
        self.core_stats.clone()
    }
}

#[derive(Clone)]
pub enum GeyserAction { 
  Subscribe(SubscribeRequest),    
  UnsubscribeAll,
}

#[derive(Clone, Debug)]
pub struct GeyserGrpcDescriptor {
    pub endpoint: String,
    pub auth_token: Option<String>,
    pub max_pending_actions: Option<usize>,
    pub max_pending_events: Option<usize>,
    pub core_pick_policy: CorePickPolicy,
    pub subscription: Option<SubscribeRequest>,
}

impl GeyserGrpcDescriptor {
    pub fn new(
        endpoint: String,
        auth_token: Option<String>,
        max_pending_actions: Option<usize>,
        max_pending_events: Option<usize>,
        core_pick_policy: CorePickPolicy,
    ) -> Self {
        Self {
            endpoint,
            auth_token,
            max_pending_actions,
            max_pending_events,
            core_pick_policy,
            subscription: None,
        }
    }

    pub fn with_subscription(mut self, sub: SubscribeRequest) -> Self {
        self.subscription = Some(sub);
        self
    }
}

impl StreamDescriptor for GeyserGrpcDescriptor {
    fn venue(&self) -> impl Venue {
        Venues::Solana
    }

    fn kind(&self) -> impl Kind {
        "YellowstoneGrpc"
    }

    fn max_pending_actions(&self) -> Option<usize> {
        self.max_pending_actions
    }

    fn max_pending_events(&self) -> Option<usize> {
        self.max_pending_events
    }

    fn core_pick_policy(&self) -> Option<CorePickPolicy> {
        Some(self.core_pick_policy)
    }

    fn health_at_start(&self) -> bool {
        false
    }
}

#[derive(Clone, Debug)]
pub enum GeyserEvent {
    Raw(UpdateOneof),
}

impl<E, S> StreamSpawner<GeyserGrpcDescriptor, E, S> for CompositeConector
where
    S: StateMarker,
    E: BaseTx + TxPairExt,
{
}

impl<E, S> StreamRunner<GeyserGrpcDescriptor, E, S> for CompositeConector
where
    S: StateMarker,
    E: BaseTx,
{
    type Config = ();
    type ActionTx = RingSender<GeyserAction>;
    type RawEvent = GeyserEvent;
    type Hook = fn(&Self::RawEvent, &mut E, &StateCell<S>);

    fn build_config(&mut self, _desc: &GeyserGrpcDescriptor) -> anyhow::Result<Self::Config> {
        Ok(())
    }

    fn run(
        mut ctx: RuntimeCtx<GeyserGrpcDescriptor, Self, E, S>,
        hook: Self::Hook,
    ) -> StreamResult<()> {
        // Однопоточный рантайм ТОЛЬКО внутри run (внешний API остаётся синхронным)
        let rt = Builder::new_current_thread()
            .enable_time()
            .enable_io()
            .build()
            .map_err(|e| StreamError::Unknown(anyhow!(e)))?;

        let mut rng = SmallRng::from_os_rng();
        let mut backoff_ms: u64 = 50;
        let backoff_max_ms: u64 = 5_000;
        let backoff_mul: f64 = 1.7;

        if !ctx.desc.health_at_start() {
            ctx.health.set(true);
        }

        rt.block_on(async move {
            'reconnect: loop {
                if ctx.cancel.is_cancelled() { break Ok(()); }

                if backoff_ms > 50 {
                    let j = rng.random_range(0..(backoff_ms / 5 + 1));
                    tokio::time::sleep(Duration::from_millis(j)).await;
                }

                let mut client = match GeyserGrpcClient::build_from_shared(ctx.desc.endpoint.clone())
                    .map_err(|e| StreamError::Unknown(anyhow!(e)))?
                    .x_token(ctx.desc.auth_token.clone())
                    .map_err(|e| StreamError::Unknown(anyhow!(e)))?
                    .tls_config(ClientTlsConfig::new().with_native_roots())
                    .map_err(|e| StreamError::Unknown(anyhow!(e)))?
                    .connect().await
                {
                    Ok(c) => c,
                    Err(e) => {
                        tracing::warn!("grpc connect error: {e}");
                        let sleep = backoff_ms.min(backoff_max_ms);
                        tokio::time::sleep(Duration::from_millis(sleep)).await;
                        backoff_ms = ((backoff_ms as f64) * backoff_mul) as u64;
                        continue 'reconnect;
                    }
                };

                let (mut tx, mut rx) = match client.subscribe().await {
                    Ok(x) => x,
                    Err(e) => {
                        tracing::warn!("subscribe call error: {e}");
                        let sleep = backoff_ms.min(backoff_max_ms);
                        tokio::time::sleep(Duration::from_millis(sleep)).await;
                        backoff_ms = ((backoff_ms as f64) * backoff_mul) as u64;
                        continue 'reconnect;
                    }
                };

                if let Some(req) = ctx.desc.subscription.as_ref() {
                    if let Err(e) = tx.send(req.clone()).await {
                        tracing::warn!("send initial subreq: {e}");
                    }
                }

                ctx.health.set(true);

                backoff_ms = 50;

                let mut tick = tokio::time::interval(Duration::from_millis(1));
                tick.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Delay);

                'session: loop {
                    tokio::select! {
                        biased;

                        m = rx.next() => {
                            match m {
                                Some(Ok(update)) => {
                                    match update.update_oneof {

                                        Some(UpdateOneof::Ping(_)) => {
                                            tracing::debug!("ping update");
                                            let _ = tx.send(SubscribeRequest { 
                                                              ping: Some(SubscribeRequestPing {id: 0}),
                                                              ..Default::default() 
                                                            }).await;
                                        }
                                        Some(u) => {
                                                // А вот и хук из модели: модель сама решает, что делать
                                                hook(&GeyserEvent::Raw(u), &mut ctx.event_tx, &ctx.state);
                                        }
                                        None => {}
                                    }
                                }
                                Some(Err(status)) => {
                                    tracing::warn!("grpc stream error: {status}");
                                    ctx.health.set(false);
                                    break 'session;
                                }
                                None => {
                                    tracing::warn!("grpc stream ended");
                                    ctx.health.set(false);
                                    break 'session;
                                }
                            }
                        }

                        _ = tick.tick() => {
                            let mut sent = 0usize;

                            Проверим команды из модели
                            while let Ok(a) = ctx.action_rx.try_recv() {
                                match a {
                                    GeyserAction::Subscribe(req) => {
                                        match tx.send(req).await {
                                            Ok(_) => {
                                                sent += 1;
                                                if sent >= 4096 { break; }
                                            }
                                            Err(e) => {
                                                tracing::warn!("subreq send failed: {e}");
                                                break 'session;
                                            }
                                        }
                                    },
                                    GeyserAction::UnsubscribeAll => {}
                                }
                            }
                        }
                    }
                    if ctx.cancel.is_cancelled() { break 'reconnect Ok(()); }
                }

                let sleep = backoff_ms.min(backoff_max_ms);
                tokio::time::sleep(Duration::from_millis(sleep)).await;
                backoff_ms = ((backoff_ms as f64) * backoff_mul) as u64;
            }
        })?;

        Ok(())
    }
}

Ну что, делаем SolanaTxStream для отправки лидерам?

Комментарии (0)