Друзья, приветствую! Если вы читали мои прошлые статьи из серии «Создание собственного API на Python (FastAPI) и повторяли за мной, то сейчас вы готовы к полноценной интеграции PostgreSQL в ваше API.

Для того чтоб это сделать у вас должны быть:

  • развернутой PostgreSQL

  • настроены миграции через Alembic

  • должен быть настроенный проект FastApi

  • должны быть модели таблиц и сами таблицы в PostgreSQL.

Если это пока не так, то настоятельно рекомендую ознакомиться с моими прошлыми статьями:

Надеюсь, что у вас получилось.

Прежде чем мы приступим к написанию кода для работы с созданными таблицами в PostgreSQL через SQLAlchemy (далее просто алхимия), нам необходимо немного доработать наш проект, внедрив в него роуты.

Напоминаю, что после последней публикации наш проект имел такой вид:

 my_fastapi_project/

├── tests/

│   └── (тут мы будем добавлять функции для Pytest)

├── app/

│   ├── database.py

│   ├── config.py

│   ├── main.py

│   └── students/

│       └── models.py

│   └── migration/

│       └── (файлы миграций Alembic)

├── alembic.ini

├── .env

└── requirements.txt

Теперь немного дополним его следующим образом.

В каждую нашу отдельную сущность (к примеру это студенты, под которых мы выделили отдельную папку) нужно будет добавить файл router.py, schemas.py, rb.py и dao.py. Можете прямо сейчас их создать, а я пока опишу что такое Router в FastApi.

Router в FastApi

В FastAPI, Router — это инструмент, который помогает организовывать и группировать маршруты (пути) вашего веб-приложения. Представьте себе, что у вас есть несколько функций, каждая из которых отвечает за разные URL-адреса. Router позволяет вам собрать эти функции в одно место и затем добавить их в ваше основное приложение.

На примере нашего старого кода, мы все функции прописывали в файл main.py (туда же мы прописывали и мадели Pydantic, но это другая история). Router позволит нам сгруппировать функции, которые взаимодействуют с конкретной сущностью (со студентами, преподавателями, авторизацией и прочее).

Простая аналогия: если ваше приложение — это большой дом, то Router — это комната, где собраны вещи одного типа.

  • Файл  schemas.py

В этот файл мы вынесем наши модели Pydantic, просто чтоб не перегружать код данными.

  • Файл   rb.py

В этот файл мы вынесем наши классы, описывающие тело запроса (в прошлых статьях мы создали один такой класс).

  • Файл  dao.py

В этот файл мы будем выносить индивидуальные функции, относящиеся к конкретной сущности. К примеру такой сущностью может выступить наши студенты и функции базы данных, которые относятся исключительно к студентам.

DAO в контексте баз данных расшифровывается как «Data Access Object» (объект доступа к данным), поэтому я привык называть этот файл именно dao.py. В других проектах вы можете встретить название core.py или service.py. Тут как кому удобно.

По изменениям в проекте, кроме того, давайте создадим папку dao в корне дирректории app, а внутрь мы положим файл base.py. В данном файле мы с вами опишем класс с универсальными методами по работе с базой данных.

Примеры такого метода: получение записи по её id, получение всех записей, получение записей по определенному фильтру, удаление записи по определенному фильтры. Как вы понимаете, большого смысла в каждом методе дублировать такой код нет.

Логика такая. В файл base.py мы выносим универсальные методы, а в файле dao.py, каждой отдельной сущности, будем прописывать индивидуальные методы под каждую конкретную задачу.

После небольших изменений структура нашего проекта должна иметь такой вид:

my_fastapi_project/

├── tests/

│   └── (тут мы будем добавлять функции для Pytest)

├── app/

│   ├── database.py

│   ├── config.py

│   ├── main.py

│   └── students/

│      ├── router.py

│      ├── schemas.py

│      ├── dao.py

│      ├── rb.py

│   └── dao/

│       └── base.py

│   └── migration/

│       └── (файлы миграций Alembic)

├── alembic.ini

├── .env

└── requirements.txt

Надеюсь, что со структурой вы разобрались. Теперь давайте начнем писать код.

Подготовим данные.

Таблица со студентами (1 запись пока)
Таблица со студентами (1 запись пока)
Таблица факультетов (пока 1 запись)
Таблица факультетов (пока 1 запись)

Для начала мы оформим наш первый простой Router со студентами, добавив в него один эндпоинт для получения данных с базы данных о всех студентах (я предварительно добавил одного студента в базу данных, чтоб было что получать).

Далее логика такая:

  1. Настраиваем роутер

  2. Пишем простой эндпоинт для получения с базы данных информаци о студенте

На данном примере мы добавим метод для получения данных о студенте прямо из базы данных. Затем, мы данный метод вынесем в локальный dao, повторив запрос, а после вынесем информацию уже в базовый dao.

Для начала импортируем нужные модули и настроим Router

Импорты:

from fastapi import APIRouter 
from sqlalchemy import select 
from app.database import async_session_maker 
from app.students.models import Student

Разберемся

  • from fastapi import APIRouter: Импортирует APIRouter из FastAPI, который используется для создания маршрутов (routes) для вашего API.

  • from sqlalchemy.future import select: Импортирует функцию select из SQLAlchemy для создания  SELECT запросов к базе данных (получение данных).

  • from app.database import async_session_maker: Импортирует async_session_maker, который используется для создания асинхронных сессий с базой данных.

  • from app.students.models import Student: Импортирует модель Student, которая представляет таблицу students в базе данных. Это необходимо для конкретного указания алхимии с какой таблицей ей работать.

Настраиваем Router

router = APIRouter(prefix='/students', tags=['Работа со студентами'])
  • router = APIRouter(...): Создает экземпляр APIRouter.

  • prefix='/students': Устанавливает префикс для всех маршрутов, определенных в этом роутере. Это означает, что все маршруты, добавленные к этому роутеру, будут начинаться с /students.

  • tags=['Работа со студентами']: Добавляет тег к роутеру, который будет использоваться в документации Swagger для группировки и описания маршрутов.

Напишем наш первый эндпоинт Router

@router.get("/", summary="Получить всех студентов")
async def get_all_students():
    async with async_session_maker() as session: 
        query = select(Student)
        result = await session.execute(query)
        students = result.scalars().all()
        return students

Давайте разберем функцию get_all_students простыми словами, шаг за шагом.

Маршрут для получения всех студентов

@router.get("/", summary="Получить всех студентов")

Эта строка говорит FastAPI, что когда кто-то делает GET-запрос на адрес /students/, нужно выполнить функцию get_all_students(). Описание «Получить всех студентов» будет показано в документации.

Асинхронная функция

async def get_all_students():

Это начало асинхронной функции. Асинхронность позволяет обрабатывать несколько запросов одновременно, не блокируя другие операции.

Создание сессии

async with async_session_maker() as session:

Здесь создается асинхронная сессия для работы с базой данных. Эта сессия автоматически закроется после выполнения всех операций внутри блока with.

Создание запроса

query = select(Student)

Создается запрос для выбора всех записей из таблицы Student.

Выполнение запроса

result = await session.execute(query)

Запрос отправляется в базу данных, и результат сохраняется в переменной result.

Извлечение результатов

students = result.scalars().all()

Все строки результата запроса извлекаются и собираются в список. Каждый элемент этого списка представляет собой объект Student.

Возвращение результата

return students

Список студентов возвращается в виде JSON-ответа. FastAPI автоматически преобразует его в формат JSON.

Итог

Функция get_all_students делает следующее:

  1. При получении GET-запроса на /students/ она открывает сессию с базой данных.

  2. Создает запрос для получения всех студентов из базы данных.

  3. Выполняет запрос и получает все записи.

  4. Преобразует полученные записи в список объектов Student.

  5. Возвращает этот список как JSON-ответ.

Таким образом, когда кто-то обращается к вашему API по адресу /students/, он получает полный список всех студентов, зарегистрированных в системе.

Подключим Router

Для того чтобы созданный Router заработал, его необходимо подключить в файле main.py. Подключение выглядит следующим образом:

from fastapi import FastAPI
from app.students.router import router as router_students


app = FastAPI()


@app.get("/")
def home_page():
    return {"message": "Привет, Хабр!"}


app.include_router(router_students)

Я тут оставил эндпоинт главной страницы. В части про подключение Front-end мы ещё доработаем эндпоинт home_page.

Для начала нам необходимо импортировать наш Router. Так как каждый файл из каждой сущности будет иметь название router, мы добавим псевдоним роутеру студентов и назовем его router_students.

Далее, для того чтобы подключить (включить) роутер, нам достаточно выполнить:

app.include_router(router_students)

Теперь можно запустить наше FastAPI приложение (не забудьте подготовить данные в таблицах). Из корня проекта выполняем команду:

uvicorn app.main:app --reload

Зайдем в документацию по адресу http://127.0.0.1:8000/docs и видим следующее:

Тут мы видим и подпись «Работа со студентами» (tags=['Работа со студентами']) и «Получить всех студентов» (summary="Получить всех студентов").

Тут мы видим что никаких подсказок, ни подсказок для формирования тела запроса, ни описания модели ответа запроса от Pydantic нет, но мы это уже совсем скоро поправим. Пока просто выполним функцию и посмотрим на результат выполнения. 

Произошла настоящая магия. Мы не указывали ни в одном месте, что необходимо трансформировать данные в JSON, но FastApi сделал это автоматически.

Это все здорово, но в эндпоинтах никто не пишет прямые запросы к базе данных, да и сам запрос было бы неплохо «прокачать», чтоб он ещё и фильтрованные значения возвращал, правда?

Давайте вынесем код взаимодействия с БД пока в файл dao.py, а после уже трансформируем его в универсальную функцию.

from sqlalchemy import select
from app.students.models import Student
from app.database import async_session_maker


class StudentDAO:
    @classmethod
    async def find_all_students(cls):
        async with async_session_maker() as session:
            query = select(Student)
            students = await session.execute(query)
            return students.scalars().all()

Класс будет выглядеть так. Обратите внимание, что мы использовали тут @classmethod. Это нам нужно для удобства обращения. Мы сможем теперь импортировать класс и обращаться через точку, не объявляя каждый раз объект класса.

Импортируем наш класс router.py и, после изменений, код router.py будет иметь такой вид:

from fastapi import APIRouter
from app.students.dao import StudentDAO


router = APIRouter(prefix='/students', tags=['Работа со студентами'])


@router.get("/", summary="Получить всех студентов")
async def get_all_students():
    return await StudentDAO.find_all_students()

Уже приятнее, правда?

Проверим.

Супер, тот же результат! Но, спросите вы, «Ты ж нам там что-то рассказывал про какой-то там Pydantic и что-то там про его модели, где это все!?» и будете абсолютно правы. Давайте добавим модель ответа (response_model) в файл schemas.py.

from datetime import datetime, date
from typing import Optional
import re
from pydantic import BaseModel, Field, EmailStr, validator


class SStudent(BaseModel):
    id: int
    phone_number: str = Field(..., description="Номер телефона в международном формате, начинающийся с '+'")
    first_name: str = Field(..., min_length=1, max_length=50, description="Имя студента, от 1 до 50 символов")
    last_name: str = Field(..., min_length=1, max_length=50, description="Фамилия студента, от 1 до 50 символов")
    date_of_birth: date = Field(..., description="Дата рождения студента в формате ГГГГ-ММ-ДД")
    email: EmailStr = Field(..., description="Электронная почта студента")
    address: str = Field(..., min_length=10, max_length=200, description="Адрес студента, не более 200 символов")
    enrollment_year: int = Field(..., ge=2002, description="Год поступления должен быть не меньше 2002")
    major_id: int = Field(..., ge=1, description="ID специальности студента")
    course: int = Field(..., ge=1, le=5, description="Курс должен быть в диапазоне от 1 до 5")
    special_notes: Optional[str] = Field(None, max_length=500, description="Дополнительные заметки, не более 500 символов")

    @validator("phone_number")
    def validate_phone_number(cls, value):
        if not re.match(r'^\+\d{1,15}$', value):
            raise ValueError('Номер телефона должен начинаться с "+" и содержать от 1 до 15 цифр')
        return value

    @validator("date_of_birth")
    def validate_date_of_birth(cls, value):
        if value and value >= datetime.now().date():
            raise ValueError('Дата рождения должна быть в прошлом')
        return value

Видим, что в документации появилась модель ответа. Отлично.

Запускаем и видим, что был получен корректный результат. В более ранних версиях Pydantic была необходимость добавлять в модели

class Config:
    orm_mode = True

В более поздних версиях данная запись изменилась на такую конструкцию:

model_config = ConfigDict(from_attributes=True)

Указывается она в начале описания модели и я советую вам ее всегда указывать, когда речь идет про взаимодействие с базой данных через алхимию. Тем самым Pydantic понимает явно, что работать он будет с аргументами базы данных.

from datetime import datetime, date
from typing import Optional
import re
from pydantic import BaseModel, Field, EmailStr, validator


class SStudent(BaseModel):
    model_config = ConfigDict(from_attributes=True)
    id: int
    phone_number: str = Field(..., description="Номер телефона в международном формате, начинающийся с '+'")
    first_name: str = Field(..., min_length=1, max_length=50, description="Имя студента, от 1 до 50 символов")
    last_name: str = Field(..., min_length=1, max_length=50, description="Фамилия студента, от 1 до 50 символов")
    date_of_birth: date = Field(..., description="Дата рождения студента в формате ГГГГ-ММ-ДД")
    email: EmailStr = Field(..., description="Электронная почта студента")
    address: str = Field(..., min_length=10, max_length=200, description="Адрес студента, не более 200 символов")
    enrollment_year: int = Field(..., ge=2002, description="Год поступления должен быть не меньше 2002")
    major_id: int = Field(..., ge=1, description="ID специальности студента")
    course: int = Field(..., ge=1, le=5, description="Курс должен быть в диапазоне от 1 до 5")
    special_notes: Optional[str] = Field(None, max_length=500, description="Дополнительные заметки, не более 500 символов")

    @validator("phone_number")
    def validate_phone_number(cls, value):
        if not re.match(r'^\+\d{1,15}$', value):
            raise ValueError('Номер телефона должен начинаться с "+" и содержать от 1 до 15 цифр')
        return value

    @validator("date_of_birth")
    def validate_date_of_birth(cls, value):
        if value and value >= datetime.now().date():
            raise ValueError('Дата рождения должна быть в прошлом')
        return value

Разбор описания полей, как и всего смысла Pydantic я делал в этой статье: Создание собственного API на Python (FastAPI): Гайд по POST, PUT, DELETE запросам и моделям Pydantic

Внимательный читатель уже сейчас заметил, что функцию для получения всех студентов было бы неплохо вынести в общий класс, так как большой разницы нет, хотим мы получить всех студентов, всех преподавателей или полные данные с любой другой таблицы.

Так мы и сделаем. Тем более мы уже подготовили под это дело специальный файл dao/base.py. Опишем его:

from sqlalchemy.future import select
from app.database import async_session_maker
from app.students.models import Student


class BaseDAO:
    @classmethod
    async def find_all_students(cls):
        async with async_session_maker() as session:
            query = select(Student)
            students = await session.execute(query)
            return students.scalars().all()

Мы тут сразу видим, что для того чтоб этот класс был универсальным, как минимум, нужно добавить возможность указания модели, с которой должен работать класс и нужно изменить название метода и переменных.

 После изменений код получил такой вид: 

from sqlalchemy.future import select
from app.database import async_session_maker


class BaseDAO:
    model = None
    
    @classmethod
    async def find_all(cls):
        async with async_session_maker() as session:
            query = select(cls.model)
            result = await session.execute(query)
            return result.scalars().all()

Тут мы вынесли указание модели на уровень класса. Тем самым мы добавили возможность наследоваться от данного класса, передавая нужную модель (на примере далее это будет понятнее).

Далее мы дали более универсальное название методу и переменным.

Теперь классу StudentDAO необходимо наследоваться от созданного класса BaseDao. После правок файл dao.py будет иметь такой вид:

from app.dao.base import BaseDAO
from app.students.models import Student


class StudentDAO(BaseDAO):
    model = Student

Код стал максимально лаконичным.

Теперь внесем изменения в наш роутер, так как название метода для получения студентов изменилось:

@router.get("/", summary="Получить всех студентов", response_model=list[SStudent])
async def get_all_students():
    return await StudentDAO.find_all()
  

Проверим работает ли он теперь? Работает!

Хорошенько посмотрев на код и подумав, мы можем прийти к выводу, что, в целом, было бы неплохо начать как-то фильтровать студентов и получать только тех, которые будут соответствовать нашим фильтрам. Да легко, ведь мы работаем с SQLAlchemy 2.0!

Внесем небольшие изменения в класс BaseDao.

class BaseDAO:
    model = None
    
    @classmethod
    async def find_all(cls, **filter_by):
        async with async_session_maker() as session:
            query = select(cls.model).filter_by(**filter_by)
            result = await session.execute(query)
            return result.scalars().all()

В этом коде:

  • Метод find_all класса BaseDAO теперь принимает неограниченное количество именованных аргументов через **filter_by.

  • Внутри метода проверяется наличие переданных фильтров. Если filter_by содержит какие-то аргументы (например, course=4 и enrollment_year=2018), то создается запрос select(cls.model).filter_by(**filter_by), который фильтрует записи по этим аргументам.

  • Если filter_by пуст или не указан, выполняется базовый запрос select(cls.model), который выбирает все записи без фильтрации.

  • Запрос выполняется в асинхронной сессии async_session_maker(), результат извлекается и возвращается в виде списка объектов.

Теперь метод find_all может использоваться с различными комбинациями фильтров, передаваемых в виде именованных аргументов или распакованных из словаря. 

Отлично. Теперь немного изменим наш эндпоинт, передав в него тело запроса. Для начала напишем код в файл rb.py

class RBStudent:
    def __init__(self, student_id: int | None = None,
                 course: int | None = None,
                 major_id: int | None = None,
                 enrollment_year: int | None = None):
        self.id = student_id
        self.course = course
        self.major_id = major_id
        self.enrollment_year = enrollment_year

        
    def to_dict(self) -> dict:
        data = {'id': self.id, 'course': self.course, 'major_id': self.major_id,
                'enrollment_year': self.enrollment_year}
        # Создаем копию словаря, чтобы избежать изменения словаря во время итерации
        filtered_data = {key: value for key, value in data.items() if value is not None}
        return filtered_data

Обратите внимание. Все параметры тела запроса у нас не обязательные. Кроме того, я использовал обновленный синтаксис. Вместо: course: Optional[int] = None использовал запись course: int | None = None. С 2021-го года они эквивалентны друг другу. 

Так-же, в данном классе вы можете увидеть метод, который возвращает данные в виде питоновского словаря. Это нам будет полезно, когда мы будем формировать наш SELECT запрос с фильтрами.

Метод организован таким образом, что если значение у любого поля будет None, то в финальный словарь оно не попадет.

from fastapi import APIRouter, Depends
from app.students.dao import StudentDAO
from app.students.rb import RBStudent
from app.students.schemas import SStudent


router = APIRouter(prefix='/students', tags=['Работа со студентами'])


@router.get("/", summary="Получить всех студентов")
async def get_all_students(request_body: RBStudent = Depends()) -> list[SStudent]:
    return await StudentDAO.find_all(**request_body.to_dict())

А это, как вы понимаете, уже что-то.

Посмотрим что там у нас в документации.

Мы получили форму. Отлично!

Попробуем ею воспользоваться:

У нас все работает, а это значит что мы можем двигаться дальше.

Теперь было бы неплохо написать универсальную функцию, которая будет возвращать запись по id либо пускай она возвращает None если записи с таким id нет.

@classmethod
async def find_one_or_none_by_id(cls, data_id: int):
    async with async_session_maker() as session:
        query = select(cls.model).filter_by(id=data_id)
        result = await session.execute(query)
        return result.scalar_one_or_none()

Тут из нового мы применили конструкцию result.scalar_one_or_none(). Как вы догадались из названия, возвращать такая функция будет или None, что будет говорить про отсутствие записи с указанным id или саму запись.

Думаю, что лучше всего тут подойдет интеграция в эндпоинт с параметром пути.

@router.get("/{id}", summary="Получить одного студента по id")
async def get_student_by_id(student_id: int) -> SStudent | None:
    return await StudentDAO.find_one_or_none_by_id(student_id)

Посмотрим что там у нас в документации:

Студент с ID 1 найден
Студент с ID 1 найден
Студент с ID 100 не найден
Студент с ID 100 не найден

Видим, что никаких ошибок у нас нет, но null как-то не очень, правда?

Давайте добавим небольшую обработку. Добавим сообщение, которое получит пользователь, если студента нет.

Укажем, что если студент не будет найден, то мы вернем или модель студента Sstudent или наш кастомный словарь (JSON).

@router.get("/{id}", summary="Получить одного студента по id")
async def get_student_by_id(student_id: int) -> SStudent | dict:
    rez = await StudentDAO.find_one_or_none_by_id(student_id)
    if rez is None:
        return {'message': f'Студент с ID {student_id} не найден!'}
    return rez
  

Отлично. Теперь давайте добавим метод, похожий на find_one_or_none_by_id, но пусть он принимает случайное значение (любой фильтр) и возвращает или одного студента или информацию о том, что студент с такими параметрами не найден.

@classmethod
async def find_one_or_none(cls, **filter_by):
    async with async_session_maker() as session:
        query = select(cls.model).filter_by(**filter_by)
        result = await session.execute(query)
        return result.scalar_one_or_none()

Напишем эндпонит.

@router.get("/by_filter", summary="Получить одного студента по фильтру")
async def get_student_by_filter(request_body: RBStudent = Depends()) -> SStudent | dict:
    rez = await StudentDAO.find_one_or_none(**request_body.to_dict())
    if rez is None:
        return {'message': f'Студент с указанными вами параметрами не найден!'}
    return rez
  

Это все круто, но что там с факультетами. Мы же не будем запоминать ID каждого факультета, а просто хотим получить такой расклад «Василий Петров — студент 2 курса факультета Информатики».

Верно и для решения этой задачи есть 2 варианта:

  1. Мы напишем отдельную функцию, которая, при получении студента выполнит обращение к двум таблицам (возьмем id факультета, затем по id факультета вытянем название и свяжем эти 2 массива в один массив)

  2. Мы изменим модели наших таблиц (students и majors) и откроем для себя возможность более комфортной и быстрой работы в связанных таблицах.

Первый вариант я оставил по той причине, что не всегда данные в нескольких таблицах будут связаны даже по общему ключу. Иногда такого варианта нет, а определенный массив сгенерировать нужно. В таких случаях пишут более сложные SQL запросы.

Перед тем как начать писать функции давайте немного изменим наши модели таблиц. Предлагаю добавить специальный метод, который будет превращать объект алхимии, который мы получаем при SELECT запросе в питоновский словарь.

Сделаем это по причине того, что, зачастую, удобнее работать со словарями, чем с объектами алхимии. К примеру, у словаря есть методы по добавлению, обновлению и удалению ключей. Да и в целом, родные словари все же ближе, чем объекты, не так ли?

class Student(Base):
    id: Mapped[int_pk]
    phone_number: Mapped[str_uniq]
    first_name: Mapped[str]
    last_name: Mapped[str]
    date_of_birth: Mapped[date]
    email: Mapped[str_uniq]
    address: Mapped[str] = mapped_column(Text, nullable=False)
    enrollment_year: Mapped[int]
    course: Mapped[int]
    special_notes: Mapped[str_null_true]
    major_id: Mapped[int] = mapped_column(ForeignKey("majors.id"), nullable=False)

    def __str__(self):
        return (f"{self.__class__.__name__}(id={self.id}, "
                f"first_name={self.first_name!r}, "
                f"last_name={self.last_name!r})")

    def __repr__(self):
        return str(self)

    def to_dict(self):
        return {
            "id": self.id,
            "phone_number": self.phone_number,
            "first_name": self.first_name,
            "last_name": self.last_name,
            "date_of_birth": self.date_of_birth,
            "email": self.email,
            "address": self.address,
            "enrollment_year": self.enrollment_year,
            "course": self.course,
            "special_notes": self.special_notes,
            "major_id": self.major_id
        }

Как вы видите, все просто. Мы добавили метод to_dict, который просто будет трансформировать все полученные данные в простой словарь. В модель Major можем метод не добавлять.

Добавление метода to_dict() в класс Student не оказывает никакого влияния на структуру таблицы в базе данных. Этот метод представляет собой чисто программистский удобный интерфейс для преобразования объекта Student в формат словаря, что упрощает его использование в различных частях вашего приложения, таких как сериализация в JSON для API или вывод в консоль для отладки.

Теперь давайте добавим эксклюзивный метод в файл dao.py, который будет возвращать полную информацию о студенте с названием факультета.

 Файл students/dao.py:

class StudentDAO(BaseDAO):
    model = Student

    @classmethod
    async def find_full_data(cls, student_id: int):
        async with async_session_maker() as session:
            # Первый запрос для получения информации о студенте
            query_student = select(cls.model).filter_by(id=student_id)
            result_student = await session.execute(query_student)
            student_info = result_student.scalar_one_or_none()

            # Если студент не найден, возвращаем None
            if not student_info:
                return None

            # Второй запрос для получения информации о специальности
            query_major = select(Major).filter_by(id=student_info.major_id)
            result_major = await session.execute(query_major)
            major_info = result_major.scalar_one()

            student_data = student_info.to_dict()
            student_data['major'] = major_info.major_name

            return student_data

Обратите внимание. Тут мы, сначала, проверили есть ли у нас студент с указанным id. Если его нет, то сценарий сразу остановится.

В случае же если студент есть, то мы автоматически тянем информацию по его факультету. Мы можем это делать с уверенностью, так как в таблице со студентами есть прямая связь с таблицей факультетов (ForeignKey))

Так как у нас добавилось новое поле — давайте изменим Pydantic модель Sstudent. В нее нам необходимо добавить описание всего одного поля:

major: Optional[str] = Field(..., description="Название факультета")

Напоминаю, что данное описание говорит, что поле major обязательное.

Теперь внесем правки в эндпоинт.

@router.get("/{id}", summary="Получить одного студента по id")
async def get_student_by_id(student_id: int) -> SStudent | dict:
    rez = await StudentDAO.find_full_data(student_id)
    if rez is None:
        return {'message': f'Студент с ID {student_id} не найден!'}
    return rez

Запускаем и проверяем.

Видим, что подгрузилось название факультета
Видим, что подгрузилось название факультета

Все отработало, но, какой-то у нас слишком длинный запрос получился к базе данных. Не так-ли, когда, казалось бы, все просто: свяжи данные и достань их разом. Если вы так подумали, то вы абсолютно правы!

В алхимии предусмотрен механизм отношений (relationships) о котором я сейчас расскажу подробнее.

Отношения в SQLAlchemy

Отношения в SQLAlchemy позволяют удобно управлять связями между таблицами в базе данных, делая код более читаемым и упрощая работу с данными.

Как это работает:

  1. Определение отношений:

В моделях определяются связи между таблицами с помощью конструкций типа relationship. Например, связь «один ко многим» или «многие ко многим».

  1. Создание SQL-запросов:

ORM автоматически генерирует нужные SQL-запросы для загрузки связанных данных.

  1. Загрузка данных:

SQLAlchemy может загружать связанные объекты автоматически при обращении к ним, используя «жадную» или «ленивую» загрузку для оптимизации производительности.

Преимущества:

  1. Удобство и читаемость:

Связи между объектами базы данных описаны прямо в коде Python, что упрощает его понимание.

  1. Меньше дублирования:

Не нужно писать сложные SQL-запросы вручную; ORM делает это за вас.

  1. Интеграция с бизнес-логикой:

ORM позволяет легко интегрировать бизнес-логику с данными, используя объектно-ориентированный подход.

  1. Целостность данных:

Автоматически поддерживается целостность данных, например, каскадное удаление связанных объектов.

Надеюсь, что доступно объяснил.

Теперь давайте добавим отношения в между таблицами (моделями) majors и students.

Для добавления отношений в наши модели SQLAlchemy, нужно использовать конструкцию relationship. Это позволит вам связать модели Student и Major, чтобы можно было легко обращаться к факультету студента и к студентам факультета.

Вот как можно добавить отношения в ваши модели:

from sqlalchemy import ForeignKey, text, Text
from sqlalchemy.orm import Mapped, mapped_column, relationship
from app.database import Base, str_uniq, int_pk, str_null_true
from datetime import date


# создаем модель таблицы студентов
class Student(Base):
    id: Mapped[int_pk]
    phone_number: Mapped[str_uniq]
    first_name: Mapped[str]
    last_name: Mapped[str]
    date_of_birth: Mapped[date]
    email: Mapped[str_uniq]
    address: Mapped[str] = mapped_column(Text, nullable=False)
    enrollment_year: Mapped[int]
    course: Mapped[int]
    special_notes: Mapped[str_null_true]
    major_id: Mapped[int] = mapped_column(ForeignKey("majors.id"), nullable=False)

    # Определяем отношения: один студент имеет один факультет
    major: Mapped["Major"] = relationship("Major", back_populates="students")

    def __str__(self):
        return (f"{self.__class__.__name__}(id={self.id}, "
                f"first_name={self.first_name!r}, "
                f"last_name={self.last_name!r})")

    def __repr__(self):
        return str(self)

    def to_dict(self):
        return {
            "id": self.id,
            "phone_number": self.phone_number,
            "first_name": self.first_name,
            "last_name": self.last_name,
            "date_of_birth": self.date_of_birth,
            "email": self.email,
            "address": self.address,
            "enrollment_year": self.enrollment_year,
            "course": self.course,
            "special_notes": self.special_notes,
            "major_id": self.major_id
        }

        
# создаем модель таблицы факультетов (majors)
class Major(Base):
    id: Mapped[int_pk]
    major_name: Mapped[str_uniq]
    major_description: Mapped[str_null_true]
    count_students: Mapped[int] = mapped_column(server_default=text('0'))

    # Определяем отношения: один факультет может иметь много студентов
    students: Mapped[list["Student"]] = relationship("Student", back_populates="major")

    def __str__(self):
        return f"{self.__class__.__name__}(id={self.id}, major_name={self.major_name!r})"

    def __repr__(self):
        return str(self)

Что добавлено:

В модели Student:

  • major: добавлена связь с моделью Major с использованием relationship.

  • back_populates="students": указывает, что обратная связь идет через атрибут students в модели Major.

В модели Major: 

  • students: добавлена связь с моделью Student с использованием relationship.

  • back_populates="major": указывает, что обратная связь идет через атрибут major в модели Student.

Эти изменения позволяют вам обращаться к факультету студента через student.major и к студентам факультета через major.students.

Теперь, когда мы настроили связи, мы можем упростить класс StudentDAO

Чтобы использовать связи, которые мы добавили в модели Student и Major, можно упростить класс StudentDAO и избежать второго запроса к базе данных, так как SQLAlchemy автоматически загрузит связанные данные.

Вот как можно переписать класс StudentDAO:

from sqlalchemy import insert, update, delete
from sqlalchemy.exc import SQLAlchemyError
from sqlalchemy.future import select
from sqlalchemy.orm import joinedload
from app.dao.base import BaseDAO
from app.majors.models import Major
from app.students.models import Student
from app.database import async_session_maker


class StudentDAO(BaseDAO):
    model = Student

    @classmethod
    async def find_full_data(cls, student_id: int):
        async with async_session_maker() as session:
            # Запрос для получения информации о студенте вместе с информацией о факультете
            query = select(cls.model).options(joinedload(cls.model.major)).filter_by(id=student_id)
            result = await session.execute(query)
            student_info = result.scalar_one_or_none()

            # Если студент не найден, возвращаем None
            if not student_info:
                return None

            student_data = student_info.to_dict()
            student_data['major'] = student_info.major.major_name
            return student_data

          

Что изменено:

1. Использование joinedload:

  • joinedload(cls.model.major) загружает данные о факультете вместе с данными о студенте, что позволяет избежать второго запроса.

2. Упрощение логики:

  • Удалён второй запрос для получения информации о факультете.

  • Теперь мы можем получить данные о факультете непосредственно из объекта student_info благодаря использованию связей.

Теперь, когда у вас есть данные о студенте, вы можете легко получить данные о факультете через связь, и SQLAlchemy автоматически позаботится о выполнении нужных SQL-запросов.

Надеюсь, что получилось доступно объяснить тему связей между таблицами.

Я сделал большой акцент на тему получения данных (SELECT), так как на старте вы будете чаще всего взаимодействовать с таким запросом.

Далее, так как тут, все таки, не курс по SQLAlchemy 2.0, а изучение FastApi методы по добавлению, обновлению и удалению данных через алхимию мы рассмотрим не так подробно.

Возможно, если это будет вам нужно, когда-то я подробно и основательно рассмотрю алхимию в нескольких статьях, с ее возможностями, фишками и трюками. Если это так, то найдите способ сообщить мне об этом.

 Добавление данных (POST)

Напоминаю, что в нашей базе данных пока 2 таблицы: students и majors. При чем студента мы не сможем добавить с несуществующим факультетом. Это говорит о том, что нужно сначала добавить факультет, а после добавлять студента.

Технически, и добавление факультета и добавление студента — это не более чем одинаковые действия (за исключением обновления счетчика студентов в таблице факультетов, но об этом чуть далее):

  • принимаем данные

  • записываем их в таблицу

Но сущностей у нас, по сути, две. Следовательно, для чистоты кода мы можем разделить факультеты и студентов на две папки.

После изменений структура проекта будет следующей:

my_fastapi_project/

├── tests/

│   └── (тут мы будем добавлять функции для Pytest)

├── app/

│   ├── database.py

│   ├── config.py

│   ├── main.py

│   └── majors/

│      ├── router.py

│      ├── schemas.py

│      ├── dao.py

│      ├── rb.py

│   └── students/

│      ├── router.py

│      ├── schemas.py

│      ├── dao.py

│      ├── rb.py

│   └── dao/

│       └── base.py

│   └── migration/

│       └── (файлы миграций Alembic)

├── alembic.ini

├── .env

└── requirements.txt

Корректно вынесем код из таблицы студентов в таблицу факультетов.

Файл majors/models.py

from sqlalchemy import text
from sqlalchemy.orm import Mapped, mapped_column, relationship
from app.database import Base, str_uniq, int_pk, str_null_true
from app.students.models import Student


# создаем модель таблицы факультетов (majors)
class Major(Base):
    id: Mapped[int_pk]
    major_name: Mapped[str_uniq]
    major_description: Mapped[str_null_true]
    count_students: Mapped[int] = mapped_column(server_default=text('0'))

    # Определяем отношения: один факультет может иметь много студентов
    students: Mapped[list[Student]] = relationship("Student", back_populates="major")

    def __str__(self):
        return f"{self.__class__.__name__}(id={self.id}, major_name={self.major_name!r})"

    def __repr__(self):
        return str(self)

Файл majors/dao.py

from app.dao.base import BaseDAO
from app.majors.models import Major


class MajorsDAO(BaseDAO):
    model = Major

Теперь удаляем ненужные данные у студентов.

from sqlalchemy import ForeignKey, Text
from sqlalchemy.orm import Mapped, mapped_column, relationship
from app.database import Base, str_uniq, int_pk, str_null_true
from datetime import date


# создаем модель таблицы студентов
class Student(Base):
    id: Mapped[int_pk]
    phone_number: Mapped[str_uniq]
    first_name: Mapped[str]
    last_name: Mapped[str]
    date_of_birth: Mapped[date]
    email: Mapped[str_uniq]
    address: Mapped[str] = mapped_column(Text, nullable=False)
    enrollment_year: Mapped[int]
    course: Mapped[int]
    special_notes: Mapped[str_null_true]
    major_id: Mapped[int] = mapped_column(ForeignKey("majors.id"), nullable=False)
    
    # Определяем отношения: один студент имеет один факультет
    major: Mapped["Major"] = relationship("Major", back_populates="students")

    def __str__(self):
        return (f"{self.__class__.__name__}(id={self.id}, "
                f"first_name={self.first_name!r},"
                f"last_name={self.last_name!r})")

    def __repr__(self):
        return str(self)

    def to_dict(self):
        return {
            "id": self.id,
            "phone_number": self.phone_number,
            "first_name": self.first_name,
            "last_name": self.last_name,
            "date_of_birth": self.date_of_birth,
            "email": self.email,
            "address": self.address,
            "enrollment_year": self.enrollment_year,
            "course": self.course,
            "special_notes": self.special_notes,
            "major_id": self.major_id
        }

Есть очень важный момент. Когда вы настраиваете связи между таблицами, импортировать модели друг в друга не нужно. То есть, если у вас в таблице students есть связь с таблицей majors, импортировать модель Major не нужно, а просто указывайте в таком формате:

major: Mapped["Major"] = relationship("Major", back_populates="students")

Указание без кавычек возможно только в случае, когда обе модели описываются в одном файле. Будьте внимательны.

Теперь необходимо корректно импортировать новое расположение к таблицам в файле migration/env.py

from app.database import DATABASE_URL, Base
from app.students.models import Student
from app.majors.models import Major

Новые модели необходимо добавлять таким же образом.

Теперь, с учетом того что мы начинаем добавлять данные программно, через наше API, я предлагаю удалить существующие таблицы и выполнить новую миграцию.

Я удалил таблицы с базы данных и все файлы с папки migration/versions

Теперь выполняем команду из корня проекта:

alembic revision --autogenerate -m "Initial revision"

Теперь выполним upgrade:

alembic upgrade head

После этого мы получили новые файлы, а вы теперь знаете как легко и просто расширять проект. В дальнейшем, при появлении новых папок, делайте следующее:

  1. Создаете папку

  2. Наполняете ее файлами

  3. Описываете модели

  4. В файле migration/env.py добавляете корректные импорты моделей как на примере выше

  5. Добавляете миграцию (ревизию)

  6. Обновляете

Придерживайтесь такого алгоритма и у вас не будет никаких проблем, как с расширением проекта, так и с миграциями через Alembic.

Мы можем сделать универсальный метод для добавления данных в таблицу.

В файл dao/base.py добавим следующее:

@classmethod
async def add(cls, **values):
    async with async_session_maker() as session:
        async with session.begin():
            new_instance = cls.model(**values)
            session.add(new_instance)
            try:
                await session.commit()
            except SQLAlchemyError as e:
                await session.rollback()
                raise e
            return new_instance

Короткие комментарии:

  • Создаем асинхронную сессию.

  • Начинаем транзакцию.

  • Создаем новый экземпляр модели с переданными значениями.

  • Добавляем новый экземпляр в сессию.

  • Пытаемся зафиксировать изменения в базе данных.

  • В случае ошибки откатываем транзакцию и пробрасываем исключение дальше.

  • Возвращаем созданный экземпляр.

Тут важно понять, что если мы не будем выполнять commit, то изменения в базе данных не сохраняться.

Сам метод будет принимать некий массив данных (словарь), который мы после распакуем и добавим.

Метод add будет работать и без блока async with session.begin(), но он добавляет важное преимущество, обеспечивая управление транзакциями. Вот почему это важно:

Управление транзакциями:

  • Блок async with session.begin() автоматически начинает транзакцию и завершает её после выхода из блока, что гарантирует целостность данных.

  • Без этого блока вам нужно вручную начинать и завершать транзакцию.

  • Если вы хотите упростить метод и отказаться от использования блока async with session.begin(), вам нужно будет явно управлять транзакциями.

Для простоты опишем Router из папки majors на добавление факультетов. Но, для начала, опишем модель Pydantic для добавления.

Тут сразу хочу обратить внимание на один момент. Обычно отдельно описываются модели для добавления данных и для их получения, что, в целом, логично.

Мы будем делать так-же. В своих проектах я обычно добавляю в название схемы пометку Add и Get. Удобно.

Напишем модель (файл majors/schemas.py):

from pydantic import BaseModel, Field


class SMajorsAdd(BaseModel):
    major_name: str = Field(..., description="Название факультета")
    major_description: str = Field(None, description="Описание факультета")
    count_students: int = Field(0, description="Количество студентов")

Тут мы не указывали id, так как наша база данных его и так сформирует.

Теперь напишем Router для обработки POST запроса (файл majors/router.py)

from fastapi import APIRouter
from app.majors.dao import MajorsDAO
from app.majors.schemas import SMajorsAdd


router = APIRouter(prefix='/majors', tags=['Работа с факультетами'])


@router.post("/add/")
async def register_user(major: SMajorsAdd) -> dict:
    check = await MajorsDAO.add(**major.dict())
    if check:
        return {"message": "Факультет успешно добавлен!", "major": major}
    else:
        return {"message": "Ошибка при добавлении факультета!"}

Подключим новый роутер в файл app/main.py:

from fastapi import FastAPI
from app.students.router import router as router_students
from app.majors.router import router as router_majors

app = FastAPI()


@app.get("/")
def home_page():
    return {"message": "Привет, Хабр!"}


app.include_router(router_students)
app.include_router(router_majors)

Тестируем

На ID не обращайте внимание. Это я в процессе написания добавил несколько факультетов.
На ID не обращайте внимание. Это я в процессе написания добавил несколько факультетов.

Отлично. Все работает. Теперь добавим несколько факультетов через наше апи. Чтоб было интереснее, воспользуемся httpx. Тем самым, имитируя реальную работу с API.

import asyncio
import httpx


async def add_major(major_name: str, major_description: str):
    url = 'http://127.0.0.1:8000/majors/add/'
    headers = {
        'accept': 'application/json',
        'Content-Type': 'application/json'
    }
    data = {
        "major_name": major_name,
        "major_description": major_description,
        "count_students": 0
    }

    async with httpx.AsyncClient() as client:
        response = await client.post(url, headers=headers, json=data)
        return response.json()


# вызов функции
response = asyncio.run(add_major(major_name='Философия', major_description='Тут мы обучаем философов')) 
print(response)

В ответ мы получили:

{'message': 'Факультет успешно добавлен!', 'major': {'major_name': 'Философия', 'major_description': 'Тут мы обучаем философов', 'count_students': 0}}

На данном примере вы видите, что апи уже начинает полноценно работать, что не может не радовать. Давайте добавим ещё пару факультетов, на свое усмотрение и перейдем к добавлению студентов.

Факультеты
Факультеты

Отлично, с этим мы разобрались. Теперь добавим обработчик PUT запроса (изменение данных). Для этого, как вы понимаете, мы снова можем написать универсальную функцию в dao/base.py

from sqlalchemy.exc import SQLAlchemyError
from sqlalchemy.future import select
from sqlalchemy import update as sqlalchemy_update, delete as sqlalchemy_delete
from app.database import async_session_maker



@classmethod
async def update(cls, filter_by, **values):
    async with async_session_maker() as session:
        async with session.begin():
            query = (
                sqlalchemy_update(cls.model)
                .where(*[getattr(cls.model, k) == v for k, v in filter_by.items()])
                .values(**values)
                .execution_options(synchronize_session="fetch")
            )
            result = await session.execute(query)
            try:
                await session.commit()
            except SQLAlchemyError as e:
                await session.rollback()
                raise e
            return result.rowcount

Этот метод обновляет записи в базе данных. Вот шаги, что он делает:

  1. Объявление метода:

    @classmethod говорит о том, что метод принадлежит классу, а не его экземпляру.

    async def update(cls, filter_by, **values) определяет асинхронный метод update, который принимает два параметра: filter_by (условия фильтрации) и values (значения для обновления).

  2. Создание сессии:

async with async_session_maker() as session: Открывается асинхронная сессия для взаимодействия с базой данных.

  1. Начало транзакции:

async with session.begin(): Начинается транзакция (группировка операций для выполнения как одной единицы).

  1. Создание запроса:

query = (sqlalchemy_update(cls.model)...): Создаётся запрос на обновление записей в таблице, связанной с моделью cls.model.

.where(*[getattr(cls.model, k) == v for k, v in filter_by.items()]): Добавляются условия фильтрации, чтобы обновить только те записи, которые соответствуют заданным условиям.

.values(**values): Устанавливаются новые значения для обновляемых записей.

.execution_options(synchronize_session="fetch"): Опция, чтобы синхронизировать состояние сессии с базой данных после выполнения запроса.

  1. Выполнение запроса:

result = await session.execute(query): Выполняется запрос на обновление.

  1. Фиксация транзакции:

await session.commit(): Сохраняются изменения в базе данных.

except SQLAlchemyError as e: await session.rollback(): Если возникает ошибка, транзакция откатывается (отмена всех изменений).

  1. Возвращение результата:

return result.rowcount: Возвращается количество обновлённых строк.

Итак, этаот метод берёт условия для фильтрации записей и новые значения для обновления этих записей. Затем он обновляет соответствующие записи в базе данных и возвращает количество обновлённых строк.

Теперь, для того чтоб протестировать данный метод, предлагаю написать обработчик PUT запроса, который будет принимать на входе название Факультета и его новое описание, а после будет происходить обновление данных.

Для начала напишем соответствующую модель Pydantic:

from pydantic import BaseModel, Field


class SMajorsUpdDesc(BaseModel):
    major_name: str = Field(..., description="Название факультета")
    major_description: str = Field(None, description="Новое описание факультета")

Теперь напишем соответствующий эндпоинт в файле router.py:

@router.put("/update_description/")
async def update_major_description(major: SMajorsUpdDesc) -> dict:
    check = await MajorsDAO.update(filter_by={'major_name': major.major_name},
                                   major_description=major.major_description)
    if check:
        return {"message": "Описание факультета успешно обновлено!", "major": major}
    else:
        return {"message": "Ошибка при обновлении описания факультета!"}

Обратите внимание на синтаксис. Для распаковки нам не обязательно принимать словарь. В некоторых случаях, как в описанном выше (major_description=major.major_description), достаточно просто передать нужное значение для обновления.

Проверим.

Мы видим что обновление прошло успешно. А это значит, что остается разобраться с DELETE.

Уже по сложившейся традиции напишем универсальный метод, но теперь уже под удаление.

Добавим новый метод в файл dao/base.py

@classmethod
async def delete(cls, delete_all: bool = False, **filter_by):
    if not delete_all and not filter_by:
        raise ValueError("Необходимо указать хотя бы один параметр для удаления.")

    async with async_session_maker() as session:
        async with session.begin():
            query = sqlalchemy_delete(cls.model).filter_by(**filter_by)
            result = await session.execute(query)
            try:
                await session.commit()
            except SQLAlchemyError as e:
                await session.rollback()
                raise e
            return result.rowcount

Этот метод асинхронно удаляет записи из базы данных. Вот его краткое описание:

  1. Параметры:

delete_all: Если True, удаляются все записи. Если False, удаляются только те записи, которые соответствуют указанным фильтрам.

filter_by: Ключевые параметры для фильтрации записей, которые нужно удалить (игнорируются, если delete_all = True).

  1. Логика:

    • Если delete_all = False и filter_by пустой, выбрасывается исключение.

    • Открывается асинхронная сессия для взаимодействия с базой данных.

    • Начинается транзакция.

    • Формируется и выполняется запрос на удаление записей по указанным условиям.

    • Если выполнение успешно, изменения фиксируются, иначе транзакция откатывается.

    • Возвращается количество удалённых строк.

Краткое описание:

Метод delete удаляет записи из базы данных, фильтруя их по заданным параметрам, или удаляет все записи, если указано delete_all=True. Возвращает количество удалённых строк.

Давайте под данный метод опишем простой обработчик DELETE запросов. Смысл будет в том, чтоб удалить конкретную запись из таблицы majors, принимая id записи.

Так как параметр всего один — отдельную модель под него писать не будем, а сам запрос оформим через параметр пути.

@router.delete("/delete/{major_id}")
async def delete_major(major_id: int) -> dict:
    check = await MajorsDAO.delete(id=major_id)
    if check:
        return {"message": f"Факультет с ID {major_id} удален!"}
    else:
        return {"message": "Ошибка при удалении факультета!"}

Все у нас отработало и факультет удалился.

Теперь отдельно рассмотрим добавление студентов.

Дело тут в том, что при добавлении студента у нас должен обновляться счетчик студентов в таблице факультетов (увеличиваться на 1), а при удалении студента счетчик должен снижаться на 1.

Напишем метод для добавления студентов (students/dao.py)

@classmethod
async def add_student(cls, student_data: dict):
    async with async_session_maker() as session:
        async with session.begin():
            # Вставка нового студента
            stmt = insert(cls.model).values(**student_data).returning(cls.model.id, cls.model.major_id)
            result = await session.execute(stmt)
            new_student_id, major_id = result.fetchone()

            # Увеличение счетчика студентов в таблице Major
            update_major = (
                update(Major)
                .where(Major.id == major_id)
                .values(count_students=Major.count_students + 1)
            )
            await session.execute(update_major)

            try:
                await session.commit()
            except SQLAlchemyError as e:
                await session.rollback()
                raise e

            return new_student_id

Объяснение:

Метод add_student:

  • Принимает словарь student_data, содержащий данные нового студента.

  • Открывает асинхронную сессию для взаимодействия с базой данных.

  • Вставляет нового студента в таблицу students, возвращая ID нового студента и ID факультета (major_id).

  • Формирует и выполняет запрос для увеличения счетчика студентов (student_count) в таблице major на 1 для соответствующего факультета.

  • Сохраняет изменения в базе данных.

  • В случае ошибки откатывает транзакцию и выбрасывает исключение.

  • Возвращает ID нового студента.

Таким образом, этот метод добавляет нового студента в таблицу students и увеличивает счетчик студентов в связанной таблице majors.

Есть более оптимальные подходы для обновления данных в связанных таблицах и один из таких вариантов мы сейчас рассмотрим.

В файл students/dao.py добавим следующую конструкцию:

from sqlalchemy import update, delete, event


@event.listens_for(Student, 'after_insert')
def receive_after_insert(mapper, connection, target):
    major_id = target.major_id
    connection.execute(
        update(Major)
        .where(Major.id == major_id)
        .values(count_students=Major.count_students + 1)
    )

Тем самым мы создали событие after_insert для модели Student, которое автоматически обновляет счетчик студентов в таблице Major после добавления нового студента.

Теперь изменим сам метод по добавлению студентов:

@classmethod
async def add_student(cls, **student_data: dict):
    async with async_session_maker() as session:
        async with session.begin():
            new_student = Student(**student_data)
            session.add(new_student)
            await session.flush()
            new_student_id = new_student.id
            await session.commit()
            return new_student_id

Код стал заметно лаконичнее и более читаемым. Саму тему событий мы, возможно, когда-то подробно рассмотрим отдельно в серии публикаций про Sqlalchemy 2.0

Опишем Pydantic модель:

class SStudentAdd(BaseModel):
    phone_number: str = Field(..., description="Номер телефона в международном формате, начинающийся с '+'")
    first_name: str = Field(..., min_length=1, max_length=50, description="Имя студента, от 1 до 50 символов")
    last_name: str = Field(..., min_length=1, max_length=50,
                           description="Фамилия студента, от 1 до 50 символов")
    date_of_birth: date = Field(..., description="Дата рождения студента в формате ГГГГ-ММ-ДД")
    email: EmailStr = Field(..., description="Электронная почта студента")
    address: str = Field(..., min_length=10, max_length=200,
                         description="Адрес студента, не более 200 символов")
    enrollment_year: int = Field(..., ge=2002, description="Год поступления должен быть не меньше 2002")
    major_id: int = Field(..., ge=1, description="ID специальности студента")
    course: int = Field(..., ge=1, le=5, description="Курс должен быть в диапазоне от 1 до 5")
    special_notes: Optional[str] = Field(None, max_length=500,
                                         description="Дополнительные заметки, не более 500 символов")

    @field_validator("phone_number")
    @classmethod
    def validate_phone_number(cls, values: str) -> str:
        if not re.match(r'^\+\d{1,15}$', values):
            raise ValueError('Номер телефона должен начинаться с "+" и содержать от 1 до 15 цифр')
        return values

    @field_validator("date_of_birth")
    @classmethod
    def validate_date_of_birth(cls, values: date):
        if values and values >= datetime.now().date():
            raise ValueError('Дата рождения должна быть в прошлом')
        return values

id убрал, так как это значение генирирует наша база данных.

Опишем метод, который будет обрабатывать POST запрос для добавления студента в базу данных. 

@router.post("/add/")
async def add_student(student: SStudentAdd) -> dict:
    check = await StudentDAO.add_student(**student.dict())
    if check:
        return {"message": "Студент успешно добавлен!", "student": student}
    else:
        return {"message": "Ошибка при добавлении студента!"}

Вы можете заметить, что отличий больших с блоком по добавлению факультетов в базу данных нет.

Методы на изменения данных мы можем взять из универсального класса Base, а вот для удаления студента можем добавить отдельный метод уже в класс StudentDAO, но, предварительно, добавим новое событие:

@event.listens_for(Student, 'after_delete')
def receive_after_delete(mapper, connection, target):
    major_id = target.major_id
    connection.execute(
        update(Major)
        .where(Major.id == major_id)
        .values(count_students=Major.count_students - 1)
    )

Как вы поняли из синтаксиса, данное событие будет выполняться после удаления записи из таблицы со студентами.

Теперь напишем отдельный метод для удаления студента по айди

@classmethod
async def delete_student_by_id(cls, student_id: int):
    async with async_session_maker() as session:
        async with session.begin():
            query = select(cls.model).filter_by(id=student_id)
            result = await session.execute(query)
            student_to_delete = result.scalar_one_or_none()

            if not student_to_delete:
                return None

            # Удаляем студента
            await session.execute(
                delete(cls.model).filter_by(id=student_id)
            )

            await session.commit()
            return student_id

Пишем эндпоинт:

@router.delete("/dell/{student_id}")
async def dell_student_by_id(student_id: int) -> dict:
    check = await StudentDAO.delete_student_by_id(student_id=student_id)
    if check:
        return {"message": f"Студент с ID {student_id} удален!"}
    else:
        return {"message": "Ошибка при удалении студента!"}

Далее, настоятельно рекомендую вам написать прочие методы для обновления, добавления, удаления и прочих действий с базой данных через ваше API самостоятельно. В идеале вам самим добавить ещё и свою таблицу, например, с расписанием.

Я, со своей стороны, добавлю «за кадром» эти методы. Так же я добавлю некоторые универсальные методы по работе с базой данных и дам их описание (формат документации). Это, как и прочий код, который я использовал в проекте, вы найдете в моем телеграмм канале «Легкий путь в Python».

 У нас большая проблема

На данный момент вы понимаете как выстраивать архитектуру в базе данных, как описывать методы для обработки POST, GET, PUT и DELETE запросов. Вы, если немного покопаетесь в документации и попрактикуетесь, сможете создавать свои собственные многоуровневые и сложные запросы, оптимизировав их, но все это теряет смысл без одного важного момента. Догадались?

Абсолютно верно – авторизация и аутентификация. Для того, чтоб все работало корректно необходимо в своем API добавлять уровни пользователей.

Что примечательно, авторизация и аутентификация закрывают не только проблему уровней доступа, но и позволяет системе понимать, что вот сейчас авторизован студент с id 4 и что нужно выводить информацию именно по нему, а не по кому-то другому.

В таком случаем пользователю будет достаточно нажать на кнопку «Мой профиль», а система уже подгрузит все данные о нем, которые ему доступны.

Другому же пользователю, например, администратору, система даст ещё и особый функционал для управления базой данных.

К примеру вы, так как являетесь разработчиком своего API будете иметь уровень супер-админа. У вас будет опция смотреть всех студентов, опция добавления факультетов, удаления, изменения любых данных и прочее.

Другой же пользователь, например преподаватель с факультета информатики, сможет только просматривать студентов своего факультета и, например, сможет устанавливать оценки или пометки в стиле «прогулял».

И, как вы уже догадались, этим мы займемся уже в следующей публикации. Там, кроме аввторизации и аутентификация мы рассмотрим тему JWT-токенов.

Заключение

Друзья, мы официально завершили знакомство с фреймворком FastAPI. Надеюсь, что мне удалось заложить в вас, если не прочный фундамент, то хотя бы общее понимание того, как работает API и как его пишут с использованием FastAPI.

Важно отметить, что многие аспекты были намеренно упрощены, и все, что было описано в предыдущих статьях и этой статье, следует воспринимать как демонстрацию возможностей! Нам предстоит еще многое обсудить, и при вашей поддержке мы обязательно продолжим наши исследования.

Особую благодарность хочу выразить каждому, кто поддержал мои публикации. Произошло приятное событие: на канале «Легкий путь в Python» уже более 100 участников.

Не секрет, что написание таких статей занимает много времени и усилий. Друзья, я надеюсь, что эта статья получит большой и позитивный отклик.

На этом пока всё. Спасибо за внимание и до новых встреч!

Комментарии (12)


  1. yesworldd
    12.07.2024 11:24

        @classmethod
        async def find_all(cls):
            async with async_session_maker() as session:
                query = select(cls.model)
                result = await session.execute(query)
                return result.scalars().all()

    Автору прежде чем писать гайды, надо почитать документацию, почему не надо создавать сессию в самом DAO
    https://docs.sqlalchemy.org/en/20/orm/session_basics.html


    1. yakvenalex Автор
      12.07.2024 11:24

      Да и я тут её и не создавал как-бы. Вы же знаете что такое async_session_maker() и откуда он тянется? Наверняка читали прошлые статьи?


      1. yesworldd
        12.07.2024 11:24
        +1

        async with async_session_maker() не должно быть в DAO, он должен передаваться как аргумент в функции или же в конструкторе


  1. yesworldd
    12.07.2024 11:24

        def __str__(self):
            return f"{self.__class__.__name__}(id={self.id}, major_name={self.major_name!r})"
    
        def __repr__(self):
            return str(self)

    Ужас... Можно же просто переопределить метод repr


    1. yakvenalex Автор
      12.07.2024 11:24

      Пожалуйста, указывайте лучший код с комментариями. Чтоб было всем понятно о чем речь. Благодарю


      1. yesworldd
        12.07.2024 11:24
        +1

        def __repr__(self):
          return f"{self.__class__.__name__}(id={self.id}, major_name={self.major_name!r})"

        Все, больше ничего не надо делать. Если метод dunder str не определен, то питон автоматом вызывает dunder repr метод.
        А прежде чем писать такие статьи, стоит почитать документацию питона. Вы же понимаете что многие люди будут ваши статьи, особенно новички, потом нахватаются от вас плохого кода. Благодарю


        1. yakvenalex Автор
          12.07.2024 11:24

          Я посчитал, что такая запись сложная для новичов, но за код благодарю. Уверен, что он кому-то пригодится. Прошу смотреть на серию данных публикаций, как обучение для новичков. Об этом, кстати, я неоднократно указывал в самом тексте.


  1. IvanZaycev0717
    12.07.2024 11:24
    +3

      @classmethod
        async def find_all(cls):
            async with async_session_maker() as session:
                query = select(cls.model)
                result = await session.execute(query)
                return result.scalars().all()

    Хорошо, что вы попробовали и не побоялись выложить, учитывая какие софт-скиллы у многих комментаторов на Хабре. Можно сделать лучше. Приведу примеры для синхронной и асинхронной алхимии, как сделать лучше

    ПОРАБОТАЕМ В СИНХРОННОЙ АЛХИМИИ

    В обычной синхронной алхимии используется, как правило, либо execute для выполнения произвольных SQL-запросов и возвращает объект ResultProxy, либо scalars для выполнения запросов, которые возвращают ровно один столбец и одну строку, и возвращает значение этой единственной ячейки напрямую. То есть если бы это была обычная синхронная сессия, такой синтаксис был бы далеко не самым лучшим решиением.

    Код для синхронной алхимии:

    # импортируем то, что нужно
    from sqlalchemy import create_engine
    from sqlalchemy.orm import sessionmaker
    
    # где-то ранее создаем движок и сессию
    engine = create_engine(os.environ.get('DATABASE_URL'))
    Session = sessionmaker(engine)
    
    # теперь работаем в классе
    @classmethod
    def find_all(cls):
        with Session() as session:
            query = select(cls.model)
            return session.scalars(query)

    У scalars не нужно писать all() - оно по умолчанию установлено

    ТЕПЕРЬ ПОГОВОРИМ ОБ АСИНХРОННОЙ АЛХИМИИ

    Как верно было замечено выше надо создать отдельную асинхронную сессию. У движка дожен быть асинхронный драйвер, например, asyncpg для PostgreSQL

    # импортируем все необходимое для работы
    from sqlalchemy.ext.asyncio import create_async_engine, async_sessionmaker
    
    # создаем движок и сессию
    engine = create_async_engine(os.environ.get('DATABASE_URL'))
    Session = async_sessionmaker(engine, expire_on_commit=False)
    
    # теперь работаем в классе
    @classmethod
    async def find_all(cls):
        async with Session() as session:
            query = select(cls.model)
            return await session.stream_scalars(query)

    Метод session.stream_scalars(query) обеспечивают возврат асинхронной версии объекта, который поддерживает протокол асинхронной итерации Python.То есть для его итерации вы будете использовать не простой цикл for, а асинхронный async for . Он позволяет выполнять асинхронные операции в процессе итерации. Итерации выполняются в асинхронном контексте, что позволяет эффективно использовать время CPU при ожидании завершения асинхронных операций вместо блокировки процесса.

    И на последок небольшую психологическую поддержку Вам хочу оказать: продолжайте дальше писать статьи, вы работаете с довольно сложными вещами. И официальный туториал по SQLAlchemy написан, слабо говоря, не лучшим образом. Книга только одна вменяемая по этой теме - вот эта. Ну а к хамству и грубости надо просто привыкнуть - как сказал не последний человек в мире Python Никита Соболев, что в России все отлично с хардами, но все ужасно с софтами. Грубость в комментариях к вам никакого отношения не имеет, этих людей так с детсва воспитало наше общество


    1. yakvenalex Автор
      12.07.2024 11:24

      Благодарю за обратную связь. Правда. Очень радует, что есть такие комментаторы как вы. Благодарю и за примеры качественного кода. Обязательно внедрю его в будущие статьи.


    1. ValeryIvanov
      12.07.2024 11:24
      +2

      У scalars не нужно писать all() - оно по умолчанию установлено

      Это не так. result.scalars() привязан к соединению из пула и к сессии, result.scalars().all() нет. В теории, это может вызвать проблемы при формировании ответа на запрос, ведь сессия в данный момент уже будет закрыта.

      Метод session.stream_scalars(query) обеспечивают возврат асинхронной версии объекта, который поддерживает протокол асинхронной итерации Python.

      stream_scalars по большей части нужен для работы с большим объёмом данных. Если данных немного, то лучше сразу забрать их все через scalars().all().

      То есть для его итерации вы будете использовать не простой цикл for, а асинхронный async for . Он позволяет выполнять асинхронные операции в процессе итерации.

      async for служит для других целей. С синхронным итератором вы тоже можете выполнять асинхронные операции в процессе итерации.


  1. ndrwK
    12.07.2024 11:24
    +2

    У вас в разных pydantic классах используется синтаксис версий 1 и 2 (декораторы validator и field_validator). Тут либо одно, либо другое.

    https://docs.pydantic.dev/latest/migration/


    1. yakvenalex Автор
      12.07.2024 11:24

      Проблема с набитым опытом работы с первой версей. Замечание принято и в будующих статьях переберусь на чистый field_validator. Спасибо за обратную связь)