Привет, Хабр!

Сегодня мы рассмотрим, как реализовать так называемую event-driven архитектуру с использованием Rust.

Архитектура на основе событий (event-driven architecture, EDA) — это подход к созданию систем, где взаимодействие между компонентами системы происходит с помощью событий. Все это позволяет развязывать компоненты друг от друга и повышать их независимость, что, в свою очередь, увеличивает масштабируемость и гибкость системы.

Как работает EDA?

  1. События: Основные данные или действия, которые происходят в системе, например, нажатие кнопки или завершение загрузки файла.

  2. Производители: Компоненты, которые создают события и отправляют их в систему.

  3. Потребители: Компоненты, которые подписаны на события и реагируют на них.

  4. Брокеры событий: Инструменты или системы, которые управляют передачей событий между производителями и потребителями.

С развитием экосистемы Rust появились хорошие инструменты для работы с архитектурой на основе событий, такие как:

  • Tokio: асинхронная платформа для работы с сетями.

  • Actix: высокопроизводительный фреймворк для создания акторных систем.

  • async-std: асинхронный стандарт для работы с Rust.

Установка и настройка среды

  1. Установим Rust:

    curl --proto '=https' --tlsv1.2 -sSf https://sh.rustup.rs | sh
  2. Установим необходимые библиотеки:

    • tokio

    • async-std

    • actix

Простая система на основе событий с использованием Tokio

Создадим простое приложение, которое будет отправлять и обрабатывать события с помощью Tokio.

cargo new event_driven_example
cd event_driven_example

Добавим зависимости в Cargo.toml:

[dependencies]
tokio = { version = "1.0", features = ["full"] }

А теперь напишем сам код:

use tokio::sync::mpsc;
use tokio::task;

#[tokio::main]
async fn main() {
    // Создаем канал для передачи сообщений
    let (tx, mut rx) = mpsc::channel(32);

    // Запускаем асинхронную задачу для обработки событий
    task::spawn(async move {
        while let Some(event) = rx.recv().await {
            println!("Обработано событие: {}", event);
        }
    });

    // Генерация событий
    for i in 1..=10 {
        tx.send(format!("Событие {}", i)).await.unwrap();
    }
}

Здесь мы:

  1. Создали канал для передачи сообщений между производителем и потребителем.

  2. Запустили асинхронную задачу для обработки событий с помощью tokio::task::spawn.

  3. Генерируем события и отправляем их в канал.

Использование Actix для акторной модели

Теперь рассмотрим использование Actix, чтобы создать более сложную систему на основе акторов.

Добавим зависимости в Cargo.toml:

[dependencies]
actix = "0.12"
actix-web = "4.0.0-beta.8"
serde = { version = "1.0", features = ["derive"] }

Создадим акторы:

use actix::prelude::*;
use serde::{Serialize, Deserialize};

#[derive(Message, Serialize, Deserialize)]
#[rtype(result = "()")]
struct Event {
    id: u32,
    message: String,
}

struct EventProducer;

impl Actor for EventProducer {
    type Context = Context<Self>;
}

impl Handler<Event> for EventProducer {
    type Result = ();

    fn handle(&mut self, event: Event, _: &mut Context<Self>) {
        println!("Произведено событие: {} - {}", event.id, event.message);
    }
}

struct EventConsumer;

impl Actor for EventConsumer {
    type Context = Context<Self>;
}

impl Handler<Event> for EventConsumer {
    type Result = ();

    fn handle(&mut self, event: Event, _: &mut Context<Self>) {
        println!("Получено событие: {} - {}", event.id, event.message);
    }
}

#[actix::main]
async fn main() {
    let producer = EventProducer.start();
    let consumer = EventConsumer.start();

    for i in 1..=10 {
        let event = Event {
            id: i,
            message: format!("Сообщение {}", i),
        };

        producer.do_send(event.clone());
        consumer.do_send(event);
    }
}

Объяснение:

  1. Создаем акторов EventProducer и EventConsumer, которые обрабатывают события.

  2. Определяем структуру Event и реализуем для нее сообщение Message.

  3. Запускаем акторов и передаем им события с помощью do_send.

Реализация брокера событий

Брокеры событий могут быть хороши для управления маршрутизацией событий между различными компонентами системы. Можно создать простой брокер с использованием библиотеки tokio.

Добавлим зависимости в Cargo.toml:

[dependencies]
tokio = { version = "1.0", features = ["full"] }

Создадим брокер:

use tokio::sync::mpsc;
use tokio::task;

struct EventBroker {
    sender: mpsc::Sender<String>,
    receiver: mpsc::Receiver<String>,
}

impl EventBroker {
    fn new(buffer_size: usize) -> Self {
        let (sender, receiver) = mpsc::channel(buffer_size);
        EventBroker { sender, receiver }
    }

    async fn start(&mut self) {
        while let Some(event) = self.receiver.recv().await {
            println!("Брокер обработал событие: {}", event);
        }
    }

    async fn send_event(&self, event: String) {
        self.sender.send(event).await.unwrap();
    }
}

#[tokio::main]
async fn main() {
    let mut broker = EventBroker::new(32);

    task::spawn(async move {
        broker.start().await;
    });

    for i in 1..=10 {
        broker.send_event(format!("Событие {}", i)).await;
    }
}

Здесь мы:

  1. Создали EventBroker с каналом для передачи событий.

  2. Запутили брокер в асинхронной задаче и обрабатываем входящие события.

  3. Отправляем события в брокер с помощью send_event.

Оптимизация и масштабируемость

Kafka и NATS

Apache Kafka - распределенная система для обработки потоков данных в реальном времени.

NATS - высокопроизводительная система обмена сообщениями с поддержкой pub/sub и request/reply.

[dependencies]
tokio = { version = "1.0", features = ["full"] }
rdkafka = "0.29"
nats = "0.19"
// пример подключения к Kafka
use rdkafka::config::ClientConfig;
use rdkafka::producer::{FutureProducer, FutureRecord};

async fn produce_kafka_event() {
    let producer: FutureProducer = ClientConfig::new()
        .set("bootstrap.servers", "localhost:9092")
        .create()
        .unwrap();

    producer.send(
        FutureRecord::to("my-topic")
            .payload("Это сообщение для Kafka")
            .key("ключ"),
        0,
    ).await.unwrap();
}
// пример подключения к NATS
use nats::asynk::Connection;

async fn publish_nats_event() {
    let nc = Connection::connect("localhost:4222").await.unwrap();

    nc.publish("events", "Это сообщение для NATS").await.unwrap();
}

Примеры

Допустим, нужно обрабатывать большое количество событий в реальном времени и сохранять результаты в базе данных.

Для этого мы будем использовать Kafka для приема и обработки событий и Rust для обработки данных и записи результатов в базу данных:

use rdkafka::consumer::{Consumer, StreamConsumer};
use rdkafka::Message;
use tokio_postgres::{NoTls, Client};

async fn process_events() {
    let consumer: StreamConsumer = ClientConfig::new()
        .set("bootstrap.servers", "localhost:9092")
        .set("group.id", "my-group")
        .create()
        .unwrap();

    let (client, connection) = tokio_postgres::connect("host=localhost user=postgres", NoTls).await.unwrap();

    tokio::spawn(async move {
        if let Err(e) = connection.await {
            eprintln!("Ошибка соединения: {}", e);
        }
    });

    consumer.subscribe(&["my-topic"]).unwrap();

    while let Some(message) = consumer.recv().await.unwrap() {
        let payload = match message.payload_view::<str>() {
            Some(Ok(text)) => text,
            Some(Err(e)) => {
                eprintln!("Ошибка декодирования сообщения: {:?}", e);
                continue;
            }
            None => continue,
        };

        println!("Получено сообщение: {}", payload);

        client.execute("INSERT INTO events (data) VALUES ($1)", &[&payload]).await.unwrap();
    }
}

Подробнее с применяемыми библиотеками можно ознакомиться по гиперссылкам:

  1. Tokio — асинхронная платформа для работы с сетями.

  2. Actix — высокопроизводительный фреймворк для создания акторных систем.

  3. async-std — асинхронный стандарт для работы с Rust.


Больше практических навыков по архитектуре приложений вы можете получить в рамках практических онлайн-курсов от экспертов отрасли.

Комментарии (0)