Что ж, да, вы не ослышались — прямо сейчас мы с вами подготовим свой движок для поддержания такого бэкенда, как у CoinMarketCup (кмк). И писать будем на моем любимом Rust. Использовать под капотом будем мою либу TitanRt, которую я лениво и скомкано презентовал в предыдущем посте. Постараюсь быть полезным и последовательным.

Осторожно: много кода (с комментариями)...


Зависимости

[package]
name = "token_scanner"
version = "0.1.0"
edition = "2024"

[dependencies]
titanrt = { version = "0.3.0", features = ["reqwest_conn"] } #
serde = { version = "1.0.219", features = ["derive"] }
anyhow = "1.0.99"
tracing-subscriber = "0.3.20"
tracing = "0.1.41"
config = "0.15.15"

Заготовка main.rs

fn main() -> anyhow::Result<()> {
    tracing_subscriber::fmt().with_max_level(Level::INFO).init(); // подключаем банальный трейсинг

    let cfg = RuntimeConfig {
        init_model_on_start: true, // запускаем модель сразу до начала лупа (не посылаем команду Start)
        core_id: Some(1),          // пиним поток рантайма к ядру
        max_inputs_pending: Some(128), // до скольких элементов можно забить очередь, если не успеваем обработать (в рантайм можно отправлять входные события)
        max_inputs_drain: None, // сколько максимум дрейним на итерации входных событий (если None, то max_inputs_pending)
        stop_model_timeout: Some(5), // максимальное время ожидания модели для остановки внутри лупа
    };

    // вычисляем путь к файлу с конфигом
    let model_cfg_path = if let Some(path) = env::args().nth(1) {
        path
    } else {
        "./model_cfg".to_string()
    };

    // загружаем конфиг
    tracing::info!("Loading model config from: {}", model_cfg_path);
    let model_cfg = load_model_cfg(model_cfg_path)?;

    // модель будет отсылать события, которые мы будем прослушивать и сами предпринимать действия
    //  создаем канал взаимодействия между моделью и текущим потоком
    let (output_tx, mut output_rx) = MpmcChannel::unbounded::<Output<OuputEvent>>();

    // пока нам не нужен никакой контекст, все берем из конфига
    // юзаем удобный маркер из либы
    let model_ctx = NullModelCtx;

    // запускаем отдельный поток с рантаймом, где будет крутиться обслуживающий busy-loop и дергаться модель
    let rt = Runtime::<TokenScannerModel>::spawn(cfg, model_ctx, model_cfg, output_tx)?;

    let mut outputs_count = 0;

    // принимаем от модели события и просто принтим их (здесь может быть сложная логика, например, для взаимодействия с бэком)
    loop {
        if outputs_count > 1000 {
            break Ok(());
        }

        while let Ok(output) = output_rx.try_recv() {
            tracing::info!("{:?}", output);
            outputs_count += 1;
        }

        // просто держим поток в течение 100 мс
        std::thread::sleep(std::time::Duration::from_millis(100));
    }
}

// вспомогательная функция для загрузки конфига
fn load_model_cfg(path: String) -> anyhow::Result<TokenScannerConfig> {
    let cfg = Config::builder()
        .add_source(config::File::from(PathBuf::from(&path)))
        .build()
        .with_context(|| format!("failed to read model config from {path}"))?;

    let model: TokenScannerConfig = cfg
        .try_deserialize()
        .with_context(|| format!("failed to deserialize model config from {path}"))?;

    Ok(model)
}

Предварительно реализуем саму модель

pub struct TokenScannerModel {}

impl BaseModel for TokenScannerModel {
    type Config = TokenScannerConfig; // тип конфига
    type OutputTx = MpmcSender<Output<OuputEvent>>; // указываем имплементацию трансмиссии
    type OutputEvent = OuputEvent; // событие трансмиссии
    type Event = NullEvent; // не шлем события в модель
    type Ctx = NullModelCtx; // не используем контекст

    fn initialize(
        ctx: Self::Ctx,
        config: Self::Config,
        reserved_core_id: Option<usize>,
        output_tx: Self::OutputTx,
        cancel_token: CancelToken,
    ) -> anyhow::Result<Self> {
        todo!()
    }

    fn execute(&mut self) -> ExecutionResult {
        todo!()
    }

    // не шлем в модель извне никаких событий (могли бы через объект рантайма)
    fn on_event(&mut self, event: Self::Event, meta: Option<InputMeta>) {
        unimplemented!()
    }

    // останавливаемся сразу
    fn stop(&mut self, kind: StopKind) -> StopState {
        // StopState::InProgress
        StopState::Done
    }

    // не релоадим конфиги на лету
    fn hot_reload(&mut self, config: &Self::Config) -> anyhow::Result<()> {
        unimplemented!()
    }
}

Что делаем во время инициализации модели

Нам нужен стрим, который будет в отдельном потоке принимать команды от модели и асинхронно отправлять запросы на биржи, чтобы фетчить данные.

В версии 0.3.0 я добавил такой универсальный http-коннектор с подобным стримом на базе reqwest. Работает в одном треде в цикле в такой схеме:

  1. В начале итерации проверяет очередь команд, если не пустая, то создает фоновые токио таски через local_spawn, внутри которой идет проверка встроенного рейт лимит менеджера, затем уже отправка запроса на сервер. После ожидания ответа отправляем через mcmp канал ответ в наш основной цикл.

  2. Идет проверка на готовые ответы. Если ответы имеются, то вызываем модельный хук и передаем все нужное, в том числе сырой ответ.

  3. Даем токио рантайму немного продышаться и выполнить таски.

  4. Конец итерации

Просто возьмем из titanrt уже готовый коннектор и заспавним стрим прямо в модели во время инита:

pub struct TokenScannerModel {
    reqwest_stream: Stream<RingSender<ReqwestAction>, RingReceiver<CryptoEvent>, NullState>, // держим стрим для отправки http запросов и вызова хука
    output_tx: MpmcSender<Output<OuputEvent>>, // держим отправителя для отправки внутренний событий в вызывающий код
}

impl BaseModel for TokenScannerModel {
...

fn initialize(
        _ctx: Self::Ctx,
        config: Self::Config,
        reserved_core_id: Option<usize>,
        output_tx: Self::OutputTx,
        cancel_token: CancelToken,
    ) -> anyhow::Result<Self> {
        // инитим коннектор (могли бы и закрепить его в структуре, чтобы можно было всегда заново создать стрим или еще один)
        let mut conn = ReqwestConnector::init(
            config.reqwest_conn,      // вынесем конфиг коннектора в модельный конфиг
            cancel_token.new_child(), // модельный токен будет родителем, если отменим стрим, то модель останется
            reserved_core_id.map(|c| vec![c]), // укажем зарезервированное ядро
        )?;

        // для ясности:
        // let mut desc = ReqwestStreamDescriptor {
        //     max_hook_calls_at_once: 10, // максимальное количество вызовов хуков в одной итерации (по сути, сколько дреним ответов за раз)
        //     wait_async_tasks_us: 0, // сколько даем поспать токио рантайма - основное место для джиттера
        //     max_pending_actions: None, // максимальное количество команд, которые стрим держит необработанными в очереди
        //     max_pending_events: None, // максимальное количество событий, которые модель держит необработанными в очереди
        //     core_pick_policy: Some(CorePickPolicy::Specific(2)), // привяжем поток стрима на отдельное ядро процессора
        //     rate_limits: config.reqwest_rate_limits.clone(),
        // };

        // короче:
        let mut desc = ReqwestStreamDescriptor::low_latency();

        for rl_cfg in config.reqwest_rate_limits.iter() {
            desc.add_rate_limit(rl_cfg.clone());
        }

        // наконец, спавним стрим в отдельном потоке через коннектор
        // передаем объем дескриптора и кастомный модельный hook, реализуемый в другом месте
        let reqwest_stream = conn.spawn_stream(desc, crypto_hook)?;

        // ждем пока стрим не запуститься
        // например, могли накосячить с конфигом и стрим не запустился
        let deadline = Instant::now() + Duration::from_secs(10);

        while !reqwest_stream.is_healthy() {
            if Instant::now() >= deadline {
                return Err(anyhow::anyhow!(
                    "stream didn't become healthy within 10 seconds",
                ));
            }
            sleep(Duration::from_secs(1));
        }

        Ok(Self {
            output_tx,
            reqwest_stream,
        })
    }
}

Конфигурацию мы оставим на потом, пока просто примем во внимание, что она есть.

Как сейчас выглядит crypto_hook

#[derive(Debug, Clone)]
pub struct CryptoEvent {}

pub fn crypto_hook(
    args: HookArgs<
        ReqwestEvent, // сырое событие, которое детерминируется на уровне реализации стрима
        RingSender<CryptoEvent>, // будем отправлять из хука спарсенные события
        NullReducer, // можете реализовать на случай, если нужен стейт внутри стрима
        NullState, // это если нужен стейт, который будете лоадить в модели
        ReqwestStreamDescriptor, // из-за этого имеем доступ к нашему дескриптору внутри хука
    >,
) {
    if args.raw.is_success() {
    } else {
    }
}

Скоро мы это исправим, но прежде мы должны понять, что мы вообще делаем.

Бизнес логика — execute()

А что мы вообще делаем, собственно? Титан райнтайм запустит модель и будет шарашить model.execute() миллионы раз в секунду. Чем толще и массивнее execute, тем меньше итераций в лупе рантайме в секунду. И это основное место, где реализуется бизнес логика модели (или торговая, если хотите).

Очевидно, на экзекьюте нам нужно реализовать логику отправки команд в наш стрим, а также дренить (высушивать) наш ресивер, через который мы отправляем спарсенные ответы в хуке.

Давайте подготовим минимальный плацдарм просто для наглядности:

impl BaseModel for TokenScannerModel {
... 
fn execute(&mut self) -> ExecutionResult {
        let events = self.reqwest_stream.drain_max(); // полностью осушаем ресивер

        for event in events {
            tracing::debug!("got event: {:?}", event);
        }

        if self.reqwest_stream.is_healthy() {
            // экшены детерминируются реализацией стрима, поэтому юзаем то, что нам дают (или сами реализуем стрим)
            
            // здесь имеется в виду, что мы билдим GET запрос, передаем рл контекст, чтобы можно было отыскать конфигурацию рейт лимитера
            let action = ReqwestAction::get(
                Url::parse(
                    "https://api.bybit.com/v5/market/orderbook?category=spot&symbol=BTCUSDT",
                )
                .unwrap(),
            )
            .rl_ctx(RateLimitContext::new("bybit"))
            .build();
            
            // и просто отправляем экшен в стрим, чтобы в ответ на него получить событие в нашем хуке
            match self.reqwest_stream.try_send(action) {
                Ok(()) => {}
                Err(e) => {
                    tracing::error!("error sending reqwest action: {}", e);
                }
            }
        }

        ExecutionResult::Relax // чтобы не жечь ядро и чутка тормозить busy-loop
    }
}

Естественно, это не просто минимальный плацдарм, а скорее пустышка в демонстрационных целях, как можно за минуту реализовать получение снэпшотов ордербука с bybit внутри execute. Если мы это запустим, то даже событий никаких не будет, так как у нас пустой хук. Тем не менее, коллеги, это точка роста и начало программистской мысли.

В следующей части

  • Реализуем OutputEvent — событие, которое шлет модель время от времени в вызывающий код, например, для бэкенда. Это может быть снимок состояния токенов, которые мы сканируем. Или дельты по конкретному токену для сохранения в базу.

  • Реализуем CryptoEvent — событие, в которое парсим сырой ответ из хука. Это будет событие с req_id, а так же внутренним enum, чтобы не ограничиваться только запросами снэпшотов стакана, а использовать разные данные с бирж, в том числе Ticker Info. И сделаем его унифицированным, чтобы не ограничиваться только биржей байбит или рынком спот.

  • Реализуем хук и парсинг сырых событий с разных источников. Возьмем 2: bybit + binance.

  • Реализуем тайминги внутри execute и дополнительные структуры для отправки экшенов. Так же поймем, что будем делать с объектами CryptoEvent и в какой редьюсер их собирать, чтобы затем высылать output_tx.

  • Расширим конфигурацию модели и обсудим ее подробнее.

Код

Комментарии (0)