Недавно столкнулся с проблемой подружить веб-фреймворк Axum и библиотеку rust-s3. Собственно, задача сделать 2 эндпойнта:
Загрузка файла в хранилище и получение ссылки
Скачка файла из хранилища по ссылке
Разумеется, без временных файлов и без удержания целиком всех данных файла в памяти.
Так как для работы с S3 нужны некоторые служебные объекты (настройки доступа к конкретному bucket), вынесем непосредственную работу в структуру UploadService
:
#[derive(Clone)]
pub struct UploadService {
bucket: Arc<s3::Bucket>
}
Для внедрения зависимости (DI) в обработчик эндпойнта, нам необходимо, чтобы наша структура реализовывала трейт Clone
. Так как сервис будет клонироваться на каждый запрос, обернём s3::Bucket
в Arc
, чтобы клонирование было максимально дешёвым.
Теперь реализуем конструктор экземпляра сервиса:
use s3::{Bucket, Region};
use s3::creds::Credentials;
...
impl UploadService {
pub fn new() -> Self {
let bucket_name = std::env::var("UPLOAD_BUCKET_NAME")
.expect("Expected UPLOAD_BUCKET_NAME environment variable");
let region = Region::Custom {
region: std::env::var("UPLOAD_BUCKET_REGION")
.expect("Expected UPLOAD_BUCKET_REGION environment variable"),
endpoint: std::env::var("UPLOAD_BUCKET_ENDPOINT")
.expect("Expected UPLOAD_BUCKET_ENDPOINT environment variable")
};
let credentials = Credentials::new(
Some(
&std::env::var("UPLOAD_BUCKET_ACCESS_KEY")
.expect("Expected UPLOAD_BUCKET_ACCESS_KEY environment variable")
),
Some(
&std::env::var("UPLOAD_BUCKET_SECRET_KEY")
.expect("Expected UPLOAD_BUCKET_SECRET_KEY environment variable")
),
None,
None,
None
).unwrap();
let bucket = Bucket::new(&bucket_name, region, credentials).unwrap()
.with_path_style();
Self {
bucket: Arc::new(bucket)
}
}
...
Сервис конфигурируется с помощью переменных окружения UPLOAD_BUCKET_NAME
, UPLOAD_BUCKET_REGION
, UPLOAD_BUCKET_ACCESS_KEY
, UPLOAD_BUCKET_SECRET_KEY
и UPLOAD_BUCKET_ENDPOINT
. Последний параметр необходим так как я использую не Amazon S3, а другого S3-совместимого провайдера (Scaleway). При использовании Amazon S3 можно явно задать нужный регион с помощью одного из значений из перечисления s3::Region
(например, s3::Region::UsWest1
), либо воспользоваться s3::Region::from_str
для парсинга региона из строки типа us-west-1
. Кстати, в наборе перечислений региона помимо стандартных регионов Amazon есть ещё Digital Ocean, Wasabi и Yandex.
Теперь самое сложное – функция загрузки файла в хранилище:
use std::sync::{Arc, Mutex};
use std::path::Path;
use std::ffi::OsStr;
use axum::http::StatusCode;
use axum::extract::multipart::Field;
use async_hash::{Sha256, Digest};
use async_compat::CompatExt;
use futures::TryStreamExt;
use uuid::Uuid;
...
pub async fn upload<'a>(&self, field: Field<'a>) -> Result<String, StatusCode> {
let orig_filename = field.file_name()
.unwrap_or("file")
.to_owned();
let mimetype = field.content_type()
.unwrap_or("application/octet-stream")
.to_owned();
let digest = Arc::new(Mutex::new(Sha256::new()));
let mut reader = field
.map_ok(|chunk| {
if let Ok(mut digest) = digest.lock() {
digest.update(&chunk);
}
chunk
})
.map_err(|err| std::io::Error::new(std::io::ErrorKind::Other, err))
.into_async_read()
.compat();
let tmp_filename = format!("tmp/{}.bin", Uuid::new_v4());
self.bucket.put_object_stream_with_content_type(
&mut reader,
&tmp_filename,
&mimetype
)
.await
.map_err(|err| {
log::error!("S3 upload error: {:?}", err);
StatusCode::INTERNAL_SERVER_ERROR
})?;
drop(reader); // Release digest borrow
let mut result = Err(StatusCode::INTERNAL_SERVER_ERROR);
if let Some(digest) = Arc::into_inner(digest).and_then(|m| m.into_inner().ok()) {
let digest = hex::encode(digest.finalize());
let ext = Path::new(&orig_filename).extension().and_then(OsStr::to_str);
let mut filename = if let Some(ext) = ext {
format!("{}.{}", digest, ext)
} else {
digest
};
filename.make_ascii_lowercase();
match self.bucket.copy_object_internal(&tmp_filename, &filename).await {
Ok(_) => result = Ok(format!("/uploads/{}", &filename)),
Err(err) => log::error!("S3 copy error: {:?}", err)
}
}
if let Err(err) = self.bucket.delete_object(&tmp_filename).await {
log::error!("S3 delete error: {:?}", err);
}
result
}
...
Функция принимает одно поле из multipart/form-data
запроса (обработка запроса будет рассмотрена ниже), определяет исходное имя файла и mime-тип (если эти данные отсутствуют, используется “file
” и “application/octet-stream
” в качестве значений по умолчанию). Затем данные поля превращаются в AsyncRead
с помощью библиотеки async-compat. При этом наш читатель потока по мере чтения потока вычисляет его SHA256 хеш (пригодится в будущем).
Теперь мы можем загрузить файл в S3-хранилище под временным именем “tmp/<UUID>.bin
” (UUID генерируется случайным образом). Если в этот момент возникает ошибка, функция возвращает код Internal Server Error.
Мы имеем файл в S3-хранилище и посчитанный SHA256 от его данных. Теперь можно переименовать файл в его окончательное имя (я хочу использовать SHA256 в качестве имени файла, чтобы одинаковые файлы не дублировались в хранилище). Для этого я беру HEX-представление SHA256 и приписываю расширение файла взятое из оригинального имени (если оно там было). Результат приводится к нижнему регистру (на случай если расширение файла было не в нижнем регистре) и далее мы выполняем копирование S3-объекта (так как API S3 не имеет функции переименования). Если копирование успешно, то у нас получается результирующий URL-файла.
Наконец, можно удалить временный объект из S3. Это происходит в любом случае – и если копирование было успешным, и если нет.
Последняя функция нашего сервиса – отдача файла по ссылке (теоретически это можно делегировать веб-серверу, но как минимум удобно иметь эту функцию при локальной разработке, как максимум нам может требоваться реализовать какую-нибудь дополнительную бизнес-логику вроде проверки прав доступа к файлу):
use axum::response::IntoResponse;
use axum::body::StreamBody;
use s3::error::S3Error;
...
pub async fn download(
&self,
filename: &str
) -> Result<impl IntoResponse, StatusCode> {
let stream = self.bucket.get_object_stream(filename)
.await
.map_err(|err| match err {
S3Error::HttpFailWithBody(status_code, body) => match status_code {
404 => StatusCode::NOT_FOUND,
_ => {
log::error!(
"S3 download HTTP error with code={} and body={:?}",
status_code,
body
);
StatusCode::INTERNAL_SERVER_ERROR
}
}
err => {
log::error!("S3 download error: {:?}", err);
StatusCode::INTERNAL_SERVER_ERROR
}
})?;
Ok(StreamBody::from(stream.bytes))
}
}
Здесь всё просто – получаем стрим S3-объекта, маппим ошибку отсутствия файла на 404-ую ошибку Axum, а остальные ошибки на 500-ую, возвращаем StreamBody
.
Остаётся реализовать обработчики самих эндпойнтов:
use axum::{Extension, Json};
use axum::extract::{Multipart, Path};
use axum::http::{HeaderMap, HeaderValue, StatusCode};
use axum::http::header::CACHE_CONTROL;
use axum::response::IntoResponse;
#[derive(Debug, serde::Serialize)]
pub struct UploadResponse {
pub url: String
}
pub async fn upload_file(
Extension(upload_service): Extension<UploadService>,
mut multipart: Multipart
) -> Result<impl IntoResponse, StatusCode> {
while let Some(field) = multipart.next_field().await.map_err(|_|
StatusCode::INTERNAL_SERVER_ERROR
)? {
if let Some("upload") = field.name() {
let url = upload_service.upload(field).await?;
return Ok(Json(UploadResponse { url }));
}
}
Err(StatusCode::BAD_REQUEST)
}
pub async fn download_file(
Path(path): Path<String>,
Extension(upload_service): Extension<UploadService>
) -> Result<impl IntoResponse, StatusCode> {
let body = upload_service.download(&path).await?;
let headers = HeaderMap::from_iter([
(CACHE_CONTROL, HeaderValue::from_str("max-age=31536000").unwrap()) // One year
]);
Ok((headers, body))
}
Обработчик загрузки загружает по одному файлу за раз, при этом имя поля файла в отправленной форме ожидается “upload”. Обработчик скачки файла выставляет срок жизни файла в кеше один год, потому что изменения файла не предполагаются (если файл изменится, он будет иметь другой SHA256 и другое имя).
Последнее, что нам остаётся – создать роутер и запустить сервер:
use std::str::FromStr;
use axum::extract::DefaultBodyLimit;
#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
env_logger::init_from_env(env_logger::Env::default().default_filter_or("info"));
let upload_service = UploadService::new();
let router = axum::Router::new()
.route("/uploads", axum::routing::post(upload_file))
.route("/uploads/*path", axum::routing::get(download_file))
.layer(Extension(upload_service))
.layer(DefaultBodyLimit::max(8 * 1024 * 1024));
let address = std::env::var("HOST").expect("Expected HOST environment variable");
let port = std::env::var("PORT").expect("Expected PORT environment variable")
.parse::<u16>().expect("PORT environment variable must be an integer");
log::info!("Listening on http://{}:{}/", address, port);
axum::Server::bind(
&std::net::SocketAddr::new(
std::net::IpAddr::from_str(&address).unwrap(),
port
)
).serve(router.into_make_service()).await?;
Ok(())
}
Экземляр UploadService
передаётся через Extension
(механизм DI в Axum), также может быть полезно задать DefaultBodyLimit
, потому что стандартное значение 1 МБ может подходить не для всех ситуаций. Хост и порт для прослушивания получаются из соответствующих переменных окружения.
Вероятно, нам также может требоваться добавить какую-нибудь проверку авторизации в эндпойнт загрузки (а, возможно, и скачки), но это зависит от конкретной функции конкретного сервиса.
Зависимости в Cargo.toml:
[package]
name = "uploader"
version = "0.1.0"
edition = "2021"
[dependencies]
log = "0.4.20"
env_logger = "0.10.0"
tokio = { version = "1", features = ["macros", "rt-multi-thread"] }
axum = { version = "0.6.20", features = ["multipart"] }
serde = "1.0.188"
uuid = { version = "1.4.1", features = ["v4"] }
rust-s3 = "0.34.0-rc1"
futures = "0.3.28"
async-compat = "0.2.2"
async-hash = "0.5.1"
hex = "0.4.3"
В качестве бонуса пример браузерного кода на TypeScript выполняющего загрузку файла:
interface UploadResponse {
url: string;
}
async function uploadFile(file: Blob, filename?: string): Promise<UploadResponse | "error"> {
const data = new FormData();
data.append("upload", file, filename);
const response = await fetch("/uploads", {
method: "post",
body: data
});
if (response.status >= 200 && response.status <= 299) {
return await response.json();
} else {
return "error";
}
}
Наш сервис готов.
Heggi
Почему бы upload'ом и download'ом самих файлов не озадачить сам S3 и не гонять через сервис лишние мегабайты?
Обращаемся к S3, формируем presigned url для upload, отдаем ее фронту и ждем пока фронт сам загрузит файл.
Аналогично и с download, обращаемся к S3, формируем presigned url для download и отдаем ссылку фронту.
KivApple Автор
С download не совсем согласен. Например, мы можем хранить в S3-картинки и хотеть их отдавать прямо браузеру, который встречает тэги типа img, css-стили и т. д. И там мы не контролируем поведение "фронта" (по сути встроенных алгоритмов браузера).
Хотя в идеале надо грузить с ACL public и проксировать напрямую веб-сервером в S3 (либо настроить веб-сервер, чтобы добавлял нужные заголовки авторизации, срезал лишние заголовки и запрещал методы отличные от GET). Но всё равно приятно иметь download в бекэнде как минимум для локальной разработки, чтобы не было необходимости настраивать веб-сервер.
Крч многоступенчатый download неудобен в данном сценарии.
С upload в целом, действительно, может быть хорошая идея, но с другой стороны у нас течёт абстракция. Теперь фронт должен быть в курсе, что у нас S3-хранилище и как с ним работать. А так мы полностью абстрагируем хранилище. Можно грузить файлы хоть HTML формой без JS.
К тому же бекэнд на Rust должен иметь очень высокую производительность в пересылке бинарных потоков, вполне возможно на уровне всяких nginx. И зачем нам тогда усложнять систему.
Heggi
С download решается просто, если не хочется усложнять фронт - бек отдает redirect 301 с ссылкой на объект в S3, при этом все проверки прав доступа и т.п. делается на беке, а ссылка формируется с ограниченным сроком существования (срок можно выставить любой, от секунд до месяцев, тут зависит от задачи).
С upload тоже все не сильно сложно, загрузка в s3 по presigned url тоже решается достаточно просто (обычным axios или fetch), но да, логики придется добавить и с голой html формой фокус не прокатит.
KivApple Автор
А ещё я генерирую имена файлов с помощью SHA256 (борьба с дубликатами, плюс борьба с одноимёнными файлами с разным содержимым). Как это сделать c presigned request? Считать хеш файла на фронте? Ненадёжно, фронт может быть свободно модифицирован пользователем.
А ещё может захотеться всяких других валидаций типа ограничения mime типа файла, а может даже содержимого (в моём коде можно не только считать хеш, но и что-нибудь ещё, можно даже в реальном времени модифицировать данные загружаемого файла, например, сжимать gzip) и т. п. А ещё мы можем хотеть выполнить какие-то дополнительные действия после загрузки (например, внести запись о файле в БД). Причём хотим обеспечить некоторую транзакцинность (на случай неудачи в середине загрузки, закрытия вкладки браузера или потерю соединения пользователем сразу после загрузки и т. д.).
Heggi
Когда формируется presigned url, имя объекта зашивается в ссылке и именно под ним будет сохранен файл, независимо от имени файла на компе пользователя.
Вообще советую почитать документацию для начала, отпадет много вопросов (но скорее всего появятся новые)
https://docs.aws.amazon.com/AmazonS3/latest/userguide/using-presigned-url.html
KivApple Автор
Всё ещё нельзя генерировать имя файла на основе хеша.
Heggi
А, вы считаете хеш по содержимому файла? Ну тогда ой, я бы сделал по другому, но в вашем случае вам виднее как лучше.
Heggi
Быстро нагуглил, оказывается S3 умеет само считать чексуммы файлов (файл грузить на бэк не придется), потом будет достаточно просто файл переименовать.
https://docs.aws.amazon.com/AmazonS3/latest/API/API_GetObjectAttributes.html
Еще амазоновский S3 умеет в лямба-функции, которые умеют файлы преобразовывать или обогащать данными. Но скорее всего это амазон-only фишка (никогда не обращал внимания на этот функционал, мне он не нужен был никогда)
https://docs.aws.amazon.com/AmazonS3/latest/userguide/transforming-objects.html