Привет, Хабр! Ведущий системный программист компании "Криптонит" Михаил Доронин поделился опытом написания промежуточного ПО — middleware. Оно часто используется в веб-разработке. Например, веб-сервер может использовать middleware для обработки запросов до того, как они будут переданы основному приложению. Так удобно выполнять аутентификацию, логирование, сжатие данных и другую обработку запросов. Наша команда разработчиков на Rust использует для создания middleware библиотеку Tower. О ней и пойдёт речь в этой статье.

Ключевым понятием Tower является типаж Service, суть которого на псевдо-Rust некоторого (надеюсь, не столь отдалённого) будущего, сводится к следующей записи:

rust
trait Service<Req, Rep, E> = AsyncFnMut(Req) -> Result<Rep, E>

; где Req — обобщённый тип запроса, Rep — тип ответа, а E — тип ошибки.

То есть Service — это просто асинхронная функция, трансформирующая запрос в ответ. Она принимает запрос и возвращает ответ или ошибку.

Эта абстракция может быть использована для моделирования как клиентов, так и серверов. Таймаут, ограничение скорости, балансировка нагрузки и т.п. могут быть смоделированы как Services, которые оборачивают некоторый внутренний сервис и применяют дополнительное поведение до или после вызова внутреннего сервиса.

Вот как выглядит определение типажа Service сейчас на текущей версии Rust v.1.80.

rust
pub trait Service<Request> {
    type Response;
    type Error;
    type Future: Future
    where
        <Self::Future as Future>::Output == Result<Self::Response, Self::Error>;

    fn poll_ready(
        &mut self, 
        cx: &mut Context<'_>
    ) -> Poll<Result<(), Self::Error>>;
    fn call(&mut self, req: Request) -> Self::Future;
}

При имплементации этого типажа нам необходимо задаться типом запроса и ответа, ошибки, и конкретным типом Future, который и будет выдавать этот ответ.

Практические примеры

Представим, что у нас есть HTTP-сервис, который, в свою очередь, от имени клиента посылает запросы другим сервисам, чтобы предоставить ответ клиенту. Неплохо было бы, чтобы токен пользователя автоматически попадал во все запросы во внешние сервисы в рамках одного запроса пользователя.

Вот несколько сокращённая имплементация сервиса ReuseClientAuthServiceService, который является обёрткой над другим сервисом — S. Первый сервис будет получать запрос, брать токен из заголовков и класть его в task local.

Обратите внимание, что для объединения промежуточного ПО с сервисами используется дополнительная абстракция — типаж Layer. Это функция, принимающая сервис одного типа и возвращающая сервис другого типа.

rust
use std::task::{Context, Poll};

use http::{self, header::AUTHORIZATION, HeaderValue, Request, Response};
use tokio::task::futures::TaskLocalFuture;
use tower::{Layer, Service};

use crate::reuse_auth::TOKEN;

impl<ReqBody, ResBody, S> Service<Request<ReqBody>>
    for ReuseClientAuthServiceService<S>
where
    S: Service<Request<ReqBody>, Response = Response<ResBody>>,
{
    // Мы не меняем ни тип ответа, ни тип ошибки, поэтому в качестве них используем соответствующие типы из оборачиваемого сервиса.
    type Response = S::Response;
    type Error = S::Error;
    // Возвращаем Future, который возвращает [scope](https://docs.rs/tokio/latest/tokio/task/struct.LocalKey.html#method.scope)
    type Future = TaskLocalFuture<Option<HeaderValue>, S::Future>;

    fn poll_ready(
        &mut self,
        cx: &mut Context<'_>,
    ) -> Poll<Result<(), Self::Error>> {
        self.inner.poll_ready(cx) // Мы готовы отвечать, когда оборачиваемый сервис готов
    }

    // Достаём авторизационный хедер и кладём его в task local
    fn call(&mut self, request: http::Request<ReqBody>) -> Self::Future {
        let headers = request.headers();
        let token = headers.get(AUTHORIZATION).cloned();
        TOKEN.scope(token, self.inner.call(request))
    }
}

Алгоритм работает следующим образом:

  1. сначала импортируются необходимые модули и библиотеки;

  2. затем определяется общая структура сервиса ReuseClientAuthServiceService с параметрами типов ReqBody (тело запроса), ResBody (тело ответа) и S (оборачиваемый сервис);

  3. для данной структуры определяются связанные типы Response (тип ответа) и Error (тип ошибки), которые берутся из оборачиваемого сервиса S;

  4. далее определяется тип Future как TaskLocalFuture, который возвращает Option<HeaderValue> и оборачивает будущий результат вызова для сервиса S;

  5. потом реализуется метод poll_ready, который делегирует выполнение методу poll_ready оборачиваемого сервиса S.

  6. на последнем шаге реализуется метод call, который извлекает авторизационный хедер из запроса, кладёт его в task local и затем вызывает оборачиваемый сервис S, передавая модифицированный запрос.

Как можно видеть, этот код показывает способ добавления авторизационного хедера в task local перед выполнением запроса через обёрнутый сервис.

Теперь посмотрим, как может выглядеть  Service для оборачивания. Он будет делать обратную операцию, доставая токен из task local и помещая его в заголовок запроса. То есть, он обеспечивает добавление токена авторизации в заголовок запроса перед передачей этого запроса внутреннему сервису inner.

rust
use std::task::{Context, Poll};

use http::{self, header::AUTHORIZATION};
use tower::Service;

use crate::reuse_auth::TOKEN;

impl<Body, S> Service<http::Request<Body>> for ReuseClientAuthClientService<S>
where
    S: Service<http::Request<Body>> + Send + 'static,
    S::Response: Send,
    S::Future: Send,
    Body: Send + 'static,
{

    // Не меняем ни тип ответа, ни тип ошибки, ни Future
    type Response = S::Response;
    type Error = S::Error;
    type Future = S::Future;

    fn poll_ready(
        &mut self,
        cx: &mut Context<'_>,
    ) -> Poll<Result<(), Self::Error>> {
        self.inner.poll_ready(cx)
    }

    // Для клиента нужна обратная операция.
    // Достаём из task local токен, который положили туда до этого и кладём его 
    // в соответствующий заголовок
    fn call(&mut self, mut request: http::Request<Body>) -> Self::Future {
        let _ = TOKEN.try_with(|token| {
            if let Some(token) = token {
                request.headers_mut().insert(AUTHORIZATION, token.clone());
            }
        });
        self.inner.call(request)
    }
}

Метод poll_ready вызывается для проверки готовности сервиса к обработке запросов. Метод call добавляет токен авторизации к запросу, если он доступен в task local, затем передаёт запрос во внутренний сервис inner.

Теперь давайте рассмотрим чуть более сложный пример со своей собственной Future. Давайте создадим grpc middleware для аутентификации пользователя, который будет выдавать соответствующий статус, если пользователь не аутентифицирован.

rust
#[derive(Clone)]
pub struct AuthenticatedService<S, A> {
    pub inner: S,
    pub auth: A,
}

impl<S, A, ReqBody, ResBody> Service<http::Request<ReqBody>>
    for AuthenticatedService<S, A>
where
    A: FnMut(&str) -> jsonwebtoken::errors::Result<UserInfo>,
    ResBody: Default,
    S: Service<http::Request<ReqBody>, Response = http::Response<ResBody>>,
{
    type Response = http::Response<ResBody>;
    type Error = S::Error;
    type Future = TaskLocalFuture<UserInfo, ResponseFuture<S::Future>>;

    #[inline]
    fn poll_ready(
        &mut self,
        cx: &mut Context<'_>,
    ) -> Poll<Result<(), Self::Error>> {
        self.inner.poll_ready(cx)
    }

    fn call(&mut self, request: http::Request<ReqBody>) -> Self::Future {
        let headers = request.headers();
        let user_info = headers
            .get(AUTHORIZATION)
            .ok_or("no authorization field in header".into())
            .and_then(|value| value.to_str().map_err(|err| format!("error while converting authorization field to string: {err}")))
            .and_then(|token| (self.auth)(token).map_err(|err| format!("error while authenticating with token: {err}")));
        let (user_info, fut) = match user_info {
            Ok(user_info) => {
                (user_info, ResponseFuture::future(self.inner.call(request)))
            }
            Err(msg) => (
                UserInfo::default(),
                ResponseFuture::status(Status::unauthenticated(msg)),
            ),
        };
        USER_INFO.scope(user_info, fut)
    }
}

#[pin_project]
#[derive(Debug)]
pub struct ResponseFuture<F> {
    #[pin]
    kind: Kind<F>,
}

impl<F> ResponseFuture<F> {
    fn future(future: F) -> Self {
        Self {
            kind: Kind::Future(future),
        }
    }

    fn status(status: Status) -> Self {
        Self {
            kind: Kind::Status(Some(status)),
        }
    }
}

#[pin_project(project = KindProj)]
#[derive(Debug)]
enum Kind<F> {
    Future(#[pin] F),
    Status(Option<Status>),
}

impl<F, E, B> Future for ResponseFuture<F>
where
    F: Future<Output = Result<http::Response<B>, E>>,
    B: Default,
{
    type Output = Result<http::Response<B>, E>;

    fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
        match self.project().kind.project() {
            KindProj::Future(future) => future.poll(cx),
            KindProj::Status(status) => {
                let status = status.take().unwrap();
                let (parts, _) = status.to_http().into_parts();
                let response = Response::from_parts(parts, B::default());
                Poll::Ready(Ok(response))
            }
        }
    }
}

Здесь структура AuthenticatedService имеет два обобщённых параметра S и A, представляющие соответственно внутреннюю службу и аутентификацию. Метод call извлекает информацию из заголовков запроса, а затем аутентифицирует пользователя с использованием функции auth. В зависимости от результата аутентификации создаётся ResponseFuture, который обрабатывает либо будущий ответ службы, либо статус "Не аутентифицирован".

Структура ResponseFuture содержит внутренний тип Kind, который может представлять будущий ответ или статус. Реализация метода poll для ResponseFuture возвращает результат в соответствии с типом Kind, который был задан. 

Обратите внимание, что в примере используются макрос pin-project и трейты, которые были введены в Rust, начиная с версии 1.0. 

Хотя в этом примере мы достаём заголовок и аутентифицируем пользователя с помощью предоставленного нам замыкания, эта часть нам не сильно интересна. Больше интересует нас структура ResponseFuture и имплементация типажа Future для неё. 

Хотелось бы вместо всего этого указывать в качестве ассоциированного типажа Future в Service следующее:

rust
    type Future: impl Future<Output = S::Future::Output>

Но, к сожалению, этот функционал в язык не могут добавить уже в течение 5 лет. Поэтому пока приходится писать собственные реализации типажей Future.

Здесь ResponseFuture обёрнута макросом pin_project, который позволяет упростить работу с Pin, и представляет собой суммарный тип, который может содержать или ошибку аутентификации, представленную типом tonic::Status, или Future, который определён обёрнутым сервисом.

При отсутствии ошибки мы просто ожидаем Future, а вот в случае возникновения ошибки всё становится немного интереснее.

Поскольку тело HTTP-ответа в виде типа B у нас может быть чем угодно, то мы не имеем возможности его создать, а без тела нельзя создать ответ. Я вышел из этой ситуации ограничением типа B типажом Default. Можно также было ограничить его типажом http_body::Body, как это сделано в реализации Interceptor в tonic, но тогда это ещё сильнее сузило бы количество сервисов, которые можно обернуть этим middleware.

В этом я вижу существенное ограничение экосистемы Rust в виде crate http, в котором запрос и ответ представлены произвольными generic-типами, дабы имплементации клиентов и серверов могли её использовать с любыми своими типами, представляющими контент, вроде Vec<u8> или Stream<Item = Bytes> и так далее.

С одной стороны, это решение понятно, а с другой — оно не позволяет подменять тело ответа в middleware для произвольного HTTP-сервера. Даже в случае, если это известный типаж, вроде http_body::Body, его придётся класть в Box, что повлияет на производительность.

Вместо заключения

В общем случае Tower предоставляет слой абстракции для различного промежуточного ПО. С её помощью можно создать точку интеграции между кодом приложения и библиотеками,  реализующими серверную или клиентскую часть для различных сетевых протоколов. Tower используется всё чаще и поддерживается в ряде библиотек сторонних разработчиков: hyper: (низкоуровневая реализация HTTP), tonic (реализация gRPC-over-HTTP/2), warp (составной фреймворк), tower-lsp (реализация протокола языкового сервера) и т. д.

Другие примеры реализации промежуточного ПО с помощью Tower смотрите здесь и здесь.

Если вам интересен Rust, прочтите интервью с Михаилом Дорониным об этом прекрасном языке.

Прямо сейчас у нас в команде открыта вакансия Rust Developer. Откликайтесь сами или передавайте друзьям! ?

Комментарии (0)