Друзья, приветствую! Надеюсь, вы соскучились. Сегодня я снова с вами, и на этот раз мы займемся созданием полноценного проекта — мини‑блога с использованием замечательного Python‑фреймворка FastAPI. Чтобы блог был функциональным, мы частично визуализируем его (разработаем фронтенд). Для визуализации API будем использовать чистый HTML, CSS и JS, а задачи по рендерингу страниц переложим на шаблонизатор Jinja2. Но обо всем по порядку.

Идея проекта

Сегодня я хочу показать вам, что разработать собственный блог на FastAPI с использованием SQLAlchemy 2 для работы с базой данных — это простая и увлекательная задача.

Идея создать такой проект пришла ко мне во время работы с клиентом. Вдаваться в подробности не буду, но основная проблема заключалась в необходимости автоматического оформления больших текстов в виде постов.

Конечный пользователь должен просто получать текст в формате Markdown, который будет отображаться как оформленные страницы (ссылки). Изначально я использовал для этой задачи сервис Telegraf (наверняка вы с ним знакомы, если занимались разработкой телеграм-ботов).

Однако у Telegraf есть множество ограничений: например, нельзя публиковать тексты в формате Markdown — поддерживается только HTML, да и то с ограниченным функционалом. Но это не столь большая проблема, ведь частично задачу можно решить с помощью готовых библиотек и регулярных выражений.

Однако настоящая проблема заключалась в ограничении на длину постов. В конечном итоге я решил отказаться от Telegraf и написать собственный мини-блог, который будет заимствовать лучшее от Telegraf, при этом устранив его недостатки.

Сегодня я расскажу, какие решения были использованы, и вы поймете, что знаний SQLAlchemy, FastAPI и базового понимания фронтенда вполне достаточно, чтобы собрать сервис, похожий на Telegraf, всего за пару дней. Я проведу вас шаг за шагом через весь процесс разработки такого проекта.

Стек технологий

Для разработки проекта мы будем использовать следующие технологии:

  • Веб‑фреймворк: FastAPI

  • ORM: SQLAlchemy с асинхронной поддержкой через aiosqlite

  • База данных: SQLite (можно заменить на любую SQL‑СУБД)

  • Миграции: Alembic

  • Аутентификация: bcrypt для хеширования паролей, python‑jose для работы с JWT‑токенами

  • Фронтенд: HTML + CSS + JS (с использованием Jinja2)

  • Запуск веб‑сервера: Uvicorn

  • Трансформация Markdown в HTML: markdown2

Функционал проекта

Давайте разберемся, какой функционал будет у нашего проекта и чем мы сегодня займемся

Несмотря на то, что мы создадим единое приложение, которое будет включать как API-часть (логи), так и визуальную составляющую, проект будет разделен на несколько частей, принятых называть микросервисами. Ниже рассмотрим микросервисы, которые мы сегодня разработаем.

База данных и взаимодействие с ней

Сегодня мы будем работать с SQLite, чтобы избежать затрат времени на настройку PostgreSQL. В этом нам поможет асинхронный ORM SQLAlchemy 2, который позволит описывать таблицы (модели) и, самое главное, удобно взаимодействовать с базой данных через встроенные механизмы. Как вы понимаете, работа с базой данных станет важной частью нашей задачи.

Чтобы наши модели таблиц преобразовались в полноценные таблицы SQLite, мы будем использовать Alembic.

Для более глубокого понимания работы с этим микросервисом рекомендую ознакомиться с моими предыдущими статьями:

  1. Асинхронный SQLAlchemy 2: простой пошаговый гайд по настройке, моделям, связям и миграциям с использованием Alembic.

  2. Асинхронный SQLAlchemy 2: пошаговый гайд по управлению сессиями, добавлению и извлечению данных с Pydantic.

  3. Асинхронный SQLAlchemy 2: улучшение кода, методы обновления и удаления данных.

В этих материалах я подробно рассматриваю аспекты асинхронной работы с SQLAlchemy 2. Далее, в разборе кода, я буду предполагать, что вы уже знакомы с этим фреймворком.

Если вы ожидали выхода четвертой части моего цикла по SQLAlchemy, то сегодня я подробно расскажу о нескольких новых технологиях этого инструмента. В частности, мы разберем, как работает связка ManyToMany в SQLAlchemy 2. Кроме того, мы напишем несколько методов с более сложными запросами к базе данных.

Регистрация, авторизация и аутентификация

В нашем блоге публикация статьи будет невозможна без входа в систему. Следовательно, без регистрации и авторизации нам не обойтись.

Для реализации этой логики мы будем использовать систему JWT-токенов.

В рамках этого блока мы также подробно разберем:

  • Зависимости (Dependencies) в FastAPI,

  • Роли пользователей, их управление и ограничение доступа.

На выходе мы создадим набор API-методов, которые обеспечат регистрацию, авторизацию и аутентификацию пользователей.

API-логика

В данном блоке мы опишем следующие методы:

  • Удаление блога

  • Изменение статуса блога (опубликовано / черновик)

  • Добавление нового блога с тегами

  • Получение информации по блогу (чтение блога)

  • Получение всех блогов в статусе «Опубликовано»

Фронтенд

Под фронтенд мы реализуем 2 эндпоинта FastApi:

  • Рендеринг страницы с блогом / страницы "блог не найден"

  • Рендеринг страницы с блогами (тут мы дополнительно усложним фильтрацией по автору и по тегам)

Деплой на Amvera Cloud

И, на завершающем этапе, мы выполним деплой нашего проекта в облачный хостинг Amvera Cloud. Просто потому что это быстро, бюджетно и удобно. К примеру, при работе с обычным VPS вам нужно будет: покупать домен, привязывать домен к VPS серверу через Nginx/Apache, настраивать сам сервер и так далее. В Amvera достаточно будет доставить файлы блога с файлом настроек (настройки стандартные и их можно просто скопировать), после активировать бесплатное доменное имя и подождать пару минут пока Amevera сообщит, что проект успешно развернут.

Дисклеймер

Как вы могли заметить в разделе функционала проекта, сегодня нам предстоит написать большое количество кода. В связи с этим предупреждаю, что в статье будет много ссылок на мои предыдущие материалы, если тема уже подробно рассматривалась ранее.

Новый функционал и технологии мы будем разбирать детально, тогда как повторяющийся или ранее рассмотренный материал будет освещен кратко с указанием на соответствующие статьи.

Кроме того, чтобы не писать FastAPI-приложение с нуля (например, блок авторизации и регистрации), я буду использовать свою стартовую заготовку, которую подробно описывал в предыдущих статьях (наиболее подробное описание "болванки" вы найдете в этой статье).

Если вы хотите работать сразу с готовым исходным кодом проекта, приглашаю вас в мой Telegram-канал «Легкий путь в Python». Там вы найдете не только полный исходный код сегодняшнего проекта, но и эксклюзивные материалы, которых нет на Хабре.

На этом заканчиваем с вводной частью и переходим к практике!

Начало работы

Первым шагом подготовим проект для работы на вашем локальном компьютере. Убедитесь, что у вас установлен Git.

Шаг 1. Клонирование репозитория

Откройте терминал и выполните следующую команду:

git clone https://github.com/Yakvenalex/FastApiWithAuthSample

Эта команда создаст новую папку с проектом, содержащую копию репозитория.

Шаг 2. Переход в директорию проекта

Перейдите в папку с проектом:

cd FastApiWithAuthSample

Теперь проект готов для дальнейшей настройки. Рекомендуется открыть его в удобной среде разработки, например, PyCharm.

Добавление модуля для Markdown

Для обработки текста с форматированием в стиле Markdown нам понадобится модуль markdown2. Он преобразует текст из Markdown в HTML, так как браузеры не поддерживают Markdown "из коробки".

Добавьте в файл requirements.txt следующую строку:

markdown2==2.5.1

Затем установите все зависимости проекта командой:

pip install -r requirements.txt

Настройка AUTH и проверка готовности

В проекте уже реализован модуль авторизации, аутентификации и ролей, а также подготовлена база данных. Если вы клонировали репозиторий, миграции через Alembic не потребуются.

Для проверки запустите приложение FastAPI из корневой директории проекта:

uvicorn app.main:app --reload  

Если порт 8000 уже занят, используйте другой, например:

uvicorn app.main:app --port 8900 --reload  

Проверка работы

После запуска приложения в терминале появится информация о его состоянии. Откройте браузер и перейдите по адресу:

http://127.0.0.1:8900/docs

На этой странице вы сможете протестировать методы API, проверить их корректность и убедиться, что система работает.

После успешной проверки API можно переходить к следующему этапу разработки.

Подробнее про JWT-токены и общую логику аутентификации в FastAPI вы можете прочитать в моей статье:

Подготовка таблиц для API блога

Хочу напомнить, что базовый класс, на основании которого создаются все модели таблиц, имеет следующий вид:

class Base(AsyncAttrs, DeclarativeBase):
    __abstract__ = True

    id: Mapped[int] = mapped_column(Integer, primary_key=True, autoincrement=True)
    created_at: Mapped[datetime] = mapped_column(TIMESTAMP, server_default=func.now())
    updated_at: Mapped[datetime] = mapped_column(
        TIMESTAMP,
        server_default=func.now(),
        onupdate=func.now()
    )

    @declared_attr
    def __tablename__(cls) -> str:
        return cls.__name__.lower() + 's'

    def to_dict(self) -> Dict[str, Any]:
        return {c.name: getattr(self, c.name) for c in self.__table__.columns}

    def __repr__(self) -> str:
        """Строковое представление объекта для удобства отладки."""
        return f"<{self.__class__.__name__}(id={self.id}, created_at={self.created_at}, updated_at={self.updated_at})>"

Колонки id, created_at и updated_at будут генерироваться автоматически, как и названия таблиц. Это сделано для того, чтобы ничего не смущало в блоке с описанием таблиц для нашего API блога.

Создадим папку app/api и в ней файл models.py. Теперь можно приступить к описанию моделей.

Начнем с простого и опишем таблицу с блогами:

class Blog(Base):
    # Заголовок статьи
    title: Mapped[str_uniq]

    # Автор (внешний ключ на таблицу Users)
    author: Mapped[int] = mapped_column(ForeignKey("users.id"), nullable=False)
    
    # Содержание статьи в формате Markdown
    content: Mapped[str] = mapped_column(Text)

    short_description: Mapped[str] = mapped_column(Text)

    # Статус статьи
    status: Mapped[str] = mapped_column(default="published", server_default='published')

Внимание заслуживает поле author. Через ForeignKey мы связываем таблицу блогов с таблицей авторов. Обратите внимание, что пока я не использовал основной фишки ORM SQLAlchemy - связей между таблицами, но мы это скоро исправим.

Следующая простая таблица будет для хранения тегов. В ней мы будем хранить каждый тег как отдельную запись:

class Tag(Base):
    # Название тега
    name: Mapped[str] = mapped_column(String(50), unique=True)

Теперь усилим нашу модель. Логично, что нам не помешает возможность получать все статьи автора, обращаясь отдельным запросом, и было бы полезно узнавать, кто автор той или иной статьи. Для этого мы можем использовать систему связей ORM SQLAlchemy (relationship).

Добавим в модель Blog следующую строку:

# Ссылка на объект пользователя
user: Mapped["User"] = relationship("User", back_populates="blogs")

Здесь мы указали, что у одного блога может быть только один автор, что логично. Теперь настроим обратную связь, указав, что один пользователь может быть автором сразу нескольких блогов:

blogs: Mapped[list["Blog"]] = relationship(
    back_populates="user"  # Должно совпадать с именем в модели Blogs
)

Теперь мы замкнули связь. Благодаря этому мы сможем с легкостью получать как информацию по автору блога, так и извлекать все статьи конкретного автора.

Теперь приступим к связке "многие-ко-многим", которую я ранее не рассматривал.

Для реализации связи блогов и тегов используем промежуточную таблицу:

class BlogTag(Base):
    # Внешние ключи для связи блогов и тегов
    blog_id: Mapped[int] = mapped_column(ForeignKey("blogs.id", ondelete="CASCADE"), nullable=False)
    tag_id: Mapped[int] = mapped_column(ForeignKey("tags.id", ondelete="CASCADE"), nullable=False)
    
    # Уникальное ограничение для предотвращения дублирования
    __table_args__ = (
        UniqueConstraint('blog_id', 'tag_id', name='uq_blog_tag'),
    )

Расширим модели блога и тега для поддержки связи "многие-ко-многим":

class Blog(Base):
    # ... предыдущие поля ...
    
    # Связь с тегами через промежуточную таблицу
    tags: Mapped[list["Tag"]] = relationship(
        secondary="blog_tags",  # Указываем промежуточную таблицу
        back_populates="blogs"
    )

class Tag(Base):
    # ... предыдущие поля ...
    
    # Обратная связь с блогами
    blogs: Mapped[list["Blog"]] = relationship(
        secondary="blog_tags",
        back_populates="tags"
    )

Разберем связь "многие-ко-многим" подробнее на примере блогов и тегов.

Представьте, что у вас блог о программировании. Одна статья может иметь несколько тегов - например, "#python", "#backend", "#sqlalchemy", а один тег может быть применен к множеству разных статей. Например, тег "#python" может быть у статей про введение в язык, про фреймворки и про базы данных.

Технически сделать прямую связь между блогами и тегами невозможно - нужна промежуточная таблица blog_tags. Она работает как своеобразный "мост" между блогами и тегами.

Зачем нужна промежуточная таблица?

Она решает сразу несколько задач:

  1. Позволяет создавать множество связей между блогами и тегами

  2. Предотвращает дублирование записей

  3. Обеспечивает целостность данных В нашем примере промежуточная таблица BlogTag содержит только два внешних ключа: blog_id и tag_id. Это означает, что каждая запись в этой таблице - это уникальная связь между конкретным блогом и конкретным тегом.

Уникальное ограничение UniqueConstraint('blog_id', 'tag_id') гарантирует, что мы не сможем случайно добавить один и тот же тег к одному блогу дважды.

Проще говоря, промежуточная таблица похожа на like-таблицу в Instagram, которая связывает пользователей и их лайки. Только в нашем случае это связь блогов и тегов.

Магия SQLAlchemy в том, что вам не придется напрямую работать с этой промежуточной таблицей. Вы можете просто добавлять теги к блогу через .tags.append() или доставать все теги блога через .tags, и ORM сама позаботится о корректном заполнении связующей таблицы.

Надеюсь, что удалось понятно объяснить эту тему.

Выполним миграции

На данном этапе мы только описали наши таблицы, но они еще не созданы в базе данных. Чтобы они появились в базе, необходимо выполнить миграцию с помощью Alembic.

Шаг 1: Регистрация новых таблиц

Для начала, добавим новые импорты в файл app/migration/env.py:

from app.api.models import Tag, Blog, BlogTag

Теперь, чтобы создать файл с инструкциями для миграций, откроем терминал и в корне проекта выполним команду:

alembic revision --autogenerate -m "add new tables"

Шаг 2: Применение миграции

Для применения миграции используем команду:

alembic upgrade head

Если все прошло успешно, вы должны увидеть новые таблицы в своей базе данных.

Я надеюсь, у вас всё получилось!

Хочу напомнить, что полный исходный код этого проекта доступен в моём телеграм-канале. Это будет полезно, если вы не хотите тратить время на написание полного кода приложения, а просто хотите ознакомиться с ним.

Создание методов взаимодействия с новыми таблицами

Напоминаю, что в проекте используется подход BaseDAO. Это значит, что существует некий класс, в котором собраны универсальные методы для работы с базой данных. К счастью или сожалению, сегодня нам предстоит реализовать ряд эксклюзивных методов для новых таблиц, поскольку логика будет нестандартной.

Для этого мы воспользуемся дочерними классами, а точнее, расширим их.

В папке app/api создаем файл dao.py и для начала просто привяжем каждую новую таблицу к базовому классу BaseDAO:

from typing import Optional
from loguru import logger
from sqlalchemy import select, func
from sqlalchemy.exc import SQLAlchemyError
from sqlalchemy.ext.asyncio import AsyncSession
from sqlalchemy.orm import joinedload, selectinload
from app.api.schemas import BlogFullResponse
from app.dao.base import BaseDAO
from app.api.models import BlogTag, Tag, Blog


class TagDAO(BaseDAO):
    model = Tag


class BlogDAO(BaseDAO):
    model = Blog


class BlogTagDAO(BaseDAO):
    model = BlogTag

На импорты пока не обращайте внимания.

Сейчас мы сделали следующее: связали каждую новую таблицу с классом BaseDAO, и теперь у нас появилась возможность вызывать универсальные методы для работы с конкретной таблицей. Для этого достаточно использовать точку. Например:

blogs = await BlogDAO.find_all(session=session, filters=None)

Для понимания кода, который мы будем рассматривать далее, потребуются базовые знания SQLAlchemy, так как код не самый простой. Все мои статьи в структурированном виде вы найдете тут (к примеру там есть разделы, посвященные SQLAlchemy, FastApi, Aiogram и так далее). Просто структурировал все, что написал на Хабре за последние пол года в одном месте.

Логика добавления нового блога

Начнем с реализации логики для добавления нового блога. Здесь я решил сильно не усложнять, чтобы не запутать вас, и не описывать сложную логику добавления с вложенностями именно в классе BlogDAO, поскольку, по сути, в добавлении блога нет ничего примечательного. Нам достаточно просто использовать метод добавления записи из BaseDAO. Сейчас мы сосредоточимся на новой логике.

В нашем блоге реализована система тегов. Чтобы сделать процесс интереснее, я решил показать, как можно реализовать динамическое создание тегов в таблице тегов, если тег не существовал. После того как тег создан (или проверено, что он существует), необходимо связать блог с тегом в промежуточной таблице (связка многие-ко-многим).

То есть логика следующая:

  1. Мы добавляем блог в таблицу блогов.

  2. Проверяем, все ли указанные автором теги есть в таблице тегов (проверка по русскому названию). Создаем теги, которых не существовало.

  3. Связываем блог с тегами в промежуточной таблице.

Эта логика обеспечивает корректную работу всех заложенных в систему связей. Она станет еще понятнее, когда мы реализуем API-метод для добавления блога.

Реализация логики добавления тегов

Начнем с добавления тегов в таблицу тегов. Для этого в классе TagDAO опишем следующий метод:

@classmethod
async def add_tags(cls, session: AsyncSession, tag_names: list[str]) -> list[int]:
    """
    Метод для добавления тегов в базу данных.
    Принимает список строк (тегов), проверяет, существуют ли они в базе данных,
    добавляет новые и возвращает список ID тегов.

    :param session: Сессия базы данных.
    :param tag_names: Список тегов в нижнем регистре.
    :return: Список ID тегов.
    """
    tag_ids = []
    for tag_name in tag_names:
        tag_name = tag_name.lower()  # Приводим тег к нижнему регистру
        # Пытаемся найти тег в базе данных
        stmt = select(cls.model).filter_by(name=tag_name)
        result = await session.execute(stmt)
        tag = result.scalars().first()

        if tag:
            # Если тег найден, добавляем его ID в список
            tag_ids.append(tag.id)
        else:
            # Если тег не найден, создаем новый тег
            new_tag = cls.model(name=tag_name)
            session.add(new_tag)
            try:
                await session.flush()  # Это создает новый тег и позволяет получить его ID
                logger.info(f"Тег '{tag_name}' добавлен в базу данных.")
                tag_ids.append(new_tag.id)
            except SQLAlchemyError as e:
                await session.rollback()
                logger.error(f"Ошибка при добавлении тега '{tag_name}': {e}")
                raise e

    return tag_ids

Метод принимает список строк (тегов). Далее мы итерируемся по списку тегов, приводя их к нижнему регистру. Затем проверяем, существует ли тег в таблице тегов. Если он есть, добавляем его ID в выходной список. Если тега нет, создаем запись в таблице тегов и добавляем полученный ID в список.

На выходе получаем список ID всех тегов, которые указал автор при добавлении блога.

Обратите внимание: на этапе добавления тега мы не используем commit. Это сделано намеренно для оптимизации. Коммит (постоянную фиксацию изменений в базе данных) мы выполним только один раз, когда все операции будут завершены уже на уровне эндпоинта FastAPI.

Связка ID блога с ID тегов

Теперь нам нужно выполнить связку ID нового блога с идентификаторами тегов во временной таблице. Для этого опишем специальный метод в BlogTagDAO:

@classmethod
async def add_blog_tags(cls, session: AsyncSession, blog_tag_pairs: list[dict]) -> None:
    """
    Метод для массового добавления связок блогов и тегов в базу данных.
    Принимает список словарей с blog_id и tag_id, добавляет соответствующие записи.

    :param session: Сессия базы данных.
    :param blog_tag_pairs: Список словарей с ключами 'blog_id' и 'tag_id'.
    :return: None
    """
    # Сначала создаем все объекты BlogTag
    blog_tag_instances = []
    for pair in blog_tag_pairs:
        blog_id = pair.get('blog_id')
        tag_id = pair.get('tag_id')
        if blog_id and tag_id:
            # Создаем объект BlogTag
            blog_tag = cls.model(blog_id=blog_id, tag_id=tag_id)
            blog_tag_instances.append(blog_tag)
        else:
            logger.warning(f"Пропущен неверный параметр в паре: {pair}")

    if blog_tag_instances:
        session.add_all(blog_tag_instances)  # Добавляем все объекты за один раз
        try:
            await session.flush()  # Применяем изменения и сохраняем записи в базе данных
            logger.info(f"{len(blog_tag_instances)} связок блогов и тегов успешно добавлено.")
        except SQLAlchemyError as e:
            await session.rollback()
            logger.error(f"Ошибка при добавлении связок блогов и тегов: {e}")
            raise e
    else:
        logger.warning("Нет валидных данных для добавления в таблицу blog_tags.")

Логика похожа на предыдущую, за исключением того, что мы сначала генерируем инстансы (будущие записи), а затем все их разом фиксируем в базе данных.

Таким образом мы готовы к тому, чтоб реализовать метод для добавления нового блога в таблицу с блогами. Давайте опишем этот метод, а после уже вернемся к BlogDao и опишем дополнительные методы, которые нам нужны будут для других задач в нашей системе.

Начинаем создавать API блога

Для этого в папке app/api создаем файл router.py. Теперь выполним импорты и подготовим роутер:

from fastapi import APIRouter, Depends, HTTPException, status, Query
from loguru import logger
from sqlalchemy.ext.asyncio import AsyncSession
from sqlalchemy.exc import IntegrityError
from fastapi.responses import JSONResponse
from app.api.schemas import BlogCreateSchemaBase, BlogCreateSchemaAdd, BlogFullResponse, BlogNotFind
from app.auth.dependencies import get_current_user, get_blog_info
from app.auth.models import User
from app.dao.session_maker import SessionDep, TransactionSessionDep
from app.api.dao import BlogDAO, BlogTagDAO, TagDAO

router = APIRouter(prefix='/api', tags=['API'])

Некоторых сущностей для импортов у нас пока нет. Напишем их по ходу дела.

Теперь опишем первый API-метод для добавления блога:

@router.post("/add_post/", summary="Добавление нового блога с тегами")
async def add_blog(
        add_data: BlogCreateSchemaBase,
        user_data: User = Depends(get_current_user),
        session: AsyncSession = TransactionSessionDep
):
    blog_dict = add_data.model_dump()
    blog_dict['author'] = user_data.id
    tags = blog_dict.pop('tags', [])

    try:
        blog = await BlogDAO.add(session=session, values=BlogCreateSchemaAdd.model_validate(blog_dict))
        blog_id = blog.id

        if tags:
            tags_ids = await TagDAO.add_tags(session=session, tag_names=tags)
            await BlogTagDAO.add_blog_tags(session=session,
                                           blog_tag_pairs=[{'blog_id': blog_id, 'tag_id': i} for i in tags_ids])

        return {'status': 'success', 'message': f'Блог с ID {blog_id} успешно добавлен с тегами.'}
    except IntegrityError as e:
        if "UNIQUE constraint failed" in str(e.orig):
            raise HTTPException(status_code=status.HTTP_400_BAD_REQUEST,
                                detail="Блог с таким заголовком уже существует.")
        raise HTTPException(status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, detail="Ошибка при добавлении блога.")

Сразу начнем разбор с аргументов эндпоинта. Первым аргументом идет add_data или, другими словами, сама информация о новом блоге.

В качестве валидатора тут используется схема Pydantic BlogCreateSchemaBase. Давайте её опишем. Для этого в папке api создаем файл schemas.py:

from datetime import datetime
from typing import List
from pydantic import BaseModel, ConfigDict, computed_field, Field


class BaseModelConfig(BaseModel):
    model_config = ConfigDict(from_attributes=True)


class BlogCreateSchemaBase(BaseModelConfig):
    title: str
    content: str
    short_description: str
    tags: List[str] = []

Тут я создал класс BaseModelConfig и прописал там параметр from_attributes=True, просто чтобы не пробрасывать эту настройку в каждую схему Pydantic. Кстати, подробно про Pydantic вы можете почитать в моей статье: Pydantic 2: Полное руководство для Python-разработчиков — от основ до продвинутых техник.

Сама схема BlogCreateSchemaBase описывает поля нашего блога: название, контент (напоминаю, что в Markdown), короткое описание и теги.

user_data: User = Depends(get_current_user)

Этой строкой мы установили четкую зависимость между этим эндпоинтом и функцией:

async def get_current_user(token: str = Depends(get_token), session: AsyncSession = SessionDep):
    try:
        payload = jwt.decode(token, settings.SECRET_KEY, algorithms=settings.ALGORITHM)
    except JWTError:
        raise NoJwtException

    expire: str = payload.get('exp')
    expire_time = datetime.fromtimestamp(int(expire), tz=timezone.utc)
    if (not expire) or (expire_time &lt; datetime.now(timezone.utc)):
        raise TokenExpiredException

    user_id: str = payload.get('sub')
    if not user_id:
        raise NoUserIdException

    user = await UsersDAO.find_one_or_none_by_id(data_id=int(user_id), session=session)
    if not user:
        raise HTTPException(status_code=status.HTTP_401_UNAUTHORIZED, detail='User not found')
    return user

Если говорить коротко, то перед выполнением эндпоинта с добавлением блога, сначала выполняется функция проверки, авторизован ли пользователь в системе. Здесь либо мы ловим ошибку, либо получаем данные о пользователе. Подробнее эту тему я рассматривал в статье про JWT-авторизацию в FastAPI.

session: AsyncSession = TransactionSessionDep

Этот аргумент устанавливает ещё одну зависимость, но уже с функцией для генерации сессии. Обратите внимание: тут нужно использовать именно TransactionSessionDep, а не SessionDep. Дело в том, что после завершения логики эндпоинта add_post, нам необходимо будет сохранить изменения в базе данных. SessionDep эти изменения просто не зафиксирует.

Далее, после всех проверок мы приступаем к основной логике эндпоинта. И тут уже нет ничего сложного — мы просто последовательно выполняем ряд методов:

  1. Добавили блог.

  2. Добавили недостающие теги.

  3. Получили список ID тегов.

  4. Связали теги и блог в таблице временных связей.

На этом этапе у вас всё должно было встать на свои места. То есть, мы с вами разложили по полочкам достаточно сложную логику и поняли, что всё, на самом деле, не так уж и трудно, правда?

Функционал добавления блога мы сегодня визуализировать не будем, поэтому для тестов можно воспользоваться автосгенерированной документацией FastAPI. Перед этим давайте зарегистрируем наш роутер в main-файле:

from app.api.router import router as router_api


app.include_router(router_api)

Теперь можно выполнить блок с добавлением поста:

  1. Открываем документацию:

    http://127.0.0.1:8900/docs
  2. Выполняем вход в систему.

  3. Выполняем запрос на добавление поста. Ниже представлен пример такого запроса.

{
  "title": "Зачем нужен Python?",
  "content": "Python — один из самых популярных языков программирования, который ценится за простоту синтаксиса, богатую экосистему библиотек и универсальность. Вот основные причины, почему Python так востребован:\n\n1. **Легкость изучения**  \nБлагодаря лаконичному синтаксису, Python идеально подходит для начинающих разработчиков. Он позволяет сосредоточиться на логике решения задач, а не на сложностях языка.\n\n2. **Многофункциональность**  \nPython используется в самых разных областях: от веб-разработки и автоматизации процессов до анализа данных, машинного обучения и научных исследований.\n\n3. **Богатая экосистема**  \nБиблиотеки вроде NumPy, Pandas, TensorFlow, Django и Flask делают Python мощным инструментом для решения задач любой сложности.\n\n4. **Сообщество и поддержка**  \nPython имеет огромное сообщество разработчиков, готовых помочь новичкам и делиться готовыми решениями.\n\n5. **Кроссплатформенность**  \nПрограммы на Python можно запускать на любых операционных системах — Windows, macOS и Linux.\n\nPython — это язык, который позволяет не только быстро начинать программировать, но и эффективно решать сложные задачи в профессиональной сфере.",
  "short_description": "Текст о том зачем вообще нужно учить Python!",
  "tags": ["кодинг"]
}

После добавления можно выполнить проверку: добавилась ли запись в базу данных. У вас должно было произойти следующее:

  • Должен был добавиться сам блог.

  • Должен был добавиться тег кодинг.

  • Должна появиться связка между тегом и блогом.

Вот как выглядят мои таблицы после добавления нескольких постов с разными тегами и от разных авторов.

Таблица с постами
Таблица с постами
Таблица с тегами
Таблица с тегами
Таблица связывающая посты и теги
Таблица связывающая посты и теги

Получаем блог

Получение блога оказалось задачей чуть сложнее, чем кажется на первый взгляд, и сейчас я объясню почему. Дело в том, что система предусматривает возможность скрывать блоги в черновик. Логично, что если блог находится в черновике, то просматривать его может только автор блога. Следовательно, нужна логика, при которой пользователь должен быть авторизован на сайте, как минимум для того, чтобы наш сайт проверил, является ли этот человек автором блога или нет.

Но есть и другой случай: мы можем захотеть, чтобы опубликованные блоги могли видеть вообще все пользователи, которые попали на наш сайт. Не так ли?

Вот и получается, что нам нужно либо реализовывать два метода для просмотра блога (например, если я автор, то должен зайти по одной ссылке, а иначе по другой), либо искать более универсальное решение. Два метода — это неудобно и вызывает избыточность кода. Поэтому мы пойдем другим путем.

Во-первых, мы реализуем отдельную зависимость. Она будет работать похожим образом на Depends(get_current_user), но при этом позволит возвращать информацию о блоге даже неавторизованным пользователям.

Для этого внесем корректировки в файл app/auth/dependencies.py. Начнем с метода для получения JWT-токена пользователя:

def get_token_optional(request: Request) -> str | None:
    return request.cookies.get('users_access_token')

Здесь мы не получим ошибки, если токена не будет, а просто получим None, что нас вполне устроит.

Теперь добавим новый метод для получения информации о пользователе:

async def get_current_user_optional(
        token: str | None = Depends(get_token_optional),
        session: AsyncSession = SessionDep
) -> User | None:
    if not token:
        return None

    try:
        payload = jwt.decode(token, settings.SECRET_KEY, algorithms=settings.ALGORITHM)
    except JWTError:
        return None

    expire: str = payload.get('exp')
    expire_time = datetime.fromtimestamp(int(expire), tz=timezone.utc)
    if (not expire) or (expire_time < datetime.now(timezone.utc)):
        return None

    user_id: str = payload.get('sub')
    if not user_id:
        return None

    user = await UsersDAO.find_one_or_none_by_id(data_id=int(user_id), session=session)
    return user

Этот метод продолжает традиции get_current_user и возвращает либо информацию о пользователе, либо None. Напоминаю, что метод get_current_user у нас возвращал либо ошибку, либо информацию о пользователе.

Перед тем как описать нашу зависимость, давайте подготовим метод, позволяющий получить полную информацию о блоге. Опишем его в BlogDAO:

@classmethod
async def get_full_blog_info(cls, session: AsyncSession, blog_id: int, author_id: int = None):
    """
    Метод для получения полной информации о блоге, включая данные об авторе и тегах.
    Для опубликованных блогов доступ к информации открыт всем пользователям.
    Для черновиков доступ открыт только автору блога.
    """
    # Строим запрос с подгрузкой данных о пользователе и тегах
    query = (
        select(cls.model)
        .options(
            joinedload(Blog.user),  # Подгружаем данные о пользователе (авторе)
            selectinload(Blog.tags)  # Подгружаем связанные теги
        )
        .filter_by(id=blog_id)  # Фильтруем по ID блога
    )

    # Выполняем запрос
    result = await session.execute(query)
    blog = result.scalar_one_or_none()

    # Если блог не найден или нет прав для его просмотра
    if not blog:
        return {
            'message': f"Блог с ID {blog_id} не найден или у вас нет прав на его просмотр.",
            'status': 'error'
        }

    # Если блог в статусе 'draft', проверяем, является ли пользователь автором
    if blog.status == 'draft' and (author_id != blog.author):
        return {
            'message': "Этот блог находится в статусе черновика, и доступ к нему имеют только авторы.",
            'status': 'error'
        }

    # Возвращаем данные блога (если он опубликован или автор имеет доступ к черновику)
    return blog

Теперь разберемся в логике. "Необычное" начинается сразу: мы загружаем не просто информацию о блоге, но и сразу подгружаем данные о пользователе и связанных тегах.

Затем идет уже знакомый запрос на получение одной записи, в данном случае — полной информации о блоге, или None, что будет означать, что блог не существует.

Если блог с указанным ID не найден, тут всё просто: мы возвращаем сообщение, что блог не найден (а дальше это сообщение обработается на фронтенде). Но если блог найден, то всё чуть сложнее. Мы проверяем, находится ли он в статусе "черновик". Если это так, то возвращаем сообщение, что просмотреть этот блог может только автор.

Если же блог существует и у пользователя есть право на его просмотр, мы возвращаем полную информацию о блоге. Логика, как видите, не особо трудная.

Теперь мы готовы описать зависимость для получения блога:

async def get_blog_info(
        blog_id: int,
        session: AsyncSession = SessionDep,
        user_data: User | None = Depends(get_current_user_optional)
) -> BlogFullResponse | BlogNotFind:
    author_id = user_data.id if user_data else None
    return await BlogDAO.get_full_blog_info(session=session, blog_id=blog_id, author_id=author_id)

На основе этой зависимости мы опишем API-метод для получения блога:

@router.get('/get_blog/{blog_id}', summary="Получить информацию по блогу")
async def get_blog_endpoint(
        blog_id: int,
        blog_info: BlogFullResponse | BlogNotFind = Depends(get_blog_info)
) -> BlogFullResponse | BlogNotFind:
    return blog_info

Этот метод работает благодаря механизму интроспекции (reflection) в Python и аннотациям типов. FastAPI анализирует сигнатуры функций, видит, что в зависимости есть параметр blog_id, и автоматически находит соответствующий параметр в пути маршрута с таким же именем и типом.

Проще говоря, фреймворк "угадывает" нужный параметр по имени и типу, независимо от порядка объявления. За счет этого мы получили достаточно лаконичный API-метод, который позволяет достать информацию о блоге любому пользователю, имеющему право на его просмотр, даже если он не авторизован в системе.

Далее, эту зависимость мы будем использовать для описания эндпоинта, который генерирует страницу с блогом.

Метод удаления блога

Опишем простой эндпоинт для удаления блога:

@router.delete('/delete_blog/{blog_id}', summary="Удалить блог")
async def delete_blog(
        blog_id: int,
        session: AsyncSession = TransactionSessionDep,
        current_user: User = Depends(get_current_user)
):
    result = await BlogDAO.delete_blog(session, blog_id, current_user.id)
    if result['status'] == 'error':
        raise HTTPException(status_code=400, detail=result['message'])
    return result

Здесь мы не можем разрешить удалять блоги просто всем авторизованным пользователям. Важно, чтобы была проверка: действительно ли пользователь является автором блога. Поэтому метод для удаления блога выглядит так:

@classmethod
async def delete_blog(cls, session: AsyncSession, blog_id: int, author_id: int) -> dict:
    """
    Метод для удаления блога. Удаление возможно только автором блога.

    :param session: Асинхронная сессия SQLAlchemy
    :param blog_id: ID блога
    :param author_id: ID автора, пытающегося удалить блог
    :return: Словарь с результатом операции
    """
    try:
        # Находим блог по ID
        query = select(cls.model).filter_by(id=blog_id)
        result = await session.execute(query)
        blog = result.scalar_one_or_none()

        if not blog:
            return {
                'message': f"Блог с ID {blog_id} не найден.",
                'status': 'error'
            }

        # Проверяем, является ли пользователь автором блога
        if blog.author != author_id:
            return {
                'message': "У вас нет прав на удаление этого блога.",
                'status': 'error'
            }

        # Удаляем блог
        await session.delete(blog)
        await session.flush()

        return {
            'message': f"Блог с ID {blog_id} успешно удален.",
            'status': 'success'
        }

    except SQLAlchemyError as e:
        await session.rollback()
        return {
            'message': f"Произошла ошибка при удалении блога: {str(e)}",
            'status': 'error'
        }

Это стандартный подход для удаления блога, с единственным отличием: перед фактом удаления мы проверяем, имеет ли пользователь право на удаление.

Метод изменения статуса блога (черновик/опубликован)

Теперь опишем метод для изменения статуса блога: "черновик" → "опубликован" или наоборот.

@classmethod
async def change_blog_status(cls, session: AsyncSession, blog_id: int, new_status: str, author_id: int) -> dict:
    """
    Метод для изменения статуса блога. Изменение возможно только автором блога.

    :param session: Асинхронная сессия SQLAlchemy
    :param blog_id: ID блога
    :param new_status: Новый статус блога ('draft' или 'published')
    :param author_id: ID автора, пытающегося изменить статус
    :return: Словарь с результатом операции
    """
    if new_status not in ['draft', 'published']:
        return {
            'message': "Недопустимый статус. Используйте 'draft' или 'published'.",
            'status': 'error'
        }

    try:
        # Находим блог по ID
        query = select(cls.model).filter_by(id=blog_id)
        result = await session.execute(query)
        blog = result.scalar_one_or_none()

        if not blog:
            return {
                'message': f"Блог с ID {blog_id} не найден.",
                'status': 'error'
            }

        # Проверяем, является ли пользователь автором блога
        if blog.author != author_id:
            return {
                'message': "У вас нет прав на изменение статуса этого блога.",
                'status': 'error'
            }

        # Если текущий статус совпадает с новым, возвращаем сообщение без изменений
        if blog.status == new_status:
            return {
                'message': f"Блог уже имеет статус '{new_status}'.",
                'status': 'info',
                'blog_id': blog_id,
                'current_status': new_status
            }

        # Меняем статус блога
        blog.status = new_status
        await session.flush()

        return {
            'message': f"Статус блога успешно изменен на '{new_status}'.",
            'status': 'success',
            'blog_id': blog_id,
            'new_status': new_status
        }

    except SQLAlchemyError as e:
        await session.rollback()
        return {
            'message': f"Произошла ошибка при изменении статуса блога: {str(e)}",
            'status': 'error'
        }

Здесь задействованы те же проверки. В конечном итоге мы меняем статус блога, если у пользователя есть права.

API-эндпоинт для изменения статуса будет выглядеть так:

@router.patch('/change_blog_status/{blog_id}', summary="Изменить статус блога")
async def change_blog_status(
        blog_id: int,
        new_status: str,
        session: AsyncSession = TransactionSessionDep,
        current_user: User = Depends(get_current_user)
):
    result = await BlogDAO.change_blog_status(session, blog_id, new_status, current_user.id)
    if result['status'] == 'error':
        raise HTTPException(status_code=400, detail=result['message'])
    return result

Эндпоинт аккуратно реализует всю описанную выше логику, делая процесс изменения статуса блога безопасным и четко контролируемым.

Получаем список блогов с фильтрами

Теперь мы переходим к действительно сложной части. Нам предстоит описать универсальный метод, позволяющий:

  • Получать все блоги;

  • Получать все блоги с фильтрацией по автору;

  • Получать все блоги с фильтрацией по тегу;

  • Генерировать пагинацию на сервере;

  • Получать общее количество блогов, удовлетворяющий критериям поиска.

Если вы пишете код вместе со мной, пока не смотрите реализацию кода — постарайтесь сделать её самостоятельно.

Моя реализация:

@classmethod
async def get_blog_list(cls, session: AsyncSession, author_id: Optional[int] = None, tag: Optional[str] = None,
                        page: int = 1, page_size: int = 10):
    """
    Получает список опубликованных блогов с возможностью фильтрации и пагинации.

    :param session: Асинхронная сессия SQLAlchemy
    :param author_id: ID автора для фильтрации (опционально)
    :param tag: Название тега для фильтрации (опционально)
    :param page: Номер страницы (начиная с 1)
    :param page_size: Количество записей на странице (от 3 до 100)
    :return: Словарь с ключами page, total_page, total_result, blogs
    """
    # Ограничение параметров
    page_size = max(3, min(page_size, 100))
    page = max(1, page)

    # Начальная сборка базового запроса
    base_query = select(cls.model).options(
        joinedload(cls.model.user),
        selectinload(cls.model.tags)
    ).filter_by(status='published')

    # Фильтрация по автору
    if author_id is not None:
        base_query = base_query.filter_by(author=author_id)

    # Фильтрация по тегу
    if tag:
        base_query = base_query.join(cls.model.tags).filter(cls.model.tags.any(Tag.name.ilike(f"%{tag.lower()}%")))

    # Подсчет общего количества записей
    count_query = select(func.count()).select_from(base_query.subquery())
    total_result = await session.scalar(count_query)

    # Если записей нет, возвращаем пустой результат
    if not total_result:
        return {
            "page": page,
            "total_page": 0,
            "total_result": 0,
            "blogs": []
        }

    # Расчет количества страниц
    total_page = (total_result + page_size - 1) // page_size

    # Применение пагинации
    offset = (page - 1) * page_size
    paginated_query = base_query.offset(offset).limit(page_size)

    # Выполнение запроса и получение результатов
    result = await session.execute(paginated_query)
    blogs = result.scalars().all()

    # Удаление дубликатов блогов по их ID
    unique_blogs = []
    seen_ids = set()
    for blog in blogs:
        if blog.id not in seen_ids:
            unique_blogs.append(BlogFullResponse.model_validate(blog))
            seen_ids.add(blog.id)

    # Логирование
    filters = []
    if author_id is not None:
        filters.append(f"author_id={author_id}")
    if tag:
        filters.append(f"tag={tag}")
    filter_str = " & ".join(filters) if filters else "no filters"

    logger.info(f"Page {page} fetched with {len(blogs)} blogs, filters: {filter_str}")
    # Формирование результата
    return {
        "page": page,
        "total_page": total_page,
        "total_result": total_result,
        "blogs": unique_blogs
    }

Разберем код детально, объясняя каждый этап простым языком:

  1. Ограничение параметров:

    page_size = max(3, min(page_size, 100))
    page = max(1, page)
    
    • Устанавливаем границы для количества записей на странице (от 3 до 100);

    • Гарантируем, что номер страницы не будет меньше 1.

  2. Базовый запрос с загрузкой связанных данных:

    base_query = select(cls.model).options(
        joinedload(cls.model.user),
        selectinload(cls.model.tags)
    ).filter_by(status='published')
    
    • Выбираем только опубликованные блоги;

    • joinedload и selectinload — оптимизации для подгрузки связанных данных (пользователь и теги).

  3. Фильтрация по дополнительным параметрам:

    if author_id is not None:
        base_query = base_query.filter_by(author=author_id)
    
    if tag:
        base_query = base_query.join(cls.model.tags).filter(cls.model.tags.any(Tag.name.ilike(f"%{tag.lower()}%")))
    
    • Можно отфильтровать блоги по автору;

    • Можно найти блоги по тегу (нечувствительный к регистру поиск).

  4. Подсчет общего количества записей:

    count_query = select(func.count()).select_from(base_query.subquery())
    total_result = await session.scalar(count_query)
    
    • Создаем подзапрос для подсчета общего числа блогов;

    • Используем func.count() для подсчета.

  5. Пагинация:

    total_page = (total_result + page_size - 1) // page_size
    offset = (page - 1) * page_size
    paginated_query = base_query.offset(offset).limit(page_size)
    
    • Расчет общего количества страниц;

    • Вычисление смещения для текущей страницы;

    • Ограничение запроса конкретным количеством записей.

  6. Удаление дубликатов:

    unique_blogs = []
    seen_ids = set()
    for blog in blogs:
        if blog.id not in seen_ids:
            unique_blogs.append(BlogFullResponse.model_validate(blog))
            seen_ids.add(blog.id)
    
    • Гарантируем уникальность блогов по ID;

    • Преобразуем блоги в модель ответа.

  7. Логирование:

    filters = []
    if author_id is not None:
        filters.append(f"author_id={author_id}")
    if tag:
        filters.append(f"tag={tag}")
    logger.info(f"Page {page} fetched with {len(blogs)} blogs, filters: {filter_str}")
    
    • Создаем строку с применёнными фильтрами;

    • Логируем информацию о выполненном запросе.

Финальный результат:

return {
    "page": page,
    "total_page": total_page,
    "total_result": total_result,
    "blogs": unique_blogs
}

API-метод для получения списка блогов

Теперь, когда вы разобрались с методом, опишем API-метод:

@router.get('/blogs/', summary="Получить все блоги в статусе 'publish'")
async def get_blog_info(
        author_id: int | None = None,
        tag: str | None = None,
        page: int = Query(1, ge=1, description="Номер страницы"),
        page_size: int = Query(10, ge=10, le=100, description="Записей на странице"),
        session: AsyncSession = SessionDep,
):
    try:
        result = await BlogDAO.get_blog_list(session=session, author_id=author_id, tag=tag, page=page,
                                             page_size=page_size)
        return result if result['blogs'] else BlogNotFind(message="Блоги не найдены", status='error')
    except Exception as e:
        logger.error(f"Ошибка при получении блогов: {e}")
        return JSONResponse(status_code=500, content={"detail": "Ошибка сервера"})

Особенности:

  • Используем Query для описания параметров;

  • Проверяем, есть ли блоги в результате, и возвращаем соответствующий ответ;

  • Логируем ошибки для отладки.

Прежде чем мы приступим к визуализации нашего проекта, хочу обратить ваше внимание на важный аспект разработки веб-приложений.

На данном этапе мы успешно реализовали полную логику блога. Для бэкенд-разработчика это может означать завершение основной части работы. Однако, процесс создания полноценного веб-приложения на этом не заканчивается.

Обычно дальнейшая разработка включает в себя:

  1. Работу веб-дизайнеров, которые создают визуальный макет страниц, например, в Figma.

  2. Деятельность фронтенд-разработчиков, которые:

    • Преобразуют дизайн-макеты в код (вёрстка).

    • Интегрируют API-методы с помощью JavaScript.

Для фронтенд-разработки могут использоваться различные технологии:

  • Базовый стек: HTML, CSS, JavaScript

  • Современные фреймворки: React, Next.js, Angular и другие

В рамках нашего проекта мы пойдем нестандартным путем и самостоятельно визуализируем наш API, используя комбинацию FastAPI и шаблонизатора Jinja2. Это позволит нам создать полноценное веб-приложение без привлечения отдельной команды фронтенд-разработчиков.

Кстати, если вам интересно углубиться в тему языков программирования и их применения в веб-разработке, я недавно подготовил подробную методичку в своем телеграм-канале. В ней я раскрыл особенности бэкенд-языков и уникальную роль JavaScript как единственного полноценного языка программирования для фронтенда.

Надеюсь с этим понятно, а мы переходим к визуализации.

Создаем страницы

Для начала, перед тем как мы будем связывать наш API с фронтендом, нам необходимо подготовить HTML + CSS шаблоны будущих страниц. Всего у нас будет 3 страницы:

  • Страница «Блог не найден или не доступен».

  • Страница с блогом.

  • Страница со списком блогов.

Так как я не ставлю задачу обучить вас созданию фронтенда, сегодня мы не будем рассматривать стили страничек, а сосредоточимся исключительно на HTML-шаблоне и JavaScript, который, к сожалению, нам предстоит написать (не волнуйтесь, я сократил его до минимума).

Начнем со страницы с блогом

Для её создания в папке app/templates нужно создать файл post.html и внутри него прописать следующий код:

<!DOCTYPE html>
<html lang="ru">
<head>
    <meta charset="UTF-8">
    <meta name="viewport" content="width=device-width, initial-scale=1.0">
    <title>{{ article.title }}</title>
    <link rel="stylesheet" href="/static/style/post.css">
</head>
<body>
<article class="article-container"
         data-blog-id="{{ article.id }}"
         data-blog-status="{{ article.status }}"
         data-blog-author="{{ article.author }}">
    <h1 class="article-title">{{ article.title }}</h1>
    <div class="article-meta">
        <a href="/blogs?author_id={{ article.author }}">{{ article.author_name }}</a> •
        {{ article.created_at.strftime('%d %B %Y') }}
    </div>
    <div class="article-content">
        {{ article.content|safe }}
    </div>
    <div class="tags-container">
        {% if article.tags %}
        <ul class="tags">
            {% for tag in article.tags %}
            <li><a href="/blogs?tag={{ tag.name }}" class="tag">{{ tag.name }}</a></li>
            {% endfor %}
        </ul>
        {% else %}
        <p>Нет тегов для этой статьи.</p>
        {% endif %}
    </div>
    {% if current_user_id == article.author %}
    <div class="article-actions">
        {% if article.status == 'published' %}
        <button class="button status-button" data-action="change-status" data-new-status="draft">
            Переместить в черновики
        </button>
        {% else %}
        <button class="button status-button" data-action="change-status" data-new-status="published">
            Опубликовать
        </button>
        {% endif %}
        <button class="button delete-button" data-action="delete">Удалить блог</button>
    </div>
    {% endif %}
</article>

<div class="view-blogs">
    <a href="/blogs" class="button view-blogs-button">Смотреть все блоги</a>
</div>

<script src="/static/js/post.js"></script>
</body>
</html>

Обратите внимание: наш HTML основан на синтаксисе Jinja2. В принципе, мы могли бы обойтись без Jinja2, реализовав всю логику на JavaScript, так как все API методы уже реализованы. Однако мы, как бэкенд-разработчики, предпочитаем оставаться в зоне Python и избегать лишнего JavaScript.

Кратко поясню, что происходит в шаблоне:

  • В заготовленные ячейки вставляется соответствующая информация.

  • Если пользователь является автором блога, ему предоставляется возможность удалить блог или изменить его статус (поместить в черновики или опубликовать).

Теперь о строке:

<script src="/static/js/post.js"></script>

Она намекает, что нам придется написать JS-код, который мы потом импортируем в этот шаблон. Сам код будет лежать в файле app/static/js/post.js. Поспешу вас утешить: это единственный JS-код, который мы сегодня напишем.

Функция для получения куки

function getCookie(name) {
    return document.cookie
        .split(';')
        .map(cookie =&gt; cookie.trim())
        .find(cookie =&gt; cookie.startsWith(`${name}=`))
        ?.split('=')[1] || null;
}

Универсальная функция для запросов к API

async function sendRequest(url, method, body = null, jwtToken = null) {
    const headers = {
        'Accept': 'application/json',
        'Content-Type': 'application/json',
    };

    if (jwtToken) {
        headers['Authorization'] = `Bearer ${jwtToken}`;
    }

    try {
        const response = await fetch(url, {
            method,
            headers,
            body: body ? JSON.stringify(body) : null,
        });

        if (!response.ok) {
            let errorData = {};
            try {
                errorData = await response.json();
            } catch (jsonError) {
                console.error('Ошибка парсинга JSON:', jsonError);
            }
            throw new Error(errorData.detail || `HTTP Error: ${response.status}`);
        }

        return await response.json();
    } catch (error) {
        if (error.name === 'TypeError') {
            console.error('Сетевая ошибка или проблема с CORS:', error);
        }
        console.error(`Ошибка в запросе: ${error.message}`);
        throw error;
    }
}

Инициализация данных

Код, который выполняется при загрузке страницы, отвечает за обработку действий пользователя с элементами блога.

document.addEventListener('DOMContentLoaded', () =&gt; {
    const articleContainer = document.querySelector('.article-container');
    if (!articleContainer) {
        console.error('Элемент .article-container не найден. Убедитесь, что он присутствует в DOM.');
        return;
    }

    const BLOG_DATA = {
        id: articleContainer.dataset.blogId,
        status: articleContainer.dataset.blogStatus,
        author: articleContainer.dataset.blogAuthor,
        jwtToken: getCookie('users_access_token'),
    };

    console.log('BLOG_DATA:', BLOG_DATA);

    const deleteButton = document.querySelector('[data-action="delete"]');
    if (deleteButton) {
        deleteButton.addEventListener('click', () =&gt; {
            if (confirm('Вы уверены, что хотите удалить этот блог?')) {
                deleteBlog(BLOG_DATA);
            }
        });
    }

    const statusButton = document.querySelector('[data-action="change-status"]');
    if (statusButton) {
        statusButton.addEventListener('click', () =&gt; {
            const newStatus = statusButton.dataset.newStatus;
            changeBlogStatus(BLOG_DATA, newStatus);
        });
    }
});

Функции для удаления и изменения статуса блога

async function deleteBlog({id, jwtToken}) {
    try {
        await sendRequest(`/api/delete_blog/${id}`, 'DELETE', null, jwtToken);
        alert('Блог успешно удалён. Перенаправление...');
        window.location.href = '/blogs/';
    } catch (error) {
        console.error('Не удалось удалить блог:', error);
    }
}

async function changeBlogStatus({id, jwtToken}, newStatus) {
    try {
        const url = `/api/change_blog_status/${id}?new_status=${encodeURIComponent(newStatus)}`;
        await sendRequest(url, 'PATCH', null, jwtToken);
        alert('Статус успешно изменён. Страница будет обновлена.');
        location.reload();
    } catch (error) {
        console.error('Не удалось изменить статус блога:', error);
        alert('Ошибка при изменении статуса блога. Попробуйте ещё раз.');
    }
}

После удаления блога мы перенаправляем пользователя на страницу с блогами, а после изменения статуса обновляем страницу.

Скажу сразу, код JS, который я описал не претендует на «лучшие практики», но я могу ручаться, что работает он ровно так, как я планировал. Так что если вы являетесь опытным фронтенд-разработчиком, то буду надеяться на вашу конструктивную критику в комментариях. Это все не моя стихия.

Визуализация страницы «Блог не найден»

Теперь реализуем шаблон страницы «Блог не найден». Я назвал файл 404.html. Вот как он выглядит:

<!DOCTYPE html>
<html lang="ru">
<head>
    <meta charset="UTF-8">
    <meta name="viewport" content="width=device-width, initial-scale=1.0">
    <title>Блог ID {{ blog_id }} не найден</title>
    <link rel="stylesheet" href="/static/style/404.css">
</head>
<body>
<div class="message-container">
    <h1 class="message-title">Блог не найден</h1>
    <p class="message-text">
        К сожалению, блог <strong>с ID {{ blog_id }} </strong>не найден. Возможно, его автор удалил запись
        или переместил её в черновики.
    </p>
    <a href="/blogs/" class="button">Смотреть опубликованные блоги</a>
</div>
</body>
</html>

Визуализация страницы со списком блогов

И последняя страница — со списком блогов. Я назвал её posts.html:

<!DOCTYPE html>
<html lang="ru">
<head>
    <meta charset="UTF-8">
    <meta name="viewport" content="width=device-width, initial-scale=1.0">
    <title>Блоги</title>
    <link rel="stylesheet" href="/static/style/posts.css">
</head>
<body>
<div class="content-container">
    <div class="page-header">
        <h1><a href="/blogs/">Все блоги</a></h1>
    </div>

    <!-- Список статей -->
    <ul class="articles-list">
        {% for blog in article.blogs %}
        <li class="article-card">
            <h2><a href="/blogs/{{ blog.id }}">{{ blog.title }}</a></h2>
            <div class="article-meta">
                {% if blog.author_id %}
                <a href="?author_id={{ blog.author_id }}">{{ blog.author_name }}</a>
                {% else %}
                {{ blog.author_name }}
                {% endif %}
                • {{ blog.created_at.strftime('%d %B %Y') }}
            </div>
            <p class="article-excerpt">{{ blog.short_description }}</p>
            {% if blog.tags %}
            <div class="article-tags">
                {% for tag in blog.tags %}
                <a href="/blogs?tag={{ tag.name }}" class="tag">{{ tag.name }}</a>
                {% endfor %}
            </div>
            {% endif %}
        </li>
        {% endfor %}
    </ul>

    <!-- Пагинация -->
    <div class="pagination">
        {% if article.page > 1 %}
        <a href="?page={{ article.page - 1 }}{% if filters.author_id %}&author_id={{ filters.author_id }}{% endif %}{% if filters.tag %}&tag={{ filters.tag }}{% endif %}"
           class="pagination-link">←</a>
        {% endif %}
        {% for p in range(1, article.total_page + 1) %}
        <a href="?page={{ p }}{% if filters.author_id %}&author_id={{ filters.author_id }}{% endif %}{% if filters.tag %}&tag={{ filters.tag }}{% endif %}"
           class="pagination-link {% if p == article.page %}active{% endif %}">{{ p }}</a>
        {% endfor %}
        {% if article.page < article.total_page %}
        <a href="?page={{ article.page + 1 }}{% if filters.author_id %}&author_id={{ filters.author_id }}{% endif %}{% if filters.tag %}&tag={{ filters.tag }}{% endif %}"
           class="pagination-link">→</a>
        {% endif %}
    </div>
</div>
</body>
</html>

Тут, кроме использования шаблонизатора Jinja2, ничего сложного нет. Единственное, что может вызвать вопросы, — это блок с пагинацией. Однако, если внимательно посмотреть, это всего лишь стандартный Python-цикл for, который обрабатывает данные, возвращённые сервером. На данном этапе у вас уже должна сложиться общая картина того, зачем мы всё это делали и почему описывали метод для получения списка блогов.

Подготовка FastAPI к рендерингу страниц

Для более глубокого понимания предстоящего материала рекомендую обратиться к моей статье "Создание собственного API на Python (FastAPI): Подключаем фронтенд и статические файлы". В этой публикации я детально рассмотрел фундаментальные принципы работы со статическими файлами и интеграции фронтенда, используя возможности FastAPI. Эта статья послужит отличным фундаментом для освоения концепций, которые мы будем обсуждать далее. Вы можете ознакомиться с ней по ссылке [здесь].

Теперь, чтобы наши странички начали рендериться в FastAPI, необходимо реализовать соответствующие API-методы.

Создайте папку app/pages, а в ней файл router.py. Импорты для работы с роутером:

from fastapi import APIRouter, Depends, Request
from fastapi.templating import Jinja2Templates
from sqlalchemy.ext.asyncio import AsyncSession

from app.api.dao import BlogDAO
from app.api.schemas import BlogFullResponse, BlogNotFind
from app.auth.dependencies import get_blog_info, get_current_user_optional
import markdown2

from app.auth.models import User
from app.dao.session_maker import SessionDep

router = APIRouter(tags=['ФРОНТЕНД'])

templates = Jinja2Templates(directory='app/templates')

Обратите внимание: мы импортировали библиотеку markdown2, которая пригодится для корректного преобразования markdown-текстов в HTML. Также настроили шаблонизатор с помощью строки:

templates = Jinja2Templates(directory='app/templates')

Теперь можно использовать шаблоны в эндпоинтах FastAPI.

Эндпоинт для получения блога

@router.get('/blogs/{blog_id}/')
async def get_blog_post(
        request: Request,
        blog_id: int,
        blog_info: BlogFullResponse | BlogNotFind = Depends(get_blog_info),
        user_data: User | None = Depends(get_current_user_optional)
):
    if isinstance(blog_info, dict):
        return templates.TemplateResponse(
            "404.html", {"request": request, "blog_id": blog_id}
        )
    else:
        blog = BlogFullResponse.model_validate(blog_info).model_dump()
        # Преобразование Markdown в HTML
        blog['content'] = markdown2.markdown(blog['content'], extras=['fenced-code-blocks', 'tables'])
        return templates.TemplateResponse(
            "post.html",
            {"request": request, "article": blog, "current_user_id": user_data.id if user_data else None}
        )

Ключевые моменты метода:

  • Аргумент request обязателен для передачи объекта запроса в шаблон.

  • Валидация и преобразование данных блога выполняется через модель BlogFullResponse.

  • Если блог не найден, рендерится 404.html. Если найден, преобразуется контент из markdown в HTML и рендерится post.html.

Особого внимания тут заслуживает следующая строка кода:

blog['content'] = markdown2.markdown(blog['content'], extras=['fenced-code-blocks', 'tables'])

Она наглядно демонстрирует, что с помощью Python можно легко преобразовывать Markdown в HTML прямо перед рендерингом страницы. В связи с этим возникает вопрос: почему было сложно реализовать нечто подобное в Telegraf?

Эндпоинт для получения списка блогов

@router.get('/blogs/')
async def get_blog_post(
        request: Request,
        author_id: int | None = None,
        tag: str | None = None,
        page: int = 1,
        page_size: int = 3,
        session: AsyncSession = SessionDep,
):
    blogs = await BlogDAO.get_blog_list(
        session=session,
        author_id=author_id,
        tag=tag,
        page=page,
        page_size=page_size
    )
    return templates.TemplateResponse(
        "posts.html",
        {
            "request": request,
            "article": blogs,
            "filters": {
                "author_id": author_id,
                "tag": tag,
            }
        }
    )

Регистрация роутера

В main-файле подключите роутер:

from app.pages.router import router as router_page
app.include_router(router_page)

Убедитесь, что у вас есть строка для подключения статических файлов:

app.mount('/static', StaticFiles(directory='app/static'), name='static')

Эта строка регистрирует папку с статическими файлами в FastAPI и указывает, что они будут доступны по пути /static в URL.

Этот код говорит FastAPI: "Все файлы, которые лежат в папке app/static, будут доступны через URL-адрес, начинающийся с /static". Например, изображение, сохраненное в app/static/logo.png, будет доступно по адресу http://localhost:8000/static/logo.png.

Теперь можно приступать к тестированию блога!

Так у меня выглядит страничка с блогом.
Так у меня выглядит страничка с блогом.

При клике на автора меня перебрасывает на страницу постов автора (/blogs/?author_id=2). При клике на тег перебрасывает на страницу с тегами (/blogs/?tag=веб-разработка). При клике на название статьи открывается сама статья.

После авторизации через API-документацию (да, я не визуализировал форму входа в блог) и после открытия статьи за моим авторством я получаю варианты с изменение статуса статьи и с ее удалением.

После перемещения публикации в черновик статья будет видна только мне как автору.

Кнопка изменилась на «Опубликовать».
Кнопка изменилась на «Опубликовать».

Открою эту статью в режиме инкогнито:

После изменения статуса на «Опубликовано» статья становится доступной даже не авторизованным пользователям.

Возможности редактировать статью, естественно, нет.
Возможности редактировать статью, естественно, нет.

Теперь нам остается финальный шаг — сделать наш блог доступным всем по ссылке. Для этого мы сделаем деплой на сервис Amvera Cloud.

Деплой FastAPI приложения

Деплой FastAPI приложения займет не более нескольких минут. Приготовьтесь — процесс будет быстрым и простым.

Шаг 1: Подготовка настроек

Первым делом необходимо создать файл настроек, который поможет сервису Amvera понять, как развертывать и запускать ваше приложение. В корне проекта, на одном уровне с файлом requirements.txt, создайте файл amvera.yml. В моем случае он выглядит так:

meta:
  environment: python
  toolchain:
    name: pip
    version: 3.12
build:
  requirementsPath: requirements.txt
run:
  persistenceMount: /data
  containerPort: 8000
  command: uvicorn app.main:app --host 0.0.0.0 --port 8000

Что делает этот файл:

  • meta: Указывает, что для работы используется среда Python и инструмент управления зависимостями — pip.

  • build: Здесь указан путь к файлу requirements.txt, чтобы Amvera знал, где искать зависимости.

  • run: Настраивает путь для хранения данных (/data), порт контейнера (8000) и команду для запуска приложения через uvicorn.

Шаг 2: Регистрация и создание проекта в Amvera Cloud

  1. Регистрация: Перейдите на сайт Amvera Cloud и зарегистрируйтесь, если у вас еще нет аккаунта. Новые пользователи получают 111 рублей на основной баланс, так что протестировать сервис можно прямо сейчас и бесплатно.

  2. Создание проекта: Нажмите кнопку "Создать проект".

  3. Настройка проекта:

    • Укажите имя проекта.

    • Выберите тарифный план — для большинства проектов подойдет тариф "Начальный". После этого нажмите «Далее».

  4. Загрузка файлов:

    • На следующем этапе выберите способ загрузки файлов: через GIT или загрузив их вручную.

    • Для удобства выберите метод загрузки вручную и просто перетащите все файлы проекта. Убедитесь, что вы добавили файлы amvera.yml и requirements.txt. Нажмите «Далее».

  5. Проверка конфигурации: На последнем экране подтвердите настройки проекта и проверьте их корректность.

Шаг 3: Настройка домена

  1. Войдите в свой проект и перейдите в раздел «Настройки».

  2. Нажмите «Добавить домен».

  3. Выберите тип домена — HTTPS.

  4. Укажите бесплатный домен от Amvera или добавьте свой собственный (например, с REG.RU), если у вас уже есть зарегистрированный домен.

Шаг 4: Ожидание завершения деплоя

Через 2-3 минуты ваш проект будет готов к запуску и доступен по уникальной ссылке, похожей на эту:

Ссылка на проект (эта ссылка будет уникальной для вашего проекта).

Полный исходный код проекта вы можете найти в моём бесплатном телеграм-канале.

Заключение

Сегодня мы успешно завершили масштабный и увлекательный проект. С одной стороны, мы погрузились в "закулисную жизнь" блога и заложили прочную основу для визуализации API с помощью фронтенд-фреймворков, а с другой - научились визуализировать свой API самостоятельно.

В процессе работы мы разобрались с такими важными темами, как связи многие-ко-многим, принципы работы тегов и пагинации, а также углубились в создание сложных запросов к базе данных через SQLAlchemy.

Если вам понравился проект и вы хотите увидеть его продолжение с дальнейшим развитием и визуализацией блога, обязательно пишите об этом в комментариях или в нашем Python-сообществе. Ваши лайки также будут для меня отличным показателем того, что тема вас заинтересовала и вы с нетерпением ждете следующую часть.

На этом пока всё. Спасибо за внимание и до скорых встреч!

Комментарии (10)


  1. Dynasaur
    03.12.2024 07:25

    В стеке питон не упомянут :-)


  1. Awaitable
    03.12.2024 07:25

    Код красиво написан, но есть замечания

    Во-первых, эндпоинты.

    /api/blogs/delete_blog
    /api/blogs/create_blog

    Это не соответствует naming-конвенциями REST

    Follow the rest.

    POST /api/blogs
    GET /api/blogs
    GET /api/blogs/{blog_id}
    PATCH /api/blogs/{blog_id}

    Так выглядит чище.

    Во-вторых, бизнес-логика в контроллере

    @router.post("/add_post/", summary="Добавление нового блога с тегами")
    async def add_blog(
            add_data: BlogCreateSchemaBase,
            user_data: User = Depends(get_current_user),
            session: AsyncSession = TransactionSessionDep
    ):
        blog_dict = add_data.model_dump()
        blog_dict['author'] = user_data.id
        tags = blog_dict.pop('tags', [])
    
        try:
            blog = await BlogDAO.add(session=session, values=BlogCreateSchemaAdd.model_validate(blog_dict))
            blog_id = blog.id
    
            if tags:
                tags_ids = await TagDAO.add_tags(session=session, tag_names=tags)
                await BlogTagDAO.add_blog_tags(session=session,
                                               blog_tag_pairs=[{'blog_id': blog_id, 'tag_id': i} for i in tags_ids])
    
            return {'status': 'success', 'message': f'Блог с ID {blog_id} успешно добавлен с тегами.'}
        except IntegrityError as e:
            if "UNIQUE constraint failed" in str(e.orig):
                raise HTTPException(status_code=status.HTTP_400_BAD_REQUEST,
                                    detail="Блог с таким заголовком уже существует.")
            raise HTTPException(status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, detail="Ошибка при добавлении блога.")

    По идее, с CRUD надо взаимодействовать в Service, делая связку используя интерфейсы, а тут все смешано.

    Кстати, в блоке try не принято делать больше одного действия, оберни создание в отдельный метод и вызывай его, это улучшит читабельность блока.

    В третьих, CRUD-методы.

    @classmethod
    async def get_blog_list(cls, session: AsyncSession, author_id: Optional[int] = None, tag: Optional[str] = None,
                            page: int = 1, page_size: int = 10):
        """
        Получает список опубликованных блогов с возможностью фильтрации и пагинации.
    
        :param session: Асинхронная сессия SQLAlchemy
        :param author_id: ID автора для фильтрации (опционально)
        :param tag: Название тега для фильтрации (опционально)
        :param page: Номер страницы (начиная с 1)
        :param page_size: Количество записей на странице (от 3 до 100)
        :return: Словарь с ключами page, total_page, total_result, blogs
        """
        # Ограничение параметров
        page_size = max(3, min(page_size, 100))
        page = max(1, page)
    
        # Начальная сборка базового запроса
        base_query = select(cls.model).options(
            joinedload(cls.model.user),
            selectinload(cls.model.tags)
        ).filter_by(status='published')
    
        # Фильтрация по автору
        if author_id is not None:
            base_query = base_query.filter_by(author=author_id)
    
        # Фильтрация по тегу
        if tag:
            base_query = base_query.join(cls.model.tags).filter(cls.model.tags.any(Tag.name.ilike(f"%{tag.lower()}%")))
    
        # Подсчет общего количества записей
        count_query = select(func.count()).select_from(base_query.subquery())
        total_result = await session.scalar(count_query)
    
        # Если записей нет, возвращаем пустой результат
        if not total_result:
            return {
                "page": page,
                "total_page": 0,
                "total_result": 0,
                "blogs": []
            }
    
        # Расчет количества страниц
        total_page = (total_result + page_size - 1) // page_size
    
        # Применение пагинации
        offset = (page - 1) * page_size
        paginated_query = base_query.offset(offset).limit(page_size)
    
        # Выполнение запроса и получение результатов
        result = await session.execute(paginated_query)
        blogs = result.scalars().all()
    
        # Удаление дубликатов блогов по их ID
        unique_blogs = []
        seen_ids = set()
        for blog in blogs:
            if blog.id not in seen_ids:
                unique_blogs.append(BlogFullResponse.model_validate(blog))
                seen_ids.add(blog.id)
    
        # Логирование
        filters = []
        if author_id is not None:
            filters.append(f"author_id={author_id}")
        if tag:
            filters.append(f"tag={tag}")
        filter_str = " & ".join(filters) if filters else "no filters"
    
        logger.info(f"Page {page} fetched with {len(blogs)} blogs, filters: {filter_str}")
        # Формирование результата
        return {
            "page": page,
            "total_page": total_page,
            "total_result": total_result,
            "blogs": unique_blogs
        }

    Пагинацию лучше вынести в отдельный метод, который будет принимать некий select

    Для логгинга - создай отдельный логгер, кстати, f-strings не принято использовать в логгерах, в logger.info можно передавать аргументы, которые будут подставлены в финальную строку.

    Это самое заметное, конкретно все - не читал.


    1. yakvenalex Автор
      03.12.2024 07:25

      Принял. Спасибо за обратную связь и конструктивную критику)


    1. LittleMeN
      03.12.2024 07:25

      Одного меня смущает, что FastAPI спроектирован для программирования API. И «прикручивание» фронтенда — рендеринга страниц, это бред!? Почему бы сразу не взять Flask например?