Что ж, да, вы не ослышались — прямо сейчас мы с вами подготовим свой движок для поддержания такого бэкенда, как у CoinMarketCup (кмк). И писать будем на моем любимом Rust. Использовать под капотом будем мою либу TitanRt, которую я лениво и скомкано презентовал в предыдущем посте. Постараюсь быть полезным и последовательным.
Осторожно: много кода (с комментариями)...
Зависимости
[package]
name = "token_scanner"
version = "0.1.0"
edition = "2024"
[dependencies]
titanrt = { version = "0.3.0", features = ["reqwest_conn"] } #
serde = { version = "1.0.219", features = ["derive"] }
anyhow = "1.0.99"
tracing-subscriber = "0.3.20"
tracing = "0.1.41"
config = "0.15.15"
Заготовка main.rs
fn main() -> anyhow::Result<()> {
tracing_subscriber::fmt().with_max_level(Level::INFO).init(); // подключаем банальный трейсинг
let cfg = RuntimeConfig {
init_model_on_start: true, // запускаем модель сразу до начала лупа (не посылаем команду Start)
core_id: Some(1), // пиним поток рантайма к ядру
max_inputs_pending: Some(128), // до скольких элементов можно забить очередь, если не успеваем обработать (в рантайм можно отправлять входные события)
max_inputs_drain: None, // сколько максимум дрейним на итерации входных событий (если None, то max_inputs_pending)
stop_model_timeout: Some(5), // максимальное время ожидания модели для остановки внутри лупа
};
// вычисляем путь к файлу с конфигом
let model_cfg_path = if let Some(path) = env::args().nth(1) {
path
} else {
"./model_cfg".to_string()
};
// загружаем конфиг
tracing::info!("Loading model config from: {}", model_cfg_path);
let model_cfg = load_model_cfg(model_cfg_path)?;
// модель будет отсылать события, которые мы будем прослушивать и сами предпринимать действия
// создаем канал взаимодействия между моделью и текущим потоком
let (output_tx, mut output_rx) = MpmcChannel::unbounded::<Output<OuputEvent>>();
// пока нам не нужен никакой контекст, все берем из конфига
// юзаем удобный маркер из либы
let model_ctx = NullModelCtx;
// запускаем отдельный поток с рантаймом, где будет крутиться обслуживающий busy-loop и дергаться модель
let rt = Runtime::<TokenScannerModel>::spawn(cfg, model_ctx, model_cfg, output_tx)?;
let mut outputs_count = 0;
// принимаем от модели события и просто принтим их (здесь может быть сложная логика, например, для взаимодействия с бэком)
loop {
if outputs_count > 1000 {
break Ok(());
}
while let Ok(output) = output_rx.try_recv() {
tracing::info!("{:?}", output);
outputs_count += 1;
}
// просто держим поток в течение 100 мс
std::thread::sleep(std::time::Duration::from_millis(100));
}
}
// вспомогательная функция для загрузки конфига
fn load_model_cfg(path: String) -> anyhow::Result<TokenScannerConfig> {
let cfg = Config::builder()
.add_source(config::File::from(PathBuf::from(&path)))
.build()
.with_context(|| format!("failed to read model config from {path}"))?;
let model: TokenScannerConfig = cfg
.try_deserialize()
.with_context(|| format!("failed to deserialize model config from {path}"))?;
Ok(model)
}
Предварительно реализуем саму модель
pub struct TokenScannerModel {}
impl BaseModel for TokenScannerModel {
type Config = TokenScannerConfig; // тип конфига
type OutputTx = MpmcSender<Output<OuputEvent>>; // указываем имплементацию трансмиссии
type OutputEvent = OuputEvent; // событие трансмиссии
type Event = NullEvent; // не шлем события в модель
type Ctx = NullModelCtx; // не используем контекст
fn initialize(
ctx: Self::Ctx,
config: Self::Config,
reserved_core_id: Option<usize>,
output_tx: Self::OutputTx,
cancel_token: CancelToken,
) -> anyhow::Result<Self> {
todo!()
}
fn execute(&mut self) -> ExecutionResult {
todo!()
}
// не шлем в модель извне никаких событий (могли бы через объект рантайма)
fn on_event(&mut self, event: Self::Event, meta: Option<InputMeta>) {
unimplemented!()
}
// останавливаемся сразу
fn stop(&mut self, kind: StopKind) -> StopState {
// StopState::InProgress
StopState::Done
}
// не релоадим конфиги на лету
fn hot_reload(&mut self, config: &Self::Config) -> anyhow::Result<()> {
unimplemented!()
}
}
Что делаем во время инициализации модели
Нам нужен стрим, который будет в отдельном потоке принимать команды от модели и асинхронно отправлять запросы на биржи, чтобы фетчить данные.
В версии 0.3.0 я добавил такой универсальный http-коннектор с подобным стримом на базе reqwest. Работает в одном треде в цикле в такой схеме:
В начале итерации проверяет очередь команд, если не пустая, то создает фоновые токио таски через local_spawn, внутри которой идет проверка встроенного рейт лимит менеджера, затем уже отправка запроса на сервер. После ожидания ответа отправляем через mcmp канал ответ в наш основной цикл.
Идет проверка на готовые ответы. Если ответы имеются, то вызываем модельный хук и передаем все нужное, в том числе сырой ответ.
Даем токио рантайму немного продышаться и выполнить таски.
Конец итерации
Просто возьмем из titanrt уже готовый коннектор и заспавним стрим прямо в модели во время инита:
pub struct TokenScannerModel {
reqwest_stream: Stream<RingSender<ReqwestAction>, RingReceiver<CryptoEvent>, NullState>, // держим стрим для отправки http запросов и вызова хука
output_tx: MpmcSender<Output<OuputEvent>>, // держим отправителя для отправки внутренний событий в вызывающий код
}
impl BaseModel for TokenScannerModel {
...
fn initialize(
_ctx: Self::Ctx,
config: Self::Config,
reserved_core_id: Option<usize>,
output_tx: Self::OutputTx,
cancel_token: CancelToken,
) -> anyhow::Result<Self> {
// инитим коннектор (могли бы и закрепить его в структуре, чтобы можно было всегда заново создать стрим или еще один)
let mut conn = ReqwestConnector::init(
config.reqwest_conn, // вынесем конфиг коннектора в модельный конфиг
cancel_token.new_child(), // модельный токен будет родителем, если отменим стрим, то модель останется
reserved_core_id.map(|c| vec![c]), // укажем зарезервированное ядро
)?;
// для ясности:
// let mut desc = ReqwestStreamDescriptor {
// max_hook_calls_at_once: 10, // максимальное количество вызовов хуков в одной итерации (по сути, сколько дреним ответов за раз)
// wait_async_tasks_us: 0, // сколько даем поспать токио рантайма - основное место для джиттера
// max_pending_actions: None, // максимальное количество команд, которые стрим держит необработанными в очереди
// max_pending_events: None, // максимальное количество событий, которые модель держит необработанными в очереди
// core_pick_policy: Some(CorePickPolicy::Specific(2)), // привяжем поток стрима на отдельное ядро процессора
// rate_limits: config.reqwest_rate_limits.clone(),
// };
// короче:
let mut desc = ReqwestStreamDescriptor::low_latency();
for rl_cfg in config.reqwest_rate_limits.iter() {
desc.add_rate_limit(rl_cfg.clone());
}
// наконец, спавним стрим в отдельном потоке через коннектор
// передаем объем дескриптора и кастомный модельный hook, реализуемый в другом месте
let reqwest_stream = conn.spawn_stream(desc, crypto_hook)?;
// ждем пока стрим не запуститься
// например, могли накосячить с конфигом и стрим не запустился
let deadline = Instant::now() + Duration::from_secs(10);
while !reqwest_stream.is_healthy() {
if Instant::now() >= deadline {
return Err(anyhow::anyhow!(
"stream didn't become healthy within 10 seconds",
));
}
sleep(Duration::from_secs(1));
}
Ok(Self {
output_tx,
reqwest_stream,
})
}
}
Конфигурацию мы оставим на потом, пока просто примем во внимание, что она есть.
Как сейчас выглядит crypto_hook
#[derive(Debug, Clone)]
pub struct CryptoEvent {}
pub fn crypto_hook(
args: HookArgs<
ReqwestEvent, // сырое событие, которое детерминируется на уровне реализации стрима
RingSender<CryptoEvent>, // будем отправлять из хука спарсенные события
NullReducer, // можете реализовать на случай, если нужен стейт внутри стрима
NullState, // это если нужен стейт, который будете лоадить в модели
ReqwestStreamDescriptor, // из-за этого имеем доступ к нашему дескриптору внутри хука
>,
) {
if args.raw.is_success() {
} else {
}
}
Скоро мы это исправим, но прежде мы должны понять, что мы вообще делаем.
Бизнес логика — execute()
А что мы вообще делаем, собственно? Титан райнтайм запустит модель и будет шарашить model.execute() миллионы раз в секунду. Чем толще и массивнее execute, тем меньше итераций в лупе рантайме в секунду. И это основное место, где реализуется бизнес логика модели (или торговая, если хотите).
Очевидно, на экзекьюте нам нужно реализовать логику отправки команд в наш стрим, а также дренить (высушивать) наш ресивер, через который мы отправляем спарсенные ответы в хуке.
Давайте подготовим минимальный плацдарм просто для наглядности:
impl BaseModel for TokenScannerModel {
...
fn execute(&mut self) -> ExecutionResult {
let events = self.reqwest_stream.drain_max(); // полностью осушаем ресивер
for event in events {
tracing::debug!("got event: {:?}", event);
}
if self.reqwest_stream.is_healthy() {
// экшены детерминируются реализацией стрима, поэтому юзаем то, что нам дают (или сами реализуем стрим)
// здесь имеется в виду, что мы билдим GET запрос, передаем рл контекст, чтобы можно было отыскать конфигурацию рейт лимитера
let action = ReqwestAction::get(
Url::parse(
"https://api.bybit.com/v5/market/orderbook?category=spot&symbol=BTCUSDT",
)
.unwrap(),
)
.rl_ctx(RateLimitContext::new("bybit"))
.build();
// и просто отправляем экшен в стрим, чтобы в ответ на него получить событие в нашем хуке
match self.reqwest_stream.try_send(action) {
Ok(()) => {}
Err(e) => {
tracing::error!("error sending reqwest action: {}", e);
}
}
}
ExecutionResult::Relax // чтобы не жечь ядро и чутка тормозить busy-loop
}
}
Естественно, это не просто минимальный плацдарм, а скорее пустышка в демонстрационных целях, как можно за минуту реализовать получение снэпшотов ордербука с bybit внутри execute. Если мы это запустим, то даже событий никаких не будет, так как у нас пустой хук. Тем не менее, коллеги, это точка роста и начало программистской мысли.
В следующей части
Реализуем OutputEvent — событие, которое шлет модель время от времени в вызывающий код, например, для бэкенда. Это может быть снимок состояния токенов, которые мы сканируем. Или дельты по конкретному токену для сохранения в базу.
Реализуем CryptoEvent — событие, в которое парсим сырой ответ из хука. Это будет событие с req_id, а так же внутренним enum, чтобы не ограничиваться только запросами снэпшотов стакана, а использовать разные данные с бирж, в том числе Ticker Info. И сделаем его унифицированным, чтобы не ограничиваться только биржей байбит или рынком спот.
Реализуем хук и парсинг сырых событий с разных источников. Возьмем 2: bybit + binance.
Реализуем тайминги внутри execute и дополнительные структуры для отправки экшенов. Так же поймем, что будем делать с объектами CryptoEvent и в какой редьюсер их собирать, чтобы затем высылать output_tx.
Расширим конфигурацию модели и обсудим ее подробнее.