Привет, друзья! Сегодня я подготовил для вас увлекательную практическую статью о создании мини-чата на FastApi. Мы погрузимся в мир вебсокетов, узнаем, зачем они нужны и как применяются в реальных приложениях. Также я продемонстрирую работу с асинхронной SQLAlchemy на примере взаимодействия с базой данных SQLite.

Для создания современного интерфейса мы обратимся к интересному и бесплатному сервису Websim.ai, который за пару минут сгенерирует нам интерфейс, включая страничку для входа/регистрации и страницу самого чата.

Чтобы наш чат мог обслуживать множество пользователей одновременно, мы выполним деплой нашего FastApi приложения. Для этого воспользуемся сервисом Amvera Cloud. Нам нужно будет подготовить файлы приложения, написать код, создать файл с настройками (можно сгенерировать на сайте или скопировать мой код), а затем доставить файлы на сервис. Для доставки можно использовать встроенный терминал или GIT, используя стандартные команды PUSH/PULL.

Но прежде чем мы погрузимся в код, давайте кратко обсудим, что такое вебсокеты и как они работают в контексте FastApi.

Что такое вебсокеты?

Вебсокеты — это технология, позволяющая устанавливать постоянное соединение между клиентом и сервером.

В отличие от традиционных HTTP-запросов, где каждое взаимодействие требует нового соединения, вебсокеты поддерживают открытое соединение, через которое данные могут передаваться в обоих направлениях в реальном времени. Это делает их идеальными для приложений, требующих мгновенного обмена данными, таких как чаты, онлайн-игры и системы уведомлений.

В контексте FastApi вебсокеты позволяют легко создавать и управлять такими постоянными соединениями, обеспечивая высокую производительность и простоту интеграции с другими компонентами вашего приложения.

Важное предисловие!

Друзья, сегодня нас ждет объемный материал, поэтому я постараюсь максимально сосредоточиться на практической части, обходя теоретические объяснения, где это возможно. Мы будем сегодня касаться таких тем, как SQLAlchemy, Alembic, JWT-токены, авторизация и аутентификация, а также работа со статическими файлами в FastApi.

Эти темы не то, чтобы сложные, но без базового их понимания вам будет сложно понять код, который мы сегодня напишем.

В других своих статьях, которые вы можете найти в моем профиле на Хабре, я подробно и глубоко разбирал все эти вопросы. Ниже приведены ссылки на мои материалы, с которыми стоит ознакомиться перед прочтением сегодняшней статьи:

Кроме того, сегодня мы не обойдемся без JavaScript, HTML и CSS. Для понимания происходящего желательно иметь базовые представления о том, как работает фронтенд.

План на сегодня

  1. Подготовка базы данных с миграциями

    • Установим SQLAlchemy и Alembic.

    • Создадим модели для двух таблиц.

    • Опишем методы взаимодействия с базой данных.

  2. Написание логики бэкенда на FastAPI

    • Опишем эндпоинты для регистрации и авторизации пользователей.

    • Реализуем методы для получения и сохранения сообщений.

  3. Подготовка фронтенда

    • Используем сервис WebSim.ai для генерации HTML‑шаблона со стилями и JavaScript‑кода.

    • Адаптируем и доработаем сгенерированный код для подключения к бэкенду.

  4. Подключение бэкенда к фронтенду

    • На этом этапе пока не будем использовать вебсокеты.

    • Подключим существующие методы бэкенда к фронтенду.

  5. Подключение вебсокетов

    • Реализуем двунаправленный канал для общения между пользователями чата средствами FastApi (websocets).

    • Внесем значительные изменения в JavaScript‑код для мини‑чата, адаптируя его под вебсокеты.

    • Подробно прокомментируем код FastAPI и JavaScript.

  6. Деплой

    • Сделаем проект общедоступным, используя AmveraCloud.

Создание и настройка проекта

Создадим проект под наше будущее приложение. Я буду использовать Pycharm.

Внутри проекта сгенерируем файл requirements.txt. Заполним его следующим образом:

Скрытый текст
fastapi==0.115.0
uvicorn==0.31.0
sqlalchemy==2.0.35
alembic==1.13.3
pydantic[email]==2.9.2
bcrypt==4.0.1
passlib==1.7.4
python-jose==3.3.0
websockets==13.1
aiosqlite==0.20.0
pydantic_settings==2.5.2
jinja2==3.1.4

Установим

pip install -r requirements.txt

Теперь подготовим файл  .env. В текущем проекте меня будет интересовать всего 2 переменные, но оставляю данный файл просто для чистоты проекта.

SECRET_KEY=jlkjdasSSSSkiidmn
ALGORITHM=HS256

Данные переменные нам нужны будут для работы с JWT-токеном, авторизацией и аутентификацией. Придумайте надежный SECRET_KEY!

Теперь сразу заготовим несколько файлов и папок, которые после постепенно наполним кодом.

В корне создадим папку app. Далее, наполним эту папку следующей структурой.

Скрытый текст
chat (папка)
├── dao.py            # Логика доступа к данным для работы с сообщениями
├── models.py         # Описание моделей данных (сообщения) для базы данных
├── router.py         # Роутеры для обработки запросов, связанных с чатом
└── schemas.py        # Схемы Pydantic для валидации и сериализации данных чата

dao (папка)
└── base.py           # Базовый класс DAO для доступа к данным, используемый в других модулях

static (папка)        # Статические файлы проекта (JS, CSS)
├── js
│   ├── auth.js       # JavaScript логика для авторизации
│   └── chat.js       # JavaScript логика для чата
└── styles
    ├── auth.css      # Стили для страницы авторизации
    └── chat.css      # Стили для страницы чата

templates (папка)     # HTML шаблоны для фронтенда
├── auth.html         # Шаблон страницы авторизации
└── chat.html         # Шаблон страницы чата

users (папка)
├── auth.py           # Логика авторизации и регистрации пользователей
├── dao.py            # Логика доступа к данным для работы с пользователями
├── dependensies.py   # Зависимости для маршрутизации (например, текущий пользователь)
├── models.py         # Модели данных для пользователей (SQLAlchemy)
├── router.py         # Роутеры для обработки запросов, связанных с пользователями
└── schemas.py        # Схемы Pydantic для валидации и сериализации данных пользователей

config.py             # Конфигурационный файл для настроек проекта
database.py           # Настройки подключения к базе данных и инициализация
exceptions.py         # Обработка пользовательских исключений и ошибок
main.py               # Основной файл запуска приложения FastAPI

Надеюсь, что структура готова. Теперь приступим к подготовке базы данных. Совсем скоро мы подготовим базу данных под свой проект и напишем первые миграции.

Блок работы с базой данной

Многое из того, что будет упомянуто далее, я уже подробно освещал в предыдущих статьях. Поэтому, если вам не хватит пояснений к коду, вы всегда можете обратиться к этим публикациям для получения более детальной информации.

Начнем с файла app/database.py. В этом файле мы пропишем базовые настройки для работы с базой данных.

from sqlalchemy import func
from datetime import datetime
from sqlalchemy.orm import Mapped, mapped_column, DeclarativeBase
from sqlalchemy.ext.asyncio import AsyncAttrs, async_sessionmaker, create_async_engine, AsyncSession

database_url = 'sqlite+aiosqlite:///db.sqlite3'
engine = create_async_engine(url=database_url)
async_session_maker = async_sessionmaker(engine, class_=AsyncSession)


class Base(AsyncAttrs, DeclarativeBase):
    created_at: Mapped[datetime] = mapped_column(server_default=func.now())
    updated_at: Mapped[datetime] = mapped_column(server_default=func.now(), onupdate=func.now())

Этот файл настраивает асинхронное взаимодействие с базой данных SQLite с использованием SQLAlchemy.

Далее, при создании моделей таблиц мы будем наследоваться от базового класса, а для любого подключения к базе данных мы будем использовать async_session_maker.

Описываем модели таблиц

Теперь мы готовы к описанию моделей наших будущих таблиц. Для начала подготовим модель с пользователями. Для этого в файле users/models.py опишем поля наших таблиц.

Модель пользователей:

from sqlalchemy import String, Integer
from sqlalchemy.orm import Mapped, mapped_column
from app.database import Base


class User(Base):
    __tablename__ = 'users'

    id: Mapped[int] = mapped_column(Integer, primary_key=True, autoincrement=True)
    name: Mapped[str] = mapped_column(String, nullable=False)
    hashed_password: Mapped[str] = mapped_column(String, nullable=False)
    email: Mapped[str] = mapped_column(String, nullable=False)

Как видите, колонок будет не много. Мы будем генерировать айди пользователя и будем записывать имя пользователя, его email и хэшированный пароль.

Теперь опишем вторую модель. Она нам будет нужна для хранения сообщений от пользователей. Эту модель мы опишем в файле chat/models.py

Модель сообщений:

from sqlalchemy import Integer, Text, ForeignKey
from sqlalchemy.orm import Mapped, mapped_column
from app.database import Base


class Message(Base):
    __tablename__ = 'messages'

    id: Mapped[int] = mapped_column(Integer, primary_key=True, index=True)
    sender_id: Mapped[int] = mapped_column(Integer, ForeignKey("users.id"))
    recipient_id: Mapped[int] = mapped_column(Integer, ForeignKey("users.id"))
    content: Mapped[str] = mapped_column(Text)

В этой таблице мы указываем, что тут будет храниться: id сообщения, айди получателя сообщения, айди отправителя сообщения и контент сообщения (в нашем случае текст).

Модели мы создали, но таблицы не появились. Исправим это при помощи Alembic, но предварительно настроим его асинхронную версию.

Настройка Alembic и первая миграция

Теперь в терминале перейдем в папку app

cd app

Далее выполним команду, которая инициализирует проект миграций с использованием Alembic для асинхронного взаимодействия с базой данных.

alembic init -t async migration

После этого в корне app у вас появится папка migration и файл alembic.ini. Перенесем файл alembic.ini на уровень выше в папку самого проекта.

В файле alembic.ini заменим одну строку для корректной работы Alembic.

script_location = migration

на

script_location = app/migration

Это мы сделаем для удобства, так как все последующие терминальные команды мы будем вводить с корневой папки проекта, а не app.

Теперь нам нужно настроить alembic для работы с базой данных. Для этого зайдем в файл app/migration/emv.py и откорректируем его следующим образом.

Было

import asyncio
from logging.config import fileConfig
from sqlalchemy import pool
from sqlalchemy.engine import Connection
from sqlalchemy.ext.asyncio import async_engine_from_config
from alembic import context


config = context.config


if config.config_file_name is not None:
    fileConfig(config.config_file_name)


target_metadata = None

Стало

import sys
from os.path import dirname, abspath

sys.path.insert(0, dirname(dirname(abspath(__file__))))

import asyncio
from logging.config import fileConfig
from sqlalchemy import pool
from sqlalchemy.engine import Connection
from sqlalchemy.ext.asyncio import async_engine_from_config
from alembic import context
from app.database import Base, database_url
from app.users.models import User
from app.chat.models import Message


config = context.config
config.set_main_option("sqlalchemy.url", database_url)
if config.config_file_name is not None:
    fileConfig(config.config_file_name)

    
target_metadata = Base.metadata

Остальные части кода файла env.py оставляем без изменений.

Из главного мы добавили корневую директорию проекта в системный путь, чтобы модули могли быть импортированы корректно. Это позволит нам запускать проект и выполнять миграции базы данных с корня проекта.

Далее, мы импортировали наш базовый класс, ссылку для подключения к базе данных и установили метаданные, тем самым, окончательно завершив подготовку Alembic. Теперь, в случае появления новых моделей таблиц просто импортируем их в этот файл, все остальное оставляя без изменений.

Теперь мы готовы к тому, чтоб создать свою первую миграцию и после сгенерировать нашу базу данных с таблицами.

Для этого возвращаемся в терминале в корень проекта:

cd ../

Далее вводим команду

alembic revision --autogenerate -m "Initial revision"

Таблицы по-прежнему не созданы, а сейчас мы только сгенерировали файл миграций. Теперь проведем миграции, воспользовавшись следующей командой.

alembic upgrade head

И теперь мы получим созданную базу данных SQLITE с именем db.sqlite3 и несколькими таблицами.

Теперь пропишем универсальные методы для взаимодействия с базой данных, которые могут использоваться для всех моделей. Для этого заполним файл dao/base.py

Скрытый текст
from sqlalchemy.exc import SQLAlchemyError
from sqlalchemy.future import select
from sqlalchemy import update as sqlalchemy_update, delete as sqlalchemy_delete, func
from app.database import async_session_maker


class BaseDAO:
    model = None

    @classmethod
    async def find_one_or_none_by_id(cls, data_id: int):
        """
        Асинхронно находит и возвращает один экземпляр модели по указанным критериям или None.

        Аргументы:
            data_id: Критерии фильтрации в виде идентификатора записи.

        Возвращает:
            Экземпляр модели или None, если ничего не найдено.
        """
        async with async_session_maker() as session:
            query = select(cls.model).filter_by(id=data_id)
            result = await session.execute(query)
            return result.scalar_one_or_none()

    @classmethod
    async def find_one_or_none(cls, **filter_by):
        """
        Асинхронно находит и возвращает один экземпляр модели по указанным критериям или None.

        Аргументы:
            **filter_by: Критерии фильтрации в виде именованных параметров.

        Возвращает:
            Экземпляр модели или None, если ничего не найдено.
        """
        async with async_session_maker() as session:
            query = select(cls.model).filter_by(**filter_by)
            result = await session.execute(query)
            return result.scalar_one_or_none()

    @classmethod
    async def find_all(cls, **filter_by):
        """
        Асинхронно находит и возвращает все экземпляры модели, удовлетворяющие указанным критериям.

        Аргументы:
            **filter_by: Критерии фильтрации в виде именованных параметров.

        Возвращает:
            Список экземпляров модели.
        """
        async with async_session_maker() as session:
            query = select(cls.model).filter_by(**filter_by)
            result = await session.execute(query)
            return result.scalars().all()

    @classmethod
    async def add(cls, **values):
        """
        Асинхронно создает новый экземпляр модели с указанными значениями.

        Аргументы:
            **values: Именованные параметры для создания нового экземпляра модели.

        Возвращает:
            Созданный экземпляр модели.
        """
        async with async_session_maker() as session:
            async with session.begin():
                new_instance = cls.model(**values)
                session.add(new_instance)
                try:
                    await session.commit()
                except SQLAlchemyError as e:
                    await session.rollback()
                    raise e
                return new_instance

Подробно каждый из этих методов я рассматривал в статье, в ней же представлена более расширенная версия класса BaseDao. Если коротко, то далее мы будем наследоваться от этого класса при работе с пользователями и при работе с чатом. Для этого я ввел переменную model = None.

Теперь мы готовы к настройке блока с регистрацией / авторизацией пользователей.

JWT-токены и работа с пользователями

Подробно этот блок я рассматривал в статье «Создание собственного API на Python (FastAPI): Авторизация, Аутентификация и роли пользователей». Код будет практически идентичный. Поэтому, разбор его буду делать максимально коротким.

Сразу подготовим класс для работы с моделью пользователей. Для этого в папке users/dao.py пропишем следующее.

from app.dao.base import BaseDAO
from app.users.models import User


class UsersDAO(BaseDAO):
    model = User

Тут мы наследуемся от базового класса BaseDAO и передаем в класс модель таблицы пользователей.

Теперь, используя класс UserDAO мы сможем использовать все методы из класса BaseDAO. Такой подход удобен тем, что если нам нужны будут какие-то особенные методы для пользователей, например, то мы сможем их прописать уже не в родительском классе, а внутри класса UserDAO. Подход достаточно удобный и гибкий, так что берите на вооружение.

Теперь опишем файл users/auth.py. В этом файле мы реализуем несколько важных функций для аутентификации пользователей. Эти функции включают генерацию JWT-токенов, хэширование паролей, проверку паролей и аутентификацию пользователей.

Скрытый текст
from passlib.context import CryptContext
from pydantic import EmailStr
from jose import jwt
from datetime import datetime, timedelta, timezone
from app.config import get_auth_data
from app.users.dao import UsersDAO


def create_access_token(data: dict) -> str:
    to_encode = data.copy()
    expire = datetime.now(timezone.utc) + timedelta(days=366)
    to_encode.update({"exp": expire})
    auth_data = get_auth_data()
    encode_jwt = jwt.encode(to_encode, auth_data['secret_key'], algorithm=auth_data['algorithm'])
    return encode_jwt


pwd_context = CryptContext(schemes=["bcrypt"], deprecated="auto")


def get_password_hash(password: str) -> str:
    return pwd_context.hash(password)


def verify_password(plain_password: str, hashed_password: str) -> bool:
    return pwd_context.verify(plain_password, hashed_password)


async def authenticate_user(email: EmailStr, password: str):
    user = await UsersDAO.find_one_or_none(email=email)
    if not user or verify_password(plain_password=password, hashed_password=user.hashed_password) is False:
        return None
    return user

Краткий комментарий по коду:

  1. Импорт библиотек и модулей: Импортируются необходимые библиотеки и модули для работы с JWT, хэшированием паролей и асинхронными операциями.

  2. Функция create_access_token: Создает JWT‑токен с заданными данными и сроком действия.

  3. Контекст хэширования паролей: Настраивается контекст для хэширования паролей с использованием алгоритма bcrypt.

  4. Функция get_password_hash: Хэширует пароль.

  5. Функция verify_password: Проверяет соответствие введенного пароля и хэшированного пароля.

  6. Функция authenticate_user: Проверяет наличие пользователя и корректность его пароля.

Вы обратили внимание на строку:

from app.config import get_auth_data

Данной строкой мы импортируем информацию из файла с настройками (app/config.py). Файл с настройками:

import os
from pydantic_settings import BaseSettings, SettingsConfigDict


class Settings(BaseSettings):
    SECRET_KEY: str
    ALGORITHM: str
    model_config = SettingsConfigDict(
        env_file=os.path.join(os.path.dirname(os.path.abspath(__file__)), "..", ".env")
    )


settings = Settings()


def get_auth_data():
    return {"secret_key": settings.SECRET_KEY, "algorithm": settings.ALGORITHM}

Тут я использовал pydantic_settings для извлечения двух переменных из файла .env. Данный процесс можно было закрыть и по-другому, например, через python-decouple, но мне захотелось продемонстрировать вам этот способ.

Благодаря такому синтаксису можно импортировать settings — объект класса Settings() — и через точку получать доступ к переменным. Удобно.

Где почитать подробнее про эти функции, вы уже знаете.

Прежде чем мы приступим к описанию файла dependencies.py, выполним небольшую подготовку, а именно, заранее опишем возможные ошибки.

Пользовательские исключения в FastApi

Для этого заполним файл exceptions.py из папки app следующим образом.

Скрытый текст
from fastapi import status, HTTPException


class TokenExpiredException(HTTPException):
    def __init__(self):
        super().__init__(status_code=status.HTTP_401_UNAUTHORIZED, detail="Токен истек")


class TokenNoFoundException(HTTPException):
    def __init__(self):
        super().__init__(status_code=status.HTTP_401_UNAUTHORIZED, detail="Токен не найден")


UserAlreadyExistsException = HTTPException(status_code=status.HTTP_409_CONFLICT,
                                           detail='Пользователь уже существует')

PasswordMismatchException = HTTPException(status_code=status.HTTP_409_CONFLICT, detail='Пароли не совпадают!')

IncorrectEmailOrPasswordException = HTTPException(status_code=status.HTTP_401_UNAUTHORIZED,
                                                  detail='Неверная почта или пароль')

NoJwtException = HTTPException(status_code=status.HTTP_401_UNAUTHORIZED,
                               detail='Токен не валидный!')

NoUserIdException = HTTPException(status_code=status.HTTP_401_UNAUTHORIZED,
                                  detail='Не найден ID пользователя')

ForbiddenException = HTTPException(status_code=status.HTTP_403_FORBIDDEN, detail='Недостаточно прав!')

В этом фрагменте кода мы определили несколько пользовательских исключений для обработки различных ошибок, связанных с аутентификацией и авторизацией в FastAPI. Вот краткое описание:

  1. TokenExpiredException: Исключение, вызываемое при истечении срока действия токена (HTTP 401).

  2. TokenNoFoundException: Исключение, вызываемое при отсутствии токена (HTTP 401).

  3. UserAlreadyExistsException: Исключение, вызываемое при попытке создать пользователя, который уже существует (HTTP 409).

  4. PasswordMismatchException: Исключение, вызываемое при несовпадении паролей (HTTP 409).

  5. IncorrectEmailOrPasswordException: Исключение, вызываемое при неверной почте или пароле (HTTP 401).

  6. NoJwtException: Исключение, вызываемое при отсутствии или недействительности токена (HTTP 401).

  7. NoUserIdException: Исключение, вызываемое при отсутствии ID пользователя (HTTP 401).

  8. ForbiddenException: Исключение, вызываемое при недостатке прав доступа (HTTP 403).

Эти исключения помогают более точно и понятно обрабатывать различные ошибки, возникающие при работе с пользователями и их аутентификацией.

TokenExpiredException и TokenNoFoundException мы описали в виде классов и чуть дальше я объясню зачем мы так сделали.

Описание файла с зависимостями

Теперь в файле users/dependensies.py опишем код, который будет отвечать за получение и проверку JWT-токена из запроса, а также за аутентификацию текущего пользователя.

Скрытый текст
from fastapi import Request, HTTPException, status, Depends
from jose import jwt, JWTError
from datetime import datetime, timezone
from app.config import get_auth_data
from app.exceptions import TokenExpiredException, NoJwtException, NoUserIdException, TokenNoFoundException
from app.users.dao import UsersDAO


def get_token(request: Request):
    token = request.cookies.get('users_access_token')
    if not token:
        raise TokenNoFoundException
    return token


async def get_current_user(token: str = Depends(get_token)):
    try:
        auth_data = get_auth_data()
        payload = jwt.decode(token, auth_data['secret_key'], algorithms=auth_data['algorithm'])
    except JWTError:
        raise NoJwtException

    expire: str = payload.get('exp')
    expire_time = datetime.fromtimestamp(int(expire), tz=timezone.utc)
    if (not expire) or (expire_time < datetime.now(timezone.utc)):
        raise TokenExpiredException

    user_id: str = payload.get('sub')
    if not user_id:
        raise NoUserIdException

    user = await UsersDAO.find_one_or_none_by_id(int(user_id))
    if not user:
        raise HTTPException(status_code=status.HTTP_401_UNAUTHORIZED, detail='User not found')
    return user

Вот краткое описание:

  1. Функция get_token: Извлекает токен из cookies запроса. Если токен отсутствует, выбрасывается исключение TokenNoFoundException.

  2. Функция get_current_user:

    • Декодирует токен с использованием секретного ключа и алгоритма из конфигурации.

    • Проверяет срок действия токена. Если токен истек, выбрасывается исключение TokenExpiredException.

    • Извлекает ID пользователя из токена. Если ID отсутствует, выбрасывается исключение NoUserIdException.

    • Ищет пользователя в базе данных по ID. Если пользователь не найден, выбрасывается исключение HTTPException с кодом 401.

    • Возвращает найденного пользователя.

Этот код обеспечивает безопасную аутентификацию пользователей, проверяя валидность и срок действия их токенов. Использоваться код может универсально под любые проекты, так что берем на вооружение.

Теперь, прежде чем мы приступим к подготовке эндпоинтов FastApi, нам нужно подготовить app/main.py файл, через который будет происходить запуск нашего FastApi приложения.

Код файла main.py

from fastapi import FastAPI, Request
from fastapi.responses import RedirectResponse
from fastapi.exceptions import HTTPException
from fastapi.middleware.cors import CORSMiddleware
from fastapi.staticfiles import StaticFiles
from app.exceptions import TokenExpiredException, TokenNoFoundException
from app.users.router import router as users_router
from app.chat.router import router as chat_router

app = FastAPI()
app.mount('/static', StaticFiles(directory='app/static'), name='static')

app.add_middleware(
    CORSMiddleware,
    allow_origins=["*"],  # Разрешить запросы с любых источников. Можете ограничить список доменов
    allow_credentials=True,
    allow_methods=["*"],  # Разрешить все методы (GET, POST, PUT, DELETE и т.д.)
    allow_headers=["*"],  # Разрешить все заголовки
)

app.include_router(users_router)
app.include_router(chat_router)


@app.get("/")
async def redirect_to_auth():
    return RedirectResponse(url="/auth")


@app.exception_handler(TokenExpiredException)
async def token_expired_exception_handler(request: Request, exc: HTTPException):
    # Возвращаем редирект на страницу /auth
    return RedirectResponse(url="/auth")


# Обработчик для TokenNoFound
@app.exception_handler(TokenNoFoundException)
async def token_no_found_exception_handler(request: Request, exc: HTTPException):
    # Возвращаем редирект на страницу /auth
    return RedirectResponse(url="/auth")

Этот код создает приложение FastAPI с настройками для работы со статическими файлами, CORS, маршрутизацией и обработкой исключений:

  1. Импорт модулей: Импортируются необходимые компоненты FastAPI, middleware для CORS и маршрутизаторы.

  2. Создание экземпляра приложения:

    app = FastAPI()
  3. Обслуживание статических файлов:

    app.mount('/static', StaticFiles(directory='app/static'), name='static')

    Это позволяет приложению обслуживать статические файлы из папки app/static.

  4. Добавление CORS middleware:

    app.add_middleware(
        CORSMiddleware,
        allow_origins=["*"],  # Разрешить запросы с любых источников
        allow_credentials=True,
        allow_methods=["*"],  # Разрешить все HTTP-методы
        allow_headers=["*"],  # Разрешить все заголовки
    )

    Настраивается политика CORS для разрешения запросов с любых источников и методов.

  5. Подключение маршрутизаторов:

    app.include_router(users_router)
    app.include_router(chat_router)

    Подключаются маршруты для работы с пользователями и чатом. Эти маршруты мы скоро опишем в файлах router.py.

  6. Перенаправление на страницу аутентификации:

    @app.get("/")
    async def redirect_to_auth():
        return RedirectResponse(url="/auth")

    Этот маршрут перенаправляет запросы с корневого URL на страницу аутентификации.

  7. Обработчики исключений:

    • TokenExpiredException и TokenNoFoundException: оба исключения перенаправляют пользователя на страницу авторизации, если токен истек или не найден. Принимать этот тип хендлеров может только классы, поэтому данные типов ошибок мы описывали в классовом стиле.

    @app.exception_handler(TokenExpiredException)
    @app.exception_handler(TokenNoFoundException)
    async def handle_token_exceptions(request: Request, exc: HTTPException):
        return RedirectResponse(url="/auth")

Таким образом, приложение настроено для работы со статическими файлами, CORS, маршрутизацией и безопасностью через обработку исключений.

Теперь опишем схемы Pydantic для работы с запросами для регистрации и авторизации пользователей. Для этого в файле users/schamas.py пропишем следующее:

from pydantic import BaseModel, EmailStr, Field


class SUserRegister(BaseModel):
    email: EmailStr = Field(..., description="Электронная почта")
    password: str = Field(..., min_length=5, max_length=50, description="Пароль, от 5 до 50 знаков")
    password_check: str = Field(..., min_length=5, max_length=50, description="Пароль, от 5 до 50 знаков")
    name: str = Field(..., min_length=3, max_length=50, description="Имя, от 3 до 50 символов")


class SUserAuth(BaseModel):
    email: EmailStr = Field(..., description="Электронная почта")
    password: str = Field(..., min_length=5, max_length=50, description="Пароль, от 5 до 50 знаков")

Тут я описал две простые модели.

  • Первая модель (SUserRegister) будет использоваться при регистрации пользователей. В этой модели прописан email, password, password_check и name – те поля, которые будет заполнять пользователь при регистрации.

  • Вторая модель (SUserAuth) будет использоваться при авторизации пользователя. Тут он будет отправлять свой  email и пароль.

И, наконец-таки, мы готовы к тому чтоб написать эндпоинты для работы с пользователями (регистрация и авторизация).

Для этого в файле users/router.py пропишем следующее.

Скрытый текст
from fastapi import APIRouter, Response
from fastapi.requests import Request
from fastapi.responses import HTMLResponse
from app.exceptions import UserAlreadyExistsException, IncorrectEmailOrPasswordException, PasswordMismatchException
from app.users.auth import get_password_hash, authenticate_user, create_access_token
from app.users.dao import UsersDAO
from app.users.schemas import SUserRegister, SUserAuth

router = APIRouter(prefix='/auth', tags=['Auth'])


@router.post("/register/")
async def register_user(user_data: SUserRegister) -> dict:
    user = await UsersDAO.find_one_or_none(email=user_data.email)
    if user:
        raise UserAlreadyExistsException

    if user_data.password != user_data.password_check:
        raise PasswordMismatchException("Пароли не совпадают")
    hashed_password = get_password_hash(user_data.password)
    await UsersDAO.add(
        name=user_data.name,
        email=user_data.email,
        hashed_password=hashed_password
    )

    return {'message': 'Вы успешно зарегистрированы!'}


@router.post("/login/")
async def auth_user(response: Response, user_data: SUserAuth):
    check = await authenticate_user(email=user_data.email, password=user_data.password)
    if check is None:
        raise IncorrectEmailOrPasswordException
    access_token = create_access_token({"sub": str(check.id)})
    response.set_cookie(key="users_access_token", value=access_token, httponly=True)
    return {'ok': True, 'access_token': access_token, 'refresh_token': None, 'message': 'Авторизация успешна!'}


@router.post("/logout/")
async def logout_user(response: Response):
    response.delete_cookie(key="users_access_token")
    return {'message': 'Пользователь успешно вышел из системы'}

Как вы видите, тут у нас только POST методы, что не случайно. Фронта у нас пока нет, который будет использоваться для GET запросов, так что тестировать методы будем на чистом SWAGGER.

Вот краткий комментарий к вашему коду:

  1. Импорт модулей и зависимостей: Импортируются необходимые модули и функции из FastAPI и других частей приложения.

  2. Создание маршрутизатора: Создается объект APIRouter с префиксом /auth и тегом Auth.

  3. Регистрация пользователя:

    • Проверка, существует ли пользователь с данным email.

    • Проверка совпадения паролей.

    • Хеширование пароля и добавление нового пользователя в базу данных.

    • Возвращение сообщения об успешной регистрации.

  4. Авторизация пользователя:

    • Аутентификация пользователя по email и паролю.

    • Создание и установка токена доступа в cookie.

    • Возвращение сообщения об успешной авторизации.

  5. Выход пользователя:

    • Удаление cookie с токеном доступа.

    • Возвращение сообщения об успешном выходе.

Этот код реализует базовые функции регистрации, авторизации и выхода пользователя в веб-приложении на FastAPI. 

Подробнее про этот код я писал в статье, на которую ссылался в начале этой главы.

Теперь мы готовы к тому, чтоб сделать первый запуск нашего приложения. Для этого в терминале, с корня проекта, вводим:

uvicorn app.main:app --reload

Видим, что проект запущен. Теперь зайдем в автоматически сгенерированную документацию (http://127.0.0.1:8000/docs) и протестируем наши методы.

У меня уже написан метод для возврата главной страницы. У вас тут должно быть 3 метода: для регистрации, авторизации и выхода из системы.

Проверим регистрацию.

Получаю сообщение:

{"message": "Вы успешно зарегистрированы!"}

Сделаю проверку в базе данных.

Остальные методы тоже работают. Надеюсь, что вы поверите мне на слово.

Отлично. FastApi мы подготовили. Теперь нам остается подкинуть интерфейс, для того, чтоб пользователь мог с удобством для себя выполнить вход в систему или произвести регистрацию.

Кроме того, мы помним, что и для чата нам нужен фронт. Поэтому подготовим файлы интерфейса сразу под весь фронт нашего приложения.

Готовим фронтенд для мини-чата

Попросим бесплатный сервис Websim.ai сгенерировать фронт за нас. Для этого переходим на сайт: https://websim.ai/ и в поле для ввода текста вводим нужный вам промт. Далее я продемонстрирую промт — результат промта в виде скринов. Специально ничего не корректировал.

страница регистрации авторизации в мини чат на FastApi. Сделай табами Авторизация - регистрация. Поля для входа email + пароль. поля для регистрации email, имя + пароль дважды
страница регистрации авторизации в мини чат на FastApi. Сделай табами Авторизация - регистрация. Поля для входа email + пароль. поля для регистрации email, имя + пароль дважды

Знаете. А мне нравится результат. Оставлю его.

Для сохранения жмем на три точки в правом верхнем углу и на иконку «Загрузить». Совсем скоро полученный файлик мы адаптируем под наше FastApi приложение, внеся некоторые корректировки.

Вот так, всего за один запрос мы получили весьма приличную страницу регистрации / авторизации.

Теперь попросим сервис сгенерировать нам страничку чата. Тут уже будет несколько запросов.

создай страницу самого чата, которая будет открываться после входа в систему. в левой части список пользователей которым можно написать. выбираем пользователя и открывается поле для ввода сообщение. Похоже сделай на веб версию ватсапа только в дизайне формы авторизации
создай страницу самого чата, которая будет открываться после входа в систему. в левой части список пользователей которым можно написать. выбираем пользователя и открывается поле для ввода сообщение. Похоже сделай на веб версию ватсапа только в дизайне формы авторизации
при входе сделай возможность чтоб не был активен никакой чат. И весело сообщение "Выберите чат для общения".
при входе сделай возможность чтоб не был активен никакой чат. И весело сообщение "Выберите чат для общения".
добавь кнопку выхода. При клике на "Выход" должен будет выполниться пост запрос на /auth/logout и должна произойти переадресация на /auth
добавь кнопку выхода. При клике на "Выход" должен будет выполниться пост запрос на /auth/logout и должна произойти переадресация на /auth

Готово!

Сохраняем полученный результат и переходим к прикручиванию фронта к бэку.

Подключаем фронтенд к бэкенду

Websim выдал нам код в двух файлах. Конечно, это неудобно. Мы разобьём каждый из файлов на HTML (шаблон страницы), JS (JavaScript страницы) и CSS (стили страницы). Предварительную подготовку мы сделали на этапе создания и настройки проекта.

Начнем со страницы регистрации / авторизации пользователей.

Фронтенд для страницы регистрации / авторизации

Для начала подготовим HTML шаблон. В моем случае получился такой вид файла templates/auth.html:

Скрытый текст
<html>
<head>
    <meta charset="UTF-8">
    <meta name="viewport" content="width=device-width, initial-scale=1.0">
    <title>Мини-чат: Вход и Регистрация</title>
    <link rel="stylesheet" type="text/css" href="{{ url_for('static', path='styles/auth.css') }}">
</head>
<body>
<div class="container">
    <div class="tabs">
        <div class="tab active" data-tab="login">Авторизация</div>
        <div class="tab" data-tab="register">Регистрация</div>
    </div>
    <div class="content">
        <form id="loginForm" class="form active">
            <input type="email" placeholder="Email" required>
            <input type="password" placeholder="Пароль" required>
            <button id="loginButton" type="submit">Войти</button>
        </form>
        <form id="registerForm" class="form">
            <input type="email" placeholder="Email" required>
            <input type="text" placeholder="Имя" required>
            <input type="password" placeholder="Пароль" required>
            <input type="password" placeholder="Повторите пароль" required>
            <button id="registerButton" type="submit">Зарегистрироваться</button>
        </form>
    </div>
</div>

<script src="{{ url_for('static', path='js/auth.js') }}"></script>
</body>
</html>

Весьма лаконично. Обратите внимание на то, как я импортировал статические файлы стилей и js. Стили я тут описывать не буду, кому интересно – смотрите их в полном исходнике кода данного проекта. С полным исходником кода, как и с другим эксклюзивным материалом, который я не публикую на Хабре, вы сможете ознакомиться в моем телеграмм канале «Легкий путь в Python».

Файл JS рассмотрим тут в полном виде, так как в него необходимо будет прописать логику по отправке POST запросов нашему бэкенду. Кроме того, в этом файле останется JS код для работы с табами страницы регистрации / авторизации, который нам написал Websim. Будет полезно ознакомиться.

Скрытый текст
// Обработка кликов по вкладкам
document.querySelectorAll('.tab').forEach(tab => {
    tab.addEventListener('click', () => showTab(tab.dataset.tab));
});

// Функция отображения выбранной вкладки
function showTab(tabName) {
    document.querySelectorAll('.tab').forEach(tab => tab.classList.remove('active'));
    document.querySelectorAll('.form').forEach(form => form.classList.remove('active'));

    document.querySelector(`.tab[data-tab="${tabName}"]`).classList.add('active');
    document.getElementById(`${tabName}Form`).classList.add('active');
}

// Функция для валидации данных формы
const validateForm = fields => fields.every(field => field.trim() !== '');

// Функция для отправки запросов
const sendRequest = async (url, data) => {
    try {
        const response = await fetch(url, {
            method: "POST",
            headers: {"Content-Type": "application/json"},
            body: JSON.stringify(data)
        });

        const result = await response.json();

        if (response.ok) {
            alert(result.message || 'Операция выполнена успешно!');
            return result;
        } else {
            alert(result.message || 'Ошибка выполнения запроса!');
            return null;
        }
    } catch (error) {
        console.error("Ошибка:", error);
        alert('Произошла ошибка на сервере');
    }
};

// Функция для обработки формы
const handleFormSubmit = async (formType, url, fields) => {
    if (!validateForm(fields)) {
        alert('Пожалуйста, заполните все поля.');
        return;
    }

    const data = await sendRequest(url, formType === 'login'
        ? {email: fields[0], password: fields[1]}
        : {email: fields[0], name: fields[1], password: fields[2], password_check: fields[3]});

    if (data && formType === 'login') {
        window.location.href = '/chat';
    }
};

// Обработка формы входа
document.getElementById('loginButton').addEventListener('click', async (event) => {
    event.preventDefault();

    const email = document.querySelector('#loginForm input[type="email"]').value;
    const password = document.querySelector('#loginForm input[type="password"]').value;

    await handleFormSubmit('login', 'login/', [email, password]);
});

// Обработка формы регистрации
document.getElementById('registerButton').addEventListener('click', async (event) => {
    event.preventDefault();

    const email = document.querySelector('#registerForm input[type="email"]').value;
    const name = document.querySelector('#registerForm input[type="text"]').value;
    const password = document.querySelectorAll('#registerForm input[type="password"]')[0].value;
    const password_check = document.querySelectorAll('#registerForm input[type="password"]')[1].value;

    if (password !== password_check) {
        alert('Пароли не совпадают.');
        return;
    }

    await handleFormSubmit('register', 'register/', [email, name, password, password_check]);
});

Я оставил комментарии в коде. В целом, этот код отвечает за работу с табами (вкладками, которые открывают или форму регистрации или форму авторизации) и за отправку запросов бэкенду.

JS код взаимодействия фронтенд и бэкенд части достаточно универсальный, так что берите на вооружение.

Теперь прикрутим этот HTML шаблон в users/router.py.

Предварительно в этом файле необходимо выполнить дополнительный импорт

from fastapi.templating import Jinja2Templates

и прописать

templates = Jinja2Templates(directory='app/templates')

Этим самым мы настраиваем возможность рендеринга HTML-шаблонов в нашем FastAPI-приложении. После этого мы можем использовать метод TemplateResponse для возврата отрендеренных HTML-страниц в ответ на запросы.

Теперь опишем эндпоинт, который будет возвращать HTML страницу регистрации / авторизации.

@router.get("/", response_class=HTMLResponse, summary="Страница авторизации")
async def get_categories(request: Request):
    return templates.TemplateResponse("auth.html", {"request": request})

Этот код представляет собой обработчик маршрута для FastAPI, который возвращает HTML-страницу авторизации, которую мы недавно подготовили.

Кроме того, напоминаю, что мы ставили переадресацию на страницу авторизации /auth в файле main.py. Проверим (http://127.0.0.1:8000).

После перехода у меня открылась страница регистрации / авторизации.

Зарегистрирую нового пользователя.

Проверим, действительно ли прошла регистрация.

Регистрация прошла успешно.
Регистрация прошла успешно.

Попробуем войти в систему.

Вход выполнен успешно
Вход выполнен успешно

Таким образом, пользовательскую часть мы полностью закрыли и у нас появилась возможность через веб-инерфейс произвести регистрацию и авторизацию в системе нашего чата. Это говорит о том, что мы можем приступать к настройке логики чата.

Пишем логику чата

Как я говорил выше, сначала мы опишем эндпоинты и подключим фронт без вебсокетов. На выходе все будет работать, но для получения новых сообщений необходимо будет каждый раз обновлять страницу (выполнять GET-запрос) руками.

После, на следующем шаге, мы организуем логику вебсокетов или, другими словами, сделаем чтоб между пользователями открывался двунаправленный канал связи с автоматическими обновлениями.

Сразу подготовим класс для работы с базой данных в файле chat/dao.py

Скрытый текст
from sqlalchemy import select, and_, or_
from app.dao.base import BaseDAO
from app.chat.models import Message
from app.database import async_session_maker


class MessagesDAO(BaseDAO):
    model = Message

Для оптимизации процесса получения сообщений между пользователями при открытии чата, мы добавим новый метод в класс MessagesDAO. Этот метод будет извлекать как отправленные, так и полученные сообщения между двумя пользователями.

@classmethod
async def get_messages_between_users(cls, user_id_1: int, user_id_2: int):
    """
    Асинхронно находит и возвращает все сообщения между двумя пользователями.

    Аргументы:
        user_id_1: ID первого пользователя.
        user_id_2: ID второго пользователя.

    Возвращает:
        Список сообщений между двумя пользователями.
    """
    async with async_session_maker() as session:
        query = select(cls.model).filter(
            or_(
                and_(cls.model.sender_id == user_id_1, cls.model.recipient_id == user_id_2),
                and_(cls.model.sender_id == user_id_2, cls.model.recipient_id == user_id_1)
            )
        ).order_by(cls.model.id)
        result = await session.execute(query)
        return result.scalars().all()

Этот метод отличается от стандартных методов в базовом классе DAO тем, что использует метод filter и логические операторы для создания более сложных SQL-запросов. Метод принимает ID двух пользователей и возвращает все сообщения между ними, независимо от того, кто является отправителем или получателем.

На выходе, данный метод получает id двух пользователей. Сделал универсально, так что не важно теперь было отправлено ID

Теперь в файле schemas.py опишем две простые Pydantic модели, которые позволят нам создавать и получать сообщения.

from pydantic import BaseModel, Field


class MessageRead(BaseModel):
    id: int = Field(..., description="Уникальный идентификатор сообщения")
    sender_id: int = Field(..., description="ID отправителя сообщения")
    recipient_id: int = Field(..., description="ID получателя сообщения")
    content: str = Field(..., description="Содержимое сообщения")


class MessageCreate(BaseModel):
    recipient_id: int = Field(..., description="ID получателя сообщения")
    content: str = Field(..., description="Содержимое сообщения")

Все поля обязательные к заполнению. В description описал за что отвечает то или иное поле.

Теперь подготовим HTML-шаблон в файле templates/chat.html.

Скрытый текст
<!DOCTYPE html>
<html lang="ru">
<head>
    <meta charset="UTF-8">
    <meta name="viewport" content="width=device-width, initial-scale=1.0">
    <title>Мини-чат</title>
    <link rel="stylesheet" type="text/css" href="/static/styles/chat.css">
</head>
<body>
<div class="chat-container">
    <div class="user-list" id="userList">
        <!-- Сначала выводим "Избранное" -->
        <div class="user-item" data-user-id="{{ user.id }}">
            Избранное
        </div>
        <!-- Выводим остальных пользователей -->
        {% for chat in users_all %}
            {% if chat.id != user.id %}
                <div class="user-item" data-user-id="{{ chat.id }}">
                    {{ chat.name }}
                </div>
            {% endif %}
        {% endfor %}
    </div>
    <div class="chat-area">
        <div class="chat-header" id="chatHeader">
            <span>Мини-чат</span>
            <button class="logout-button" id="logoutButton" onclick="logout()">Выход</button>
        </div>
        <div class="messages" id="messages">
            <div class="welcome-message">Выберите чат для общения</div>
        </div>
        <div class="input-area">
            <input type="text" id="messageInput" placeholder="Введите сообщение..." disabled>
            <button id="sendButton" disabled>Отправить</button>
        </div>
    </div>
</div>
<script src="/static/js/chat.js"></script>
</body>
</html>

HTML-код тут достаточно простой. Основная сложность будет в JS. Из того на что тут стоит обратить внимание – это то, что я вынес «Избранное» вверх списка.

Для получения истории чата с пользователем мы будем каждый раз отправлять GET-запрос, передавая в параметрах пути ID пользователя и, если ID пользователя равно ID того, кто этот запрос оставил, то у нас появляется «Избранное».

Опишем теперь корневой роутер, который будет возвращать  HTML-шаблон нашего чата в файле chat/router.py.

from fastapi import APIRouter, WebSocket, WebSocketDisconnect, Request, Depends
from fastapi.responses import HTMLResponse
from fastapi.templating import Jinja2Templates
from typing import List, Dict
from app.chat.dao import MessagesDAO
from app.chat.schemas import MessageRead, MessageCreate
from app.users.dao import UsersDAO
from app.users.dependencies import get_current_user
from app.users.models import User
import asyncio
import logging

router = APIRouter(prefix='/chat', tags=['Chat'])
templates = Jinja2Templates(directory='app/templates')


# Страница чата
@router.get("/", response_class=HTMLResponse, summary="Chat Page")
async def get_chat_page(request: Request, user_data: User = Depends(get_current_user)):
    users_all = await UsersDAO.find_all()
    return templates.TemplateResponse("chat.html",
                                      {"request": request, "user": user_data, 'users_all': users_all})

Импорты чуть позже прокомментирую. Из того, на что сейчас стоит обратить внимание, – это генерация шаблона чата. Вызываться он будет по пути /chat, так как это мы описали в префиксе при настройке роутера.

Далее мы подключили шаблонизатор.

На страницу, кроме HTML шаблона, мы возвращаем список всех пользователей и из зависимости, через токен пользователя, подставляем информацию по авторизованному пользователю.

Регистрируем роутер в main.py файле и выполняем авторизацию.

Страничка загрузилась, и в списке пользователей мы видим два пользователя, что корректно.

Теперь, не обращаясь к вебсокетам и JS, пропишем несколько эндпоинтов, которые позволят нам получать сообщения из базы данных и позволят сообщения в базу данных добавлять.

Эндпоинт для получения сообщений:

@router.get("/messages/{user_id}", response_model=List[MessageRead])
async def get_messages(user_id: int, current_user: User = Depends(get_current_user)):
    return await MessagesDAO.get_messages_between_users(user_id_1=user_id, user_id_2=current_user.id) or []

Тут я использовал уже подготовленный метод для получения истории общения между пользователями. Для получения достаточно выполнить GET-запрос по пути messages/user_id.

Теперь опишем эндпоинт для добавления сообщения в базу данных.

@router.post("/messages", response_model=MessageCreate)
async def send_message(message: MessageCreate, current_user: User = Depends(get_current_user)):
    # Add new message to the database
    await MessagesDAO.add(
        sender_id=current_user.id,
        content=message.content,
        recipient_id=message.recipient_id
    )
    
    return {'recipient_id': message.recipient_id, 'content': message.content, 'status': 'ok', 'msg': 'Message saved!'}

Тут я использовал общий метод для добавления записей из класса BaseDAO. После успешного добавления записи мы будем получать JSON. Данный метод мы скоро усилим, так как на нем будет завязана логика вебсокетов.

В целом логика понятна. Пользователь открывает чат, тем самым мы захватываем айди пользователя, которому нужно отправить сообщение, а айди пользователя, который будет отправлять сообщение, мы и так знаем, так как эта информация всегда хранится в зависимостях (в токене) — User = Depends(get_current_user). Дальше останется захватить текст сообщения и можно делать запись в базу данных.

А теперь мы перейдем к более сложной теме — к установке вебсокетов и настройке JS-скрипта. На этом этапе мне придется добавить больше теории, так как без этого будет трудно понять, что происходит, но перед этим давайте добавим логику вебсокетов в наш код.

# Активные WebSocket-подключения: {user_id: websocket}
active_connections: Dict[int, WebSocket] = {}


# Функция для отправки сообщения пользователю, если он подключен
async def notify_user(user_id: int, message: dict):
    """Отправить сообщение пользователю, если он подключен."""
    if user_id in active_connections:
        websocket = active_connections[user_id]
        # Отправляем сообщение в формате JSON
        await websocket.send_json(message)


# WebSocket эндпоинт для соединений
@router.websocket("/ws/{user_id}")
async def websocket_endpoint(websocket: WebSocket, user_id: int):
    # Принимаем WebSocket-соединение
    await websocket.accept()
    # Сохраняем активное соединение для пользователя
    active_connections[user_id] = websocket
    try:
        while True:
            # Просто поддерживаем соединение активным (1 секунда паузы)
            await asyncio.sleep(1)
    except WebSocketDisconnect:
        # Удаляем пользователя из активных соединений при отключении
        active_connections.pop(user_id, None)

И изменим логику добавления сообщений в базу данных.

@router.post("/messages", response_model=MessageCreate)
async def send_message(message: MessageCreate, current_user: User = Depends(get_current_user)):
    # Добавляем новое сообщение в базу данных
    await MessagesDAO.add(
        sender_id=current_user.id,
        content=message.content,
        recipient_id=message.recipient_id
    )
    # Подготавливаем данные для отправки сообщения
    message_data = {
        'sender_id': current_user.id,
        'recipient_id': message.recipient_id,
        'content': message.content,
    }
    # Уведомляем получателя и отправителя через WebSocket
    await notify_user(message.recipient_id, message_data)
    await notify_user(current_user.id, message_data)

    # Возвращаем подтверждение сохранения сообщения
    return {'recipient_id': message.recipient_id, 'content': message.content, 'status': 'ok', 'msg': 'Message saved!'}

Тут я включил новую функцию notify_user. Теперь к пояснениям.

  1. Подключение через WebSocket. Когда пользователь открывает страницу чата, сервер устанавливает WebSocket-соединение:

@router.websocket("/ws/{user_id}")
async def websocket_endpoint(websocket: WebSocket, user_id: int):
    await websocket.accept()  # Принимаем соединение
    active_connections[user_id] = websocket  # Сохраняем соединение пользователя
    try:
        while True:
            await asyncio.sleep(1)  # Поддерживаем соединение активным
    except WebSocketDisconnect:
        active_connections.pop(user_id, None)  # Удаляем пользователя при отключении
  • При подключении пользователь проходит через await websocket.accept().

  • Соединение сохраняется в словарь active_connections, где ключ — user_id, а значение — объект websocket. Это позволяет серверу отправлять сообщения конкретным пользователям.

  1. Активное соединение
    После подключения WebSocket-соединение остается открытым. Сервер поддерживает его активным в цикле while True. Если пользователь отключается, соединение закрывается, и сервер удаляет его из списка активных.

  2. Отправка сообщений через WebSocket
    Когда пользователь отправляет сообщение, оно сохраняется в базе данных:

await MessagesDAO.add(
    sender_id=current_user.id,
    content=message.content,
    recipient_id=message.recipient_id
)

После этого сервер сразу отправляет сообщение участникам через WebSocket:

await notify_user(message.recipient_id, message_data)  # Получатель
await notify_user(current_user.id, message_data)  # Отправитель

Функция notify_user проверяет, подключен ли пользователь, и отправляет ему сообщение, если он онлайн:

async def notify_user(user_id: int, message: dict):
    if user_id in active_connections:
        websocket = active_connections[user_id]
        await websocket.send_json(message)  # Отправляем сообщение

Таким образом, WebSocket обеспечивает "живое" взаимодействие в чате с мгновенной доставкой сообщений.

Пишем JS-код логики чата

А теперь давайте напишем наш JS-код. Весь код я пропишу в файле chat.js. Пишу сразу с подробными комментариями код сверху вниз. В моем телеграм-канале можно будет посмотреть этот код одним куском без лишних комментариев.

Скрытый текст
// Переменные
let selectedUserId = null;  // Хранит ID пользователя, с которым мы общаемся в чате
let socket = null;          // Хранит объект WebSocket для соединения с сервером
let messagePollingInterval = null;  // Таймер для периодической загрузки сообщений

// Функция для выхода из аккаунта
async function logout() {
    try {
        const response = await fetch('/auth/logout', { 
            method: 'POST', 
            credentials: 'include' // Передаем куки, чтобы сервер знал, кто выходит
        });

        if (response.ok) {
            window.location.href = '/auth'; // Если все ок, перенаправляем на страницу авторизации
        } else {
            console.error('Ошибка при выходе'); // Ловим ошибки, если что-то пошло не так
        }
    } catch (error) {
        console.error('Ошибка при выполнении запроса:', error); // Обрабатываем ошибки сети
    }
}

// Выбор пользователя для общения
async function selectUser(userId, userName, event) {
    selectedUserId = userId;  // Запоминаем, с кем ведется беседа
    document.getElementById('chatHeader').innerHTML = `<span>Чат с ${userName}</span><button class="logout-button" id="logoutButton">Выход</button>`;
    document.getElementById('messageInput').disabled = false; // Активируем ввод сообщений
    document.getElementById('sendButton').disabled = false;   // Активируем кнопку отправки

    document.querySelectorAll('.user-item').forEach(item => item.classList.remove('active')); // Убираем выделение с других пользователей
    event.target.classList.add('active');  // Подсвечиваем текущего пользователя

    const messagesContainer = document.getElementById('messages');
    messagesContainer.innerHTML = '';  // Очищаем окно сообщений
    messagesContainer.style.display = 'block';  // Показываем окно сообщений

    document.getElementById('logoutButton').onclick = logout;  // Привязываем функцию выхода

    await loadMessages(userId);  // Загружаем предыдущие сообщения с этим пользователем
    connectWebSocket();  // Открываем WebSocket для обмена сообщениями в реальном времени
    startMessagePolling(userId);  // Начинаем периодически проверять новые сообщения
}

// Загрузка предыдущих сообщений
async function loadMessages(userId) {
    try {
        const response = await fetch(`/chat/messages/${userId}`);  // Делаем запрос на сервер, чтобы получить старые сообщения
        const messages = await response.json();  // Преобразуем ответ в JSON

        const messagesContainer = document.getElementById('messages');
        messagesContainer.innerHTML = messages.map(message =>
            createMessageElement(message.content, message.recipient_id)  // Преобразуем каждое сообщение в HTML-элемент
        ).join('');  // Склеиваем элементы и вставляем их в контейнер сообщений
    } catch (error) {
        console.error('Ошибка загрузки сообщений:', error);  // Ловим ошибки при загрузке
    }
}

// Соединение с WebSocket
function connectWebSocket() {
    if (socket) socket.close();  // Если соединение уже было, закрываем его

    socket = new WebSocket(`wss://${window.location.host}/chat/ws/${selectedUserId}`);  // Открываем новое WebSocket-соединение

    socket.onopen = () => console.log('WebSocket соединение установлено');  // Логируем успешное подключение

    socket.onmessage = (event) => {
        const incomingMessage = JSON.parse(event.data);  // Получаем новое сообщение от сервера
        if (incomingMessage.recipient_id === selectedUserId) {  // Проверяем, кому адресовано сообщение
            addMessage(incomingMessage.content, incomingMessage.recipient_id);  // Добавляем сообщение в чат
        }
    };

    socket.onclose = () => console.log('WebSocket соединение закрыто');  // Логируем закрытие соединения
}

// Отправка сообщения
async function sendMessage() {
    const messageInput = document.getElementById('messageInput');
    const message = messageInput.value.trim();  // Берем текст сообщения

    if (message && selectedUserId) {  // Проверяем, что сообщение не пустое и выбран собеседник
        const payload = {recipient_id: selectedUserId, content: message};  // Формируем данные для отправки

        try {
            await fetch('/chat/messages', {
                method: 'POST',
                headers: {'Content-Type': 'application/json'},
                body: JSON.stringify(payload)  // Отправляем сообщение на сервер
            });

            socket.send(JSON.stringify(payload));  // Также отправляем сообщение через WebSocket
            addMessage(message, selectedUserId);  // Добавляем сообщение в чат
            messageInput.value = '';  // Очищаем поле ввода
        } catch (error) {
            console.error('Ошибка при отправке сообщения:', error);  // Ловим ошибки
        }
    }
}

// Добавление нового сообщения в чат
function addMessage(text, recipient_id) {
    const messagesContainer = document.getElementById('messages');
    messagesContainer.insertAdjacentHTML('beforeend', createMessageElement(text, recipient_id));  // Добавляем новое сообщение в конец чата
    messagesContainer.scrollTop = messagesContainer.scrollHeight;  // Прокручиваем чат вниз, чтобы показать последнее сообщение
}

// Формирование HTML-элемента для сообщения
function createMessageElement(text, recipient_id) {
    const userID = parseInt(selectedUserId, 10);  // Преобразуем ID в число
    const messageClass = userID === recipient_id ? 'my-message' : 'other-message';  // Определяем, кто отправил сообщение: мы или другой пользователь
    return `<div class="message ${messageClass}">${text}</div>`;  // Возвращаем HTML для отображения сообщения
}

// Периодическая проверка новых сообщений
function startMessagePolling(userId) {
    clearInterval(messagePollingInterval);  // Очищаем старый таймер
    messagePollingInterval = setInterval(() => loadMessages(userId), 1000);  // Каждую секунду загружаем новые сообщения
}

// Привязка действий к элементам
document.querySelectorAll('.user-item').forEach(item => {
    item.onclick = event => selectUser(item.getAttribute('data-user-id'), item.textContent, event);  // Привязываем обработчик клика для выбора пользователя
});

document.getElementById('sendButton').onclick = sendMessage;  // Привязываем отправку сообщения на кнопку "Отправить"

document.getElementById('messageInput').onkeypress = async (e) => {
    if (e.key === 'Enter') {  // Если пользователь нажимает Enter в поле ввода сообщения
        await sendMessage();  // Отправляем сообщение
    }
};

В итоге, весь этот скрипт реализует чат с возможностью выбора пользователя, отправки сообщений через WebSocket, обновления чата в реальном времени и периодической проверки новых сообщений.

Очень надеюсь, что получилось все доступно объяснить. В случае вопросов по этому коду – пишите их в комментариях. На все отвечу.

Кроме того, у нас есть ещё одна проблема. Нет динамического обновления списка новых пользователей. Решить эту проблему можно двумя способами вебсокетами и AJAX-запросами. Я выберу второй способ, так как с первым способом мы уже знакомы.

Опишем метод для получения всех пользователей в users/router.py

@router.get("/users", response_model=List[SUserRead])
async def get_users():
    users_all = await UsersDAO.find_all()
    # Используем генераторное выражение для создания списка
    return [{'id': user.id, 'name': user.name} for user in users_all]

Модель Pydantic, которая опишет получаемые данные

class SUserRead(BaseModel):
    id: int = Field(..., description="Идентификатор пользователя")
    name: str = Field(..., min_length=3, max_length=50, description="Имя, от 3 до 50 символов")

Теперь напишем JS-код для периодического обновления списка пользователей.

Сам код будет не особо трудным, но нам нужно будет его корректно адаптировать под общую логику. Для начала внесем небольшие изменения в HTML-шаблон чата.

<!DOCTYPE html>
<html lang="ru">
<head>
    <meta charset="UTF-8">
    <meta name="viewport" content="width=device-width, initial-scale=1.0">
    <title>Мини-чат</title>
    <link rel="stylesheet" type="text/css" href="/static/styles/chat.css">
</head>
<body>
<div class="chat-container">
    <div class="user-list" id="userList">
        <!-- Добавляем фиксированный элемент "Избранное" -->
        <div class="user-item" data-user-id="{{ user.id }}">
            Избранное
        </div>
        <!-- Выводим остальных пользователей, исключая текущего пользователя -->
        {% for chat in users_all %}
            {% if chat.id != user.id %}
                <div class="user-item" data-user-id="{{ chat.id }}">
                    {{ chat.name }}
                </div>
            {% endif %}
        {% endfor %}
    </div>
    <div class="chat-area">
        <div class="chat-header" id="chatHeader">
            <span>Мини-чат</span>
            <button class="logout-button" id="logoutButton" onclick="logout()">Выход</button>
        </div>
        <div class="messages" id="messages">
            <div class="welcome-message">Выберите чат для общения</div>
        </div>
        <div class="input-area">
            <input type="text" id="messageInput" placeholder="Введите сообщение..." disabled>
            <button id="sendButton" disabled>Отправить</button>
        </div>
    </div>
</div>

<script>
    // Передаем идентификатор текущего пользователя в JavaScript
    const currentUserId = parseInt("{{ user.id }}", 10);
</script>

<script src="/static/js/chat.js"></script>
</body>
</html>

Из нового мы пробросили переменную currentUserId с шаблона в JS. Данная переменная нам будет необходима для проверки отображения «Избранное».

Теперь напишем функцию для получения списка пользователей.

async function fetchUsers() {
    try {
        const response = await fetch('/auth/users');
        const users = await response.json();
        const userList = document.getElementById('userList');

        // Очищаем текущий список пользователей
        userList.innerHTML = '';

        // Создаем элемент "Избранное" для текущего пользователя
        const favoriteElement = document.createElement('div');
        favoriteElement.classList.add('user-item');
        favoriteElement.setAttribute('data-user-id', currentUserId);
        favoriteElement.textContent = 'Избранное';

        // Добавляем "Избранное" в начало списка
        userList.appendChild(favoriteElement);

        // Генерация списка остальных пользователей
        users.forEach(user => {
            if (user.id !== currentUserId) {
                const userElement = document.createElement('div');
                userElement.classList.add('user-item');
                userElement.setAttribute('data-user-id', user.id);
                userElement.textContent = user.name;
                userList.appendChild(userElement);
            }
        });

        // Повторно добавляем обработчики событий для каждого пользователя
        addUserClickListeners();
    } catch (error) {
        console.error('Ошибка при загрузке списка пользователей:', error);
    }
}

И настроем ее вызов сразу после обновления страницы и с интервалом в 10 секунд.

document.addEventListener('DOMContentLoaded', fetchUsers);
setInterval(fetchUsers, 10000); // Обновление каждые 10 секунд

Теперь я полностью очищу базу данных с пользователями и с сообщениями и запишу для вас небольшую демку того как работает сервис. Мы захватим с вами все аспекты, начиная от входа в систему и заканчивая динамическим появлением новых сообщений.

Есть места, которые можно оптимизировать. К примеру, добавить звуковые уведомления, подсветку новых сообщений, отправку медиа и прочее и, если будет запрос, я готов продолжить развитие этого проекта.

Теперь, когда у нас все готово, остается только выполнить деплой проекта в облако. Для этого воспользуемся сервисом Amvera Cloud.

Деплой проекта

Для начала в корне проекта (на уровне с базой данных) создадим файл amvera.yml и заполним его следующим образом:

meta:
  environment: python
  toolchain:
    name: pip
    version: 3.12
build:
  requirementsPath: requirements.txt
run:
  persistenceMount: /data
  containerPort: 8000
  command: uvicorn app.main:app --host 0.0.0.0 --port 8000

Этим кодом мы описали простую инструкцию для запуска нашего проекта в облаке.

По сути, на этом подготовка к деплою завершена. Теперь нам останется доставить файлы проекта на сервис Amvera Cloud и активировать бесплатный домен, который будет выделен под проект. Приступим.

  • Выполняем регистрацию в Amvera Cloud, если ее не было (новенькие получают 111 рублей на основной счет в подарок)

  • Переходим в раздел проектов и нажимаем на «Создать проект»

  • Даем проекту название и выбираем тариф. Для учебного проекта достаточно «Начального».

  • Теперь загрузим файлы приложения в проект. Можно использовать GIT или «Через интерфейс». Я выбираю второй вариант.

  • На следующем экране вы заметите, что настройки автоматически подгрузились. Проверяем чтоб все было корректно и жмем на «Завершить».

  • После этого необходимо активировать бесплатный домен или привязать существующий. Для этого заходим в проект и перемещаемся на вкладку «Настройки». Там кликаем на «Добавить домен».

После этого ждем пока проект запустится и станет активным. В случае если не привяжется доменное имя кликните на кнопку «Пересобрать проект». На эту же кнопку стоит нажимать каждый раз, если вы вносите изменения в проект.

После деплоя, если возникнут проблемы с отправкой запросов по API, в блоке command следует добавить следующие параметры:

--forwarded-allow-ips='.' --proxy-headers

Общий вид команды будет таким:

command: uvicorn app.main:app --host 0.0.0.0 --port 8000 --forwarded-allow-ips='.' --proxy-headers

Пояснение:

  • --forwarded-allow-ips='.': Разрешает использование всех IP-адресов в заголовке X-Forwarded-For. Это полезно, когда приложение находится за прокси-сервером или балансировщиком нагрузки для корректного определения IP-адреса клиента.

  • --proxy-headers: Указывает серверу обрабатывать заголовки, связанные с прокси-сервером, такие как X-Forwarded-For, чтобы определить реальный IP клиента. Особенно важно при работе за прокси или CDN (например, Cloudflare).

Также можно указать конкретный IP-адрес сервера в параметре --forwarded-allow-ips — его можно найти во вкладке "Лог приложения".

Заключение

Конечно, этот проект можно было бы усовершенствовать и доработать. Я и не претендую на звание «гуру фронтенд-разработки», да и такой задачи перед собой не ставил. Основная цель этой статьи — показать вам, как вебсокеты работают на практике, и на примере проекта продемонстрировать их реальное применение.

Кроме того, в своих предыдущих статьях я, как мне кажется, недостаточно подробно рассмотрел взаимодействие фронтенда на чистом JavaScript, CSS и HTML с бэкендом на FastAPI. Здесь я постарался восполнить этот пробел и еще раз показать, насколько удобна и эффективна работа с асинхронной SQLAlchemy, особенно если проект имеет четко выстроенную структуру.

Сегодня мы рассмотрели важные темы, такие как интеграция асинхронной SQLAlchemy с FastAPI для работы с базой данных SQLite, настройка Alembic для миграций, создание с нуля системы регистрации и авторизации с использованием JWT-токенов, а также закрыли вопрос работы с вебсокетами.

Полный исходный код проекта вы найдете в моем Telegram-канале «Легкий путь в Python», где также публикуется эксклюзивный контент.

Если вы хотите, чтобы я продолжил развитие проекта, напишите об этом в комментариях и участвуйте в голосовании. Ваши отзывы очень важны для меня!

На этом у меня всё. До скорой встречи!

Комментарии (2)