Привет, Хабр!

Если бы мне сказали, что я однажды заменю привычный Python или Scala для работы с данными на Rust, я бы, пожалуй, ухмыльнулся и продолжил привычное дело. Но времена меняются, и Rust вполне уверенно пробивает себе дорогу в мир больших данных. Сегодня я расскажу вам о трех фреймворках, которые делают Rust конкурентом в обработке данных.

И первый фреймворк – Polars.

Polars

В отличие от pandas, где нагрузка на процессор и память может быть ощутима, Polars использует фичи Rust для параллельных вычислений и оптимизированного использования ресурсов.

Главные преимущества Polars:

  • Быстрая обработка больших данных: Polars написан на Rust и использует многопоточность.

  • Поддержка многомерных данных: Polars поддерживает работу с многомерными массивами.

  • Ленивая и физическая вычислительная модель: Polars поддерживает ленивые вычисления

  • Работа с различными форматами данных: Поддержка CSV, JSON, Parquet, IPC, Avro и других форматов данных.

Примеры использования

Чтение и обработка CSV-файла

Загрузим данные из CSV-файла, отфильтруем строки и выполним групповую агрегацию:

use polars::prelude::*;
use std::fs::File;

fn main() -> Result<()> {
    // чтение CSV-файла
    let file = File::open("data.csv")?;
    let df = CsvReader::new(file)
        .infer_schema(None)
        .has_header(true)
        .finish()?;

    // фильтрация данных по условию
    let filtered_df = df
        .lazy()
        .filter(col("column_name").gt(lit(10)))
        .collect()?;

    // группировка и агрегация
    let result_df = filtered_df
        .lazy()
        .groupby([col("group_column")])
        .agg([col("value_column").mean()])
        .collect()?;

    println!("{:?}", result_df);

    Ok(())
}

Создание DataFrame и выполнение операций

Создадим DF и выполним операции сложения и фильтрации.

use polars::prelude::*;

fn main() -> Result<()> {
    // создание DataFrame
    let df = df![
        "column1" => &[1, 2, 3, 4, 5],
        "column2" => &[10, 20, 30, 40, 50]
    ]?;

    // добавление нового столбца с результатом сложения
    let df = df.lazy()
        .with_column((col("column1") + col("column2")).alias("sum"))
        .collect()?;

    // фильтрация строк, где сумма больше 30
    let filtered_df = df.lazy()
        .filter(col("sum").gt(lit(30)))
        .collect()?;

    println!("{:?}", filtered_df);

    Ok(())
}

Ленивые вычисления и работа с JSON

use polars::prelude::*;

fn main() -> Result<()> {
    let json_data = r#"
        [
            {"name": "Alice", "age": 25},
            {"name": "Bob", "age": 30},
            {"name": "Charlie", "age": 35}
        ]
    "#;

    // чтение JSON данных
    let df = JsonReader::new(json_data.as_bytes()).finish()?;

    // ленивые вычисления: фильтрация и вычисление среднего возраста
    let result = df.lazy()
        .filter(col("age").gt(lit(25)))
        .select([col("age").mean()])
        .collect()?;

    println!("{:?}", result);

    Ok(())
}

Подробне с Polars можно ознакомиться здесь.

Arroyo

Arroyo — это распределенный движок потоковой обработки, ориентированный на stateful вычисления с поддержкой как ограниченных, так и неограниченных потоков данных.

Arroyo разработан с использованием модели Dataflow, что позволяет управлять состояниями потоков данных, делая возможными различные сложные вычисления, такие как оконные агрегации, join-операции и многое другое. Весь этот функционал так же реализован на основе Rust.

Примеры использования Arroyo

Реализуем базовую настройку для обработки потока событий и подсчет количества событий в окне времени:

use arroyo::pipeline::Pipeline;
use arroyo::window::TumblingWindow;

fn main() {
    // инициализируем конвейер обработки данных
    let mut pipeline = Pipeline::new();

    // источник данных
    let source = pipeline.add_source("source_name");

    // применяем оконную функцию с временем окна в 5 минут
    let windowed = source.window(TumblingWindow::minutes(5))
                         .count();

    // выводим результат в консоль
    pipeline.add_sink(windowed, |result| println!("Количество событий: {:?}", result));

    // запуск конвейера
    pipeline.run();
}

Теперь реализуем stateful обработку, где нужно будет отслеживать состояние между событиями, например, вычисление среднего значения на основе предыдущих данных:

use arroyo::state::StatefulOperator;
use arroyo::pipeline::Pipeline;

struct AverageState {
    sum: f64,
    count: u64,
}

impl StatefulOperator for AverageState {
    type Input = f64;
    type Output = f64;

    fn process(&mut self, value: Self::Input) -> Option<Self::Output> {
        self.sum += value;
        self.count += 1;
        Some(self.sum / self.count as f64)
    }
}

fn main() {
    let mut pipeline = Pipeline::new();

    // инициализируем источник данных
    let source = pipeline.add_source("numeric_data");

    // применяем stateful операцию для вычисления среднего значения
    let averaged = source.stateful_operator(AverageState { sum: 0.0, count: 0 });

    // отправляем результат в консоль
    pipeline.add_sink(averaged, |avg| println!("Среднее значение: {:?}", avg));

    // запуск конвейера
    pipeline.run();
}

Теперь рассмотрим как Arroyo может использоваться для более сложных задач, например объединение нескольких потоков данных и выполнение оконных агрегаций:

use arroyo::pipeline::Pipeline;
use arroyo::window::SlidingWindow;

fn main() {
    let mut pipeline = Pipeline::new();

    // инициализация двух источников данных
    let source1 = pipeline.add_source("source1");
    let source2 = pipeline.add_source("source2");

    // оконные операции на двух потоках данных
    let windowed1 = source1.window(SlidingWindow::minutes(10)).sum();
    let windowed2 = source2.window(SlidingWindow::minutes(10)).sum();

    // join двух потоков данных по ключу
    let joined = windowed1.join(windowed2, |key1, key2| key1 == key2);

    // обработка результата
    pipeline.add_sink(joined, |result| println!("Join result: {:?}", result));

    // запуск конвейера
    pipeline.run();
}

Подробнее с Arryo можно ознакомиться здесь.

Timber

Timber — это простой логгер, разработанный для приложений, работающих в условиях многопоточности. Его основное назначение — упрощение процесса логирования в параллельных задачах. Timber позволяет вести логи как в стандартный вывод, так и в указанный файл.

Основные фичи Timber:

  1. Timber поддерживает уровни логирования, которые можно настроить через макросы.

  2. По умолчанию Timber выводит логи в stdout, но может быть легко перенастроен на запись в файл.

Пример использования Timber в приложении, которое может переключаться между режимами отладки и релиза, изменяя уровни логирования:

#[macro_use(timber)]
use timber;

#[cfg(debug)]
pub mod level {
    pub const ERR: i32 = 1;
    pub const DEB: i32 = 2;
    pub const INF: i32 = 7;
}

#[cfg(not(debug))]
pub mod level {
    pub const ERR: i32 = 1;
    pub const DEB: i32 = 0;
    pub const INF: i32 = 3;
}

// макросы для упрощения логирования
macro_rules! log_err{($($arg:tt)*) => {timber!($crate::level::ERR, "ERR", $($arg)*)}}
macro_rules! log_deb{($($arg:tt)*) => {timber!($crate::level::DEB, "DEB", $($arg)*)}}
macro_rules! log_inf{($($arg:tt)*) => {timber!($crate::level::INF, "INF", $($arg)*)}}

fn main() {
    timber::init("log.txt").unwrap(); // инициализация логирования в файл

    log_err!("Ошибка! Этот лог будет виден всегда.");
    log_deb!("Отладка. Этот лог виден только в режиме отладки.");
    log_inf!("Информация. Этот лог будет виден и в релизе, и в отладке.");
}

Можно определить константы уровней логов ERR, DEB, INF, и компилятор будет игнорировать ненужные строки в релизной сборке.

Больше практических инструментов коллеги из OTUS рассматривают в рамках практических онлайн-курсов от экспертов рынка. Подробнее ознакомиться с курсами можно в каталоге.

Комментарии (0)