Идея написать данную статью родилась после моего фейла по разработке данного сервиса. Суть задачи была проста — написать сервер с базовыми методами сохранения и отдачи файлов и сервисными методами по специфичной обработке файлов. Обмен данными (тело запроса, возвращаемые данные) я реализовал через json, про асинхрон идею упустил. По началу всё было хорошо, файлы не превышали размер нескольких мегабайтов, методы использовались редко. Но буквально через пару месяцев размеры файлов стали измеряться десятками мегабайт, количество запросов сотни в минуту. Сервис стал тормозить, возникали ошибки совместного доступа к файлам. «Никогда Штирлиц не был так близок к провалу».

В этом кейсе я покажу как я переписал код базовых методов.

В проекте будут использованы библиотеки asyncio, aiohttp для обеспечения асинхронности сервиса.

На этот раз я использовал специальный метод передачи данных порциями по HTTP протоколу — Chunked transfer encoding. Зачем он нужен и как работает подробно описано тут — в англоязычной статье Википедии. Хорошо, что aiohttp поддерживает такой метод из «коробки» — StreamResponse.enable_chunked_encoding.

Ссылка на полный код проекта будет внизу статьи.

Пишем скрипт потокового вызова post-метода сохранения файла

Этот скрипт вспомогательный, его задача — стримить байтовый поток в файловый сервис. Основа скрипта взята из документации aiohttp, в которой показано показано как отправлять большие файлы, не считывая их полностью в память.

Также в заголовок запроса я добавил информацию об имени файла. Для этого создал в заголовке запроса раздел CONTENT‑DISPOSITION.

Итого скрипт выглядит так:

async def file_sender(file_name: str, chunk_size: int) -> Generator[bytes, None, None]:
    """
    Генератор считывания файла по частям
            Параметры:
                    file_name (str): имя файла, включая путь
                    chunk_size (int): размер порции для считывания файла в память
            Возвращаемое значение:
                    chunk (bytes): часть байтового потока файла
    """

    async with aiofiles.open(file_name, 'rb') as f:
        chunk = await f.read(chunk_size)

        while chunk:
            yield chunk
            chunk = await f.read(chunk_size)


async def main() -> None:
    """Функция генерации post-запроса в адрес файлового сервиса"""

    args = get_args()  # код этой функции доступен в репозитории для этой статьт

    url = urljoin(f'{args.protocol}://{args.host}:{args.port}', args.url)
    headers = {
        'CONTENT-DISPOSITION': f'attachment;filename={os.path.basename(args.path)}',
    }

    async with aiohttp.ClientSession() as session:
        async with session.post(
                url,
                headers=headers,
                data=file_sender(file_name=args.path, chunk_size=args.chunk_size)
        ) as resp:
            logger.info(await resp.text())

Разрабатываем post-метод сохранения файла

В первом приближении код хендлера по обработке post‑метода выглядит следующим образом:

async def save_archive(request: Request) -> web.Response:
    """Хендлер сохранения байтового потока из запроса в файл"""

    # https://github.com/aio-libs/aiohttp-demos
    content = await request.content.read()

    async with aiofiles.open(NEW_FILE_NAME, 'bw') as fh:
        await fh.write(content)
        await fh.flush()

    return web.Response(text='file accepted')

Далее я научил хэндлер доставать из заголовка запроса имя файла и сохранять информацию в таблицу базу данных. Таблица имеет два поля — айдишник и имя файла, которое мы получаем из CONTENT‑DISPOSITION.

Итоговый код хендлера:

async def save_file(request: Request) -> web.Response:
    """
    Хендлер сохранения байтового потока из http-запроса в файл
            Параметры:
                    request (aiohttp.web_request): объект http-запроса
            Возвращаемое значение:
                    response (aiohttp.Response): объект ответа
    """

    logger.info(request.headers)

    _, params = cgi.parse_header(request.headers['CONTENT-DISPOSITION'])
    file_name = params['filename']
    file_id = str(uuid.uuid4())
    file_path = os.path.join(app['archive_dir'], file_id)

    # https://github.com/aio-libs/aiohttp-demos
    content = await request.content.read()

    async with aiofiles.open(file_path, 'bw') as fh:
        await fh.write(content)
        await fh.flush()
    logger.debug(f'Файл принят {file_name} и записан на диск')

    # Сохраняем информацию о загруженном файле в базе данных
    async with engine.begin() as conn:
        response = await conn.execute(files.insert().values(id=file_id, name=file_name))
        logger.debug(f'Файл сохранен под id={response.inserted_primary_key[0]}')

    return web.Response(status=201, reason='OK', text=response.inserted_primary_key[0])

Обратите внимание, файл мы сохраняем под именем айдишника. В нашем случае это UUID4. Данный прием позволил мне избежать проблем с сохранением файлов с одинаковым именем.

Для работы с БД я использовал sqlalchemy. Код подключения к БД ниже:

metadata = sqlalchemy.MetaData()
engine: AsyncEngine
files = sqlalchemy.Table(
    "file",
    metadata,
    sqlalchemy.Column("id", sqlalchemy.String(38), primary_key=True),
    sqlalchemy.Column("name", sqlalchemy.String(255)),
)


async def get_db_engine(database_url: str):
    """
    Подключает движок базы данных и создает таблицу для хранения информации о файлах
            Параметры:
                    database_url (str): адрес базы данных
            Возвращаемое значение:
                    None
    """

    global engine
    engine = create_async_engine(database_url, echo=True)
    async with engine.begin() as conn:
        await conn.run_sync(metadata.create_all)

Разрабатываем get-метод получения файла

У пользователей бывает разная скорость сетевого соединения. Поэтому процесс отдачи файла я делил на части. Это основная идея в интерфейсе файлового сервиса. Все будут в позиции «выиграл — выиграл»: клиент начнет скачивать файл сразу, а нам не придется хранить в памяти сервера файл целиком:

async def get_file(request: Request) -> web.StreamResponse:
    """
    Хендлер формирования архива и скачивания его в файл
            Параметры:
                    request (aiohttp.web_request): объект http-запроса
            Возвращаемое значение:
                    response (aiohttp.StreamResponse): объект ответа в виде байтового потока
    """

    file_id = request.match_info['id']
    folder_path = os.path.join(os.getcwd(), app['archive_dir'])

    if not (os.path.exists(folder_path) and os.path.isdir(folder_path)):
        logger.warning(f'Запрошена несуществующая папка {folder_path}')
        raise web.HTTPNotFound(text='Архив не существует или был удален')

    # Получаем из базы данных информацию о файле    
    async with engine.connect() as conn:
        statement = select(files.c.id, files.c.name).where(files.c.id == file_id)

        file_rows = await conn.execute(statement)
        file = file_rows.fetchone()

        if file is None:
            raise web.HTTPNotFound(text='Файла по указанному id не существует')
        file_path = os.path.join(app['archive_dir'], file_id)

    response = web.StreamResponse(  # aiohttp требует возвращать объект StreamResponse из функции-хендлера
        status=200,
        reason='OK',
        headers={
            'Content-Type': 'multipart/x-mixed-replace',
            'CONTENT-DISPOSITION': f'attachment;filename={file.name}'
        }
    )

    # Отправляем клиенту HTTP заголовки
    await response.prepare(request)

    try:
        async with aiofiles.open(file_path, 'rb') as f:
            chunk = await f.read(app['chunk_size'])

            while chunk:
                # Начинаем (или продолжаем) стримить байтовый поток
                await response.write(chunk)
                chunk = await f.read(app['chunk_size'])

    except asyncio.CancelledError:
        logger.error("Download was interrupted ")

        # отпускаем перехваченный CancelledError
        raise

    return response

Итого

Я создал асинхронный файловый сервис, который помог разгрузить ОЗУ сервера и при этом был способен работать в HL‑режиме, что важно для приложений, которыми пользуется большое количество клиентов.

Репозиторий файлового сервиса для данной статьи доступен по ссылке. В README.md вы найдете инструкцию по запуску проекта. Обратите внимание что база данных там виртуальная, т. е. эмулируется только на время работы сервиса. Это поведение легко изменить указав в settings.ini адрес «натуральной» базы данных.

Фишка данного сервиса заключается в том, что получить файл вы сможете поместив ссылку в адресной строке браузера. Например http://localhost:8080/files/ff6b51b5–4030–48ed‑aa5a-1b8aba37d1c4/. А это значит, что для получения файла из сервиса вам достаточно сформировать урл в шаблоне web‑страницы.

Комментарии (14)


  1. rSedoy
    21.04.2023 12:55
    +1

    А вот в этом всём, меня смущает что logger синхронный, как быть?


    1. pomponchik
      21.04.2023 12:55

      Можно использовать polog в асинхронном режиме.


      1. rSedoy
        21.04.2023 12:55

        интересно, но маленькая популярность и локальность, конечно напрягает, чтобы такое к себе тащить(


    1. SergeyKlimov_Creator Автор
      21.04.2023 12:55

      Спасибо за замечание, переделаю на aiologger


      1. rSedoy
        21.04.2023 12:55

        я вот пока в поисках, есть loguru, но там как-то не очень очевидное с await logger.complete(), глянул aiologger, там тоже какой-то await logger.shutdown(), вот еще новое в комментах предложили, пока внимательно не смотрел.


  1. pomponchik
    21.04.2023 12:55
    +1

    В целом сервис выглядит хорошо, но:

    1. Не хватает тестов.

    2. Не предусмотрены репликация / шардирование.

    3. Что будет, если в процессе загрузки данных в файл выключится электричество на сервере? На диске окажется битый файл, а в БД о нем записей не будет. Со временем при постоянном пользовании сервисом такие файлы неизбежно будут возникать и накапливаться. Одно из возможных решений - писать в файл внутри транзакции БД. Но это порождает риск долгих транзакций, от которого можно избавиться, если хранить не файлы целиком, а чанки (также это может частично решить проблему с фрагментацией диска).

    4. await request.content.read() будто бы таки считывает файл в память полностью, после чего уже записывает на диск. То есть не соблюдается главная гарантия вашего сервиса - что он не держит файлы в памяти целиком.

    5. aiofiles - немного сомнительная библиотека.

    6. Не вижу возможности параметризовать сервис при помощи переменных окружения.

    7. Нет возможности запуска в контейнере.

    8. Инициализация движка БД выглядит странно. Если по итогу все равно создается глобальная переменная, зачем это пихать в функцию?


    1. SergeyKlimov_Creator Автор
      21.04.2023 12:55

      Спасибо большое за развернутый комментарий. Почемуaiofiles сомнительная библиотека? Какие предлагаете альтернативы?


      1. pomponchik
        21.04.2023 12:55

        Это обертка, насколько я помню, над тред-пулом. То есть реальной кооперативной многозадачности там нет, только ее видимость. Альтернатив, к сожалению, не знаю.


        1. mcferden
          21.04.2023 12:55

          Есть aiofile, заявляется нативная асинхронная работа с файлами в Linux


    1. sirejja
      21.04.2023 12:55
      +2

      По п3. Решение довольно простое. Перед загрузкой файла делать некоторый черновик записи о файле со статусом pending. Загружать файл. Апдейтить запись на статус processed. А дальше в фоне подчищать все, что не в processed статусе и со старой временной меткой.


      1. pomponchik
        21.04.2023 12:55

        Это может быть опасно, поскольку неизвесно, сколько точно должна длиться загрузка. Лимитов на размер файлов тут нет, теоретически пользователь может из тайги по GPRS 100-гиговый рип "аватара 2" качать. А если автор запилит еще и возобновление загрузки после обрыва соединения, то вообще.


        1. pfffffffffffff
          21.04.2023 12:55
          +2

          Можно периодически чекать не изменился ли размер файла с прошлой проверки и уже тогда грохать файл


          1. pomponchik
            21.04.2023 12:55

            Я не вижу, чем это проще, нежели организовать код правильно, чтобы запись файлов осуществлялась внутри транзакции БД. Вы предлагаете дополнить проект новыми сущностями, при том, что по итогу гарантий это дает даже меньше.


  1. funca
    21.04.2023 12:55

    При обрыве соединения клиент сможет докачивать только недостающие данные или будет грузить все по новой?