Друзья, приветствую! Наконец-то дошли руки до создания материала и написания этой статьи. Под Новый год всем резко что-то нужно, и писать не получается физически. Надеюсь, что вы ждали выхода этой статьи.

Если вы читали мои статьи и вообще знакомы с моим творчеством, то знаете, что большая часть контента тут посвящена теме FastAPI и смежных технологий, связанных с этим замечательным Python-фреймворком.

Так получилось, что я уже давно обещал рассказать вам про связку FastAPI с другим не менее замечательным фреймворком Celery. Сегодня я закрою этот гештальт и, чтобы было интереснее, чем читать сухую теорию, мы с вами создадим полноценный законченный проект. Так что теории сегодня будет 50 на 50 с практикой. Постараюсь сделать так, чтобы был понятен каждый шаг и было интересно.

Чем сегодня займемся?

Сегодня мы создадим FullStack приложение: файлообменник с временным хранилищем на сервере. Другими словами, мы сделаем так, чтобы пользователь при отправке файлов на наш сервер выбирал срок жизни файла, и по истечении этого срока файл с сервера автоматически удалялся.

Под FullStack я подразумеваю полный цикл разработки:

  • Создание веб-страниц приложения (будет всего 3 страницы)

  • Написание простого JavaScript кода для страниц

  • Создание API-методов нашего приложения

  • Подъем Redis (о том, зачем он нам нужен, подробнее далее)

  • Настройка Celery

  • Деплой приложения (запуск на удаленном сервере)

Какие технологии будем сегодня использовать

Сегодня нам предстоит использование ряда разрозненных технологий, начиная от простых HTML + CSS и заканчивая настройкой связей между FastAPI и Celery. Мы будем использовать следующие технологии:

  • HTML + CSS + JavaScript: для создания фронтенд части нашего приложения. Для того чтобы нам, бэкенд-разработчикам, было проще, мы дополнительно будем использовать шаблонизатор Jinja2.

  • FastAPI: бэкенд фреймворк Python, который позволит нам разрабатывать API методы приложения и который будет поднимать наш фронтенд (рендерить и обслуживать веб-странички).

  • Redis: нереляционная база данных, которая сегодня будет закрывать ряд задач, начиная от брокера сообщений и заканчивая базой данных для FastAPI. Что это такое, зачем и почему, мы подробно поговорим далее.

  • Celery: классический фреймворк Python, который используется для выполнения отложенных задач (worker, beat и flower – подробнее далее).

  • Ряд сопутствующих библиотек: loguru (логирование), pydantic (валидация данных), pydantic-settings (работа с переменными окружения) и так далее. В ходе статьи подробнее остановимся и на них.

Концепция реализации задачи

Перед тем как мы приступим к подробному теоретическому разбору Celery, хочу сразу отметить: FastAPI и Celery – это всегда, подчеркиваю, всегда разные приложения, которые взаимодействуют между собой посредством брокеров и очередей задач. У новичков тут появляется путаница, особенно когда дело доходит до деплоя (удаленного запуска).

Кроме того, всегда в этой связке будет ещё одно приложение — брокер. Это тот, кто сохраняет задачи и затем передает их на реализацию. В качестве брокеров, как правило, выступает Redis (нереляционная база данных) или RabbitMQ (система, которая позволяет приложениям обмениваться сообщениями надежно и эффективно). Поэтому вам в любом случае нужно будет поднимать или Redis, или RabbitMQ.

Следовательно, предстоит поднимать минимум 3 приложения (FastAPI, Celery и, например, Redis).

И тут появляется реальная сложность с тем, чтобы запустить это все дело на удаленном сервере.

Как правило, эта задача закрывается при помощи Docker и Docker Compose. Если простыми словами, то в специальном файле прописываются все инструкции, и после вводится одна команда, которая поднимает сразу ряд приложений (в нашем случае минимум 3). В связи с этим у новичков и у тех, кто просто знакомится с этой технологией, возникают проблемы, недопонимание и ощущение того, что происходит что-то сложное.

Сегодня я намеренно упрощу эту часть. Мы с вами буквально будем запускать несколько изолированных приложений и после их свяжем между собой. Такой подход я выбрал намеренно, ведь он не просто продемонстрирует общую простоту технологий, но и даст вам четкое понимание того, как оно все работает.

В этой простоте понимания и деплоя нам поможет сервис Amvera Cloud. Мы сегодня прямо там развернем: Redis, Celery и FastAPI приложение. То есть 3 независимых приложения, которые будут связаны между собой общим брокером.

Сервис Amera в первую очередь я выбрал за простоту и надежность. Например, для того чтобы там поднять базу данных Redis, достаточно будет потратить 2 минуты. Буквально. И не нужно будет никаких доп инструкций вводить, ну разве что пароль к базе данных придумать и ввести.

Для деплоя Celery и FastAPI достаточно будет закинуть файлы приложений с шаблонным файлом инструкций, и все заработает.

Теперь перейдем к важному теоретическому блоку.

Важный дисклеймер!

Друзья, сегодняшний материал предназначен исключительно для демонстрации возможностей связки FastAPI, Redis и Celery. Это пример реализации, а не полноценная инструкция по созданию файлообменника.

Если ваша цель — разработать реальный сервис для обмена файлами, то стоит учитывать множество факторов, таких как требования к масштабируемости, надежности и стоимости. Для таких задач часто используют решения вроде Amazon S3, которые обеспечивают оптимальную работу с большими объемами данных, но требуют дополнительных ресурсов и могут привести к высоким затратам на хостинг.

Здесь же мы сосредоточимся на практическом примере, который поможет вам понять, как работает взаимодействие между FastAPI, Redis и Celery, и продемонстрируем базовую концепцию распределенной обработки задач.

Что такое Celery и как он работает

Друзья, давайте разберемся с этим загадочным Celery. Представьте, что у вас есть ресторан (ваше приложение), и вам нужно выполнять множество задач: готовить блюда, мыть посуду, обслуживать клиентов. Celery — это как ваш супер-организованный менеджер, который распределяет задачи между сотрудниками (воркерами), чтобы всё работало как часы.

Основные компоненты Celery

  1. Брокер: Это как доска объявлений в вашем ресторане. Когда появляется новая задача, она "вывешивается" на эту доску. Обычно в роли брокера выступает Redis или RabbitMQ, которые хранят задачи и распределяют их между воркерами.

  2. Воркеры (Workers): Это ваши трудолюбивые сотрудники. Они постоянно смотрят на доску объявлений (брокер) и, как только появляется новая задача, хватают её и выполняют. Воркеры могут работать параллельно, что позволяет обрабатывать несколько задач одновременно.

  3. Бэкенд: Представьте, что это журнал выполненных заказов. Сюда записываются результаты выполнения задач. Бэкенд не обязателен, но очень полезен, если вам нужно отслеживать результаты выполнения задач.

Важно понимать, что Celery сам по себе синхронный. Это означает, что каждый воркер выполняет задачи последовательно. Но, поскольку воркеров может быть много, и они работают параллельно, в целом система становится асинхронной.

Теперь представьте, что вам нужно отправить 1000 email-ов. Без Celery ваше приложение "зависнет", пока не отправит все письма. С Celery вы просто добавляете задачу в очередь и продолжаете работать, а воркеры в фоновом режиме займутся отправкой.

Celery удобен тем, что является независимым приложением. Это даёт возможность вынести нагрузку с основного сервера, например, того, где развернуто ваше FastAPI приложение, на отдельный, более мощный сервер. С учётом общей архитектуры, это становится супер-полезным.

Основные режимы работы Celery

Celery может работать в трёх основных режимах:

  1. Worker: Основной режим Celery, отвечающий за выполнение задач. Процессы воркеров обрабатывают задачи из очереди. Сегодня для основной логики мы будем использовать этот режим с дополнением — я покажу, как через Worker установить отложенную задачу (по умолчанию задачи запускаются немедленно).

  2. Beat: Планировщик для периодических задач. Beat отправляет задачи в очередь по заданному расписанию. Обычно применяется для реализации задач, которые нужно выполнять постоянно по расписанию. Например, подготовка сложного отчета в 9 утра каждый день с последующей отправкой на Email. Можно было для нашего проекта взять и его, но мне показалось что тут Beat будет избыточен.

  3. Flower: Инструмент мониторинга и администрирования для Celery. Он позволяет отслеживать состояние задач и воркеров через удобный веб-интерфейс. Сегодня на практике я покажу, как он работает.

Важно понимать, что если вы решите использовать все три режима одновременно, то вам потребуется запустить три отдельных приложения (Flower, Worker и Beat).

В рамках блока с деплоем мы запустим удалённо только Celery в режиме Worker, а мониторинг (Flower) будем запускать локально. Это и безопаснее, и требует меньше усилий.

Почему Celery синхронный и почему это хорошо

Celery часто называют инструментом для асинхронной обработки задач, но внутри него задачи выполняются синхронно. Это важная особенность, которая делает его предсказуемым и надежным инструментом. Давайте разберемся, что это значит и почему это важно.

Как работает Celery

Когда вы отправляете задачу в Celery, она попадает в очередь. Воркер Celery — это процесс, который выбирает задачи из очереди и выполняет их последовательно и синхронно. В отличие от асинхронных систем, где одна задача может быть временно приостановлена, чтобы освободить ресурсы для другой задачи, воркер Celery обрабатывает каждую задачу от начала до конца.

Эта модель отличается от асинхронного подхода, который часто используется, например, в веб-серверах. В таких системах события (например, запросы пользователей) выполняются в одном потоке с постоянным переключением между задачами. Это переключение происходит для оптимизации ресурсов, но может усложнить управление задачами, их отладку и тестирование.

Преимущества синхронного выполнения задач

  1. Предсказуемость выполнения
    Каждая задача выполняется полностью, без переключений и пауз. Это устраняет неопределенность в последовательности выполнения, что особенно важно для критически важных операций.

  2. Простота отладки и тестирования
    Синхронный код проще отслеживать. Когда вы отлаживаете задачу, вы видите полный ее путь выполнения без необходимости учитывать возможные точки переключения контекста.

  3. Эффективное использование ресурсов
    Поскольку воркер фокусируется только на одной задаче за раз, нет затрат на переключение между задачами, как в асинхронной модели. Это может быть особенно полезно для задач, интенсивно использующих процессор или память.

  4. Гибкость в приоритизации
    Celery позволяет создавать несколько очередей с разными приоритетами. Например, задачи с высоким приоритетом могут быть обработаны отдельными воркерами, гарантируя их выполнение без задержек.

Асинхронность на уровне системы

Хотя воркер Celery работает синхронно, асинхронность реализуется на уровне всей системы. За счет запуска нескольких воркеров одновременно (на одном сервере или на разных) достигается высокая параллельность.

Каждый воркер обрабатывает свою задачу синхронно, но вся система работает асинхронно, поскольку воркеры могут обрабатывать задачи из очереди одновременно. Таким образом, вы получаете лучшее из двух миров: надежность и простоту синхронного выполнения задач и высокую производительность асинхронной системы.

Почему это важно понимать?

Асинхронная модель выполнения задач (как в asyncio) идеально подходит для I/O-операций, где процессы простаивают в ожидании ответа от диска, базы данных или сети. Однако для вычислительно сложных задач или процессов, где важна предсказуемость выполнения, синхронная модель Celery оказывается более подходящей.

Используя Celery, вы можете не беспокоиться о переключении контекста внутри воркера — все задачи выполняются линейно. Это делает Celery мощным инструментом для распределенных вычислений, где надежность и масштабируемость играют ключевую роль.

Теперь, когда вы разобрались с теорией, пришло время применить ее на практике!

Поднимаем базу данных Redis

Redis — это быстрое хранилище данных в памяти, которое можно развернуть разными способами в зависимости от ваших потребностей и операционной системы. Вот несколько вариантов:

  1. Ubuntu/Debian:

    • Убедитесь, что у вас установлен apt и репозитории обновлены:

      sudo apt update
      sudo apt install redis-server
      
    • После установки запустите Redis и настройте его как службу:

      sudo systemctl start redis
      sudo systemctl enable redis
      
    • Для проверки работы выполните:

      redis-cli ping
      

      Ответ "PONG" означает, что Redis успешно работает.

  2. CentOS/RHEL:

    • Добавьте репозиторий EPEL и установите Redis:

      sudo yum install epel-release
      sudo yum install redis
      
    • Запустите и включите службу Redis:

      sudo systemctl start redis
      sudo systemctl enable redis
      

2. Установка Redis на macOS

На macOS установка Redis выполняется с помощью Homebrew:

  1. Убедитесь, что Homebrew установлен. Если нет, установите его:

    /bin/bash -c "$(curl -fsSL https://raw.githubusercontent.com/Homebrew/install/HEAD/install.sh)"
    
  2. Установите Redis:

    brew install redis
    
  3. Запустите Redis как службу:

    brew services start redis
    
  4. Проверьте, работает ли Redis:

    redis-cli ping
    

3. Установка Redis на Windows

На Windows Redis не имеет официальной поддержки, но существуют рабочие варианты:

  1. Используйте предварительно собранный исполняемый файл из проекта Memurai — это качественная альтернатива Redis для Windows.

  2. Либо установите Redis через WSL (Windows Subsystem for Linux):

    • Установите WSL и выберите дистрибутив Linux (например, Ubuntu).

    • Следуйте инструкциям для Linux (см. выше).

4. Использование Docker

Для всех систем (Linux, macOS, Windows) удобным вариантом может быть использование Docker:

  1. Убедитесь, что Docker установлен (для новичков отлично подойдет установка Docker Desktop).

  2. Выполните команду для запуска Redis:

    docker run --name redis -d -p 6379:6379 redis
  3. Теперь Redis доступен на вашем локальном компьютере по порту 6379.

В статье «Быстрый запуск Redis через Docker Compose» я подробно описал процесс развёртывания Redis с помощью Docker Compose.

5. Использование облачных провайдеров

Если вы не хотите разворачивать Redis локально, можно воспользоваться облачными сервисами, которые предоставляют готовые экземпляры Redis. Примеры:

  • AWS ElastiCache

  • Google Cloud Memorystore

  • Azure Cache for Redis

Эти варианты подходят для производственных сред, так как предлагают масштабируемость и управление без необходимости заботиться о конфигурации.

Поднимаем Redis на Amvera Cloud

В рамках данной статьи я буду поднимать Redis на платформе Amvera Cloud. Этот способ подходит для быстрого развертывания базы данных с минимальной настройкой.

Шаги для развёртывания Redis на Amvera Cloud

  • Регистрируемся на сайте Amvera Cloud, если еще не было регистрации (за регистрацию вы получите 111 рублей на основной баланс)

  • После входа в панель управления выберите опцию «Создать проект». На открывшемся экране вам нужно будет дать имя вашей базе данных, указать, что это база данных, а не приложение, и выбрать тариф. Я рекомендую выбрать «Начальный» тариф, но если вы планируете интенсивное использование вашей базы данных, то лучше рассмотреть более высокий тариф. После этого нажмите «Далее».

  • На следующем экране выберите базу данных Redis и задайте пароль если требуется командой --requirepass ВАШ_ПАРОЛЬ. После жмем на «Завершить».

  • Теперь нужно открыть внешний доступ к вашей базе данных. Для этого переходим в созданный проект. Там перемещаемся на вкладку «Настройки» и добавляем внешнее доменное имя:

Тестируем подключение

Перед тем как продолжить, давайте убедимся, что наше приложение может подключиться к базе данных Redis с локального компьютера. Для этого используем простой скрипт:

import redis
from app.config import settings

# Подключение к Redis
r = redis.Redis(
    host=settings.REDIS_HOST,
    port=settings.REDIS_PORT,  # Стандартный порт Redis
    password=settings.REDIS_PASSWORD,
    ssl=True,  # Включаем SSL
    ssl_cert_reqs=None  # Отключаем проверку сертификата, если нужно
)

# Проверка подключения
try:
    # Попытка выполнить команду PING
    response = r.ping()
    if response:
        print("Подключение к Redis успешно!")
    else:
        print("Не удалось подключиться к Redis.")
except Exception as e:
    print(f"Произошла ошибка: {e}")

Переменные для подключения

В моём случае параметры подключения выглядят следующим образом:

REDIS_PORT=6379
REDIS_PASSWORD=my_super_pass
REDIS_HOST=redisdb-yakvenalex.db-msk0.amvera.tech

Установка библиотеки Redis

Перед выполнением скрипта установите библиотеку redis:

pip install redis

Выполнение скрипта

Запустите скрипт. Если вы увидите сообщение:

Подключение к Redis успешно!

это означает, что ваша база данных готова к дальнейшей работе.

Примечание о проверке SSL-сертификатов

Обратите внимание: при подключении через URL (а не напрямую через IP-адрес) могут возникать проблемы с проверкой SSL-сертификатов. Чтобы избежать таких проблем, вы можете явно отключить проверку сертификата, как это сделано в скрипте выше (ssl_cert_reqs=None). Это полезно для отладки, но в боевых условиях рекомендуется настроить сертификаты корректно.

При настройке брокера под Celery сертификаты отключаются по другому и об этом далее.

Пишем Celery приложение

Я выше говорил, что у нас будет полностью изолированные приложения друг от друга, завязанные на общем брокере. Для того чтоб дополнительно это подтвердить предлагаю следовать за мной и создать отдельное приложение в отдельном виртуальном окружении.

Структура проекта следующая:

project_celery/                # Корневая папка проекта
├── app/                # Папка с приложением
│   ├── config.py       # Файл с конфигурацией приложения (настройки, переменные окружения)
│   └── main.py         # Главный файл приложения, точка входа
├── .env                # Файл переменных окружения (например, настройки для Redis, базы данных)
├── requirements.txt    # Список зависимостей Python проекта (например, Flask, Redis)
├── amvera.yml          # Конфигурационный файл для Amvera Cloud (для тех кто будет делать деплой на Amvera Cloud)

Файл requirements.txt

pydantic-settings==2.6.1
celery==5.4.0
redis==5.2.1
loguru==0.7.3
requests==2.32.3
flower==2.0.1

Для установки зависимостей вводим в консоли:

pip install -r requirements.txt

Теперь подготовим файл настроек app/config.py

import os
import ssl

from celery import Celery
from pydantic_settings import BaseSettings, SettingsConfigDict


# Класс для управления настройками приложения через Pydantic
class Settings(BaseSettings):
    REDIS_PORT: int               # Порт для подключения к Redis
    REDIS_PASSWORD: str           # Пароль для подключения к Redis
    BASE_URL: str                 # Базовый URL приложения
    REDIS_HOST: str               # Хост Redis-сервера
    BASE_DIR: str = os.path.abspath(os.path.join(os.path.dirname(__file__), '..'))  # Корневая директория проекта

    # Указание файла с переменными окружения
    model_config = SettingsConfigDict(env_file=f"{BASE_DIR}/.env")


# Создание экземпляра настроек
settings = Settings()

# Формирование URL для подключения к Redis через SSL
redis_url = f"rediss://:{settings.REDIS_PASSWORD}@{settings.REDIS_HOST}:{settings.REDIS_PORT}/0"

# Опции для работы с SSL, отключаем проверку сертификата (подходит для отладки)
ssl_options = {"ssl_cert_reqs": ssl.CERT_NONE}

# Инициализация экземпляра Celery
celery_app = Celery(
    "celery_worker",  # Имя приложения Celery
    broker=redis_url,  # URL брокера задач (Redis)
    backend=redis_url  # URL для хранения результатов выполнения задач
)

Базовый URL приложения — это доменное имя, которое будет использоваться для вашего приложения FastAPI. На данный момент его можно указать следующим образом:

BASE_URL=http://127.0.0.1:8000

8000 - это стандартный порт FastApi приложения после запуска.

Комментарии к коду:

  1. Settings:

    • Класс на базе Pydantic для управления настройками приложения.

    • Загружает переменные окружения из файла .env с помощью model_config.

    • Содержит параметры, необходимые для подключения к Redis.

  2. redis_url:

    • Формируется строка подключения к Redis через протокол rediss:// (SSL-соединение).

  • Включает параметры: пароль, хост и порт. При обычном подключении через стандартный IP-адрес используется redis://

  1. ssl_options:

    • Отключает проверку сертификатов SSL (не рекомендуется в продакшене, но полезно для отладки и для случаев когда по-другому не работает).

  2. celery_app:

    • Создаётся экземпляр Celery, который будет использоваться как диспетчер задач.

    • Параметр broker указывает на брокер задач (Redis).

    • Параметр backend отвечает за хранение результатов выполнения задач.

При запуске Redis создаёт 16 баз данных по умолчанию. Имена баз данных представляют собой целые числа от 0 до 15. В своём примере я использовал базу данных 0 как для брокера, так и для бэкенда, но вы можете выбрать другие базы, если это необходимо.

Теперь настроим само приложение (файл app/main.py).

Выполним импорты:

import requests
from celery import Celery
from app.config import ssl_options, redis_url, settings

Сейчас у вас может появиться непонимание из серии «В смысле requests, он же синхроннный и вообще зачем отправлять с Celery запросы». Отвечаю.

У нас будет один обработчик. Его суть в том, что он должен будет в указанное время и на указанный эндпоинт выполнить отправку DELETE запроса. Этот запрос будет отправлен на наш API-метод.

Так как Celery изначально синхронный я решил следовать его парадигме и реализовал все в синхронном коде. При этом, в случае если одновременно нужно будет выполнить задачу на выполнение этого запроса — проблем не будет, так как задачи будут запаралелены. Получается, что в этом случае синхронный подход может работать быстрее асинхронной реализации.

Создаем экземпляр приложения Celery

celery_app = Celery("celery_worker", broker=redis_url, backend=redis_url)

В качестве аргументов при создании мы даем: имя, брокера и бэкенд. Выше подробно разбирали что это и зачем нужно.

Теперь пропишем дополнительные настройки:

celery_app.conf.update(
    broker_use_ssl=ssl_options,
    redis_backend_use_ssl=ssl_options,
    task_serializer='json',
    result_serializer='json',
    accept_content=['json'],
    enable_utc=True,  # Убедитесь, что UTC включен
    timezone='Europe/Moscow',  # Устанавливаем московское время
    broker_connection_retry_on_startup=True,
    task_acks_late=True,
    task_reject_on_worker_lost=True,
)

Каждая настройка в конфигурации Celery, которую вы предоставили, имеет свою функцию, обеспечивая правильное функционирование асинхронных задач. Вот подробное объяснение каждой из них:

  • broker_use_ssl: Указывает, следует ли использовать SSL для соединения с брокером сообщений (в данном случае Redis). Это повышает безопасность передачи данных или, как в нашем случае, отключает проверку.

  • redis_backend_use_ssl: Аналогично предыдущему, эта настройка включает SSL для соединения с бэкендом результатов, что также улучшает безопасность.

  • task_serializer: Определяет формат сериализации задач. В данном случае используется JSON, что позволяет легко передавать данные между процессами.

  • result_serializer: Указывает формат сериализации результатов выполнения задач. Здесь также используется JSON, что обеспечивает совместимость с сериализацией задач.

  • accept_content: Список типов контента, которые Celery будет принимать. Указание ['json'] означает, что Celery будет обрабатывать только сообщения в формате JSON.

  • enable_utc: Включает использование времени по всемирному координированному времени (UTC). Это важно для синхронизации задач в распределенной системе.

  • timezone: Устанавливает временную зону для задач. В нашем случае это "Europe/Moscow", что позволяет правильно обрабатывать временные метки в московском времени.

  • broker_connection_retry_on_startup: Опция, которая указывает, следует ли повторно пытаться подключиться к брокеру сообщений при запуске приложения. Это полезно для обеспечения надежности.

  • task_acks_late: Указывает, что задачи должны подтверждаться после их завершения. Это предотвращает потерю задач в случае сбоя рабочего процесса до завершения задачи.

  • task_reject_on_worker_lost: Если рабочий процесс теряется, задачи не будут автоматически повторно назначены другим рабочим процессам. Это помогает избежать потери данных и дублирования работы.

Эти настройки помогают оптимизировать работу Celery, обеспечивая безопасность, надежность и правильное управление временными данными при выполнении асинхронных задач.

Теперь опишем наш единственный обработчик.

@celery_app.task(
    name='delete_file_scheduled',
    bind=True,
    max_retries=3,
    default_retry_delay=5
)
def delete_file_scheduled(self, file_id, dell_id):
    """Задача для отложенного удаления файла"""
    try:
        response = requests.delete(f"{settings.BASE_URL}/delete/{file_id}/{dell_id}")
        response.raise_for_status()
        return response.status_code
    except requests.RequestException as exc:
        self.retry(exc=exc)
    except Exception as e:
        return None

Давайте разберем этот код по частям:

Декоратор задачи

@celery_app.task(
    name='delete_file_scheduled',
    bind=True,
    max_retries=3,
    default_retry_delay=5
)
  • @celery_app.task: Декоратор, который определяет функцию как задачу Celery.

  • name='delete_file_scheduled': Указывает имя задачи, которое можно использовать для вызова этой задачи в других частях приложения или для мониторинга. Очень полезный атрибут, который позволит нам из изолированной среды FastApi поставить эту задачу в очередь.

  • bind=True: Позволяет задаче иметь доступ к объекту self, который представляет экземпляр задачи. Это необходимо для использования методов, таких как self.retry().

  • max_retries=3: Указывает максимальное количество попыток повторного выполнения задачи в случае неудачи. В данном случае задача будет повторяться до трех раз.

  • default_retry_delay=5: Указывает задержку (в секундах) перед повторной попыткой выполнения задачи. Здесь задержка составляет 5 секунд.

Основная функция задачи

def delete_file_scheduled(self, file_id, dell_id):
    """Задача для отложенного удаления файла"""
  • def delete_file_scheduled(self, file_id, dell_id): Определяет функцию, которая будет выполняться как задача. Она принимает file_id и dell_id как параметры, которые необходимы для удаления файла.

Основная логика выполнения

try:
    response = requests.delete(f"{settings.BASE_URL}/delete/{file_id}/{dell_id}")
    response.raise_for_status()
    return response.status_code
  • try:: Начало блока обработки исключений. Здесь будет выполняться основной код задачи.

  • requests.delete(...): Выполняет HTTP-запрос на удаление файла по указанному URL. URL формируется с использованием базового адреса из настроек и переданных параметров file_id и dell_id.

  • response.raise_for_status(): Проверяет статус ответа. Если статус код указывает на ошибку (например, 4xx или 5xx), будет возбуждено исключение HTTPError.

  • return response.status_code: Возвращает статус код ответа, что может быть полезно для мониторинга или логирования.

Обработка исключений

except requests.RequestException as exc:
    self.retry(exc=exc)
except Exception as e:
    return None
  • except requests.RequestException as exc:: Обрабатывает все исключения, связанные с HTTP-запросами (например, проблемы с сетью или недоступность сервера).

    • self.retry(exc=exc): Если возникает ошибка запроса, задача будет повторена автоматически с учетом настроек max_retries и default_retry_delay. Это позволяет избежать потери задач из-за временных проблем.

  • except Exception as e:: Обрабатывает любые другие исключения, которые могут возникнуть во время выполнения задачи.

  • return None: Возвращает None, если произошло любое другое исключение. Это может быть полезно для сигнализации о том, что задача завершилась неудачно без повторной попытки.

Наше приложение Celery полностью готово. Теперь мы можем запустить его в одном из трех режимов: Worker, Flower или Beat.

Для запуска в режиме Worker (основной режим, который нас больше всего интересует) необходимо в корне проекта (на один уровень выше папки app) ввести команду:

celery -A app.main worker -l INFO

Команда celery -A app.main worker -l INFO выполняет следующие действия:

  • Запуск воркера Celery: Команда инициирует запуск воркера, который будет обрабатывать задачи, находящиеся в очереди. Воркеры являются основными компонентами системы Celery и отвечают за выполнение асинхронных задач.

  • Определение приложения: Параметр -A app.main указывает на модуль, содержащий экземпляр приложения Celery. В данном случае это app.main, что подразумевает наличие объекта Celery внутри этого модуля.

  • Уровень логирования: Параметр -l INFO устанавливает уровень логирования на "INFO". Это означает, что воркер будет выводить информацию о своих действиях, включая полученные задачи и их статусы, что полезно для мониторинга работы воркера.

Таким образом, эта команда запускает воркер Celery, который будет слушать очередь задач и выполнять их в соответствии с определением в приложении, при этом предоставляя информацию о процессе выполнения через логи.

В Celery есть один важный аспект, о котором стоит упомянуть — это пул для выполнения задач. По умолчанию, если вы запускаете Celery с помощью команды celery -A app.main worker -l INFO, не указывая явно пул, будет запущен пул Prefork Pool.

Вот краткое описание существующих пулов:

1. Prefork Pool

Этот пул основан на многопроцессорности и использует модуль multiprocessing Python. Он создает несколько дочерних процессов для обработки задач.

  • Когда использовать: Рекомендуется для задач, которые являются CPU-bound, то есть требуют значительных вычислительных ресурсов. Это позволяет обойти блокировку GIL (Global Interpreter Lock) и эффективно использовать многопроцессорные системы.

  • Преимущества: Высокая производительность для CPU-bound задач, возможность обработки нескольких задач одновременно.

  • Недостатки: Более высокие накладные расходы на создание и управление процессами. Некорректно работает на операционной системе Windows!

2. Threads Pool

Этот пул использует ThreadPoolExecutor из стандартной библиотеки Python и управляет потоками на уровне операционной системы.

  • Когда использовать: Подходит для задач, которые являются I/O-bound, то есть тратят много времени на операции ввода-вывода (например, сетевые запросы).

  • Преимущества: Легче управлять по сравнению с процессами, меньше накладных расходов.

  • Недостатки: Из-за GIL только один поток может выполнять Python-код в любой момент времени, что ограничивает производительность для CPU-bound задач.

3. Eventlet Pool

Этот пул использует библиотеку eventlet, которая поддерживает асинхронное выполнение с помощью корутин.

  • Когда использовать: Подходит для задач, которые требуют высокой степени параллелизма и являются I/O-bound.

  • Преимущества: Эффективное использование ресурсов при выполнении большого количества операций ввода-вывода.

  • Недостатки: Может быть сложнее в отладке и требует использования совместимых библиотек.

4. Gevent Pool

Пул, основанный на библиотеке gevent, также поддерживает асинхронное выполнение с использованием зеленых потоков (greenlets).

  • Когда использовать: Подходит для I/O-bound задач с высокой параллельностью.

  • Преимущества: Эффективная работа с большим количеством одновременных соединений.

  • Недостатки: Как и eventlet, может требовать совместимости библиотек и сложнее в отладке.

5. Solo Pool

Этот пул выполняет задачи в одном потоке без параллелизма. Он запускает задачи в том же процессе, что и воркер.

  • Когда использовать: Подходит для простых сценариев или отладки, когда не требуется параллельная обработка задач.

  • Преимущества: Нет накладных расходов на управление процессами или потоками; быстрый запуск задач.

  • Недостатки: Может обрабатывать только одну задачу за раз, что делает его неэффективным для высоконагруженных систем.

Как писал выше, Prefork Pool идет по умолчанию и если вы пишите код на Windows и там-же тестируете, то получите ошибку. Кроме того, не всегда есть смысл использовать сложные пулы. Например для текущей задачи я остановился на пуле SOLO, как на этапе разработки, так и на деплоя.

Для того чтоб указать другой пул достаточно к команде запуска добавить флаг -P и дать имя пула. Например:

celery -A app.main worker --loglevel=INFO  -P solo

Теперь, если хотите открыть монитор вам нужно будет запускать отдельное приложение. Копировать код нет необходимости и достаточно просто открыть отдельный терминал, активировать в нем виртуальное окружение и ввести команду:

celery -A app.main worker flower --port=5555

На монитор посмотрим чуть позже, когда запустим наше приложение.

Теперь можно приступать к FastApi приложению.

Пишем FastApi приложение

Под него тоже предлагаю создать отдельный проект. Структуру рассмотрим по ходу дела, так как файлов там будет чуть больше чем в проекте Celery.

В корне сразу создадим папку app, в которой будем писать код приложения и файлы: .env (переменные окружения), requirements.txt (зависимости), amvera.yml (если планируется деплой на Amvera Cloud).

Вот список зависимостей:

fastapi==0.115.6
uvicorn==0.32.1
pydantic==2.10.3
pydantic-settings==2.6.1
Jinja2==3.1.4
celery==5.4.0
redis==5.2.1
python-multipart==0.0.19
loguru==0.7.3

Устанавливаем:

pip install -r requirements.txt

Внимательный читатель может задать резонный вопрос: зачем мы снова устанавливаем Celery, если он уже работает в отдельном изолированном приложении? Позвольте объяснить.

Мы в FastApi не будем отдельно запускать Celery приложение, так как оно уже запущено. Но нам необходимо будет реализовать передачу задач от FastApi приложения в приложение Celery.

Redis, кстати, тут мы будем использовать не только в качестве брокера для Celery, но и в качестве внутренней базы данных просто, чтоб разнообразить постоянное использование PostgreSQL, как в моих прочих проектах.

Файл с переменными окружения (.env) можно продублировать. Отличий не будет. Например:

REDIS_PORT=6379
REDIS_PASSWORD=my_super_pass
REDIS_HOST=redisdb-yakvenalex.db-msk0.amvera.tech
BASE_URL=http://127.0.0.1:8000

В папке app создадим файл config.py и заполним его следующим образом:

import os
import ssl
import redis
from celery import Celery
from pydantic_settings import BaseSettings, SettingsConfigDict


class Settings(BaseSettings):
    REDIS_PORT: int
    REDIS_PASSWORD: str
    REDIS_HOST: str
    BASE_URL: str
    BASE_DIR: str = os.path.abspath(os.path.join(os.path.dirname(__file__), '..'))
    UPLOAD_DIR: str = os.path.join(BASE_DIR, 'app/uploads')
    STATIC_DIR: str = os.path.join(BASE_DIR, 'app/static')
    model_config = SettingsConfigDict(env_file=f"{BASE_DIR}/.env")


# Получаем параметры для загрузки переменных среды
settings = Settings()
redis_url = f"rediss://:{settings.REDIS_PASSWORD}@{settings.REDIS_HOST}:{settings.REDIS_PORT}/0"
celery_app = Celery("celery_worker", broker=redis_url, backend=redis_url)
ssl_options = {"ssl_cert_reqs": ssl.CERT_NONE}
celery_app.conf.update(broker_use_ssl=ssl_options, redis_backend_use_ssl=ssl_options)
redis_client = redis.Redis(host=settings.REDIS_HOST,
                           port=settings.REDIS_PORT,
                           db=0, password=settings.REDIS_PASSWORD,
                           ssl=True, ssl_cert_reqs=None)

Мы добавили две новые переменные: одну для хранения статических файлов на стороне клиента (CSS, JS и фотографии), а другую — для хранения файлов (UPLOAD_DIR).

Отдельно поговорим про строки:

celery_app = Celery("celery_worker", broker=redis_url, backend=redis_url)
ssl_options = {"ssl_cert_reqs": ssl.CERT_NONE}
celery_app.conf.update(broker_use_ssl=ssl_options, redis_backend_use_ssl=ssl_options)

С учетом того, что наша система подразумевает изоляцию между приложением FastAPI и Celery, некоторое дублирование кода становится неизбежным. Тем не менее, количество настроек в этом коде меньше по сравнению с оригинальной конфигурацией Celery. Это связано с тем, что мы повторно инициировали celery_app с одной конкретной задачей — уведомить воркеров о наличии задач для выполнения.

Таким образом, FastAPI не будет самостоятельно выполнять фоновые задачи. Вместо этого мы используем этот подход для того, чтобы Celery знал о существовании задач и мог их обрабатывать. Это позволяет поддерживать четкую архитектуру приложения и эффективно управлять фоновыми процессами.

Кроме того, так как мы будем использовать Redis отдельно в качестве базы данных FastApi мы отдельно инициируем клиента Redis при помощи строк:

redis_client = redis.Redis(host=settings.REDIS_HOST,
                           port=settings.REDIS_PORT,
                           db=0, password=settings.REDIS_PASSWORD,
                           ssl=True, ssl_cert_reqs=None)

В начале статьи мы уже частично рассмотрели этот пример, когда тестировали подключение к базе данных Redis.

После выполнения этих шагов, обращаясь к переменной redis_client на стороне FastApi, мы получим доступ к полноценной базе данных. В неё мы сможем помещать необходимые для работы данные, изменять их, извлекать и удалять по мере необходимости.

Для этих целей можно было использовать классическую табличную базу данных, например PostgreSQL в связке с SQLAlchemy, но мне кажется что пора разнообразить. Тем более у базы данных Redis есть свои преимущества — например скорость работы.

Теперь мы можем приступить к созданию методов нашего API. Для этого в папке app создайте новую папку api и заполните её двумя пустыми файлами:

  1. router.py — в этом файле будут описаны все эндпоинты API.

  2. utils.py — здесь мы реализуем простую утилиту.

Приступим к описанию API-методов (не путаем с методами для рендеринга фронта). Всего, в рамках данного проекта, мы реализуем 2 API метода: метод для загрузки файла со сроком жизни и метод для удаления файла.

Работаем с файлом app/api/router.py.

Выполним импорты:

import os
import datetime
from fastapi import APIRouter, UploadFile, HTTPException, Form
from loguru import logger
from app.api.utils import generate_random_string
from app.config import settings, celery_app, redis_client

Давайте сразу реализуем в файле utils.py утилиту.

import string
from random import choices


def generate_random_string(length: int) -> str:
    return ''.join(choices(string.ascii_letters, k=length))

Эта утилита предназначена для создания уникальных строковых идентификаторов, состоящих из латинских букв разного регистра. Я постараюсь объяснить, как они будут использоваться в нашей системе.

  1. Пользователь отправляет файл

  2. После того, как FastApi прочитает файл он под него сгенерирует 2 идентификатора: айди файла и айди для удаления файла. В этом нам поможет утилита, которую мы создали.

Идентификатор файла будет применяться в нескольких задачах: для создания имени файла на основе его идентификатора и формата, в качестве ключа в Redis, для получения информации о файле по его идентификатору, а также как один из обязательных параметров при удалении файла.

Что касается идентификатора для удаления файла, я добавил его в систему в качестве дополнительной защиты. Этот идентификатор препятствует удалению файла лицами, не имеющими права на это. Чтобы удалить файл раньше запланированного времени в Celery, необходимо указать не только идентификатор самого файла, но и идентификатор его удаления.

FastAPI автоматически обернет эту синхронную функцию в асинхронную обертку, если она используется в асинхронном контексте (асинхронном эндпоинте). Это означает, что вы можете безопасно использовать эту функцию в ваших асинхронных FastAPI маршрутах не усложняя.

Вернемся к файлу api/router.py

Назначим сам роутер:

router = APIRouter(tags=['API'])

Теперь опишем метод для загрузки файла и после разберемся с ним.

@router.post("/api/upload/")
async def upload_file(file: UploadFile, expiration_minutes: int = Form(...)):
    try:
        # Прочитать загруженный файл
        file_content = await file.read()

        max_file_size = 5 * 1024 * 1024  # 5 МБ в байтах
        if len(file_content) > max_file_size:
            raise HTTPException(status_code=413, detail="Превышен максимальный размер файла (5 МБ).")

        upload_dir = settings.UPLOAD_DIR
        total_size = sum(os.path.getsize(os.path.join(upload_dir, f)) for f in os.listdir(upload_dir) if os.path.isfile(os.path.join(upload_dir, f)))
        max_total_size = 100 * 1024 * 1024  # 100 МБ в байтах
        if total_size + len(file_content) > max_total_size:
            raise HTTPException(status_code=507, detail="Превышен общий лимит размера файлов (100 МБ). Освободите место и повторите попытку.")

        start_file_name = file.filename
        # Сгенерировать уникальное имя файла и ID для удаления
        file_extension = os.path.splitext(file.filename)[1]
        file_id = generate_random_string(12)
        dell_id = generate_random_string(12)

        # Сохранить файл на диск
        file_path = os.path.join(settings.UPLOAD_DIR, file_id + file_extension)
        with open(file_path, "wb") as f:
            f.write(file_content)

        # Рассчитать время истечения в секундах
        expiration_seconds = expiration_minutes * 60
        expiration_time = datetime.datetime.now(datetime.timezone.utc) + datetime.timedelta(seconds=expiration_seconds)

        # Запланировать задачу для удаления файла после истечения времени
        celery_app.send_task('delete_file_scheduled', args=[file_id, dell_id], countdown=expiration_seconds)

        # URL-адреса для метаданных
        download_url = f"{settings.BASE_URL}/files/{file_id + file_extension}"
        view_url = f"{settings.BASE_URL}/view_file/{file_id}"

        # Сохранить метаданные в Redis
        redis_key = f"file:{file_id}"  # Уникальный ключ для файла
        redis_client.hmset(redis_key, {"file_path": file_path,
                                       "dell_id": dell_id,
                                       "download_url": download_url,
                                       "expiration_time": int(expiration_time.timestamp()),
                                       "start_file_name": start_file_name})

        return {
            "message": "Файл успешно загружен",
            "file_id": file_id,
            "dell_id": dell_id,
            "download_url": download_url,
            "view_url": view_url,
            "expiration_time": expiration_time.isoformat(),
            "expiration_seconds": expiration_seconds
        }
    except HTTPException as e:
        raise e
    except Exception as e:
        raise HTTPException(status_code=500, detail=f"Ошибка загрузки файла: {str(e)}")

Подробный разбор функции upload_file для загрузки файлов

Функция upload_file отвечает за обработку загруженных пользователем файлов и реализует логику их хранения, удаления и предоставления ссылок для доступа.

Определение маршрута и параметров функции

@router.post("/api/upload/")
async def upload_file(file: UploadFile, expiration_minutes: int = Form(...)):
  • @router.post("/api/upload/"): Указывает маршрут для обработки HTTP POST-запросов по адресу /api/upload/.

  • async def upload_file: Асинхронная функция, что позволяет обрабатывать файлы без блокировки выполнения других задач.

  • Параметры:

    • file: UploadFile — загружаемый файл.

    • expiration_minutes: int = Form(...) — время хранения файла, переданное в теле запроса в виде данных формы.

Основные этапы работы функции

  1. Чтение файла и сохранение оригинального имени

    file_content = await file.read()
    start_file_name = file.filename
    
    • Содержимое файла асинхронно считывается и сохраняется.

    • Оригинальное имя сохраняется для использования в метаданных.

  2. Генерация идентификаторов и сохранение файла

    file_extension = os.path.splitext(file.filename)[1]
    file_id = generate_random_string(12)
    dell_id = generate_random_string(12)
    file_path = os.path.join(settings.UPLOAD_DIR, file_id + file_extension)
    with open(file_path, "wb") as f:
        f.write(file_content)
    
    • Генерируются уникальные идентификаторы для файла (file_id) и его удаления (dell_id).

    • Файл сохраняется на диск с уникальным именем, содержащим идентификатор и исходное расширение.

  3. Расчет времени истечения и планирование удаления

    expiration_seconds = expiration_minutes * 60
    expiration_time = datetime.datetime.now(datetime.timezone.utc) + datetime.timedelta(seconds=expiration_seconds)
    celery_app.send_task('delete_file_scheduled', args=[file_id, dell_id], countdown=expiration_seconds)
    
    • expiration_seconds: Время истечения файла преобразуется из минут в секунды.

    • expiration_time: Рассчитывается точный момент истечения файла.

    • Через Celery отправляется задача на удаление файла. Задача будет выполнена через указанное время (countdown).

  4. Формирование ссылок на файл

    download_url = f"{settings.BASE_URL}/files/{file_id + file_extension}"
    view_url = f"{settings.BASE_URL}/view_file/{file_id}"
    
    • Генерируются ссылки для скачивания и просмотра файла. Эти ссылки позже отправляются клиенту.

  5. Сохранение метаданных в Redis

    redis_key = f"file:{file_id}"
    redis_client.hmset(redis_key, {
        "file_path": file_path,
        "dell_id": dell_id,
        "download_url": download_url,
        "expiration_time": int(expiration_time.timestamp()),
        "start_file_name": start_file_name
    })
    
    • Уникальный ключ (redis_key) формируется для хранения метаданных.

    • Сохраняются данные о пути к файлу, ID для удаления, ссылки, время истечения и оригинальное имя.

  6. Возврат успешного ответа

    return {
        "message": "Файл успешно загружен",
        "file_id": file_id,
        "dell_id": dell_id,
        "download_url": download_url,
        "view_url": view_url,
        "expiration_time": expiration_time.isoformat(),
        "expiration_seconds": expiration_seconds
    }
    

    Клиент получает JSON-ответ, содержащий:

    • Подтверждение успешной загрузки.

    • Метаданные: идентификаторы, ссылки, время истечения.

Обновление: В рамках учебного проекта добавлены ограничения:

  1. Максимальный размер загружаемого файла составляет 5 МБ.

  2. Общий объем всех файлов в хранилище ограничен 100 МБ.

Эти меры позволили сократить нагрузку на систему и избежать дополнительных затрат на хостинг. Данный проект носит исключительно демонстрационный характер, и он не претендует на звание полноценного файлообменника. Его цель — показать, как связка FastAPI, Redis и Celery может быть использована для обработки файлов и управления их жизненным циклом.

Логика взаимодействия FastAPI и Celery

Пример связки FastAPI и Celery иллюстрирует, как с использованием Redis (как брокера задач) передаются данные между компонентами системы:

  • celery_app.send_task(...) передает задачу delete_file_scheduled.

  • Задача получает ID файла и ID удаления (file_id, dell_id).

  • Через параметр countdown указывается время задержки выполнения.

Результат:

  1. Файл загружен и сохранен.

  2. Сформированы уникальные идентификаторы.

  3. Рассчитано время истечения.

  4. Удаление запланировано через Celery.

  5. Клиент получает ссылки для доступа к файлу.

Теперь напишем метод для удаления файла с сервера:

@router.delete("/delete/{file_id}/{dell_id}")
async def delete_file(file_id: str, dell_id: str):
    redis_key = f"file:{file_id}"
    file_info = redis_client.hgetall(redis_key)

    if not file_info:
        raise HTTPException(status_code=404, detail="Файл не найден")

    dell_id_redis = file_info.get(b"dell_id").decode()
    if dell_id_redis != dell_id:
        raise HTTPException(status_code=403, detail="Не совпадает айди удаления с айди удаления файла")

    file_path = file_info.get(b"file_path").decode()

    # Удаление файла и очистка записи в Redis
    try:
        if os.path.exists(file_path):
            os.remove(file_path)
            logger.info(f"Файл {file_path} успешно удален!")
        else:
            logger.warning(f"Файл {file_path} не найден.")

        redis_client.delete(redis_key)
        return {"message": "Файл успешно удален и запись в Redis очищена!"}

    except OSError as e:
        logger.error(f"Error deleting file {file_path}: {str(e)}")
        raise HTTPException(status_code=500, detail=f"Ошибка удаления файла: {str(e)}")

Давайте внимательно рассмотрим и обсудим код функции delete_file, которая отвечает за удаление файлов из вашего приложения.

@router.delete("/delete/{file_id}/{dell_id}")
async def delete_file(file_id: str, dell_id: str):
  
  • @router.delete("/delete/{file_id}/{dell_id}"): Декоратор, который определяет маршрут для HTTP DELETE запроса на /delete/{file_id}/{dell_id}. Это означает, что функция будет вызываться при отправке DELETE-запроса на этот URL, где {file_id} и {dell_id} — это параметры пути.

  • async def delete_file(file_id: str, dell_id: str): Определение асинхронной функции delete_file, которая принимает два параметра: file_id (идентификатор файла) и dell_id (идентификатор удаления).

Получение информации о файле

redis_key = f"file:{file_id}"
file_info = redis_client.hgetall(redis_key)
  • redis_key = f"file:{file_id}": Создает уникальный ключ для доступа к метаданным файла в Redis, используя идентификатор файла.

  • file_info = redis_client.hgetall(redis_key): Извлекает все метаданные файла из Redis по заданному ключу. Возвращает словарь с данными файла или пустой результат, если файл не найден.

На этом этапе вы видите, что взаимодействие с не табличной базой данных не сильно отличается от взаимодействия с табличной. То есть, если бы нам нужно было достать информацию с PostgreSQL мы бы тоже взяли бы айди записи и по ней запросили информацию.

Проверка существования файла:

if not file_info:
    raise HTTPException(status_code=404, detail="File not found")
  • if not file_info:: Проверяет, были ли найдены метаданные для данного файла. Если file_info пустой, это означает, что файл не существует.

  • raise HTTPException(...): Если файл не найден, вызывается исключение HTTP с кодом состояния 404 (не найдено) и сообщением об ошибке.

Проверка идентификатора удаления

dell_id_redis = file_info.get(b"dell_id").decode()
if dell_id_redis != dell_id:
    raise HTTPException(status_code=403, detail="Invalid deletion ID")
  • dell_id_redis = file_info.get(b"dell_id").decode(): Извлекает идентификатор удаления из метаданных файла и декодирует его из байтового формата в строку.

  • if dell_id_redis != dell_id:: Сравнивает идентификатор удаления из Redis с переданным в запросе. Если они не совпадают, это может означать попытку несанкционированного удаления.

  • raise HTTPException(...): Если идентификаторы не совпадают, вызывается исключение HTTP с кодом состояния 403 (доступ запрещен) и соответствующим сообщением.

Удаление файла и очистка записи в Redis

file_path = file_info.get(b"file_path").decode()
  • file_path = file_info.get(b"file_path").decode(): Извлекает путь к файлу из метаданных и декодирует его.

Попытка удалить файл

if os.path.exists(file_path):
    os.remove(file_path)
    logger.info(f"Файл {file_path} успешно удален!")
else:
    logger.warning(f"Файл {file_path} не найден.")
  • if os.path.exists(file_path):: Проверяет, существует ли файл по указанному пути.

Далее идет обработка ошибок. С этим должно быть все понятно.

Функция delete_file предназначена для безопасного удаления файлов из системы. Перед выполнением операции она проверяет, существует ли файл и соответствует ли идентификатор удаления указанному. Если удаление прошло успешно, функция очищает запись в Redis и возвращает клиенту подтверждение. В случае возникновения ошибок предусмотрена обработка исключений, а также ведение соответствующих логов и отправка ответов клиенту.

Основная особенность в том, что этот метод будет использовать, как Celery для отправки запроса:

@celery_app.task(
    name='delete_file_scheduled',
    bind=True,
    max_retries=3,
    default_retry_delay=5
)
def delete_file_scheduled(self, file_id, dell_id):
    """Задача для отложенного удаления файла"""
    try:
        response = requests.delete(f"{settings.BASE_URL}/delete/{file_id}/{dell_id}")
        response.raise_for_status()
        return response.status_code
    except requests.RequestException as exc:
        self.retry(exc=exc)
    except Exception as e:
        return None

Так и автор файла, который захочет удалить файл раньше времени срока его жизни на нашем сервере.

Реализация фронтенда

Под фронтенд нам нужны будут, как методы FastApi для рендеринга, так и файлы HTML + CSS + JS для того чтоб было что рендерить. Начнем с файлов фронта.

Наше приложение будет подразумевать наличие трех страниц:

  1. Главная страница с возможностью привязать файл, выбрать его время жизни  и с возможностью его отправить на сервер

  2. Страница с просмотром файла

  3. Страница с «Файл удален или не найден»

Как обычно, тут я не буду разбирать полный код фронтенд части. Кому будет интересно посмотреть полный код всего приложения, в том числе и фронтенд-части, переходите в мой телеграмм канал «Легкий путь в Python». Там вы найдете не только исходники моих проектов, но и тот контент, который я не публикую на Хабре.

Для организации хранения статических файлов в корневом каталоге приложения мы создаем папку static. Внутри этой папки мы организуем три подпапки: style, js и img. Первая содержит файлы стилей (css), вторая — файлы JavaScript, а третья — фотографии.

Затем мы отдельно зарегистрируем эту папку, чтобы наше приложение FastApi могло правильно взаимодействовать со статическими файлами.

Теперь создадим в папке app папку templates. В ней мы будем хранить HTML-шаблоны наших страниц.

Опишем первый шаблон и дадим ему имя home.html.

Скрытый текст
<!DOCTYPE html>
<html lang="ru">
<head>
    <meta charset="UTF-8">
    <meta name="viewport" content="width=device-width, initial-scale=1.0">
    <title>Временное файловое хранилище</title>
    <link href="/static/style/home.css" rel="stylesheet" type="text/css">
</head>
<body>
<div class="upload-container">
    <div class="upload-area" id="dropZone">
        <svg xmlns="http://www.w3.org/2000/svg" fill="none" viewBox="0 0 24 24" stroke="currentColor">
            <path stroke-linecap="round" stroke-linejoin="round" stroke-width="2"
                  d="M7 16a4 4 0 01-.88-7.903A5 5 0 1115.9 6L16 6a5 5 0 011 9.9M15 13l-3-3m0 0l-3 3m3-3v12"/>
        </svg>
        <h2>Перетащите файл сюда</h2>
        <p>или</p>
        <button type="button" id="selectFile">Выберите файл</button>
        <input type="file" id="fileInput" style="display: none">
    </div>

    <div class="file-info" id="fileInfo" style="display: none;">
        <p>Выбранный файл: <span id="fileName"></span></p>
        <p>Размер: <span id="fileSize"></span></p>
    </div>

    <select id="lifetime">
        {% for item in lifetime_list %}
        <option value="{{ item.value }}">{{ item.text }}</option>
        {% endfor %}
    </select>

    <button id="uploadBtn" disabled>Отправить файл</button>
    <div class="error" id="errorMsg" style="display: none;"></div>
    <div id="file-link"></div>
</div>

<!-- Модальное окно -->
<div id="myModal" class="modal">
    <div class="modal-content">
        <span class="close">&times;</span>
        <h2>Ваш файл будет удален...</h2>
        <p id="deletionTime"></p>
        <button id="shareLinkBtn">Поделиться файлом</button>
        <button id="deleteFileBtn">Удалить файл</button>
    </div>
</div>

<script>
    тут описание JavaScript логики — разберем далее.
</script>
</body>
</html>

JavaScript код этой страницы разберем далее. Сейчас рассмотрим общую логику HTML-документа.

В целом тут все просто. Внимания заслуживает, возможно, эти строки:

<select id="lifetime">
        {% for item in lifetime_list %}
        <option value="{{ item.value }}">{{ item.text }}</option>
        {% endfor %}
</select>

Тут я воспользовался шаблонизатором Jinja2 чтоб отобразить варианты с выбором срока жизни файлов. Они будут передаваться со стороны сервера, а Jinja2 их просто отрисует.

Кроме того, есть у нас модальное окно:

<div id="myModal" class="modal">
    <div class="modal-content">
        <span class="close">&times;</span>
        <h2>Ваш файл будет удален...</h2>
        <p id="deletionTime"></p>
        <button id="shareLinkBtn">Поделиться файлом</button>
        <button id="deleteFileBtn">Удалить файл</button>
    </div>
</div>

По умолчанию оно выключено, но как только пользователь выполнит загрузку файлов он увидит это окно с кнопками «Поделиться файлом» и «Удалить файл».

При клике на «Удалить файл» выполнится тот запрос, который выполняет Celery в своей отложенной задачи, просто раньше.

При клике на «Поделиться файлом» в буфер обмена попадает ссылка на просмотр файла. То есть, тут мы даем ссылку именно на просмотр файла, а не на его загрузку. Ссылку на загрузку мы предоставим на странице просмотра.

Теперь разберем JavaScript, который будет иметь непосредственное отношение к теме сегодняшней статьи.

uploadBtn.addEventListener('click', async () => {
        const file = fileInput.files[0];
        if (!file) return;

        const lifetime = document.getElementById('lifetime').value;

        // Создаем объект FormData и добавляем файл и время истечения
        const formData = new FormData();
        formData.append('file', file);
        formData.append('expiration_minutes', lifetime);

        uploadBtn.disabled = true;
        uploadBtn.textContent = 'Загрузка...';

        try {
            // Замените URL на реальный endpoint вашего сервера
            const response = await fetch('/api/upload/', {
                method: 'POST',
                body: formData
            });

            if (response.ok) {
                const data = await response.json();
                fileLink.textContent = `Ссылка на файл: ${data.download_url}`;
                fileLink.style.display = 'block';

                // Открываем модальное окно с информацией о времени удаления
                const expirationTimeDate = new Date(data.expiration_time);
                deletionTimeText.textContent = `Файл будет удален ${expirationTimeDate.toLocaleString()}`;
                modal.style.display = "block";

                // Обработчик для кнопки "Поделиться файлом"
                document.getElementById("shareLinkBtn").onclick = () => {
                    navigator.clipboard.writeText(data.view_url).then(() => {
                        alert("Ссылка на файл скопирована в буфер обмена!");
                    });
                };

                // Обработчик для кнопки "Удалить файл"
                document.getElementById("deleteFileBtn").onclick = async () => {
                    try {
                        const deleteResponse = await fetch(`/delete/${data.file_id}/${data.dell_id}`, { method: 'DELETE' });
                        if (deleteResponse.ok) {
                            alert("Файл успешно удален!");
                            modal.style.display = "none"; // Закрываем модальное окно
                            location.reload();
                        } else {
                            alert("Ошибка при удалении файла.");
                        }
                    } catch (error) {
                        console.error("Ошибка при удалении файла:", error);
                    }
                };

            } else {
                showError('Ошибка при загрузке файла');
            }

         } catch (error) {
             showError('Ошибка при загрузке файла');
         } finally {
             uploadBtn.disabled = false;
             uploadBtn.textContent = 'Отправить файл';

             // Сброс состояния после загрузки
             fileInput.value = '';
             fileInfo.style.display = 'none';
             uploadBtn.disabled = true;
             errorMsg.style.display = 'none';
         }
     });

Давайте кратко разберем код, который обрабатывает загрузку файла на сервер и управление его удалением.

Основные шаги кода

  1. Обработчик события нажатия кнопки загрузки:

    uploadBtn.addEventListener('click', async () => { ... });
    
    • При нажатии на кнопку uploadBtn запускается асинхронная функция.

  2. Получение файла и времени хранения:

    const file = fileInput.files[0];
    const lifetime = document.getElementById('lifetime').value;
    
    • Извлекается выбранный файл из элемента fileInput и время хранения файла в минутах.

  3. Создание объекта FormData:

    const formData = new FormData();
    formData.append('file', file);
    formData.append('expiration_minutes', lifetime);
    
    • Создается объект FormData, в который добавляются файл и время хранения.

  4. Изменение состояния кнопки загрузки:

    uploadBtn.disabled = true;
    uploadBtn.textContent = 'Загрузка...';
    
    • Кнопка отключается и изменяется текст на "Загрузка...".

  5. Отправка запроса на сервер:

    const response = await fetch('/api/upload/', {
        method: 'POST',
        body: formData
    });
    
    • Выполняется POST-запрос к серверу для загрузки файла.

  6. Обработка ответа от сервера:

    if (response.ok) { ... } else { showError('Ошибка при загрузке файла'); }
    
    • Если ответ успешный, выводится ссылка на загруженный файл и открывается модальное окно с информацией о времени удаления.

    • Если произошла ошибка, вызывается функция showError.

  7. Кнопка "Поделиться файлом":

    document.getElementById("shareLinkBtn").onclick = () => { ... };
    
    • При нажатии копируется ссылка на файл в буфер обмена с уведомлением.

  8. Кнопка "Удалить файл":

    document.getElementById("deleteFileBtn").onclick = async () => { ... };
    
    • При нажатии выполняется DELETE-запрос для удаления файла с сервера. В случае успеха показывается уведомление, а модальное окно закрывается.

  9. Обработка ошибок:

    catch (error) { showError('Ошибка при загрузке файла'); }
    
    • В случае ошибки при загрузке файла выводится сообщение об ошибке.

  10. Сброс состояния после завершения загрузки:

    finally {
        uploadBtn.disabled = false;
        uploadBtn.textContent = 'Отправить файл';
        fileInput.value = '';
        fileInfo.style.display = 'none';
        errorMsg.style.display = 'none';
    }
    
    • В блоке finally восстанавливается состояние кнопки, очищается поле ввода файла и скрываются элементы интерфейса.

Код реализует функциональность загрузки файлов на сервер, отображает ссылки для доступа к загруженным файлам, а также позволяет пользователю делиться ссылками и удалять файлы по запросу. Обработка ошибок и изменение состояния интерфейса обеспечивают удобство использования.

Давайте посмотрим, как эта страница выглядит в реальной жизни.

Страница до того, как файл выбран.
Страница до того, как файл выбран.
Страница после того, как файл выбран.
Страница после того, как файл выбран.
Модальное окно после успешной отправки файла.
Модальное окно после успешной отправки файла.

Теперь давайте реализуем страницу с просмотром файла. Для этого создаем в папке app/templates файл file_info.html

Скрытый текст
<!DOCTYPE html>
<html>
<head>
    <meta charset="UTF-8">
    <meta name="viewport" content="width=device-width, initial-scale=1.0">
    <title>Просмотр файла</title>
    <link href="/static/style/file_info.css" rel="stylesheet" type="text/css">
</head>
<body>
<div class="file-container">
    <div class="countdown" id="countdown" data-expiration="{{ expiration_time }}">
        Файл будет удален через <span id="timer">...</span> секунд
    </div>

    <div class="file-details">
        <svg class="file-icon" xmlns="http://www.w3.org/2000/svg" fill="none" viewBox="0 0 24 24" stroke="currentColor">
            <path stroke-linecap="round" stroke-linejoin="round" stroke-width="2" d="M7 21h10a2 2 0 002-2V9.414a1 1 0 00-.293-.707l-5.414-5.414A1 1 0 0012.586 3H7a2 2 0 00-2 2v14a2 2 0 002 2z"/>
        </svg>
        <div class="file-name" id="fileName">{{ start_file_name }}</div>
    </div>

    <a href="{{ download_url }}" class="download-btn" id="downloadBtn">
        <svg class="download-icon" xmlns="http://www.w3.org/2000/svg" fill="none" viewBox="0 0 24 24" stroke="currentColor">
            <path stroke-linecap="round" stroke-linejoin="round" stroke-width="2" d="M4 16v1a3 3 0 003 3h10a3 3 0 003-3v-1m-4-4l-4 4m0 0l-4-4m4 4V4"/>
        </svg>
        Скачать файл
    </a>
    <a href="/" class="home-btn" id="homeBtn">
        <svg class="home-icon" xmlns="http://www.w3.org/2000/svg" fill="none" viewBox="0 0 24 24" stroke="currentColor">
            <path stroke-linecap="round" stroke-linejoin="round" stroke-width="2" d="M3 10l9-7 9 7v11a2 2 0 01-2 2H5a2 2 0 01-2-2V10z" />
            <path stroke-linecap="round" stroke-linejoin="round" stroke-width="2" d="M9 21V9h6v12" />
        </svg>
        На главную
    </a>
</div>

<script>
document.addEventListener('DOMContentLoaded', () => {
    // Получаем время истечения из атрибута data-expiration
    const countdownElement = document.getElementById('countdown');
    const expirationTimestamp = parseInt(countdownElement.dataset.expiration, 10);

    // Текущее время в секундах с начала эпохи
    const nowTimestamp = Math.floor(Date.now() / 1000);

    // Вычисляем оставшееся время
    const remainingTime = expirationTimestamp - nowTimestamp;

    if (remainingTime > 0) {
        startCountdown(remainingTime);
    } else {
        showError('Время жизни файла истекло');
        document.getElementById('downloadBtn').style.display = 'none';
         setTimeout(function() {
        location.reload();
    }, 2000); // 2000 миллисекунд = 2 секунды
    }
});

function startCountdown(seconds) {
    const timerElement = document.getElementById('timer');
    let remainingTime = seconds;

    function updateTimer() {
        timerElement.textContent = remainingTime;
        if (remainingTime <= 0) {
            showError('Время жизни файла истекло');
            document.getElementById('downloadBtn').style.display = 'none';
            return;
        }
        remainingTime--;
        setTimeout(updateTimer, 1000);
    }

    updateTimer();
}

function showError(message) {
    const container = document.querySelector('.file-container');
    const errorDiv = document.createElement('div');
    errorDiv.style.color = '#dc2626';
    errorDiv.style.marginTop = '1rem';
    errorDiv.textContent = message;
    container.appendChild(errorDiv);
}
</script>
</body>
</html>

Тут я привел его полную реализацию, так как не особо много кода.

Если коротко, то его логику можно описать так:

  1. Пользователь переходит по ссылке просмотра файла

  2. Перед загрузкой страницы сервер из Redis извлекает данные о файле (чуть позже реализуем)

  3. Данные от сервера в приличном виде отображаются на странице.

Дополнительно на странице появляется кнопка «На главную», при клике на которую пользователь возвращается на страницу загрузки файла.

Для большей динамики я сделал отображение срока жизни файла в виде счетчика секунд. Получается следующее. Сервер возвращает срок, когда файл будет удален. JavaScript подхватывает это значение и начинает обратный отчет. Выглядит это так:

Обратите внимание, здесь мы указали первоначальное имя файла, однако на сервере он хранится под другим названием. Чтобы убедиться в этом, нажмите на кнопку «Скачать файл».

Теперь давайте создадим страницу с ошибкой «Файл не найден или удалён». Назовём этот файл file_not_found.html:

Скрытый текст
<!DOCTYPE html>
<html>
<head>
    <meta charset="UTF-8">
    <meta name="viewport" content="width=device-width, initial-scale=1.0">
    <title>Файл не найден</title>
    <link href="/static/style/file_info.css" rel="stylesheet" type="text/css">
</head>
<body>
<div class="file-container">
    <div class="error-message" id="errorMessage">
        Файл с запрашиваемым ID удалён или никогда не существовал.
    </div>

    <a href="/" class="home-btn" id="homeBtn">
        <svg class="home-icon" xmlns="http://www.w3.org/2000/svg" fill="none" viewBox="0 0 24 24" stroke="currentColor">
            <path stroke-linecap="round" stroke-linejoin="round" stroke-width="2" d="M3 10l9-7 9 7v11a2 2 0 01-2 2H5a2 2 0 01-2-2V10z" />
            <path stroke-linecap="round" stroke-linejoin="round" stroke-width="2" d="M9 21V9h6v12" />
        </svg>
        На главную
    </a>
</div>
</body>
</html>

Тут я использовал те же стили.

Теперь давайте реализуем эндпоинты для рендеринга этих страниц.

Для этого в папке app создаем папку pages, а внутри нее файл router.py и начинаем работать с этим файлом.

Импорты и настройка:

from fastapi import APIRouter, HTTPException
from fastapi.templating import Jinja2Templates
from fastapi.requests import Request
from fastapi.responses import HTMLResponse

from app.api.utils import get_file_data
from app.config import redis_client

router = APIRouter(prefix='', tags=['ФРОНТ'])
templates = Jinja2Templates(directory='app/templates')

Тут важно импортировать не только роутер, но и шаблонизатор. О том как это все работает я подробно рассказывал в статье «Создание собственного API на Python (FastAPI): Подключаем фронтенд и статические файлы».

Теперь опишем метод для рендеринга главной страницы приложения:

@router.get("/", response_class=HTMLResponse)
async def read_root(request: Request):
    lifetime_list = [
        {'value': 1, 'text': "1 минута"},
        {'value': 5, 'text': "5 минут"},
        {'value': 15, 'text': "15 минут"},
        {'value': 30, 'text': "30 минут"},
        {'value': 60, 'text': "1 час"},
        {'value': 180, 'text': "3 часа"},
        {'value': 720, 'text': "12 часов"},
        {'value': 1440, 'text': "24 часа"},
    ]

    return templates.TemplateResponse("home.html", {"request": request,
                                                    "lifetime_list": lifetime_list})

Всё предельно просто. В реальных условиях список lifetime_list хранился бы в базе данных, и администратор имел бы возможность вносить в него изменения. Однако я решил не использовать базу данных и создать список вручную, передав его в отдельной переменной в функцию lifetime_list.

Напоминаю, что при помощи специальной конструкции на главной странице мы пробегаемся по этому списку и формируем выпадающий список (select).

<select id="lifetime">
        {% for item in lifetime_list %}
        <option value="{{ item.value }}">{{ item.text }}</option>
        {% endfor %}
</select>

Теперь рассмотрим второй endpoint, который будет отображать страницу с файлом или страницу с сообщением «файл не найден», если файл был удалён или если в переданном файле_id была обнаружена ошибка.

@router.get("/view_file/{file_id}", response_class=HTMLResponse)
async def get_file_info(request: Request, file_id: str):
    redis_key = f"file:{file_id}"
    file_info = redis_client.hgetall(redis_key)

    if not file_info:
        return templates.TemplateResponse(
            "file_not_found.html",
            {
                "request": request,
                "file_id": file_id
            }
        )

    # Извлекаем и декодируем значения из Redis
    file_path = file_info.get(b"file_path").decode()
    download_url = file_info.get(b"download_url").decode()
    expiration_time = int(file_info.get(b"expiration_time").decode())  # Преобразуем в int
    start_file_name = file_info.get(b"start_file_name").decode()

    # Передаем expiration_time как timestamp
    return templates.TemplateResponse(
        "file_info.html",
        {
            "request": request,
            "file_id": file_id,
            "file_path": file_path,
            "download_url": download_url,
            "expiration_time": expiration_time,  # Передаем как timestamp
            "start_file_name": start_file_name,
        }
    )

Тут вы видите уже знакомый подход по извлечению информации о файле из Redis, используя идентификатор файла.

Далее, если Redis не вернул информации о файле, то мы возвращаем страницу «Файл не найден» иначе мы извлекаем важные для работы страницы о файле данные и передаем их на фронтенд:

return templates.TemplateResponse(
        "file_info.html",
        {
            "request": request,
            "file_id": file_id,
            "file_path": file_path,
            "download_url": download_url,
            "expiration_time": expiration_time,  # Передаем как timestamp
            "start_file_name": start_file_name,
        }
    )

Теперь нам остается только настроить main-файл приложения и можно будет переходить к тестированию.

Создаем файл app/main.py и заполняем его следующим образом:

import os
from fastapi import FastAPI
from fastapi.staticfiles import StaticFiles
from fastapi.middleware.cors import CORSMiddleware
from app.api.router import router as router_api
from app.pages.router import router as router_pages
from app.config import settings


app = FastAPI()
# Добавляем middleware для CORS
app.add_middleware(
    CORSMiddleware,
    allow_origins=["*"],  # Разрешаем все источники
    allow_credentials=True,
    allow_methods=["*"],  # Разрешаем все методы
    allow_headers=["*"],  # Разрешаем все заголовки
)
os.makedirs(settings.UPLOAD_DIR, exist_ok=True)


app.mount("/files", StaticFiles(directory=settings.UPLOAD_DIR), name="files")
app.mount("/static", StaticFiles(directory=settings.STATIC_DIR), name="static")


app.include_router(router_api)
app.include_router(router_pages)

Давайте рассмотрим код подробнее и выделим ключевые моменты:

Общая структура приложения

  1. Импорт необходимых модулей:

    import os
    from fastapi import FastAPI
    from fastapi.staticfiles import StaticFiles
    from fastapi.middleware.cors import CORSMiddleware
    from app.api.router import router as router_api
    from app.pages.router import router as router_pages
    from app.config import settings
    
    • Мы импортируем необходимые модули и классы для работы с FastAPI, включая поддержку статических файлов и CORS (Cross-Origin Resource Sharing).

    • Также происходит импорт маршрутизаторов для API и страниц, а также конфигурационных настроек.

  2. Создание экземпляра приложения:

    app = FastAPI()
    
    • Создается экземпляр приложения FastAPI, который будет использоваться для определения маршрутов и обработки запросов.

Настройка CORS

app.add_middleware(
    CORSMiddleware,
    allow_origins=["*"],  # Разрешаем все источники
    allow_credentials=True,
    allow_methods=["*"],  # Разрешаем все методы
    allow_headers=["*"],  # Разрешаем все заголовки
)
  • CORS Middleware: Добавление middleware для управления CORS позволяет вашему приложению принимать запросы из других источников.

  • allow_origins=["*"]: Разрешает запросы от всех источников. Это может быть полезно в процессе разработки, но в продакшене стоит ограничить этот список конкретными доменами.

  • allow_methods и allow_headers: Позволяют использовать все HTTP методы и заголовки, что также удобно для разработки, но может потребовать более строгой настройки в производственной среде.

Создание директорий

os.makedirs(settings.UPLOAD_DIR, exist_ok=True)
  • Эта строка создает директорию для загрузки файлов, если она еще не существует. Это полезно для обеспечения наличия необходимых папок перед тем, как приложение начнет обрабатывать запросы на загрузку файлов.

Настройка статических файлов

app.mount("/files", StaticFiles(directory=settings.UPLOAD_DIR), name="files")
app.mount("/static", StaticFiles(directory=settings.STATIC_DIR), name="static")
  • app.mount(...): Эти строки монтируют статические директории в приложение.

    • /files: Путь для доступа к загруженным файлам.

    • /static: Путь для доступа к статическим ресурсам (например, CSS, JS изображениям).

  • Использование StaticFiles позволяет FastAPI обслуживать статические файлы напрямую без необходимости создания дополнительных маршрутов.

Подключение маршрутизаторов

app.include_router(router_api)
app.include_router(router_pages)
  • Эти строки подключают маршрутизаторы из различных модулей (API и страниц) к основному приложению. Это помогает организовать код и разделить логику на отдельные компоненты.

  • Это также улучшает читаемость и поддержку кода, позволяя легко добавлять или изменять маршруты в будущем.

Для запуска приложения воспользуемся командой:

uvicorn app.main:app

Вот как выглядит приложение в динамике:

Теперь нам остается последний штрих — выполнить деплой приложения FastApi и Celery.

Деплой Celery и FastApi на Amvera Cloud

Начнем с удаленного запуска FastApi приложения.

Подготовим шаблонные настройки для запуска. Их мы описываем в корне проекта, в файле с именем amvera.yml:

meta:
  environment: python
  toolchain:
    name: pip
    version: 3.12
build:
  requirementsPath: requirements.txt
run:
  persistenceMount: /data
  containerPort: 8000
  command: uvicorn app.main:app --host 0.0.0.0 --port 8000

Если коротко, то этим файлом мы даем понять сервису Amvera как запускать наше приложение.

Теперь деплой будет сводится к следующему:

  1. Создаем новый проект в Amvera Cloud

  2. Доставляем в него файлы FastApi приложения (можно через интерфейс на сайте или через команды GIT)

  3. Подкидываем проекту бесплатное доменное имя https (одна из фишек Amvera).

Давайте теперь пошагово:

  1. Создать проект → даем имя, выбираем приложение (не база данных), жмем на далее

  2. На втором экране вы увидите инструкцию по доставке файлов через GIT и будет вкладка «Через интерфейс». Я выбрал ее и просто перетянул файлы проекта. После жмем на далее.

  3. Проверяем, чтоб корректно были введены настройки, если нет, то вносим правки прямо на этом экране и кликаем на завершить.

Теперь нам необходимо сделать 2 вещи:

  1. Привязать бесплатное доменное имя к проекту (можете, кстати, привязать свое собственное доменное имя, на Amvera это в пару кликов мышкой).

  2. Добавить в .env файл переменную BASE_URL (бесплатное доменное имя с https более чем подойдет)

  3. Пересобрать проект, чтоб к нему корректно привязалось и доменное имя и файл .env (буквально 1 клик мышкой)

Для того чтоб добавить бесплатное доменное имя:

  1. Переходим в проект FastApi

  2. Кликаем на «Настройки»

  3. Привязываем:

Теперь переключаемся на вкладку «Репозиторий», удаляем файл .env и отправляем файл .env уже с обновленной переменной BASE_SITE.

Далее кликаем на кнопку «Пересобрать проект» (верхняя стрелка на скрине выше). Там, буквально через 2-3 минуты вы должны будете увидеть, что приложение запущено. После этого ваше FastApi приложение будет доступно по выделенной ссылке.

В моей случае ссылка: https://fastapidelserv-yakvenalex.amvera.io/ (кому интересно можете протестировать приложение, ссылка боевая).

Деплой Celery приложения

В файл.env, который используется в Celery, необходимо добавить переменную BASE_SITE, в которой будет указано ваше доменное имя. Это позволит упростить процесс развертывания, поскольку Celery в режиме Worker не требует отдельного доменного имени. В результате, нам нужно будет лишь создать файл настроек, сформировать проект и выполнить его установку.

Файл настроек в Celery (amvera.yml) имеет следующий вид:

meta:
  environment: python
  toolchain:
    name: pip
    version: 3.12
build:
  requirementsPath: requirements.txt
run:
  persistenceMount: /data
  containerPort: 8000
  command: celery -A app.main  worker -l INFO -P solo

Меняется только команда запуска:

celery -A app.main  worker -l INFO -P solo

В случае если вы будете брать тариф выше начального, то можно убрать -P solo и оставить:

celery -A app.main  worker -l INFO

Тут все зависит от предполагаемых нагрузок и тарифного плана.

В остальном:

  1. Создали проект

  2. Доставили файлы с файлом настроек

  3. Проверили настройки и нажали на завершить

  4. Ждем 2-3 минуты и все готово!

Теперь покажу вам одну интересную фишку.

Локальный запуск Flower для мониторинга задач

Когда наше приложение уже работает в облаке (сервисы FastAPI и Celery функционируют изолированно), иногда хочется наблюдать за процессом выполнения задач, не разворачивая дополнительное приложение для мониторинга, например, Flower. Хорошая новость: это можно легко сделать локально!

Текущая архитектура

На данный момент:

  1. FastAPI и Celery работают изолированно.

  2. Они могут быть запущены даже на разных сервисах. Например:

    • FastAPI работает в облаке, допустим, на Amvera, так как это удобно и быстро.

    • Celery выполняется на выделенном сервере, который предоставляет больше ресурсов.

Эта изоляция — гибкость системы, которая позволяет развертывать компоненты там, где это наиболее эффективно.

Как запустить Flower локально

Для запуска Flower на вашем локальном компьютере достаточно выполнить следующую команду в терминале:

celery -A app.main flower --port=5555
  • -A app.main: Указывает, где находится точка входа для Celery (в данном случае модуль app.main).

  • --port=5555: Определяет порт, на котором Flower будет доступен.

После выполнения этой команды, вы сможете открыть браузер и перейти по адресу http://localhost:5555/task, чтобы увидеть интерфейс Flower.

Теперь, для проверки, давайте поручим фронтенду выполнить задачу:

Ждем минуту и видим:

Файл успешно удален!
Файл успешно удален!

Заключение

Несмотря на кажущуюся простоту нашего проекта, в процессе его реализации мы затронули множество важных тем, которые часто считаются сложными, особенно для новичков.

Сегодня я постарался развеять миф о том, что работа с нереляционными базами данных, брокерами сообщений и Celery является чем-то недоступным. На самом деле, ничего сложного нет, если вы понимаете основные принципы, по которым функционирует эта связка.

Я надеюсь, что, прочитав эту статью, вы станете более уверенным специалистом, который сможет использовать не только FastAPI в сочетании с реляционными базами данных, но и освоить работу с нереляционными системами. Также надеюсь, что взаимодействие с Celery станет для вас простым и приятным процессом, и что я смог внести свой вклад в ваше обучение.

Важно отметить, что Celery не ограничивается работой только с FastAPI. Этот инструмент отлично зарекомендовал себя и в сочетании с другими фреймворками на Python, такими как Django и Flask. Он также широко используется при разработке сложных Telegram-ботов и в любых других сценариях, где требуется выполнение фоновых задач в изолированном пространстве.

Напоминаю, что полный исходный код двух приложений (FastAPI и Celery) доступен в моем бесплатном Telegram-канале «Легкий путь в Python».

Если статья оказалась для вас полезной, буду рад вашим комментариям или хотя бы лайку.

На этом все. До новых встреч!

Комментарии (23)


  1. OcMaRUS
    14.12.2024 06:47

    Из всех подробных, что видел, это самый!!! Спасибо за столь титаническую работу!


    1. zabanen2
      14.12.2024 06:47

      конечно, с таким помощником, как:

      Давайте разберем этот код по частям:
      Вот краткое описание существующих пулов:

      @chatgpt


      1. pda0
        14.12.2024 06:47

        "Отличная" идея, кстати, для декоратора. Пишем пустой метод, документирующий комментарий и декоратор @chatgtpсам там что-нибудь пишет в тело метода... :-D


    1. yakvenalex Автор
      14.12.2024 06:47

      Спасибо за поддержку.


  1. baldr
    14.12.2024 06:47

    Увидел название и первая мысль - таакк, скорее всего будет асинхронный FastAPI и синхронный вызов Celery внутри. Хотелось ошибиться - специально зашёл проверить.

    Но нет. Функция объявлена как async, но обращение к Redis (и не одно!) синхронное, вызов send_task тоже синхронный.. И вообще ни одного await внутри функции. Возьмите уж тогда обычный Flask.

    Да, стандартного способа вызвать таску celery из асинхронного кода нет. Для этого всем приходится слегка реверсить код celery и для каждого брокера писать свой асинхронный менеджер чтобы запустить таску или получить результаты.

    Плохому учите, гражданин.


    1. codeserg
      14.12.2024 06:47

      То что отдельные методы не оптимальны - это уже вопрос оптимизации. Статья явно для начального или среднего уровня, чтобы погрузиться в тему. Общее понимание архитектуры после прочтения у меня, например, появилось. В комментариях часто нахожу то, что пропускает чатгпт:) Так что благодарю автора, и комментаторов.


      1. yakvenalex Автор
        14.12.2024 06:47

        Спасибо за поддержку.


      1. baldr
        14.12.2024 06:47

        Отнюдь. Я пропустил остальные замечания где можно что-то улучшить или дополнить, как раз потому что это не так важно для начального или среднего уровня.

        Однако, вызов синхронного кода из асинхронной функции - это грубая ошибка. Сейчас этот код кто-то возьмёт и потащит ставить на рабочий проект, а потом будет удивляться тому, что он не справляется с обычной нагрузкой.

        В статье рассматривается именно интеграция FastAPI и Celery и именно это сделано неправильно.


        1. yakvenalex Автор
          14.12.2024 06:47

          Дайте хороший и качественный код. Покажите как нужно. Правда. Зачем писать простыни текста если можно скинуть правильный код. Тут же речь про пару строк. Я думаю, что не только мне это интересно будет.


          1. baldr
            14.12.2024 06:47

            Ну, вообще говоря, я приводил код, 1.5 года назад, в комментариях к другой статье. Я выдирал его из рабочего проекта, убирая кучу деталей, так что он просто показывает куда нужно думать. Написать статью по этой теме - наверное хорошая идея, но у меня на это времени нет совсем.


  1. TIEugene
    14.12.2024 06:47

    except Exception as e:

    Мдо-о-о... No comments.


    1. yakvenalex Автор
      14.12.2024 06:47

      Правильно, мне нужно было в учебной статье написать 50 обработчиков всевозможных ошибок, чтоб у вас были "comments")


    1. NNikolay
      14.12.2024 06:47

      Сильно лучше чем except:


  1. yakvenalex Автор
    14.12.2024 06:47

    Ребята, серьезно? Дизлайкать за "спасибо за поддержку"? Что у вас в голове вообще? Давайте по порядку.

    1. Про GPT и "всё готово за 5 минут":
      Если вы думаете, что можно сделать подобный проект и написать статью такого уровня, просто прокинув запрос в нейросеть — попробуйте. Вот реально, садитесь и сделайте. Но предупреждаю, на это уйдёт не 5 минут, а несколько полных дней работы. Код, идеи, текст — это всё моё. GPT я использую исключительно как инструмент для вычитки и доработки отдельных блоков. Это нормальная практика, и нет ничего плохого в том, чтобы использовать инструменты, которые помогают улучшать результат.

    2. Про "вечный хейт":
      Давайте не будем лицемерить. Делать что-то своими руками, тратить своё время, силы и потом получать вместо нормальной поддержки дизлайки за благодарность — это смешно. Если вам не нравится то, что я делаю, — окей, ваше право. Но неужели вам обязательно тратить свою энергию, чтобы писать или кликать негатив? Если у вас есть конструктив, говорите. Если нет — зачем вообще заходить в эту тему?

    3. Про поддержку:
      Когда кто-то выражает поддержку — я благодарю. Это нормально. Это человеческое. И если даже это вызывает у кого-то желание поставить дизлайк, то, честно говоря, вопрос уже не ко мне, а к вашему внутреннему состоянию.

    Мои проекты и статьи — это труд, а не кнопка "Сгенерировать" в нейросети. Хотите критиковать — сначала покажите, что вы сами можете сделать лучше. А если нечего показать — задумайтесь, что вообще побуждает вас тратить время на негатив.


    1. baldr
      14.12.2024 06:47

      Я понимаю ваше возмущение, но могу только посоветовать привыкнуть - к любой статье будут коментарии. Нет комментариев только к статье, которую не читали.

      Вам указывают на недостатки - они есть. Они есть всегда. Перехват Exception - действительно, не критично. Использовать Redis в качестве брокера - ок, в учебном проекте можно. Я лично написал о том что считаю реально большой ошибкой.

      Ребята, серьезно? Дизлайкать за "спасибо за поддержку"? Что у вас в голове вообще?

      Добро пожаловать на Хабр. Пустые комментарии "спасибо" здесь не очень ценятся - для этого можно стрелочку нажать. (я не ставил этот дизлайк если что)

      К ChatGPT лично у меня особо нет возражений, если в статье написана интересная тема.


      1. yakvenalex Автор
        14.12.2024 06:47

        Принято, но предполагаю что дело не в "пустом комментарии". Что касается кода. Я всегда позитивно отношусь к конструктивной критике и лично к вам вообще вопросов никаких нет, разве что я бы хотел увидеть более корректную реализацию именно в коде.


  1. parus-lead
    14.12.2024 06:47

    Спасибо за такую подробную и интересную статью. Оказалась очень полезной. FastAPI + Redis вообще очень популярна сейчас.
    А насчёт хейтеров: не переживай и не бери близко к сердцу - их везде хватает. Даже не обязательно лютых хейтеров, а просто грубых и необразованных людей.
    Продолжай писать подобные интересные статьи.


    1. yakvenalex Автор
      14.12.2024 06:47

      Спасибо за поддержку. Действительно, этого часто нехватает. Подписался на вас. У вас тоже интересные статьи)


      1. Propan671
        14.12.2024 06:47

        Подписался на обоих) Спасибо за материал!


  1. unstopppable
    14.12.2024 06:47

    В качестве бесплатного локального s3 можно добавить в связку minio


  1. Propan671
    14.12.2024 06:47

    У меня уже глаз дергается от сочетания FastAPI + Celery, но тут он вполне уместен. Отличная работа! Btw, не пробовали сочетание FastAPI + FastStream - интересны ваши мысли на этот счет


    1. yakvenalex Автор
      14.12.2024 06:47

      Спасибо за обратную связь. Да, в планах есть проект FastStream + Aiogram + Redis) Технология интересная, особенно в контексте телеграмм ботов


      1. yakvenalex Автор
        14.12.2024 06:47

        Я недавно вашу статью читал)) С поиска. Очень круто описали)