SQLAlchemy 1.4

Прежде чем мы рассмотрим пример, приведем некоторые важные сведения о новом выпуске SQLAlchemy 1.4:

  • SQLAlchemy 1.4 представляет изменения, которые будут доработаны в SQLAlchemy 2.0.

  • SQLAlchemy унифицирует API Core и ORM для обеспечения согласованности.

  • Теперь Core и ORM поддерживают async с asyncio, но эта фича еще недостаточно готова для продакшна. Она также сопровождается некоторыми ограничениями относительно того, что мы можем делать с ORM, в частности, в отношении ленивой загрузки.

  • Появился новый способ создания запросов, названный "стилем 2.0", поскольку он станет стилем следующей основной версии SQLAlchemy. Спойлер: синтаксис в основном тот же, но запрос не выполняется непосредственно объектом сессии.

  • Теперь есть поддержка декларативного маппинга с помощью dataclasses и attrs.

В этой заметке я буду использовать новые асинхронные возможности ORM-слоя вместе с новыми запросами в стиле 2.0. Мы создадим простое FastAPI-приложение с двумя маршрутами. Один — для добавления городов и их жителей, а другой — для вывода списка наиболее населенных объектов.

Предварительные условия

Пример предполагает использование Python 3.9 и SQLAlchemy 1.4. Другие зависимости включают FastAPI с uvicorn, asyncpg (клиент базы данных PostgreSQL для asyncio в Python) и typer для создания табличной структуры из командной строки.

На данный момент поддержка async может быть использована только с базой данных PostgreSQL.

Код для этой статьи можно найти в репо stribny/fastapi-asyncalchemy.

Модель

Существуют новые способы объявления моделей SQLAlchemy, однако я буду использовать декларативный стиль описания подклассов, который по-прежнему хорошо работает и менее многословен.

Наша модель City в models.py будет иметь поля id, name и population:

from sqlalchemy import Column
from sqlalchemy import String
from sqlalchemy import Integer
from fastapi_asyncalchemy.db.base import Base


class City(Base):
    __tablename__ = "cities"

    id = Column(Integer, autoincrement=True, primary_key=True, index=True)
    name = Column(String, unique=True)
    population = Column(Integer)

Инициализация асинхронного движка и модели

В base.py мы инициализируем новый движок SQLAlchemy с помощью create_async_engine() и создадим мейкер асинхронной сессии, передав ему новый класс AsyncSession:

from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy.ext.asyncio import AsyncSession
from sqlalchemy.ext.asyncio import create_async_engine
from sqlalchemy.orm import sessionmaker


DATABASE_URL = "postgresql+asyncpg://postgres:postgres@localhost/asyncalchemy"


engine = create_async_engine(DATABASE_URL, echo=True)
Base = declarative_base()
async_session = sessionmaker(
    engine, class_=AsyncSession, expire_on_commit=False
)

Указание echo=True при инициализации движка позволит нам увидеть сгенерированные SQL-запросы в консоли. Мы должны отключить поведение "expire on commit (завершить при фиксации)" для сессий с expire_on_commit=False. Это связано с тем, что в настройках async мы не хотим, чтобы SQLAlchemy выдавал новые SQL-запросы к базе данных при обращении к уже закоммиченным объектам.

Давайте также определим асинхронную функцию для очистки и воссоздания таблицы базы данных, которую мы будем использовать позже:

async def init_models():
    async with engine.begin() as conn:
        await conn.run_sync(Base.metadata.drop_all)
        await conn.run_sync(Base.metadata.create_all)

Выгрузка и создание таблиц из Base.metadata по умолчанию не выполняются асинхронно, и нам нет смысла это менять. Это просто пример, показывающий, как SQLAlchemy может выполнять синхронные операции с помощью run_sync().

Объект асинхронной сессии

Наконец, мы создадим функцию FastAPI Dependency, которая будет формировать для нас будущие сессии по запросу:

# Dependency
async def get_session() -> AsyncSession:
    async with async_session() as session:
        yield session

С помощью системы зависимостей FastAPI мы можем в дальнейшем использовать эту функцию для добавления новых сессий в наши маршруты.

Сервисный слой и новый стиль запросов

Теперь определим две сервисные (служебные) функции в файле service.py: одну для добавления, а другую для получения городов.

from sqlalchemy import select
from sqlalchemy.ext.asyncio import AsyncSession
from fastapi_asyncalchemy.models import *


async def get_biggest_cities(session: AsyncSession) -> list[City]:
    result = await session.execute(select(City).order_by(City.population.desc()).limit(20))
    return result.scalars().all()


def add_city(session: AsyncSession, name: str, population: int):
    new_city = City(name=name, population=population)
    session.add(new_city)
    return new_city

В get_biggest_cities для создания запроса используется новая функция select. Запрос выглядит так же, как если бы он выполнялся непосредственно на объекте сессии. Но теперь, вместо вызова session().query(), мы ожидаем session.execute(), который выполнит запрос и сохранит результаты. Метод scalars() обеспечивает доступ к результатам.

Функция add_city просто помещает новый объект City в сессию — мы будем управлять транзакцией в контроллере (маршрут).

Командная строка

Прежде чем мы рассмотрим маршруты, нам нужно создать таблицу. Просто определим функцию в main.py, которая будет запускать init_models() в цикле событий. Я использовал Typer для создания команды CLI, хотя, когда определена только одна команда, ее выполнение сводится к вызову python main.py без дополнительных аргументов.

import asyncio
import typer

cli = typer.Typer()

@cli.command()
def db_init_models():
    asyncio.run(init_models())
    print("Done")


if __name__ == "__main__":
    cli()

Маршруты

Давайте теперь посмотрим на маршруты FastAPI (с CitySchema для представления переданного JSON):

from fastapi import FastAPI
from fastapi import Depends
from sqlalchemy.ext.asyncio import AsyncSession
from pydantic import BaseModel
from sqlalchemy.exc import IntegrityError
from fastapi_asyncalchemy.exceptions import DuplicatedEntryError
from fastapi_asyncalchemy.db.base import init_models
from fastapi_asyncalchemy.db.base import get_session
from fastapi_asyncalchemy import service


app = FastAPI()


class CitySchema(BaseModel):
    name: str
    population: int


@app.get("/cities/biggest", response_model=list[CitySchema])
async def get_biggest_cities(session: AsyncSession = Depends(get_session)):
    cities = await service.get_biggest_cities(session)
    return [CitySchema(name=c.name, population=c.population) for c in cities]


@app.post("/cities/")
async def add_city(city: CitySchema, session: AsyncSession = Depends(get_session)):
    city = service.add_city(session, city.name, city.population)
    try:
        await session.commit()
        return city
    except IntegrityError as ex:
        await session.rollback()
        raise DuplicatedEntryError("The city is already stored")

Мы видим, что сессия может быть внедрена (инжектирована) с помощью Depends. Таким образом, вызов каждого из маршрутов создаст новую сессию. Для получения данных единственное изменение заключается в том, что теперь мы хотим применить await для нашей сервисной функции.

Чтобы продемонстрировать ситуацию с появлением какого-либо сбоя, я перехватываю ошибку нарушения целостности от уникального ограничения, определенного ранее. Мы просто используем await session.commit() или await session.rollback().

Заключительные слова

Я не являюсь экспертом по асинхронному использованию SQLAlchemy, а просто хотел посмотреть, что потребуется для преобразования стандартного приложения FastAPI с SQLAlchemy в асинхронный формат. 


Приглашаем на два открытых занятия для новичков в IT: «Основы ООП в Python» и «Продвинутое ООП и исключения в Python». Подробнее и регистрация — по ссылке.

Комментарии (0)