Очередной чат, и к тому же на rust?! Да, yet another. И да, в этой статье не будет каких-то новых откровений системного программирования с написанием своего фреймворка для работы со сетью на уровне драйверов или других испытаний. Этот альманах про мой первый опыт в веб-разработке, который может быть полезен для других новичков, ведь тут мы затронем помимо злосчастного rust такие вещи, как devcontainer, REST API, идентификацию-аутентификацию-авторизацию, WebSockets, SSE, юнит и интеграционные тесты, некоторые паттерны, логирование и прочее.

Начало начал

Редактор кода

Выберем каким редактором кода будем пользоваться. Это довольно холиварная тема, но я остановлюсь на VS Code по таким причинам, как он лёгок в настройке, довольно шустрый, есть гибкие профили и дебаг-конфиги, синхронизация настроек, куча плагинов на любой вкус и цвет, кроссплатформенный. А ещё он умеет в devcontainer, но об этом чуть позже.

Рабочее пространство

Весь проект будет находиться в монорепозитории на github. Это довольно удобно, особенно когда изменения вносятся сразу в нескольких местах, не приходится делать несколько пулл реквестов, и не забываешь подгрузить сабмодули. Это всё, конечно, хорошо, но усложняет последующий CI/CD. Чтобы как-то облегчить себе работу по слежке за версиями, будем использовать conventional commit.

Инструменты

Так как сервер будет написан на rust, то нам нужен и соответствующий тулчейн, который требует, помимо прочего, ещё и установить зависимости в виде Visual Studio в Windows. А потом ещё и потребуется развернуть базу данных и/или Redis. И ладно, если вы один работаете над своим pet-проектом, но представьте, что работаете сразу на нескольких тачках, как и я, или с вами работают другие люди, то проблем с тем, что развернуть набор инструментов с теми же настройками, не избежать. Или просто не хотите захламлять систему лишним. Для целей развёртки инфраструктуры разработки силами Microsoft был придуман devcontainer, который базируется на технологиях контейнеризации Docker.

Сильными сторонами devcontainer можно считать абсолютную простоту в использовании, если хотя бы пару раз трогали траву Dockerfile и docker-compose. Из минусов очевидный оверхед из-за виртуализации, который даёт достаточно сильное влияние на скорость рабочих процессов, включая анализ и компиляцию кода.

Давайте начнём, создав папку server, в которую положим файл .devcontainer.json, со следующим содержимым:

{
  "name": "rust-devcontainer",
  "image": "mcr.microsoft.com/devcontainers/rust:latest"
}

Всё, на этом вся настройка devcontainer завершена, можно смело открывать эту папку в VS Code, после чего он сам увидит файл и предложит установить соответствующий плагин, который, в свою очередь, скачает образ и запустит контейнер. Если вам нужен не rust-тулчейн, а какой-нибудь, например, .NET, то можете посмотреть либо на странице Templates, либо в регистре Microsoft. Помимо прочего, есть базовый образ, который позволит легко создать своё окружение.

Помимо готовых шаблонов, devcontainer башляет настройками редактора, можно установить единые для всех плагины, которые будут работать внутри контейнера, и настройки редактора. Как пример, давайте добавим плагины для работы с базой и переопределим для работы nightly версии rustfmt. Для этого нужно указать поле customizations в файле .devcontainer.json.

{
  "name": "rust-devcontainer",
  "image": "mcr.microsoft.com/devcontainers/rust:latest",
  "customizations": {
        "vscode": {
            "extensions": [
                "ms-ossdata.vscode-pgsql",
            ],
            "settings": {
                "rust-analyzer.rustfmt.overrideCommand": ["rustfmt", "+nightly", "--edition", "2024"]
            }
        }
    }
}

Если речь зашла об nightly версии rustfmt, то давайте покажу, как добавить эту версию в окружение. Для этого нам придётся создать папку devcontainer (хотя можете и без неё) и переместить в неё файл .devcontainer.json, после чего создать файл Dockerfile со следующим содержимым:

FROM mcr.microsoft.com/devcontainers/rust:latest
RUN rustup component add --toolchain nightly-x86_64-unknown-linux-gnu rustfmt

Первая строчка говорит о том, что мы будем использовать готовый образ как отправную точку, после чего добавляем нужный нам компонент rustfmt. Чтобы devcontainer начал использовать наш Dockerfile, нужно заменить поле image в .devcontainer.json на build со следующими полями:

{
  "name": "rust-devcontainer",
  "build": {
        "dockerfile": "Dockerfile",
        "context": ".."
  },
  "customizations": {
        "vscode": {
            "extensions": [
                "ms-ossdata.vscode-pgsql",
            ],
            "settings": {
                "rust-analyzer.rustfmt.overrideCommand": ["rustfmt", "+nightly", "--edition", "2024"]
            }
        }
    }
}

Оба этих поля указываются относительно файла .devcontainer.json, так как Dockerfile лежит в той же директории, то мы просто указываем его название. Поле context указывает, что будет копироваться внутрь контейнера.

Помимо прочего, можно использовать docker-compose. Для примера создим в той же директории файл docker-compose.yml со следующим содержимым:

services:
  devcontainer:
    build:
      context: ..
      dockerfile: .devcontainer/Dockerfile
    command: sleep infinity
    ports:
      - 3000:3000
    volumes:
      - ../../:/workspace
    environment:
      - DATABASE_URL=postgresql://postgres:password@db/justice
    depends_on: [db]

  db:
    image: postgres
    environment:
      - POSTGRES_USER=postgres
      - POSTGRES_PASSWORD=password

Убираем из .devcontainer.json поле build и описываем то же самое в docker-compose.yml. Почти одно и то же, однако теперь пути указываются относительно context, который, в свою очередь, указывается относительно docker-compose.yml, а не .devcontainer.json, как это было ранее.

Из необычного здесь поле command: sleep infinity, которая заставляет контейнер не завершать свою работу сразу после сборки образа и даёт VS Code возможность подключиться к контейнеру.

Так как мы будем работать с базами, неплохо было бы иметь инструмент для миграций. В качестве такого инструмента я буду использовать sqlx-cli, которую можно установить командой:

cargo install sqlx-cli

Изначально использовал макрос migrate! из sqlx, чтобы не забывать про миграции и автоматически их применять при запуске приложения, но быстро от этого отказался из-за того, что я использовал макросы query! для запросов, которые делают проверки во время компиляции, а так как миграции ещё не применены, то и сборка падает с ошибкой.

Обнаружил, что cargo build и rust-analyzer могут падать из-за недостающих прав на папку /usr/local/cargo/registry/. Решил это тем, что даю всем доступ к папке следующей командой (в Dockerfile):

sudo chmod -R a+rw /usr/local/cargo/registry/

Финальный Dockerfile у нас получился вот такой:

FROM mcr.microsoft.com/devcontainers/rust:latest
RUN cargo install sqlx-cli
RUN rustup component add --toolchain nightly-x86_64-unknown-linux-gnu rustfmt
RUN sudo chmod -R a+rw /usr/local/cargo/registry/

Инициализируем проект

Мы наконец-таки запустили наш devcontainer и теперь можем приступить к написанию кода. Давайте инициализируем наш rust-проект, прописав следующую команду в терминале:

cargo init --vcs none

Пишем код

Логирование

Давайте начнём с логов, ведь что может быть прекрасней кружки кофе порции логов с утреца? Нам потребуются такие крейты, как:

cargo add tracing tracing-appender
cargo add tracing-subscriber -F env-filter -F fmt -F local-time
cargo add time -F formatting -F macros

Если не хотите изменять формат времени, то можете опустить такие зависимости, как time и фьючи local-time у tracing-subscriber.

Логирование всего и вся, конечно, крутое занятие, но видеть в логах кучу ненужного мусора от других крейтов - такое себе, поэтому давайте отфильтруем их, добавив фильтр в нашу функцию init_logs.

pub fn init_logs() {
    let filter = EnvFilter::default()
        .add_directive("server=trace".parse().unwrap())
        .add_directive("axum=info".parse().unwrap());
}

Логи имеют разные уровни логирования, варьируемые от меньшего к большему по уровню важности: trace, debug, info, warn и error. В данном случае мы будем получать от нашего приложения абсолютно все логи, а от axum (о котором чуть позже) будем получать логи лишь уровня info.

Фильтр мы добавили, но теперь давайте немного поменяем формат времени с помощью time и его макроса format_description.

pub fn init_logs() {
    let filter = EnvFilter::default()
        .add_directive("server=trace".parse().unwrap())
        .add_directive("axum=info".parse().unwrap());

    let timer = LocalTime::new(format_description!(
        "[day]-[month]-[year] [hour]:[minute]:[second]"
    ));
}

Давайте создадим регистр tracing, который работает по следующей схеме, и зададим слой логгирования в консоль, применим к нему наш формат времени и инициализируем систему логирования методом init на регистре. Формат времени задаётся с помощью метода with_timer на слое.

pub fn init_logs() {
    let filter = EnvFilter::default()
        .add_directive("server=trace".parse().unwrap())
        .add_directive("axum=info".parse().unwrap());

    let timer = LocalTime::new(format_description!(
        "[day]-[month]-[year] [hour]:[minute]:[second]"
    ));

    tracing_subscriber::registry()
          .with(filter)
          .with(fmt::layer().with_timer(timer))
          .init();
}

Но мы хотим, чтобы логи помимо консоли писались в файл. Для этого есть крейт tracing-appender, который мы добавили ранее. Давайте и его добавим как слой. Для начала получим путь до нашего исполняемого файла, после чего получим папку, в которой он находится. Это нужно для того, что если использовать просто относительный путь, то tracing-appender будет создавать папку в директории с Cargo.toml во время дебага.

let exe_path = std::env::current_exe().expect("failed to get executable path");
let exe_folder = exe_path.parent().expect("failed to get executable folder");

Теперь настроим ротацию логов. Если кратко, то ротация — это как часто будут создаваться новые файлы логов. Я буду использовать ежедневную ротацию.

pub fn init_logs() -> tracing_appender::non_blocking::WorkerGuard {
    let filter = EnvFilter::default()
        .add_directive("server=trace".parse().unwrap())
        .add_directive("axum=info".parse().unwrap());

    let timer = LocalTime::new(format_description!(
        "[day]-[month]-[year] [hour]:[minute]:[second]"
    ));

    let exe_path = std::env::current_exe().expect("failed to get executable path");
    let exe_folder = exe_path.parent().expect("failed to get executable folder");
    let (non_blocking, _guard) = tracing_appender::non_blocking(tracing_appender::rolling::daily(
        exe_folder.join("logs"),
        "server",
    ));

    tracing_subscriber::registry()
        .with(filter)
        .with(fmt::layer().with_timer(timer))
        .with(fmt::layer().with_writer(non_blocking).with_ansi(false))
        .init();

    _guard
}

Могли заметить, что возвращается кортеж с неблокирующим писателем и гуардом. Есть также возможность настроить блокирующего писателя, он будет блокировать поток, чтобы записать логи. В свою очередь, неблокирующий писатель помещает логи в очередь, из которой берёт выделенный поток и записывает их в файл, но при таком подходе есть шанс потерять логи при внезапном завершении работы программы, если верить документации.

Помимо прочего, мы отключили запись ANSI у файлового писателя с помощью метода with_ansi(false), чтобы не было спецкодов, которые используются для цветного вывода в консоли, в файле нам они не нужны.

Теперь мы можем использовать в коде макросы логирования. В них передавать, кроме текста, ничего не надо, они сами подхватят проинициализированный контекст tracing.

trace!("Hello, trace!");
debug!("Hello, debug!");
info!("Hello, info!");
warn!("Hello, warn!");
error!("Hello, error!");

Сервер

Hello, World

Давайте опишем обработчик HTTP запросов, который будет отправлять на GET запрос строку "Hello, World!". Для этого будем использовать axum. Каких-то определённых причин в его выборе не было, просто тыкнул пальцем в небо. Помимо axum нам потребуется tokio, так как axum работает в асинхронном контексте. Поэтому добавим их в проект, прописав следующие строки в терминале:

cargo add axum
cargo tokio -F macros -F net -F rt-multi-thread

Фьючи macros нужно для макроса tokio::main, net для TcpListener, который используется axum для прослушивания порта, и rt-multi-thread нужен для того, чтобы tokio работал в многопоточном режиме.

В файле main.rs применим атрибутный макрос tokio::main к функции main, которую сделаем асинхронной, добавив ключевое слово async, и создадим объект типа Router, который, как уже можно было догадаться по названию, будет маршрутизировать запросы, и добавим обработчик GET-запросов к корню адресного пути, то есть к /.

#[tokio::main]
async fn main() {
    let _guard = init_logs();

    let app = Router::new().route("/", get(|| async { "Hello, World!" }));
}

Чтобы обрабатывать запросы других типов, например, POST или PUT, то для них есть соответствующие функции post и put. Со списком всех поддерживаемых типов запросов можете ознакомиться в документации.

К сожалению, это не всё, пока наш сервер не будет принимать запросы. Чтобы он начал это делать, нужно инициализировать ранее упомянутый TcpListener и передать его с app как аргументы в функцию axum::serve.

#[tokio::main]
async fn main() {
    let _guard = init_logs();

    let app = Router::new().route("/", get(|| async { "Hello, World!" }));

    let port = 4000;
    let listener = TcpListener::bind((Ipv4Addr::LOCALHOST, port))
        .await
        .expect("failed to bind port");

    tracing::info!("Listening on port {port}");

    axum::serve(listener, app)
        .await
        .expect("failed to start server");
}

Теперь если запустить сервер командой cargo run и перейти в браузер по адресу localhost:4000, то мы увидим следующее:

image.png
hello, world

Первая мирация

Для того, чтобы продолжить, нам потребуется где-то хранить информацию, базу мы уже развернули, осталось написать миграции. Для того, чтобы создать новую миграцию, которая будет называться init, используем команду:

sqlx migrate add init

Команда создаст папку migrations с двумя файлами 20250902082759_init.up.sql и 20250902082759_init.down.sql. Внутри up файла нужно написать SQL скрипт, который будет делать новые изменения в структуре базы данных, внутри down файла описывают скрипты, которые позволяют вернуть базу к исходному виду.

Набросаем схемку нашей базы данных, от которой будем отталкиваться при создании скрипта SQL для первоначальной миграции. У нас будет на данный момент пока что лишь пять таблиц.

image (1).png
схема базы данных

Опишем в файле 20250902082759_init.up.sql создание таблиц и некоторых индексов для более быстрого поиска данных. Чтобы сохранить идемпотентность скрипта, добавим в начало скрипта дроп таблиц, если они уже существуют.

-- Add up migration script here

DROP TABLE IF EXISTS Messages;
DROP TABLE IF EXISTS ChatMembers;
DROP TABLE IF EXISTS Chats;
DROP TABLE IF EXISTS Sessions;
DROP TABLE IF EXISTS Users;

CREATE TABLE Users (
    Id SERIAL PRIMARY KEY,
    Name VARCHAR(30) NOT NULL UNIQUE,
    Password CHAR(64) NOT NULL,
    CreatedAt TIMESTAMPTZ DEFAULT NOW()
);

CREATE TABLE Sessions (
    Uid CHAR(11) PRIMARY KEY NOT NULL,
    UserId INTEGER NOT NULL,
    LoggedAt TIMESTAMPTZ DEFAULT NOW(),
    FOREIGN KEY (UserId) REFERENCES Users(Id) ON DELETE CASCADE
);

CREATE TABLE Chats (
    Id SERIAL PRIMARY KEY,
    Title VARCHAR(50) NOT NULL,
    CreatedAt TIMESTAMPTZ DEFAULT NOW()
);

CREATE TABLE ChatMembers (
    UserId INTEGER,
    ChatId INTEGER NOT NULL,
    PRIMARY KEY (ChatId, UserId),
    FOREIGN KEY (ChatId) REFERENCES Chats(Id) ON DELETE CASCADE,
    FOREIGN KEY (UserId) REFERENCES Users(Id) ON DELETE SET NULL
);

CREATE TABLE Messages (
    Id BIGSERIAL NOT NULL,
    UserId INTEGER,
    ChatId INTEGER NOT NULL,
    Content TEXT NOT NULL,
    CreatedAt TIMESTAMPTZ DEFAULT NOW(),
    PRIMARY KEY (Id),
    FOREIGN KEY (UserId) REFERENCES Users(Id) ON DELETE SET NULL,
    FOREIGN KEY (ChatId) REFERENCES Chats(Id) ON DELETE CASCADE
);

CREATE INDEX IdxSessionsUserId ON Sessions(UserId);
CREATE INDEX IdxChatMembersUserId ON ChatMembers(UserId);
CREATE INDEX IdxChatMembersChatId ON ChatMembers(ChatId);
CREATE INDEX IdxMessagesUserId ON Messages(UserId);
CREATE INDEX IdxMessagesChatId ON Messages(ChatId);

Скрипт отката в файле 20250902082759_init.down.sql будет очень простым: просто удаляем созданные таблицы.

-- Add down migration script here

DROP TABLE Messages;
DROP TABLE ChatMembers;
DROP TABLE Chats;
DROP TABLE Sessions;
DROP TABLE Users;

Теперь применим нашу миграцию, для это воспользуемся командой:

sqlx migrate run

Я создам отдельный файлик db.rs, где буду конфигурировать базу данных, чтобы не захламлять наш красивый main.rs. Хотя я на самом деле не особо понимаю, зачем, например, ограничивают количество активных подключений к базе данных, просьба осведомлённых просветить и меня в комментариях; поэтому инициализация пула подключений будет супер примитивной: получаем строку подключения из переменной среды и сразу создаём пул.

use sqlx::PgPool;

pub async fn init_db() -> PgPool {
    let connection_string = std::env::var("DATABASE_URL").expect("DATABASE_URL must be set");
    PgPool::connect_lazy(&connection_string).expect("failed to connect to database")
}

Отлично, теперь нужно как-то прокидывать пул в обработчики запросов, для этого создадим структуру AppState, внутри которой будем передавать пул.

struct AppState {
  db: sqlx::PgPool,
}

#[tokio::main]
async fn main() {
    let _guard = init_logs();

    let db = init_db().await;

    let state = Arc::new(AppState { db });

    let app = Router::new()
        .route("/", get(|| async { "Hello, World!" }))
        .with_state(state);

    let port = 4000;
    let listener = TcpListener::bind((Ipv4Addr::LOCALHOST, port))
        .await
        .expect("failed to bind port");

    tracing::info!("Listening on port {port}");

    axum::serve(listener, app)
        .await
        .expect("failed to start server");
}

Регистрация

Прекрасно, но давайте теперь создадим что-то посложнее. Например, регистрацию новых пользователей. Мы будем использовать для аутентификации систему сессий, которые будем хранить в куки.

Из вариантов ещё есть JWT, но, на мой взгляд, они излишне объёмные. В теории можно просто проверять подпись токена, не лазая в базу, плюс токен несёт в себе какую-то полезную нагрузку, но на практике всё равно токен проверяется в базе на актуальность при каждом запросе.

Вариантов аутентификации куда больше, например, OAuth2/OIDC, Basic Auth или mTLS, и т.

Создам модуль controllers/mod.rs, в котором создам дополнительный модуль users.rs, в котором будут находиться обработчики запросов по пути /users.

Дополнительно создам модуль models/mod.rs, в котором создам ещё один модуль users.rs. В этом файле опишем базовые структуры запроса и ответа. Начнём с запроса к серверу, создадим структуру LoginUserRequest с полями username и password. Пусть вас не смущает, что делаем регистрацию, а модель данных называется как на вход, просто мне было лень описывать две одинаковые структуры.

#[derive(Deserialize)]
pub struct LoginUserRequest {
    pub username: Username,
    pub password: Password,
}

Но постой, что за типы Username и Password?! Это value object из DDD (или Domain-Driven Design). Вся их задача — это инкапсулировать некоторые значение со своей логикой, которая не изменяет значение. В нашем случае всё, что будет делать этот объект, — проверять, валидное ли значение передал нам пользователь, и если оно не валидное, то будет возвращать массив с описанием проблемы.

#[derive(Deserialize)]
pub struct Username(pub String);

impl Username {
    pub fn validate(&self) -> Vec {
        let mut errors = Vec::new();
        let trim = self.0.trim();

        if trim.is_empty() {
            errors.push("Empty username".to_owned());
            return errors;
        }

        if trim.len() > 30 {
            errors.push("Username must be less than 30 characters".to_owned());
        }

        if trim.len() < 3 {
            errors.push("Username must be more than 3 characters".to_owned());
        }

        for ch in trim.chars() {
            if !(ch.is_ascii_alphanumeric() || ch == '_' || ch == '-' || ch == '.') {
                errors.push("Username must contain only latin letters or digits, underscores, dashes and dots".to_owned());
                return errors;
            }
        }

        errors
    }
}

impl Deref for Username {
    type Target = String;

    fn deref(&self) -> &Self::Target {
        &self.0
    }
}

#[derive(Deserialize)]
pub struct Password(pub String);

impl Password {
    pub fn validate(&self) -> Vec {
        let mut errors = Vec::new();
        let trim = self.0.trim();

        if trim.is_empty() {
            errors.push("Empty password".to_owned());
            return errors;
        }

        if trim.len() < 6 {
            errors.push("Password must be more than 6 characters".to_owned());
        }

        errors
    }
}

impl Deref for Password {
    type Target = String;

    fn deref(&self) -> &Self::Target {
        &self.0
    }
}

Отличненько, но наш контроллер должен будет что-то отправлять в ответ. Для этого давайте реализуем структуру LoginUserResponse. Он будет реализовывать трейт IntoResponse, чтобы мы могли красиво написать Result вместо не очень красивого и информативного impl IntoResponse, как меня пытается убедить ChatGPT, что это стандарт для axum. Также мы скрываем от пользователя поле session, потому что оно ему не особо нужно, так как всё равно улетит в куки.

use crate::services::auth::SESSION_COOKIE_NAME;

pub const SESSION_LIFETIME: i64 = 60 * 60 * 24 * 7;

#[derive(Serialize)]
pub struct LoginUserResponse {
    pub user_id: i32,
    #[serde(skip)]
    pub session: String,
}

impl LoginUserResponse {
    pub fn new(user_id: i32, session: String) -> Self {
        Self { user_id, session }
    }
}

impl IntoResponse for LoginUserResponse {
    fn into_response(self) -> axum::response::Response {
        (
            StatusCode::CREATED,
            [(
                "Set-Cookie",
                format!(
                    "{SESSION_COOKIE_NAME}={}, Max-Age={SESSION_LIFETIME}",
                    self.session
                ),
            )],
            Json(self),
        )
            .into_response()
    }
}

Раз упомянули ApiError, то давайте создадим файл error.rs, в котором опишем нашу ошибку. Члены перечисления будут использоваться как коды, что мы и настраиваем атрибутом serde(tag = "type"). Помимо этого реализуем IntoResponse для ошибки.

#[derive(Serialize, Debug)]
#[serde(tag = "type")]
pub enum ApiError {
    Unknown {
        trace_id: TraceId,
    },
    Internal,
    Validation {
        fields: HashMap>,
        trace_id: TraceId,
    },
    Conflict {
        trace_id: TraceId,
    },
    NotFound {
        trace_id: TraceId,
    },
    Unauthorized {
        trace_id: TraceId,
    },
}

impl IntoResponse for ApiError {
    fn into_response(self) -> axum::response::Response {
        let status = match &self {
            ApiError::Unknown { .. } => StatusCode::INTERNAL_SERVER_ERROR,
            ApiError::Validation { .. } => StatusCode::BAD_REQUEST,
            ApiError::Conflict { .. } => StatusCode::CONFLICT,
            ApiError::NotFound { .. } => StatusCode::NOT_FOUND,
            ApiError::Unauthorized { .. } => StatusCode::UNAUTHORIZED,
            ApiError::Internal => StatusCode::INTERNAL_SERVER_ERROR,
        };
        (status, Json(self)).into_response()
    }
}

Уже сейчас понимаю, что проще можно было сделать через структуру с полями trace_id и error_type.

Трассировка

Что же за поле с типом TraceId? Я к этому не сразу пришёл, из практического опыта разбирания логов я подумал: а как в потоке однообразных строчек разбирать, к какому клиенту относится лог? Поэтому давайте реализуем сервис, который будет открывать tracing::span и отдавать в последующие обработчики наш TraceId.

Ошибка тут заключается в том, что если пришло сразу два запроса, то span-ы будут накладываться друг на друга, раздувая строку лога в несколько раз.

Создадим модуль services/mod.rs, в котором создадим ещё один модуль trace.rs. TraceId будет просто обёрткой для SmallUid. Честно, я не смог разобраться, как передавать TraceId без обёртки в виде Extension. Поэтому пока просто вставляем в req.extensions_mut наше расширение.

#[derive(Serialize, Clone, Debug)]
pub struct TraceId(Arc);

impl TraceId {
    pub fn new() -> TraceId {
        TraceId(Arc::new(small_uid::SmallUid::new().to_string()))
    }
}

impl Display for TraceId {
    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
        write!(f, "{}", self.0)
    }
}

impl Deref for TraceId {
    type Target = String;

    fn deref(&self) -> &Self::Target {
        &self.0
    }
}

pub async fn trace(mut req: Request, next: Next) -> Response {
    let trace_id = TraceId::new();

    let span = tracing::info_span!(
        "request",
        %trace_id,
        method = %req.method(),
        path = %req.uri().path(),
    );
    let _enter = span.enter();

    tracing::trace!("request started");

    req.extensions_mut().insert(trace_id);

    tracing::trace!("trace id added to request extensions");

    let response = next.run(req).await;

    tracing::trace!("request completed");

    response
}

После этого можем в main.rs обернуть функцию-сервис в axum::middleware::from_fn, чтобы превратить в сервис. Axum использует слоистую архитектуру, то есть запрос будет обрабатываться от самого последнего к первому обработчику. Исходя из этого, чтобы наш сервис-трейсер обрабатывал все наши запросы, нужно поместить его самым последним.

#[tokio::main]
async fn main() {
    let _guard = init_logs();

    let db = init_db().await;

    let state = Arc::new(AppState { db });

    let app = Router::new()
        .route("/", get(|| async { "Hello, World!" }))
        .with_state(state)
        .layer(middleware::from_fn(trace));

    let port = 4000;
    let listener = TcpListener::bind((Ipv4Addr::LOCALHOST, port))
        .await
        .expect("failed to bind port");

    tracing::info!("Listening on port {port}");

    axum::serve(listener, app)
        .await
        .expect("failed to start server");
}

Возможно, я зря писал всё это, ибо в tower_http есть TraceLayer и SetRequestIdLayer, к которые позволяют реализовать схожую по функционалу вещь.

Первый эндпоинт

Пора ваять наше API! Я не буду надолго останавливаться на том, что такое RESTful API, порекомендую лишь вот это, это и вот это как краткую выжимку. Я даже не гарантирую, что мы будем придерживаться принципов, описанных в источниках.

Предлагаю начать с регистрации новых пользователей. Для это опишем функцию new_user, которая будет принимать на вход наш TraceId, стейт и экстрактор формы, он должен идти последним, так как потребляет body запроса. Честно, я не знаю, что мной двигало, когда я решил делать через экстрактор формы, а не json, но, когда дойдём до фронтенда, возможно, ещё поменяем это.

И начнём работу с того, что проверим запрос на ошибки. Если встречается ошибка, то она заносится в HashMap с названием поля, чтобы пользователь понимал, о чём идёт речь.

pub async fn new_user(
    Extension(trace_id): Extension,
    State(state): State>,
    Form(user): Form,
) -> Result {
    tracing::trace!("validationg user credentials");

    let mut errors = HashMap::new();
    let username_errors = user.username.validate();

    if !username_errors.is_empty() {
        tracing::trace!("invalid username: {}", username_errors.join(", "));
        errors.insert("username".to_owned(), username_errors);
    }

    let password_errors = user.password.validate();

    if !password_errors.is_empty() {
        tracing::trace!("invalid password: {}", password_errors.join(", "));
        errors.insert("password".to_owned(), password_errors);
    }

    if !errors.is_empty() {
        tracing::trace!("invalid user credentials, returning error");
        return Err(ApiError::Validation {
            fields: errors,
            trace_id,
        });
    }

    todo!()
}

Но как-то это выглядит неаккуратно, давайте вынесем в отдельную функцию validate_user. Сразу выглядит чище, не правда ли?

pub async fn new_user(
    Extension(trace_id): Extension,
    State(state): State>,
    Form(user): Form,
) -> Result {
    tracing::trace!("validationg user credentials");
    let errors = validate_user(&user);

    if !errors.is_empty() {
        tracing::trace!("invalid user credentials, returning error");
        return Err(ApiError::Validation {
            fields: errors,
            trace_id,
        });
    }

    todo!()
}

fn validate_user(user: &LoginUserRequest) -> HashMap> {
    let mut errors = HashMap::new();
    let username_errors = user.username.validate();

    if !username_errors.is_empty() {
        tracing::trace!("invalid username: {}", username_errors.join(", "));
        errors.insert("username".to_owned(), username_errors);
    }

    let password_errors = user.password.validate();

    if !password_errors.is_empty() {
        tracing::trace!("invalid password: {}", password_errors.join(", "));
        errors.insert("password".to_owned(), password_errors);
    }

    errors
}

Хешируем и солим

Теперь можем приступать к созданию нового пользователя! Хорошим тоном будет трансформация пароля алгоритмом хеширования sha256 в строку фиксированной длины. Данный алгоритм даёт баланс между безопасностью и скоростью.

@subzey отметил, что это устаревшая рекомендация, актуальные можно найти на сайте OWASP (Open Web Application Security Project), где они рекомендуют использовать такие алгоритмы, как Argon2id, scrypt, bcrypt или PBKDF2. Помимо этого вводят такое понятие, как pepper, что подразумевает под собой дополнительную глобальную соль.

Помимо этого рекомендуют добавлять соль к паролю, чтобы пароль просто не сопоставили по таблице соответствия. Обычно для каждого пользователя генерируют свою соль и хранят прямо вместе с другой информацией, но у нас нет столбца под соль, поэтому давайте создадим новую миграцию!

sqlx migrate add salt

Миграция будет супер маленькой, мы просто добавляем один столбец, который будет заполнен пробелами, если верить документации Postgres. Размер 32 символа взяли из того, что мы будем генерировать соль размером в 16 байт, которые будем кодировать в HEX, где один байт равен двум символам.

-- Add up migration script here

ALTER TABLE Users ADD COLUMN salt CHAR(32);

Down-скрипт будет следующим:

-- Add down migration script here

ALTER TABLE Users DROP COLUMN salt;

Чтобы сгенерировать случайную последовательность байт, нам потребуется крейт rand. добавим его в проект. Нам нужна скорость и не нужна криптографическая стойкость, поэтому воспользуемся стандартным ГПСЧ, которым рекомендуют пользоваться сами авторы крейта для массовой генерации - SmallRng.

cargo add rand --no-default-features -F small_rng
cargo add hex

Предлагаю написать трейт RandomGenerator с методом get_salt и инкапсулируем ГСПЧ в структуру для лёгкой подмены в тестах, если нам потребуется. Для этого создам модуль rand.rs. Реализация метода get_salt будет супер простой: создаём буфер, который заполняем «случайными» битами, и кодируем в HEX.

#[cfg_attr(test, mockall::automock)]
pub trait RandomGenerator: Sync + Send {
    fn get_salt(&mut self) -> String;
}

pub struct SmallRandom(SmallRng);

impl SmallRandom {
    pub fn new(seed: u64) -> Self {
        SmallRandom(SmallRng::seed_from_u64(seed))
    }
}

impl RandomGenerator for SmallRandom {
    fn get_salt(&mut self) -> String {
        let mut result = [0u8; 16];
        self.0.fill(&mut result);
        hex::encode(result)
    }
}

Чтобы не получать каждый раз похожие результаты, инициализируем ГПСЧ единожды и будем передавать внутри AppState.

// state.rs

pub struct AppState {
    pub random: Arc>,
    pub db: PgPool,
}

impl AppState {
    pub fn new(random: Arc>, db: sqlx::PgPool) -> Self {
        Self {
            db,
            random,
        }
    }
}

// main.rs

async fn main() {
    // ...

    let rng = Arc::new(Mutex::new(SmallRandom::new(807234275934919497)));

    let state = Arc::new(AppState { db, rng });

    // ...
}

Я думал, что таким образом SmallRng поведёт себя как функция rand в C++, но на практике оказалось, что этот ГПСЧ выдаёт одно детерминированное значение.

Добавим в проект крейт для хеширования sha2.

cargo add sha2

Предлагаю вынести работу с базой в другую функцию create_user и типизировать наш пароль структурой PasswordHash.

// models/users.rs

pub struct PasswordHash {
    password: String,
    salt: String,
}

impl PasswordHash {
    pub fn new(password: &str, salt: &str) -> Self {
        let hash = sha2::Sha256::digest(format!("{password}{salt}").as_bytes());
        Self {
            password: hex::encode(hash),
            salt: salt.to_owned(),
        }
    }
}

impl Deref for PasswordHash {
    type Target = String;

    fn deref(&self) -> &Self::Target {
        &self.password
    }
}

// controllers/users.rs

pub async fn new_user(
    Extension(trace_id): Extension,
    State(state): State>,
    Form(user): Form,
) -> Result {
    tracing::trace!("validationg user credentials");
    let errors = validate_user(&user);

    if !errors.is_empty() {
        tracing::trace!("invalid user credentials, returning error");
        return Err(ApiError::Validation {
            fields: errors,
            trace_id,
        });
    }

    let salt = state.random.lock().await.get_salt();
    let password = PasswordHash::new(user.password, &salt);
    let user_id = create_user(&state.random, &*state.userss, &user.username, &trace_id).await?;

    todo!()
}

async fn create_user(
    rand: &tokio::sync::Mutex,
    db: &PgPoool,
    salt: &str,
    user: &LoginUserRequest,
    trace_id: &TraceId,
) -> Result {

    tracing::trace!("saving user credintials in database...");
    let result = sqlx::query_scalar!(
        "INSERT INTO Users (Name, Password, Salt) VALUES ($1, $2, $3) RETURNING Id",
        username,
        *password_hash,
        salt
    )
    .fetch_one(&db)
    .await;

    match result {
        Ok(id) => {
            tracing::info!("user {} created", *user.username);
            Ok(id)
        }
        Err(err) => {
            tracing::error!("failed to create user: {}", err);
            Err(ApiError::Unknown {
                trace_id: trace_id.clone(),
            })
        }
    }
}

Дальше нужно создать сессию. Шанс, конечно, мизерный, но у нас может случиться коллизия идентификаторов сессий, поэтому у нас будет пять, целых пять попыток создать сессию, после чего отправляем ошибку.

pub async fn new_user(
    Extension(trace_id): Extension,
    State(state): State>,
    Form(user): Form,
) -> Result {
    tracing::trace!("validationg user credentials");
    let errors = validate_user(&user);

    if !errors.is_empty() {
        tracing::trace!("invalid user credentials, returning error");
        return Err(ApiError::Validation {
            fields: errors,
            trace_id,
        });
    }

    let salt = get_salt(&state).await;
    let password = PasswordHash::new(user.password, salt);
    let user_id = create_user(&state, &user.username, &trace_id).await?;
    let session = create_session(&state, user_id, &trace_id).await?;

   Ok(LoginUserResponse::new(user_id, session))
}

async fn create_session(
    state: &AppState,
    user_id: i32,
    trace_id: &TraceId,
) -> Result {
    for _ in 0..5 {
        tracing::trace!("generating session UID...");
        let uid = small_uid::SmallUid::new().to_string();

        let expires_at =
 OffsetDateTime::now_utc().saturating_add(Duration::seconds(SESSION_LIFETIME));

        let result = sqlx::query!(
            "INSERT INTO Sessions (Uid, UserId, ExpiresAt) VALUES ($1, $2, $3)",
            uid,
            user_id,
            expires_at
        )
        .execute(&state.db)
        .await?;

        tracing::trace!("trying to save session UID {uid} for user id {user_id} in database...");
        match result
        {
            Ok(_) => {
                tracing::info!("session {} created for user {}", uid, user_id);
                return Ok(uid);
            }
            Err(err) => {
                tracing::error!("failed to create {} session: {}", uid, err);
                return Err(ApiError::Unknown {
                    trace_id: trace_id.clone(),
                });
            }
        };
    }

    tracing::error!("failed to create session");
    Err(ApiError::Conflict {
        trace_id: trace_id.clone(),
    })
}

Тесты

Ну вроде всё, теперь можно приступать к тестированию. Мы будем писать юнит-тесты и интеграционные. Первые тестируют функции по отдельности, а вот вторые проверяют работу всей системы в целом, как в боевых условиях.

Юнит-тесты

Давайте посмотрим на некоторый пример юнит-тестов к функции validate_user. На счёт философии тестирования в rust рекомендую почитать документацию.

#[test]
async fn test_validate_user_ok() {
  let user = LoginUserRequest {
      username: Username::new("valid_user".into()),
      password: Password::new("ValidPass123".into()),
  };

  let errors = validate_user(&user);
  assert!(errors.is_empty());
}

#[test]
async fn test_validate_user_invalid_username_and_password() {
  let user = LoginUserRequest {
      username: Username::new("".into()),
      password: Password::new("".into()),
  };

  let errors = validate_user(&user);

  assert!(errors.contains_key("username"));
  assert!(errors.contains_key("password"));
}

#[test]
async fn test_validate_user_invalid_username() {
  let user = LoginUserRequest {
      username: Username::new("вууу".into()),
      password: Password::new("ValidPass123".into()),
  };

  let errors = validate_user(&user);

  assert!(errors.contains_key("username"));
}

#[test]
async fn test_validate_user_invalid_username_length() {
  let user = LoginUserRequest {
      username: Username::new("0123456789012345678901234567890123456789".into()),
      password: Password::new("123".into()),
  };

  let errors = validate_user(&user);

  assert!(errors.contains_key("username"));
}

Ничего сложного, не правда ли? Но если мы подумаем о том, как протестировать функцию create_user, то столкнёмся с тем, что мы не знаем, ведь она использует под капотом запросы к базе данных. Казалось бы, разверни просто тестовую базу, подумаешь, всё равно в devcontainer работаешь, но нет! Мы воспользуемся паттерном Repository, чтобы можно было легко подменять настоящую реализацию на имитирующую, и крейтом mockall, чтобы он автоматически делал нам имитацию.

Ещё нам потребуется крейт async_trait, иначе код просто не скомпилируется с ошибкой о том, что трейт не dyn-совместимый. Почитать про dyn-совместимость можете на странице документации. Добавим эти крейты.

cargo add async_trait
cargo add --dev mockall

Мы опишем соответствующие трейты UsersRepository и SessionsRepository в подмодулях нового модуля repositories и накинем атрибутный макрос automock, который будет работать только при тестировании.

// repositories/users.rs

#[cfg_attr(test, mockall::automock)]
#[async_trait::async_trait]
pub trait UsersRepository: Send + Sync {
    async fn create_user(&self, username: &str, password: PasswordHash) -> Result;
    async fn get_user(&self, username: &str, password: PasswordHash) -> Result;
    async fn get_user_by_session(&self, session_id: &str) -> Result;
}

// repositories/sessions.rs

#[cfg_attr(test, mockall::automock)]
#[async_trait::async_trait]
pub trait SessionsRepository: Send + Sync {
    async fn create_session(
        &self,
        uid: &str,
        user_id: i32,
        expires_at: OffsetDateTime,
    ) -> Result<(), RepositoryError>;
    async fn get_session_by_user_id(&self, user_id: i32) -> Result;
    async fn get_session(&self, uid: &str) -> Result;
    async fn remove_session(&self, uid: &str) -> Result<(), RepositoryError>;
}

Теперь в AppState можем добавить следующие поля. И сразу напишем конструктор, который будет инициализировать репозитории, чтобы не засорять функцию main.

pub struct AppState {
    pub random: Arc>,

    pub users: Arc,
    pub sessions: Arc,
}

impl AppState {
    pub fn new(random: Arc>, pool: sqlx::PgPool) -> Self {
        Self {
            users: Arc::new(PgUsersRepository::new(pool.clone())),
            sessions: Arc::new(PgSessionsRepository::new(pool)),
            random,
        }
    }
}

Давайте сразу добавим тип ошибок в файле error.rs, чтобы можно было удобнее их обрабатывать в контроллерах.

#[derive(Debug)]
pub enum RepositoryError {
    Unknown(sqlx::Error),
    Conflict,
    NotFound,
}

impl Display for RepositoryError {
    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
        match self {
            RepositoryError::Unknown(err) => write!(f, "unknown repository error: {err}"),
            RepositoryError::Conflict => write!(f, "conflict"),
            RepositoryError::NotFound => write!(f, "not found"),
        }
    }
}

impl From for RepositoryError {
    fn from(err: sqlx::Error) -> Self {
        match err {
            sqlx::Error::Database(ref db)
                if db.kind() == sqlx::error::ErrorKind::UniqueViolation =>
            {
                RepositoryError::Conflict
            }
            sqlx::Error::RowNotFound => RepositoryError::NotFound,
            _ => RepositoryError::Unknown(err),
        }
    }
}

Теперь можем описать логику репозиториев. Не буду на этом останавливаться, ибо реализация до ужаса простая.

// repositories/sessions.rs

pub struct PgSessionsRepository(sqlx::PgPool);

impl PgSessionsRepository {
    pub fn new(db: sqlx::PgPool) -> Self {
        Self(db)
    }
}

#[async_trait::async_trait]
impl SessionsRepository for PgSessionsRepository {
    async fn create_session(
        &self,
        uid: &str,
        user_id: i32,
        expires_at: OffsetDateTime,
    ) -> Result<(), RepositoryError> {
        sqlx::query!(
            "INSERT INTO Sessions (Uid, UserId, ExpiresAt) VALUES ($1, $2, $3)",
            uid,
            user_id,
            expires_at
        )
        .execute(&self.0)
        .await?;

        Ok(())
    }

    async fn get_session_by_user_id(&self, user_id: i32) -> Result {
        let uid = sqlx::query_scalar!("SELECT Uid FROM Sessions WHERE UserId = $1", user_id)
            .fetch_one(&self.0)
            .await?;

        Ok(uid)
    }

    async fn get_session(&self, uid: &str) -> Result {
        let userid = sqlx::query_scalar!("SELECT UserId FROM Sessions WHERE Uid = $1", uid)
            .fetch_one(&self.0)
            .await?;

        Ok(userid)
    }

    async fn remove_session(&self, uid: &str) -> Result<(), RepositoryError> {
        sqlx::query!("DELETE FROM Sessions WHERE Uid = $1", uid)
            .execute(&self.0)
            .await?;
        Ok(())
    }
}

// repositories/users.rs

pub struct PgUsersRepository(sqlx::PgPool);

impl PgUsersRepository {
    pub fn new(db: sqlx::PgPool) -> Self {
        Self(db)
    }
}

#[async_trait::async_trait]
impl UsersRepository for PgUsersRepository {
    async fn create_user(&self, username: &str, password: PasswordHash) -> Result {
        let result = sqlx::query!(
            "INSERT INTO Users (Name, Password) VALUES ($1, $2) RETURNING Id",
            username,
            *password
        )
        .fetch_one(&self.0)
        .await?;

        Ok(result.id)
    }

    async fn get_user(&self, username: &str, password: PasswordHash) -> Result {
        let result = sqlx::query_as!(
            User,
            "SELECT Id, Name as username, Password, CreatedAt as created_at FROM Users WHERE Name = $1 AND Password = $2",
            username,
            *password
        )
        .fetch_one(&self.0)
        .await?;

        Ok(result)
    }

    async fn get_user_by_session(&self, session_id: &str) -> Result {
        let result = sqlx::query_as!(User,
            "SELECT u.Id, u.Name as username, u.Password, u.CreatedAt as created_at FROM Users u LEFT JOIN Sessions s ON u.Id = s.UserId WHERE s.Uid = $1",
            session_id
        )
        .fetch_one(&self.0)
        .await?;

        Ok(result)
    }
}

Давайте напишем тест, проверяющий работу нашей функции create_user. Для начала инициализируем мок репозитория, который будет ожидать, что пользователь воспользуется методом create_user у UsersRepository, который, в свою очередь, вернёт положительный результат с идентификатором пользователя «1». Точно так же поступаем с моком ГПСЧ - MockRandomGenerator.

#[test]
async fn test_create_user_ok() {
    let mut rand = MockRandomGenerator::new();
    rand.expect_get_salt().returning(|| "salt".to_string());
    let mut users = MockUsersRepository::new();
    users.expect_create_user().returning(|_, _| Ok(1));

    let user = LoginUserRequest {
        username: Username::new("valid_user"),
        password: Password::new("ValidPass123"),
    };

    let result = create_user(&Mutex::new(rand), &users, &user, &TraceId::new())
        .await
        .expect("failed to create user");
    assert_eq!(result, 1);
}

Помимо ожидаемого поведения можно протестировать поведение нашей функции при конфликте данных: она должна вернуть ошибку более высоко уровня ApiError::Conflict вместо RepositoryError::Conflict.

#[test]
async fn test_create_user_conflict() {
    let mut users = MockUsersRepository::new();
    users
        .expect_create_user()
        .returning(|_, _| Err(RepositoryError::Conflict));

    let user = LoginUserRequest {
        username: Username::new("valid_user"),
        password: Password::new("ValidPass123"),
    };

    let result = create_user(&users, &user, &TraceId::new()).await;
    assert!(matches!(result, Err(ApiError::Conflict { .. })));
}

Интеграционные тесты

Предлагаю теперь посмотреть на то, как это будет работать вместе с create_session в интеграционном тесте. Однако не всё так просто, так как интеграционные тесты хранятся в папке tests рядом с src, и чтобы наши тесты могли увидеть наши функции, нам нужно превратить крейт в тип lib. Для этого достаточно создать файлик lib.rs в src и переместить все объявления модулей туда, как показано ниже. Все pub модули будут видны тестам.

pub mod controllers;
mod db;
pub mod error;
mod logs;
pub mod models;
pub mod repositories;
mod state;
pub mod services;

Создам файлик tests/users.rs, в котором напишу функцию test_new_user, которая будет тестировать контроллер new_user при штатных входных данных. Вроде ничего сложного. Ну, кроме того, что зачем я в конце теста чищу добавленные данные. Это нужно, чтобы при повторном запуске тестов они не упали с ошибкой. Вообще, по-хорошему, делать ещё и при неудачном выполнении new_user, который просто обёрнут методом expect, придётся ручками чистить таблицы в случае упавших тестов. Но мне лень.

#[test]
async fn test_new_user() {
    const USERNAME: &str = "valid_user1";

    let trace_id = TraceId::new();
    let trace_ext = Extension(trace_id);

    let db = init_db().await;

    let state = State(Arc::new(AppState::new(
        Arc::new(Mutex::new(SmallRandom::new(1))),
        db.clone(),
    )));

    let data = Form(LoginUserRequest {
        username: Username::new(USERNAME),
        password: Password::new("ValidPass123"),
    });

    let result = server::controllers::users::new_user(trace_ext, state, data)
        .await
        .expect("failed to create user");

    let response = result.into_response();
    let cookies = response.headers();
    assert!(cookies.contains_key("Set-Cookie"));

    let result = sqlx::query!("SELECT * FROM Users WHERE Name = $1", USERNAME)
        .fetch_one(&db)
        .await
        .expect("failed to fetch user");

    let _ = sqlx::query!("SELECT * FROM Sessions WHERE UserId = $1", result.id)
        .fetch_one(&db)
        .await
        .expect("failed to fetch session");

    query!("DELETE FROM Users WHERE Name = $1", USERNAME)
        .execute(&db)
        .await
        .unwrap();
    query!("DELETE FROM Sessions WHERE userid = $1", result.id)
        .execute(&db)
        .await
        .unwrap();
}

Теперь можем запустить тесты командой cargo test и любоваться прекрасным зрелищем того, как тесты горят проходят.

Аутентификация

Вход

Создание нового пользователя мы реализовали и даже протестировали, но существующие пользователи должны как-то входить, поэтому давайте реализуем ручку для входя уже имеющихся в системе пользователей.

Наш трейт UsersRepository ищет пользователя по его имени и хешу пароля, поэтому сначала нам надо получить соль пользователя. Для этого в трейт добавим метод get_user_salt, вынесем получение идентификатора пользователя в отдельную функцию get_user_id и после получения соли можем сразу получить хеш пароля и затем самого пользователя. Я позволил себе вынести получение соли в отдельную функцию get_user_salt.

// repositories/users.rs

pub trait UsersRepository: Send + Sync {
		// ...    
    async fn get_user_salt(&self, username: &str) -> Result<String, RepositoryError>;
}

impl UsersRepository for PgUsersRepository {
		// ...
    async fn get_user_salt(&self, username: &str) -> Result<String, RepositoryError> {
        let salt = sqlx::query_scalar!("SELECT Salt FROM Users WHERE Name = $1", username)
            .fetch_one(&self.0)
            .await?
            .ok_or(RepositoryError::NotFound)?;

        Ok(salt)
    }
}

// controllers/users.rs

pub async fn login_user(
    Extension(trace_id): Extension<TraceId>,
    State(state): State<Arc<AppState>>,
    Form(user): Form<LoginUserRequest>,
) -> Result<LoginUserResponse, ApiError> {
    tracing::trace!("getting user id by name {}...", *user.username);
    let user_id = get_user_id(&*state.users, &user, &trace_id).await?;
    
    todo!()
}

async fn get_user_id(
    users: &dyn UsersRepository,
    user: &LoginUserRequest,
    trace_id: &TraceId,
) -> Result<i32, ApiError> {
    tracing::trace!("getting user salt...");

    let salt = get_user_salt(users, &user.username, trace_id).await?;
    let password_hash = PasswordHash::new(&user.password, &salt);

    tracing::trace!("getting user id...");
    match users.get_user(&user.username, password_hash).await {
        Ok(user) => {
            tracing::info!("user {} found", user.username);
            Ok(user.id)
        }
        Err(RepositoryError::NotFound) => {
            tracing::warn!("user {} not found", *user.username);
            Err(ApiError::NotFound {
                trace_id: trace_id.clone(),
            })
        }
        Err(err) => {
            tracing::error!("failed to get user: {}", err);
            Err(ApiError::Unknown {
                trace_id: trace_id.clone(),
            })
        }
    }
}

async fn get_user_salt(
    users: &dyn UsersRepository,
    username: &Username,
    trace_id: &TraceId,
) -> Result<String, ApiError> {
    match users.get_user_salt(username).await {
        Ok(salt) => {
            tracing::info!("user {} salt found", **username);
            Ok(salt)
        }
        Err(RepositoryError::NotFound) => {
            tracing::warn!("user {} not found", **username);
            Err(ApiError::NotFound {
                trace_id: trace_id.clone(),
            })
        }
        Err(err) => {
            tracing::error!("failed to get user salt: {}", err);
            Err(ApiError::Unknown {
                trace_id: trace_id.clone(),
            })
        }
    }
}

Не будем ничего придумывать, просто находим уже существующую или создаём новую сессию.

pub async fn login_user(
    Extension(trace_id): Extension<TraceId>,
    State(state): State<Arc<AppState>>,
    Form(user): Form<LoginUserRequest>,
) -> Result<LoginUserResponse, ApiError> {
    tracing::trace!("getting user id by name {}...", *user.username);
    let user_id = get_user_id(&*state.users, &user, &trace_id).await?;
    let session = get_or_create_session(&*state.sessions, user_id, &trace_id).await?;

    Ok(LoginUserResponse::new(user_id, session))
}

async fn get_or_create_session(
    sessions: &dyn SessionsRepository,
    user_id: i32,
    trace_id: &TraceId,
) -> Result<String, ApiError> {
    match sessions.get_session_by_user_id(user_id).await {
        Ok(session) => {
            tracing::info!("session {session} found");
            Ok(session)
        }
        Err(err) => {
            tracing::error!("failed to get session: {err}");

            tracing::trace!("trying to create new session");
            create_session(sessions, user_id, trace_id).await
        }
    }
}

Идентификация

А что если пользователь захочет выйти из аккаунта? Нам надо как-то идентифицировать пользователя, чтобы удалить его сессию, поэтому давайте реализуем сервис аутентификации. Для этого создам подмодуль auth в модуле services и распишем одноименный сервис, который будет:

  1. получать TraceId из extensions,

  2. парсить куки с помощью крейта cookie,

  3. искать пользователя в базе,

  4. вставлять в расширения информацию о пользователе.

pub const SESSION_COOKIE_NAME: &str = "session";

pub struct Auth {
    pub session: String,
    pub user: User,
}

pub async fn auth(
    State(state): State<Arc<AppState>>,
    mut req: Request<Body>,
    next: Next,
) -> Response {
    tracing::info!("authenticating user...");

    let Some(trace_id) = req.extensions().get::<TraceId>() else {
        tracing::warn!("trace id not found");
        return ApiError::Internal.into_response();
    };

    tracing::trace!("parsing cookie...");
    let cookie = req
        .headers()
        .get_all("Cookie")
        .into_iter()
        .filter_map(|c| c.to_str().ok())
        .filter_map(|c| c.parse::<cookie::Cookie>().ok())
        .find(|c| c.name() == SESSION_COOKIE_NAME);

    tracing::trace!("getting session uid...");
    let session_uid = match cookie {
        Some(c) => c.value().to_owned(),
        None => {
            tracing::warn!("session cookie not found");
            return unauthorized(trace_id.clone());
        }
    };

    let user = match state.users.get_user_by_session(&session_uid).await {
        Ok(user) => user,
        Err(err) => {
            tracing::error!("failed to get user: {}", err);
            return unauthorized(trace_id.clone());
        }
    };

    let auth_state = Arc::new(Auth {
        session: session_uid.to_owned(),
        user,
    });

    req.extensions_mut().insert(auth_state);

    next.run(req).await
}

fn unauthorized(trace_id: TraceId) -> Response {
    ApiError::Unauthorized { trace_id }.into_response()
}

Тестирование

Теперь нужно протестировать сервис, но эта задача не так очевидна, как было до этого, ведь для того, чтобы проверить работу сервиса, в него нужно передать объекты типов Request и Next. Если Request не вызывает лишних вопросов, то вот Next не имеет публичных конструкторов, следовательно, протестировать привычным способом не представляется возможным.

На выручку нам спешит крейт tower, который имеет на своём борту трейт ServiceExt и метод oneshot, позволяющий прогнать по всем контроллерам запрос. Так как он нужен нам исключительно для тестов, то добавить нужно его в dev-зависимости.

cargo add --dev tower

Импортируем ServiceExt в tests/auth.rs и у объекта типа Router появляется метод oneshot. Будем проверять работу через то, что добавим контроллер, который будет возвращать имя пользователя из объекта Auth в ответ.

Но в силу того, что объект типа Body не имеет никаких публичных методов для получения его содержимого, будем использовать функцию to_bytes из axum и потом формируем строку из полученного массива байт.

use tower::ServiceExt;

#[test]
async fn test_auth_ok() {
    const USERNAME: &str = "user1222";

    let db = init_db().await;
    let state = Arc::new(AppState::new(
        Arc::new(Mutex::new(SmallRandom::new(1))),
        db.clone(),
    ));

    let user_id = state
        .users
        .create_user(USERNAME, PasswordHash::new("ValidPass123", "salt"))
        .await
        .expect("failed to create user");

    let expires = OffsetDateTime::now_utc().saturating_add(Duration::seconds(SESSION_LIFETIME));
    let session = small_uid::SmallUid::new();
    state
        .sessions
        .create_session(&session.to_string(), user_id, expires)
        .await
        .expect("failed to create session");

    let req = Request::builder()
        .uri("/")
        .header("Cookie", format!("{SESSION_COOKIE_NAME}={session}"))
        .extension(Extension(session))
        .body(Body::empty())
        .unwrap();

    let app = Router::new()
        .route(
            "/",
            get(|Extension(auth): Extension<Arc<Auth>>| async move { auth.user.username.clone() }),
        )
        .layer(middleware::from_fn_with_state(
            state,
            server::services::auth::auth,
        ))
        .layer(Extension(TraceId::new()));

    let res = app.oneshot(req).await.unwrap();
    assert_eq!(res.status(), 200);

    let body = String::from_utf8(
        axum::body::to_bytes(res.into_body(), usize::MAX)
            .await
            .unwrap()
            .to_vec(),
    )
    .unwrap();
    assert_eq!(body, USERNAME);

    sqlx::query!("DELETE FROM users WHERE Name = $1", USERNAME)
        .execute(&db)
        .await
        .unwrap();
    sqlx::query!("DELETE FROM sessions WHERE userid = $1", user_id)
        .execute(&db)
        .await
        .unwrap();
}

#[test]
async fn test_auth_not_found_session() {
    let db = init_db().await;
    let state = Arc::new(AppState::new(
        Arc::new(Mutex::new(SmallRandom::new(1))),
        db.clone(),
    ));

    let session = small_uid::SmallUid::new();
    let req = Request::builder()
        .uri("/")
        .header("Cookie", format!("{SESSION_COOKIE_NAME}={session}"))
        .extension(Extension(session))
        .body(Body::empty())
        .unwrap();

    let app = Router::new()
        .route(
            "/",
            get(|Extension(auth): Extension<Arc<Auth>>| async move { auth.user.username.clone() }),
        )
        .layer(middleware::from_fn_with_state(
            state,
            server::services::auth::auth,
        ))
        .layer(Extension(TraceId::new()));

    let res = app.oneshot(req).await.unwrap();
    assert_eq!(res.status(), 401);
}

Выход

Перед тем, как мы напишем контроллер logout_user, нужно создать модель ответа LogoutUserResponse. Отвечать будем заголовком, который установит время жизни куки сессии равный нулю.

pub struct LogoutUserResponse;

impl IntoResponse for LogoutUserResponse {
    fn into_response(self) -> axum::response::Response {
        (
            StatusCode::OK,
            [("Set-Cookie", format!("{SESSION_COOKIE_NAME}=_; Max-Age=0"))],
            (),
        )
            .into_response()
    }
}

Сам контроллер logout_user будет супер примитивным: просто удаляем из базы сессию.

pub async fn logout_user(
    Extension(auth): Extension<Arc<Auth>>,
    Extension(trace_id): Extension<TraceId>,
    State(state): State<Arc<AppState>>,
) -> Result<LogoutUserResponse, ApiError> {
    tracing::trace!("logging out user {}...", auth.user.id);

    match state.sessions.remove_session(&auth.session).await {
        Ok(_) => {
            tracing::info!("session {} deleted", auth.session);
            Ok(LogoutUserResponse)
        }
        Err(RepositoryError::NotFound) => {
            tracing::warn!("session {} not found", auth.session);
            Err(ApiError::NotFound {
                trace_id: trace_id.clone(),
            })
        }
        Err(err) => {
            tracing::error!("failed to delete session: {err}");
            Err(ApiError::Unknown {
                trace_id: trace_id.clone(),
            })
        }
    }
}

Чистим сессии

Хотя можно и оставлять просроченные сессии в базе, сверяя актуальную дату, но мы не ищем лёгких путей! Поэтому будем чистить базу от просроченных сессий. Для этого запустим в отдельном потоке задачу, которая будет раз в N времени выполнять запрос к базе. Функция, выполняющая запуск такой задачи будет у меня находиться по пути services/session.rs.

const CLEANUP_INTERVAL: u64 = 60 * 60;

pub fn start_cleanup_task(db: sqlx::PgPool) {
    tokio::spawn(async move {
        loop {
            match sqlx::query!(
                "DELETE FROM Sessions WHERE ExpiresAt < $1",
                time::OffsetDateTime::now_utc()
            )
            .execute(&db)
            .await
            {
                Ok(count) => tracing::info!("deleted {} expired sessions", count.rows_affected()),
                Err(err) => tracing::error!("failed to delete expired sessions: {}", err),
            }

            tokio::time::sleep(std::time::Duration::from_secs(CLEANUP_INTERVAL)).await;
        }
    });
}

Чаты

Напишем контроллер, который будет создавать новые чаты. И вот тут у меня возникли проблемы с тем, как это реализовать. Нет, это довольно очевидно, просто создаёшь новую запись в базе, но вот в каком виде принимать список членов чата: по имени или по идентификаторам, если то и другое уникально? Попробуем по идентификаторам!

Помимо этого возникла довольно неочевидная проблема использования репозиториев: если мне надо внести изменения сразу в несколько таблиц, то тут неплохо подходит транзакция, но как её прокинуть в репозиторий, как аргумент или создать какой-нибудь менеджер, который будет инициализировать мини-репозитории и прокидывать в них транзакцию? Второе звучит довольно круто, но слишком запарено тестировать, поэтому будем использовать первый подход.

Традиционно начнём с того, что определим модели данных, с которыми будем работать. Я немного упоролся и начал вообще всё типизировать, хе-хе.

#[derive(Deserialize, sqlx::Type)]
#[sqlx(transparent)]
pub struct ChatTitle(String);

impl ChatTitle {
    pub fn new(title: String) -> Self {
        Self(title)
    }

    pub fn validate(&self) -> Vec<String> {
        let mut errors = Vec::new();

        if self.0.is_empty() {
            errors.push("Title is empty".to_string());
        }

        if self.0.len() > 50 {
            errors.push("Title is too long".to_string());
        }

        errors
    }
}

impl Deref for ChatTitle {
    type Target = String;

    fn deref(&self) -> &Self::Target {
        &self.0
    }
}

#[derive(Debug, sqlx::Type, Serialize, Deserialize)]
#[sqlx(transparent)]
pub struct ChatId(i32);

impl ChatId {
    pub fn new(id: i32) -> Self {
        Self(id)
    }
}

impl std::fmt::Display for ChatId {
    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
        write!(f, "{}", self.0)
    }
}

impl PartialEq<i32> for ChatId {
    fn eq(&self, other: &i32) -> bool {
        self.0 == *other
    }
}

impl Deref for ChatId {
    type Target = i32;

    fn deref(&self) -> &Self::Target {
        &self.0
    }
}

#[derive(Deserialize)]
pub struct NewChatRequest {
    pub title: ChatTitle,
    pub users_ids: Vec<UserId>,
}

#[derive(Serialize)]
pub struct NewChatResponse {
    pub chat_id: ChatId,
}

impl NewChatResponse {
    pub fn new(chat_id: ChatId) -> Self {
        Self { chat_id }
    }
}

impl IntoResponse for NewChatResponse {
    fn into_response(self) -> axum::response::Response {
        (axum::http::StatusCode::CREATED, axum::Json(self)).into_response()
    }
}
> &Self::Target {
        &self.0
    }
}

#[derive(Deserialize)]
pub struct NewChatRequest {
    pub title: ChatTitle,
    pub users_ids: Vec<UserId>,
}

#[derive(Serialize)]
pub struct NewChatResponse {
    pub chat_id: ChatId,
}

impl NewChatResponse {
    pub fn new(chat_id: ChatId) -> Self {
        Self { chat_id }
    }
}

impl IntoResponse for NewChatResponse {
    fn into_response(self) -> axum::response::Response {
        (axum::http::StatusCode::CREATED, axum::Json(self)).into_response()
    }
}

Напишем трейт ChatsRepository, который будет рулить сразу двумя таблицами: Chats и ChatMembers (надеюсь, меня не загрызут заживо в комментария за это…).

#[cfg_attr(test, mockall::automock)]
#[async_trait::async_trait]
pub trait ChatsRepository {
    async fn create_chat(
        &self,
        title: ChatTitle,
        users: Vec<UserId>,
    ) -> Result<ChatId, RepositoryError>;
}

pub struct PgChatsRepository(PgPool);

impl PgChatsRepository {
    pub fn new(pool: PgPool) -> Self {
        Self(pool)
    }
}

#[async_trait::async_trait]
impl ChatsRepository for PgChatsRepository {
    async fn create_chat(
        &self,
        title: ChatTitle,
        users: Vec<UserId>,
    ) -> Result<ChatId, RepositoryError> {
        let mut tn = self.0.begin().await?;

        let chat_id = ChatId::new(
            sqlx::query_scalar!(
                "INSERT INTO Chats (Title) VALUES ($1) RETURNING Id",
                title as _
            )
            .fetch_one(&mut *tn)
            .await?,
        );

        query!(
            "INSERT INTO ChatMembers (ChatId, UserId) SELECT $1, UNNEST($2::int[])",
            chat_id as _,
            users as _,
        )
        .execute(&mut *tn)
        .await?;

        tn.commit().await?;

        Ok(chat_id)
    }
}

И теперь можем сделать так.

pub async fn new_chat(
    Extension(auth): Extension<Arc<Auth>>,
    Extension(trace_id): Extension<TraceId>,
    State(state): State<Arc<AppState>>,
    Json(chat): Json<NewChatRequest>,
) -> Result<NewChatResponse, ApiError> {
    let errors = validate_chat(&chat.title);

    if !errors.is_empty() {
        return Err(ApiError::Validation {
            fields: errors,
            trace_id,
        });
    }

    let users_ids = merge_ids(auth.user.id, chat.users_ids);

    let chat_id = match state.chats.create_chat(chat.title, users_ids).await {
        Ok(id) => {
            tracing::trace!("chat {id} created");
            id
        }
        Err(err) => {
            tracing::error!("failed to create chat: {err}");
            return Err(ApiError::Unknown { trace_id });
        }
    };

    Ok(NewChatResponse::new(chat_id))
}

fn validate_chat(title: &ChatTitle) -> HashMap<String, Vec<String>> {
    let mut errors = HashMap::new();

    let title_errors = title.validate();
    if !title_errors.is_empty() {
        errors.insert("title".to_owned(), title_errors);
    }

    errors
}

fn merge_ids(user_id: UserId, ids: Vec<UserId>) -> Vec<UserId> {
    let mut users = Vec::with_capacity(ids.len() + 1);
    users.push(user_id);
    users.extend(ids);
    users
}

Ничего нового и интересного, поэтому опущу часть, где я создаю однообразные контроллеры создания и удаления чатов и сообщений. Но покажу проблему, с которой столкнулся.

Так как в базе столбец UserId может быть NULL, так как пользователь мог удалить свой профиль, но его сообщения остаются, в свою очередь, sqlx возвращает Option<i32> вместо Option<UserId>, даже не пытается применить какой-нибудь FromRow, который ты реализовал для структуры, хотя документация говорит об обратном, что структура для использования query_as должна реализовывать этот трейт.

В общем, я решил проблему тем, что в самом запросе закастил тип с помощью UserId as “sender_id: _”.

#[async_trait::async_trait]
impl MessagesRepository for PgMessagesRepository {
    async fn get_messages(
        &self,
        chat_id: ChatId,
        limit: i64,
    ) -> Result<Vec<Message>, RepositoryError> {
        let result = query_as!(
            Message,
            "SELECT Id, UserId as \\"sender_id: _\\", ChatId as chat_id, Content, CreatedAt as created_at FROM messages WHERE chatid = $1 ORDER BY CreatedAt DESC LIMIT $2",
            chat_id as _,
            limit
        ).fetch_all(&self.0)
        .await?;

        Ok(result)
    }
}

Авторизация

Помимо этого я столкнулся с такой проблемой, как проверять права пользователя на доступ к сообщениям чата. Было два варианта:

  1. запрашивать каждый раз чаты пользователя и в них искать нужный;

  2. делать один запрос и добавлять флаг о разрешении доступа в ответ от базы.

Я остановился на первом, ибо он проще, как мне кажется, чем писать километровый SQL-запрос, тем паче позже мы добавим кэширование всего и вся.

Поэтому реализуем метод get_user_chats в репозитории ChatsRepository, который формирует HashSet. Не то, чтобы значения могут повторяться, просто будет проще проверить наличие нужного чата методом contains. Затем я вынес логику авторизации в отдельный функцию check_chat_access, который запрашивает чаты и проверяет наличие чата в коллекции.

// repositories/chats.rs

#[async_trait::async_trait]
impl ChatsRepository for PgChatsRepository {
    // ...
    async fn get_user_chats_ids(
        &self,
        user_id: UserId,
    ) -> Result<HashSet<ChatId>, RepositoryError> {
        let chat_ids = query_scalar!(
            "SELECT ChatId FROM ChatMembers WHERE UserId = $1",
            user_id as _
        )
        .fetch(&self.0)
        .filter_map(|id| async { id.map(ChatId::from).ok() })
        .collect::<HashSet<ChatId>>()
        .await;

        Ok(chat_ids)
    }
}

// controllers/messages.rs

pub async fn get_messages(
    Extension(auth): Extension<Arc<Auth>>,
    Extension(trace_id): Extension<TraceId>,
    State(state): State<Arc<AppState>>,
    Query(params): Query<GetMessagesParams>,
) -> Result<GetMessagesResponse, ApiError> {
    if !check_chat_access(&*state.chats, auth.user.id, params.chat_id).await {
        tracing::error!("forbidden");
        return Err(ApiError::Forbidden { trace_id });
    }
    
    // ...
}

#[tracing::instrument(skip(chats), ret)]
async fn check_chat_access(chats: &dyn ChatsRepository, user_id: UserId, chat_id: ChatId) -> bool {
    let Ok(chats) = chats.get_user_chats_ids(user_id).await else {
        return false;
    };

    chats.contains(&chat_id)
}

Сообщения

Нам нужно получить список сообщений из чата. Мы можем загрузить все сообщения сразу, но если их тысячи, это неэффективно. Давайте загружать сообщения по мере необходимости. Для этого установим лимит на количество сообщений, которое можно получить за один раз.

Я решил задать максимальное количество сообщений константой. Надеюсь, это приемлемое решение.

// repositories/messages.rs

#[async_trait::async_trait]
impl MessagesRepository for PgMessagesRepository {
    async fn get_messages(
        &self,
        chat_id: ChatId,
        limit: i64,
        last_message_id: Option<MessageId>,
    ) -> Result<Vec<Message>, RepositoryError> {
        let result = query_as!(
                Message,
                "SELECT Id, UserId as \\"sender_id: _\\", ChatId as chat_id, Content, CreatedAt as created_at
                FROM messages
                WHERE chatid = $1 AND ($2::BIGINT IS NULL OR Id < $2)
                ORDER BY CreatedAt DESC
                LIMIT $3",
                chat_id as _,
                last_message_id as _,
                limit,
            ).fetch_all(&self.0)
            .await?;

        Ok(result)
    }
}

// controllers/messages.rs

const MAX_MESSAGES: i64 = 100;

pub async fn get_messages(
    Extension(auth): Extension<Arc<Auth>>,
    Extension(trace_id): Extension<TraceId>,
    State(state): State<Arc<AppState>>,
    Query(params): Query<GetMessagesParams>,
) -> Result<GetMessagesResponse, ApiError> {
    if !check_chat_access(&*state.chats, auth.user.id, chat_id).await {
        tracing::error!("forbidden");
        return Err(ApiError::Forbidden { trace_id });
    }

    let limit = if params.limit > MAX_MESSAGES {
        MAX_MESSAGES
    } else {
        params.limit
    };

    let mut messages = state
        .messages
        .get_messages(chat_id, limit + 1, params.last_message_id)
        .await
        .map_err(|e| {
            tracing::error!("failed to get messages: {e}");
            ApiError::Unknown { trace_id }
        })?;

    let has_more = messages.len() > limit as usize;

    messages.truncate(limit as usize);

    Ok(GetMessagesResponse { messages, has_more })
}

Документирование

Предлагаю теперь задокументировать наше API. Это можно сделать и в обычном markdown-файлике всё описать и захостить на каком-нибудь Github Pages, но мы пойдём дальше: наша документация будет собираться в красивую HTML-страничку прямо из кода с минимум усилий. Для этого воспользуемся спецификацией OpenAPI, которую поможет нам описать крейт utoipa, а для визуализации на помощь спешит Rapidoc (ещё есть SwaggerUI, Redoc).

Выбор на utoipa пал по причине того, что это довольно популярное решение, а Rapidoc выбрал потому, что Swagger ведёт, мягко говоря, недружелюбную политику, а Redoc по какой-то причине не захотел у меня работать.

Так как мы используем axum, нам потребуется соответствующая реализация, которой служит крейт utoipa_axum, поэтому добавляем и его в проект. И мостиком с Rapidoc выступит крейт utoipa-rapidoc.

cargo add utoipa utoipa_axum
cargo add utoipa-redoc -F axum

Далее нужно на контроллеры навесить специальный атрибутный макрос path. С помощью этого макроса мы описываем путь, задаём описание и тестовые данные для демонстрации работы API. Как пример я уже навесил этот атрибут на контроллер new_user. За большей информацией и примерами загляните в документацию, там довольно популярно показывают и объясняют работу макроса.

/// Create new user
#[utoipa::path(post,
    path = "/users",
    tag = "users", 
    request_body(
        content = LoginUserRequest, 
        description = "User credentials", 
        content_type = "application/x-www-form-urlencoded"),
    responses(
        (status = OK, description = "User created", body = LoginUserResponse),
        (status = BAD_REQUEST, description = "Invalid user credentials", body = ApiError),
        (status = INTERNAL_SERVER_ERROR, description = "Internal server error", body = ApiError)
    )
)]
pub async fn new_user(
    Extension(trace_id): Extension<TraceId>,
    State(state): State<Arc<AppState>>,
    Form(user): Form<LoginUserRequest>,
) -> Result<LoginUserResponse, ApiError> {
    // ...
}

Чтобы utoipa мог автоматически извлекать информацию о моделях данных, нужно навесить на наши модели атрибут ToSchema. А если у вас, как у меня, имеется где-то в структурах умные указатели Arc или Rc, то вам придётся включить поддержку сериализации этих типов в serde и в самом utoipa посредством фич rc и rc_schema, соответственно.

У меня также для представления времени используется крейт time, поэтому добавлю, помимо прочего, фичу time.

cargo add serde -F rc
cargo add utoipa -F rc_schema -F time

Теперь в main.rs можем заменить Router из axum на OpenApiRouter из utoipa_axum и зарегистрировать наши контроллеры с помощью метода routes. Но не спешите волноваться, наш Router вернётся к нам, если применить метод split_for_parts, вместе с описанием OpenAPI.

Я не знаю, баг это или фича, но при попытке добавить в макрос routes! контроллер без полного пути до модуля — выдаёт ошибку о том, что не может найти функцию.

А ещё при попытке добавить за один раз контроллеры GET- и DELETE-запросов, то в рантайме сервер падает с паникой о том, что была предпринята попытка добавить два одинаковых обработчика.

// let messages =
//     Router::new().route("/", get(messages::get_messages).post(messages::new_message));

// let chats = Router::new()
//     .route("/", get(chats::get_chats).post(chats::new_chat))
//     .route("/{chat_id}", delete(chats::remove_chat));

// let app = axum::Router::new()
//     .nest("/chats", chats)
//     .nest("/messages", messages)
//     .route("/logout", get(users::logout_user))
//     .layer(middleware::from_fn_with_state(
//         state.clone(),
//         server::services::auth::auth,
//     ))
//     .route("/", get(|| async { "Hello, World!" }))
//     .route("/users", post(users::new_user))
//     .route("/login", post(users::login_user))
//     .with_state(state)
//     .route_layer(middleware::from_fn(trace));

let (app, api) = OpenApiRouter::new()
        .routes(routes!(messages::new_message, messages::get_messages))
        .routes(routes!(chats::remove_chat))
        .routes(routes!(chats::new_chat, chats::get_chats))
        .routes(routes!(users::logout_user))
        .layer(middleware::from_fn_with_state(
            state.clone(),
            server::services::auth::auth,
        ))
        .routes(routes!(users::login_user))
        .routes(routes!(users::new_user))
        .route("/", get(|| async { "Hello, World!" }))
        .with_state(state)
        .layer(middleware::from_fn(trace))
        .split_for_parts();

В силу того, что некоторые наши контроллеры используют аутентификацию, например, как logout_user, в объект OpenAPI, выдаваемый нам из OpenApiRouter, нужно добавить схему аутентификации, так как utoipa ничего не знает о нашем сервисе аутентификации. Поддерживаются разные схемы аутентификации, о которых можно больше узнать в документации.

У нас аутентификация происходит посредством куки, это не самый удобный способ для OpenAPI, так как JavaScript ограничен в доступе к куки, поэтому придётся руками дёргать ручку входа, чтобы получить токен сессии.

Давайте создадим модификатор SecurityAddon, который будет добавлять схему к нашему объекту ApiDoc. Создаём мы потому, что готовый содержит информацию об авторе utoipa либо он полностью пустой. Добавим немного информации, которая будет отображаться на главной странице документации. Для этого создам отдельный модуль docs.rs.

#[derive(OpenApi)]
#[openapi(info(
   title = "Justice API",
   version = env!("CARGO_PKG_VERSION"),
   description = "Documentation for the Justice API."
), modifiers(&SecurityAddon))]
pub struct ApiDoc;

struct SecurityAddon;

impl Modify for SecurityAddon {
    fn modify(&self, openapi: &mut utoipa::openapi::OpenApi) {
        let mut components = openapi
            .components
            .take()
            .unwrap_or_else(|| ComponentsBuilder::new().build());

        components.add_security_scheme(
            "auth",
            SecurityScheme::ApiKey(ApiKey::Cookie(ApiKeyValue::new("session"))),
        );

        openapi.components = Some(components);
    }
}

Теперь в main.rs можем исправить инициализацию с нашим OpenApi объектом.

// let (app, api) = OpenApiRouter::new()
let (app, api) = OpenApiRouter::with_openapi(ApiDoc::openapi())

Настало время добавить Rapidoc. Для этого проинициализируем объект RapiDoc, передав ему пути, по которым он и OpenAPI спецификация будет хоститься. И затем мержим его в Router.

Метод merge потребляет объект и возвращает его же, поэтому получаеся такая странная строчка let app = app.merge.

#[tokio::main]
async fn main() {
    let _guard = init_logs();

    let db = init_db().await;

    session::start_cleanup_task(db.clone());

    let rng = Arc::new(Mutex::new(SmallRandom::new(807234275934919497)));
    let state = Arc::new(AppState::new(rng, db));

    let (app, api) = OpenApiRouter::with_openapi(ApiDoc::openapi())
        // ...
        .split_for_parts();

    let app = app.merge(utoipa_rapidoc::RapiDoc::with_url(
        "/docs",
        "/openapi.json",
        api,
    ));

    let port = 4000;
    let listener = TcpListener::bind((Ipv4Addr::LOCALHOST, port))
        .await
        .expect("failed to bind port");

    tracing::info!("Listening on port {port}");

    axum::serve(listener, app)
        .await
        .expect("failed to start server");
}

Отлично, теперь запустив сервер и перейдя в браузере по адресу localhost:4000/docs, мы попадём на страницу Rapidoc, который позволит нам прочитать документацию и протестировать наше API.

image.png
rapidoc

События

Клиенту необходимо каким-то образом узнавать о появлении новых чатов или сообщений. Сделать это можно разными способами.

Один из самых простых вариантов — это периодические запросы к API с целью проверить, не появились ли новые данные. Такой подход называется regular polling (обычное опрашивание) или short polling (короткое опрашивание). В этом случае клиент через определённые интервалы времени (например, каждые 5–10 секунд) обращается к серверу и получает обновления. Метод прост в реализации, но имеет ряд недостатков:

  • высокая нагрузка на сервер из-за постоянных повторяющихся запросов;

  • задержки между отправкой нового сообщения и его доставкой клиенту (клиент увидит изменения только при следующем запросе);

  • неэффективное использование сетевых ресурсов.

Более продвинутый механизм — long polling. При нём клиент отправляет запрос на сервер, а сервер удерживает соединение открытым, пока не появятся новые данные. Как только данные приходят, соединение закрывается, и клиент сразу же открывает новое. Таким образом создаётся иллюзия постоянного соединения без регулярного «холостого» опрашивания.

А что, если не закрывать соединение после ответа, а постоянно его удерживать? Именно на базе этого подхода реализованы технологии вроде Server-Sent Events (SSE). SSE позволяет серверу отправлять обновления в одностороннем порядке (от сервера к клиенту) по протоколу HTTP, что упрощает реализацию и хорошо подходит для задач вроде обновления чатов. А самое главное, в том же Axum есть нативная поддержка этого метода.

Отдельно стоит упомянуть WebSockets — это более универсальная технология, обеспечивающая постоянное двустороннее соединение между клиентом и сервером. Она позволяет серверу мгновенно отправлять уведомления клиенту, а клиенту — передавать данные на сервер без необходимости открывать новое соединение для каждого запроса. WebSockets поддерживают передачу как текстовых, так и бинарных данных, что делает их подходящими для различных сценариев. Однако настройка и поддержка WebSocket-соединений сложнее по сравнению с SSE или polling, а также не все прокси и балансировщики корректно работают с ними, что может усложнить инфраструктуру. В простых сценариях, где требуется только односторонняя передача данных от сервера к клиенту, использование WebSockets может оказаться избыточным.

Взято с dzone.com
Взято с dzone.com

Мы же будем использовать SSE по нескольким причинам:

  1. нам не требуется двусторонняя связь в силу того, что клиент у нас взаимодействует с сервером посредством недавно реализованного REST API;

  2. он умеет самостоятельно восстанавливать соединение;

  3. имеет встроенную поддержку в axum.

Надо подумать, как будем организовывать хранение SSE подключений. Я вижу это как HashMap<UserId, broadcast::Sender<SseEvent>>, где у нас будет храниться подключение по идентификатору пользователя, который мы будем получать автоматом из сессии, то есть сначала пользователю нужно пройти авторизацию; а SseEvent будет помогать нам идентифицировать и распределять события по типам при его отправке пользователю.

Мы можем написать что-то типа RwLock<HashMap<..>>, чтобы работать со словарём, но при каждом новом подключении слушателя событий будут блокироваться все потоки, которые пытаются разослать новые события. Для таких случаев существует неблокирующая реализация хэш-карты - DashMap из одноимённого крейта.

В силу того, что Sse из axum принимает поток из Result<Event, axum::Error>, нам потребуется крейт для работы с ними, будем использовать tokio_stream, который имеет в своём арсенале под фичей sync обёртку BroadcastStream для broadcast::Sender<T>, которая позволит избавить от бойлерплейта и трансформировать бродкаст в поток.

Помимо этого, нам надо как-то передать объекты событий пользователю, для этого будем использовать JSON, в который сериализовать объекты нам поможет serde_json.

В ранее упомянутом SeeEvent будет поле event с типом SeeEventType, и, чтобы руками не прописывать сериализацию имён членов этого перечисления, добавим крейт strum.

cargo add dashmap serde_json
cargo add strum -F derive
cargo add tokio_stream -F sync

Создам модуль models/events.rs, в котором опишу несколько моделей. Сходу у нас будет пока что два типа событий: новые сообщения и новые чаты.

#[derive(Clone, strum::AsRefStr)]
pub enum SseEventType {
    Message,
    Chat,
}

#[derive(Clone)]
pub struct SseEvent {
    pub event_type: SseEventType,
    pub data: String,
}

impl SseEvent {
    pub fn new(event_type: SseEventType, data: impl serde::Serialize) -> Self {
        let data = serde_json::to_string(&data).expect("failed to serialize event data");
        Self { event_type, data }
    }
}

Создам ещё один модуль controllers/events.rs и напишу контроллер, который будет подписывать клиентов на новые события. Сначала мы ищем по UserId пользовательское соединение, а если не нашлось, то создаём новое и сразу же подписываемся на прослушку бродкаста. Затем трансформируем бродкаст в поток, опуская любые ошибки или задержанные сообщения.

pub async fn events(
    Extension(auth): Extension<Arc<Auth>>,
    State(state): State<Arc<AppState>>,
) -> Sse<impl Stream<Item = Result<Event, axum::Error>>> {
    let rx = state
        .events
        .entry(auth.user.id)
        .or_insert_with(|| broadcast::channel(16).0)
        .value()
        .subscribe();

    let stream = BroadcastStream::new(rx).filter_map(|msg| async move {
        match msg {
            Ok(SseEvent { event_type, data }) => {
                tracing::trace!("SSE event {} emitted", event_type.as_ref());
                Some(Ok(Event::default().event(event_type.as_ref()).data(data)))
            }
            Err(err) => {
                tracing::error!("SSE error: {err}");
                None
            }
        }
    });

    Sse::new(stream)
}

Теперь в контроллере создания новых сообщений можем отсылать уведомления всем, кто состоит в чате, а для этого нужно сделать новый запрос к базе и получить список участников чата. Поэтому давайте добавим новый метод get_chat_members в ChatsRepository.

// ...
impl ChatsRepository for PgChatsRepository {
		// ...
    async fn get_chat_members(&self, chat_id: ChatId) -> Result<Vec<UserId>, RepositoryError> {
        let members = query_scalar!(
            "SELECT UserId as \\"user_id: _\\" FROM ChatMembers WHERE ChatId = $1",
            chat_id as _
        )
        .fetch_all(&self.0)
        .await?;

        Ok(members)
    }
}

И добавим в AppState наш список подписчиков и их соединения.

pub struct AppState {
    pub random: Arc<Mutex<dyn RandomGenerator>>,
    pub events: Arc<DashMap<UserId, broadcast::Sender<SseEvent>>>,

    pub users: Arc<dyn UsersRepository>,
    pub sessions: Arc<dyn SessionsRepository>,
    pub chats: Arc<dyn ChatsRepository>,
    pub messages: Arc<dyn MessagesRepository>,
}

impl AppState {
    pub fn new(random: Arc<Mutex<dyn RandomGenerator>>, pool: sqlx::PgPool) -> Self {
        Self {
            users: Arc::new(PgUsersRepository::new(pool.clone())),
            sessions: Arc::new(PgSessionsRepository::new(pool.clone())),
            chats: Arc::new(PgChatsRepository::new(pool.clone())),
            messages: Arc::new(PgMessagesRepository::new(pool)),
            events: Arc::new(DashMap::new()),
            random,
        }
    }
}

Теперь можем получить список подписчиков (кроме самого пользователя, к нему и так возвращается идентификатор сообщения) и каждому отправить сообщение.

И вот тут у меня появились сомнения, какого вида событие должно быть: нести в себе только идентификатор нового сообщения или сообщение целиком. Я решил, что целое сообщение будет более приемлемо отправлять, чтобы не было дополнительного запроса для получения сообщения. Поэтому я немного переписал метод create_message и теперь он возвращает Message вместо MessageId.

pub async fn new_message(
    Extension(auth): Extension<Arc<Auth>>,
    Extension(trace_id): Extension<TraceId>,
    State(state): State<Arc<AppState>>,
    Path(chat_id): Path<ChatId>,
    Json(req): Json<NewMessageRequest>,
) -> Result<NewMessageResponse, ApiError> {
		// ...

let message = match state
        .messages
        .create_message(chat_id, auth.user.id, req.content.as_ref())
        .await
    {
		    //  ...
    };

    let chat_members = state.chats.get_chat_members(chat_id).await.map_err(|e| {
        tracing::error!("failed to get chat members: {e}");
        ApiError::Unknown { trace_id }
    })?;

    for member in chat_members {
        if member == auth.user.id {
            continue;
        }

        if let Some(member) = state.events.get(&member) {
            if let Err(e) = member.send(SseEvent::new(
                SseEventType::Message,
                MessageEvent {
                    user_id: auth.user.id,
                    message: message.clone(),
                    chat_id,
                },
            )) {
                tracing::error!("failed to send message event: {e}");
            }
        }
    }

    Ok(NewMessageResponse {
        message_id: message.id,
    })
}

Похожие манипуляции проделываем в new_chat, только в этот раз заменяем тип события на SseEventType::Chat.

pub async fn new_chat(
    Extension(auth): Extension<Arc<Auth>>,
    Extension(trace_id): Extension<TraceId>,
    State(state): State<Arc<AppState>>,
    Json(chat): Json<NewChatRequest>,
) -> Result<NewChatResponse, ApiError> {
    // ...

    let users_ids = if let Some(users_ids) = chat.users_ids
        && !users_ids.is_empty()
    {
        merge_ids(auth.user.id, users_ids)
    } else {
        vec![auth.user.id]
    };

    let chat_id = match state.chats.create_chat(&chat.title, &users_ids).await {
					// ...
    };

    for member in &users_ids {
        if *member == auth.user.id {
            continue;
        }

        if let Some(member) = state.events.get(member) {
            if let Err(err) = member.send(SseEvent::new(
                SseEventType::Chat,
                ChatEvent {
                    title: chat.title.clone(),
                    users_ids: users_ids.clone(),
                    chat_id,
                },
            )) {
                tracing::error!("failed to send event: {err}");
            }
        }
    }

    Ok(NewChatResponse::new(chat_id))
}

К сожалению, Rapidoc плохо поддерживает SSE, но если запустить поток, то в консоли разработчика можно посмотреть, что нам присылает сервер.

image.png
консоль разработчика

Фронтенд

Docker

И вот тут я столкнулся с тем, что при попытке наладить сборку образа нашего сервера, то она падает с ошибкой недоступности базы данных, которую выкидывают макросы sqlx. Я пробовал по разному прокинуть базу, но у меня ничего не завелось. Поэтому будем использовать offline mode у sqlx. Чтобы перейти в него, нужно запустить, в моём случае, соответствующий devcontainer и прописать команду:

cargo sqlx prepare

Данная команда просканирует проект на наличие запросов и закэширует результат проверки через базу в виде json-файлов в папке .sqlx, которая будет в корне папки server.

Создадим привычную нам папку .devcontainer в корне проекта web, и в ней создадим файл devcontainer.json с ниже приведённым содержимым. Думаю, пояснять ничего не надо, всё достаточно просто.

{
    "name": "web-devcontainer",
    "dockerComposeFile": "docker-compose.yml",
    "service": "devcontainer",
    "workspaceFolder": "/workspace/web"
}

Давайте напишем и разберем файл docker-compose.yml. В нем мы определяем три сервиса:

  1. devcontainer – почти такой же, как в начале.

  2. backend – сервер, который мы создали в предыдущей главе. Обратите внимание: контекстом выступает корень проекта, чтобы подхватить миграции сразу после запуска контейнера. Мы также добавили синхронизацию выходной папки фронтенда через volumes для некого подобия hotreload файлов.

  3. database – не будем тут останавливаться, ничего нового.

services:
  devcontainer:
    build:
      context: ..
      dockerfile: .devcontainer/Dockerfile
    command: sleep infinity
    volumes:
      - ../../:/workspace
    depends_on: [ backend ]

  backend:
    build:
      context: ../../
      dockerfile: server/Dockerfile
    command: sqlx migrate run --source ../migrations
    volumes:
      - ../dist:/workspace/server/target/debug/public
    environment:
      - DATABASE_URL=postgres://postgres:password@db/justice
    ports:
      - 4000:4000
    depends_on: [ db ]

  db:
    image: postgres
    environment:
      - POSTGRES_USER=postgres
      - POSTGRES_PASSWORD=password
      - POSTGRES_DB=justice

Нашим JavaScript-рантаймом будет выступать Bun, а собирать проект будет Vite. Первый выбрал чисто из интереса потрогать животинку, а вот Vite мне давно полюбился из-за минимальной настройки.

В свою очередь, фреймворком для построения пользовательского интерфейса будем использовать solid.js. Почему не React, Vue, Angular или какой-нибудь Svelte? Тут так же нет какой-то определённой причины, кроме как я уже работал с React, и Solid похож на него.

В этом случае у нас будет SPA-приложение. Оно отличается от привычного MPA тем, что изначально загружается одна HTML-страница, а всё остальное подгружается уже по ходу дела. Первая загрузка страницы занимает некоторое время, но последующие загрузки будут куда быстрее благодаря кэшированию. Помимо скорости, SPA дают пользователям положительный опыт использования веб-приложения. Из минусов SPA стоит выделить слабую SEO-оптимизацию.

К сожалению, я не нашёл нормального devcontainer с Bun, поэтому соберём его сами! Для этого я создам Dockerfile в нашей папке .devcontainer и напишу нижеизложенное. Крайне простой скрипт сборки, используем базовый образ для devcontainer-ов, задаю пользователя vscode, чтобы Bun не установился в папку рута, после чего грузим скрипт установки Bun и задаём путь до места установки.

FROM mcr.microsoft.com/devcontainers/base:alpine
USER vscode
RUN curl -fsSL <https://bun.sh/install> | bash
ENV PATH="$PATH:/root/.bun/bin/bun"

Создадим ещё один Dockerfile в корне проекта сервера, где определим следующее. Используем официальный образ rust с осью debian, иначе в alpine нужно несколько больше действий проделать, чтобы скомпилировать проект, так как там отсутствуют некоторые зависимости. После этого устанавливаем sqlx-cli, чтобы после сборки можно было провести миграцию базы данных.

Теперь следите за руками, легко запутаться в относительных путях: мы задаём папку workspace как рабочую папку, и копируем папку с миграциями, так как мы в качестве контекста в docker-compose.yml задали корневую папку всего проекта, то есть мы на одном уровне с папками web, server и migrations; после этого задаём рабочую папку нашего сервера и копируем всё также относительно корневой папки всего проекта.

Дальше просто запускаем компиляцию сервера, далее указываем путь до исполняемого файла, который Docker запустит при разворачивании контейнера.

FROM rust:bookworm
RUN cargo install sqlx-cli --no-default-features --features postgres

WORKDIR /workspace
ADD migrations migrations
WORKDIR /workspace/server
ADD server .

RUN cargo build
ENTRYPOINT ["/workspace/server/target/debug/server"]

Отлично, всё работает, это ли не величие? Теперь можем проициализировать наш проект с помощью команды:

bun create solid

Команда создаст костяк проекта в новой папке web, всё её содержимое перетащу в корень проекта. Я думал, что можно как-то проициализировать проект в уже существующей папке, но команда bun init имеет в шаблонах только React-приложения.

А ещё команда сгенерировала кучу мусора. Например, создатели шаблона засунули целую вики Bootstrap в App, которая выбрасывает целую кучу ошибок при запуске, валяются файлы pnpm-lock.yaml и README. Сносим всё.

Запуск

Кстати об запуске, давайте для начала добавим в сервер tower_http с фьючей fs, чтобы стали доступны такие сервисы, как ServeFile и ServeDir.

cargo add tower_http -F fs

В main.rs добавим новые маршруты. nest_service будет переадресовывать пути, начинающиеся на /assets, в сервис, который будет искать нужный нам файл. Если такой не нашёлся, то отсылаем главную страничку пользователю, и уже там наш фронтенд сам будет маршрутизировать запрос, но об этом позже.

#[tokio::main]
async fn main() {
    // ...
    
    let root_path = std::env::current_exe()
        .expect("failed to get executable path");
    let root_path = root_path
        .parent()
        .expect("failed to get parent directory");

    let (app, api) = OpenApiRouter::with_openapi(ApiDoc::openapi())
		// ...
		.nest_service("/assets", ServeDir::new(root_path.join("public").join("assets")))
        .fallback_service(ServeFile::new(root_path.join("public").join("index.html")))
        .split_for_parts();
		
		// ...
}

Проделав такие небольшие манипуляции, можем теперь запустить web-devcontainer, в котором прописать команду, которая будет следить за изменениями в файлах и компилировать их в выходную папку dist в корне проекта. А так как у нас эта папка расшарена с контейнером, то все обновления, произведённые с файлами, будут немедленно доступны на нашем бэкенде.

bun run build -- --watch

Дизайним

Главная страница

Я не буду особо акцентировать внимание на вёрстке страниц, так как сам бубен в этом деле. Но буду показывать результаты. Вот, например, главная страничка. Пустая? Ещё как! Но не в этом суть.

image.png
главная страница

А суть в том, как сделать так, чтобы неавторизированный пользователь видел эту страницу, а пользователь нашего сервиса видел чат? Для этого создам компонент routes/MainPageRouter.tsx, который будет парсить печеньки и возвращать соответствующий компонент.

import { RouteSectionProps } from "@solidjs/router";
import { CookieJar } from "../utils/cookie";
import MainPage from "../pages/MainPage";
import { Component } from "solid-js";

const MainPageRouter: Component<RouteSectionProps> = (
	props: RouteSectionProps,
) => {
	const cookies = new CookieJar(document.cookie);
	const session = cookies.getCookie("session");

	if (!session) {
		return <MainPage />;
	} else {
		return props.children;
	}
};

export default MainPageRouter;

Теперь в App.tsx можем использовать данный компонент, обернув им корневой маршрут до ChatPage, как сделано ниже.

import type { Component } from "solid-js";
import { Route, Router } from "@solidjs/router";
import MainPageRouter from "./routes/MainPageRouter";
import LoginPage from "./pages/LoginPage";
import RegisterPage from "./pages/RegisterPage";
import { UserProvider } from "./contexts/UserContext";
import ChatPage from "./pages/ChatPage";

const App: Component = () => {
	return (
		<UserProvider>
			<Router>
				<Route path="/" component={MainPageRouter}>
					<Route path="/" component={ChatPage} />
				</Route>
				<Route path="/register" component={RegisterPage} />
				<Route path="/login" component={LoginPage} />
			</Router>
		</UserProvider>
	);
};

export default App;

Вообще, по-хорошему, нужно скрыть от клиентского кода нашу сессию в куки, используя флаг httpOnly, и делать запрос к серверу, но мне показалось это излишним (сознайся, что просто лень было лезть в код сервера, чтобы устанавливать этот флаг -_-).

Чатики

Начнём от малого, то есть реализуем подгрузку чатов пользователя и создания новых. Но для начала реализуем выборку пользователей как участников чата. Для этого реализую поиск по пользователям. Чтобы легко и быстро искать пользователей, воспользуемся модулем pg_trgm и создадим специальный индекс из этого модуля.

-- Add up migration script here

CREATE EXTENSION IF NOT EXISTS pg_trgm;

CREATE INDEX idx_users_name_trgm ON users USING gin (name gin_trgm_ops);

Теперь можем реализовать интерфейс поиска в базе, который будет получать все данные о пользователе и сортировать их по схожести со строкой поиска.

#[async_trait::async_trait]
pub trait UsersRepository: Send + Sync {
    // ...
    async fn search_users_by_username(&self, username: &str) -> Result<Vec<User>, RepositoryError>;
}

pub struct PgUsersRepository(sqlx::PgPool);

#[async_trait::async_trait]
impl UsersRepository for PgUsersRepository {
		// ...
    async fn search_users_by_username(&self, username: &str) -> Result<Vec<User>, RepositoryError> {
        let result = sqlx::query!( 
                "SELECT *, similarity(name, $1) AS sim 
                FROM Users 
                WHERE Name % $1
                ORDER BY sim DESC
                LIMIT 5", username)
            .fetch(&self.0)
            .filter_map(|row| {
                match row {
                    Ok(row) => Some(User {
                        id: UserId::new(row.id),
                        username: row.name,
                        password: row.password,
                        created_at: row.createdat,
                    }),
                    Err(err) => {
                        tracing::error!("failed to search users: {}", err);
                        None
                    }
                }
            })
            .collect::<Vec<_>>()
            .await;

        Ok(result)
    }
}

Отлично, реализуем контроллер, который будет взаимодействовать с этим интерфейсом.

/// Search users
#[utoipa::path(
    get,
    path = "/search/users",
    tags = ["users", "search"],
    params(
        ("username" = String, Query, description = "Username to search for")
    ),
    responses(
        (status = OK, description = "Users found", body = SearchUsersResponse),
        (status = INTERNAL_SERVER_ERROR, description = "Internal server error", body = ApiError)
    )
)]
pub async fn search_users(
    Extension(trace_id): Extension<TraceId>,
    State(state): State<Arc<AppState>>,
    Query(params): Query<SearchUsersQuery>,
) -> Result<SearchUsersResponse, ApiError> {
    let errors = params.username.validate();

    if !errors.is_empty() {
        return Err(ApiError::Validation {
            fields: HashMap::from([("username".to_string(), errors)]),
            trace_id,
        });
    }

    let users = state.users.search_users_by_username(&params.username).await;
    match users {
        Ok(users) => Ok(SearchUsersResponse(
            users.into_iter().map(|user| user.into()).collect(),
        )),
        Err(err) => {
            tracing::error!("failed to search users: {}", err);
            Err(ApiError::Unknown {
                trace_id: trace_id.clone(),
            })
        }
    }
}

Теперь можем сверстать компонент, который будет выводить список чатов пользователя. Он будет также содержать часть логики, ответственной за создание нового чат.

// MemberInput.tsx

import { Button, Form } from "solid-bootstrap";
import { User } from "../models/users";
import { createSignal, For, onCleanup, Show } from "solid-js";

interface MemberInputProps {
	index: number;
	value: string;
	removeMember: (index: number) => void;
	setId: (index: number, id: number) => void;
}

export default function MemberInput(props: MemberInputProps) {
	const [users, setUsers] = createSignal<User[]>([]);
	const [success, setSuccess] = createSignal(false);

	let timer: number;

	const onInputMember = async (e: InputEvent) => {
		const input = e.currentTarget as HTMLInputElement;
		setSuccess(false);

		clearTimeout(timer);
		timer = window.setTimeout(() => runQuery(input.value), 500);
	};

	const runQuery = async (value: string) => {
		if (value.length < 3 || value.length > 30) return;

		const res = await fetch(
			"/search/users?" + new URLSearchParams({ username: value }),
		);
		if (!res.ok) return;

		const users: User[] = await res.json();
		setUsers(users);

		const u = users.find((u) => u.username === value);
		if (u !== undefined) {
			props.setId(props.index, u.id);
			setSuccess(true);
		}
	};

	onCleanup(() => clearTimeout(timer));

	return (
		<Form.Group class="d-flex flex-row mb-1 mt-1">
			<Form.Control
				type="text"
				value={props.value}
				onInput={onInputMember}
				list={"user_search_" + props.index}
				isValid={success()}
			/>
			<Show when={users()}>
				<datalist id={"user_search_" + props.index}>
					<For each={users()}>{(user) => <option value={user.username} />}</For>
				</datalist>
			</Show>
			<Button
				variant="danger"
				class="ms-1"
				onClick={() => props.removeMember(props.index)}
			>
				X
			</Button>
		</Form.Group>
	);
}

// ChatList.tsx
import "./ChatList.tsx.css";
import { createSignal, For, onMount } from "solid-js";
import { createStore, SetStoreFunction } from "solid-js/store";
import { Chat, ChatCreationResponse, Member } from "../models/chats";
import { Button, Form, ListGroup, Modal } from "solid-bootstrap";
import MemberInput from "./MemberInput";
import { useLocation } from "@solidjs/router";

export interface ChatsListProps {
	chats: Chat[];
	setChats: SetStoreFunction<Chat[]>;
}

export default function ChatsList({ chats, setChats }: ChatsListProps) {
	const params = useLocation();

	const [members, setMembers] = createStore<Member[]>([]);
	const [title, setTitle] = createSignal("");
	const [show, setShow] = createSignal(false);

	const handleClose = () => {
		setShow(false);
		setTitle("");
		setMembers([]);
	};

	const handleCreate = async (e: SubmitEvent) => {
		e.preventDefault();

		// remove repetitions or unspecified users without id
		const usersMap = new Map(
			members.filter((m) => m.id !== undefined).map((m) => [m.value, m.id]),
		);

		const users_ids: number[] = Array.from(
			usersMap
				.keys()
				.map((u) => usersMap.get(u))
				.filter((u) => u !== undefined),
		);

		const data = {
			title: title(),
			users_ids,
		};

		const res = await fetch("/chats", {
			method: "POST",
			headers: {
				"Content-Type": "application/json",
			},
			body: JSON.stringify(data),
		});

		if (!res.ok) {
			console.error(await res.json());
			return;
		}

		const chat_id: ChatCreationResponse = await res.json();

		setChats(chats.length, { id: chat_id.chat_id, title: title() });
		handleClose();
	};

	onMount(async () => {
		const res = await fetch("/chats");
		if (res.ok) {
			const chats: Chat[] = await res.json();
			setChats(chats);
		} else {
			console.error(res.status);
			console.error(await res.json());
		}
	});

	return (
		<div class="border-end chats-list-min-width h-100">
			<div class="border-bottom p-3 cursor" onClick={() => setShow(true)}>
				<span>New chat</span>
			</div>

			<ListGroup defaultActiveKey={params.hash}>
				<For each={chats.reverse()}>
					{(chat) => (
						<ListGroup.Item action href={"#" + chat.id}>
							{chat.title}
						</ListGroup.Item>
					)}
				</For>
			</ListGroup>

			<Modal
				show={show()}
				onHide={handleClose}
				aria-labelledby="contained-modal-title-vcenter"
				centered
			>
				<Modal.Header closeButton>
					<Modal.Title id="contained-modal-title-vcenter">New chat</Modal.Title>
				</Modal.Header>
				<Form onSubmit={handleCreate}>
					<Modal.Body>
						<Form.Control
							type="text"
							onInput={(e) => setTitle(e.currentTarget.value)}
						/>
						<For each={members}>
							{(m, i) => (
								<MemberInput
									index={i()}
									value={m.value}
									setId={(idx, id) => setMembers(idx, "id", id)}
									removeMember={(idx) =>
										setMembers([...members.filter((_, index) => index !== idx)])
									}
								/>
							)}
						</For>
						<Button
							variant="link"
							onClick={() => setMembers(members.length, { value: "" })}
						>
							Add member
						</Button>
					</Modal.Body>
					<Modal.Footer>
						<Button onClick={handleClose} type="reset">
							Close
						</Button>
						<Button type="submit">Create</Button>
					</Modal.Footer>
				</Form>
			</Modal>
		</div>
	);
}

Настала очередь чата и отправки сообщений! Не уверен, нужны ли тут какие-либо пояснения, довольно шаблонный код. Хотя стоит добавить, для чего нужна функция refCallback. Она нужна для того, чтобы передавать в родительский компонент ссылку на контейнер и для прокрутки чата до низа, когда приходит новое сообщение, ведь именно родительский компонент будет слушать наш SSE-поток и передавать обновлённое состояние в дочерние компоненты.

import { Button, Form } from "solid-bootstrap";
import {
	GetChatMessagesResponse,
	Message,
	SendMessageResponse,
} from "../models/chats";
import {
	Accessor,
	createEffect,
	createSignal,
	For,
	onCleanup,
	Setter,
	Show,
} from "solid-js";
import { useUsers } from "../contexts/UserContext";
import ChatMessage from "./Message";

export interface ChatProps {
	messages: Accessor<Message[]>;
	setMessages: Setter<Message[]>;
	hasMore: Accessor<boolean>;
	setHasMore: Setter<boolean>;
	chatId: Accessor<string>;
	refCallback: (el: HTMLDivElement) => void;
}

export default function ChatView({
	messages,
	setMessages,
	hasMore,
	setHasMore,
	chatId,
	refCallback,
}: ChatProps) {
	let container!: HTMLDivElement;
	let sentinel!: HTMLDivElement;
	let observer: IntersectionObserver;

	createEffect(async () => {
		refCallback(container);
		if (!chatId()) return;

		const res = await fetch(
			`/chats/${chatId()}?${new URLSearchParams({ limit: "10" })}`,
		);

		if (!res.ok) {
			console.error(await res.json());
			return { has_more: false, messages: [] };
		}

		const body: GetChatMessagesResponse = await res.json();
		setMessages(body.messages.reverse());
		setHasMore(body.has_more);

		requestAnimationFrame(() => {
			if (container) container.scrollTop = container.scrollHeight;
		});

		if (body.has_more) {
			observer = new IntersectionObserver(async (entries) => {
				if (entries[0].isIntersecting && hasMore()) {
					const last_message = messages().at(0);
					if (!last_message) return;

					const urlParams = {
						limit: "5",
						last_message_id: last_message.id.toString(),
					};
					const res = await fetch(
						`/chats/${chatId()}?${new URLSearchParams(urlParams)}`,
					);
					if (!res.ok) {
						console.error(await res.json());
						return;
					}

					const body: GetChatMessagesResponse = await res.json();
					setMessages([...body.messages.reverse(), ...messages()]);
					setHasMore(body.has_more);
				}
			});
			observer.observe(sentinel);
		}
	});

	onCleanup(() => observer.disconnect());

	const { users } = useUsers();
	const [content, setContent] = createSignal("");

	const sendMessage = async () => {
		const trim_content = content().trim();

		if (!trim_content || !users.currentUser) {
			return;
		}

		const data = {
			content: trim_content,
		};

		const res = await fetch(`/chats/${chatId()}`, {
			method: "POST",
			headers: {
				"Content-Type": "application/json",
			},
			body: JSON.stringify(data),
		});

		if (!res.ok) {
			console.error(res.status);
			console.error(await res.json());
			return;
		}

		const body: SendMessageResponse = await res.json();
		const message: Message = {
			content: content(),
			id: body.message_id,
			created_at: new Date().toISOString(),
			sender_id: users.currentUser.id,
			chat_id: Number.parseInt(chatId()),
		};

		setMessages([...messages(), message]);
		setContent("");

		requestAnimationFrame(() => {
			if (container) container.scrollTop = container.scrollHeight;
		});
	};

	const onSendMessage = async (e: SubmitEvent) => {
		e.preventDefault();
		sendMessage();
	};

	const onKeyDown = (e: KeyboardEvent) => {
		if (e.ctrlKey && e.key === "Enter") {
			e.preventDefault();
			sendMessage();
		}
	};

	return (
		<div class="w-100 d-flex flex-column">
			<div
				class="overflow-auto flex-grow-1"
				ref={container}
			>
				<Show when={hasMore()}>
					<div ref={sentinel} />
				</Show>

				<For each={messages()}>{(msg) => <ChatMessage {...msg} />}</For>
			</div>
			<Show when={chatId()}>
				<Form class="d-flex flex-row" onSubmit={onSendMessage}>
					<Form.Control
						as="textarea"
						class="flex-grow-1"
						style={{ height: "100px" }}
						value={content()}
						onInput={(e) => setContent(e.currentTarget.value)}
						onKeyDown={onKeyDown}
					/>
					<Button type="submit">Send</Button>
				</Form>
			</Show>
		</div>
	);
}

Объединим эти компоненты и подпишемся на SSE.

import { A, useLocation } from "@solidjs/router";
import { Button } from "solid-bootstrap";
import ChatsList from "../components/ChatsList";
import { createMemo, createSignal, onCleanup, onMount } from "solid-js";
import { Chat, Message } from "../models/chats";
import { NewChatEvent, NewMessageEvent } from "../models/events";
import ChatView from "../components/Chat";
import { createStore } from "solid-js/store";

export default function ChatPage() {
	const params = useLocation();
	const chatId = createMemo(() => params.hash.slice(1));
	const [hasMore, setHasMore] = createSignal(false);
	const [messages, setMessages] = createSignal<Message[]>([]);
	const [chats, setChats] = createStore<Chat[]>([]);
	let chatContainer!: HTMLDivElement;

	onMount(() => {
		const events = new EventSource("/events");

		events.addEventListener("Message", (event) => {
			const eventData: NewMessageEvent = JSON.parse(event.data);
			if (chatId() === eventData.chat_id.toString()) {
				setMessages([...messages(), eventData.message]);
				requestAnimationFrame(() => {
					if (chatContainer) chatContainer.scrollTop = chatContainer.scrollHeight;
				});
			}
		});

		events.addEventListener("Chat", (event) => {
			const eventData: NewChatEvent = JSON.parse(event.data);
			const chat: Chat = {
				id: eventData.chat_id,
				title: eventData.title,
			};
			setChats(chats.length, chat);
		});

		onCleanup(() => {
			events.close();
		});
	});

	return (
		<div class="w-100 h-100 d-flex flex-row">
			<aside class="d-flex h-100 align-items-end fit p-3 border-end">
				<A href="/logout" target="_self">
					<Button variant="danger">Exit</Button>
				</A>
			</aside>
			<main class="d-flex flex-row w-100">
				<ChatsList {...{ chats, setChats }} />
				<ChatView {...{ hasMore, setHasMore, messages, setMessages, chatId }} refCallback={el => chatContainer = el} />
			</main>
		</div>
	);
}

Итоги

image.png

У нас получился минимально рабочий чат, ему ещё многого не хватает, например, SSE держит только одно подключение, отсутствует статус прочитанного у сообщений, локализации или конфигурационного файла, но можем продолжать улучшать его в других статьях, где добавим CI/CD, подумаем над HTTPS и безопасностью в целом, загрузкой файлов и прочее, включая вышеописанное. Если вам будет интересно, конечно же.

Не знаю, как для вас, но лично для меня, пока готовил материал, искал источники, переписывал код и прочее, статья оказалась очень полезная, помогла расставить некоторую информацию по полочкам или заметить, может быть, для вас крайне очевидные моменты и исправить их (или, по крайней мере, в будущем исправлю).

Я не ручаюсь за работоспособность кода из статьи, ибо он адаптировался под ход статьи, чтобы вам было проще воспринимать его, так как писался он совершенно в другой последовательности. Поэтому советую сначала заглянуть в репозиторий.

Комментарии (5)


  1. a3w3r
    22.10.2025 17:32

    Прикольно. Недавно сам попробовал сделать мессенджер на Rust. Делал фокус на личке и приватности.


  1. rail-ka
    22.10.2025 17:32

    Здорово! Это мой любимый стэк (Rust + SolidJS)! Неужели кто-то еще использует SolidJS, нужно больше продвигать его в русскоязычной среде (как и Rust).

    Добавлю что у @solidjs/router есть мощные и удобные хуки createAsync и query, которые позволяют кешировать запросы и делать предварительные запросы, пока грузится асинхронный компонент (аналог react query или tanstack query).

    Вообще SolidJS нравится тем, что у него все есть из коробки, что избавляет от выбора react query vs tanstack query, выбор router-а, state manager и других библиотек...


  1. subzey
    22.10.2025 17:32

    Как только данные приходят, соединение закрывается, и клиент сразу же открывает новое. <…> Именно на базе этого подхода реализованы технологии вроде Server-Sent Events (SSE).

    Нет, это неправда, не вводите людей в заблуждение!
    SSE не закрывает соединение после отправки сообщения, у вас же на диаграммах это изображено


    1. Ertanic Автор
      22.10.2025 17:32

      Да, моя оплошность, спасибо за внимательность, поправил.


  1. subzey
    22.10.2025 17:32

    Хорошим тоном будет трансформация пароля алгоритмом хеширования sha256…

    Похоже, это из какой-то древней методички, просто sha с солью уже давно недостаточно.

    NIST рекомендует хотя бы PBKDF2 с хотя бы 600 000 итераций. (Грубо говоря, это sha256 выполненный 600 000 раз).

    А ещё лучше, что-нибудь более специализированное и защищённое от перебора на видеокарте.

    Пардон за душнения, я понимаю, что это пет-проект. Но не дай боже кто-нибудь это прочитает и в прод утащит)