SQLAlchemy 1.4
Прежде чем мы рассмотрим пример, приведем некоторые важные сведения о новом выпуске SQLAlchemy 1.4:
SQLAlchemy 1.4 представляет изменения, которые будут доработаны в SQLAlchemy 2.0.
SQLAlchemy унифицирует API Core и ORM для обеспечения согласованности.
Теперь Core и ORM поддерживают async с asyncio, но эта фича еще недостаточно готова для продакшна. Она также сопровождается некоторыми ограничениями относительно того, что мы можем делать с ORM, в частности, в отношении ленивой загрузки.
Появился новый способ создания запросов, названный "стилем 2.0", поскольку он станет стилем следующей основной версии SQLAlchemy. Спойлер: синтаксис в основном тот же, но запрос не выполняется непосредственно объектом сессии.
Теперь есть поддержка декларативного маппинга с помощью dataclasses и attrs.
В этой заметке я буду использовать новые асинхронные возможности ORM-слоя вместе с новыми запросами в стиле 2.0. Мы создадим простое FastAPI-приложение с двумя маршрутами. Один — для добавления городов и их жителей, а другой — для вывода списка наиболее населенных объектов.
Предварительные условия
Пример предполагает использование Python 3.9 и SQLAlchemy 1.4. Другие зависимости включают FastAPI с uvicorn, asyncpg (клиент базы данных PostgreSQL для asyncio в Python) и typer для создания табличной структуры из командной строки.
На данный момент поддержка async может быть использована только с базой данных PostgreSQL.
Код для этой статьи можно найти в репо stribny/fastapi-asyncalchemy.
Модель
Существуют новые способы объявления моделей SQLAlchemy, однако я буду использовать декларативный стиль описания подклассов, который по-прежнему хорошо работает и менее многословен.
Наша модель City в models.py будет иметь поля id
, name
и population
:
from sqlalchemy import Column
from sqlalchemy import String
from sqlalchemy import Integer
from fastapi_asyncalchemy.db.base import Base
class City(Base):
__tablename__ = "cities"
id = Column(Integer, autoincrement=True, primary_key=True, index=True)
name = Column(String, unique=True)
population = Column(Integer)
Инициализация асинхронного движка и модели
В base.py мы инициализируем новый движок SQLAlchemy с помощью create_async_engine()
и создадим мейкер асинхронной сессии, передав ему новый класс AsyncSession
:
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy.ext.asyncio import AsyncSession
from sqlalchemy.ext.asyncio import create_async_engine
from sqlalchemy.orm import sessionmaker
DATABASE_URL = "postgresql+asyncpg://postgres:postgres@localhost/asyncalchemy"
engine = create_async_engine(DATABASE_URL, echo=True)
Base = declarative_base()
async_session = sessionmaker(
engine, class_=AsyncSession, expire_on_commit=False
)
Указание echo=True
при инициализации движка позволит нам увидеть сгенерированные SQL-запросы в консоли. Мы должны отключить поведение "expire on commit (завершить при фиксации)" для сессий с expire_on_commit=Fal
se. Это связано с тем, что в настройках async мы не хотим, чтобы SQLAlchemy выдавал новые SQL-запросы к базе данных при обращении к уже закоммиченным объектам.
Давайте также определим асинхронную функцию для очистки и воссоздания таблицы базы данных, которую мы будем использовать позже:
async def init_models():
async with engine.begin() as conn:
await conn.run_sync(Base.metadata.drop_all)
await conn.run_sync(Base.metadata.create_all)
Выгрузка и создание таблиц из Base.metadata по умолчанию не выполняются асинхронно, и нам нет смысла это менять. Это просто пример, показывающий, как SQLAlchemy может выполнять синхронные операции с помощью run_sync()
.
Объект асинхронной сессии
Наконец, мы создадим функцию FastAPI Dependency, которая будет формировать для нас будущие сессии по запросу:
# Dependency
async def get_session() -> AsyncSession:
async with async_session() as session:
yield session
С помощью системы зависимостей FastAPI мы можем в дальнейшем использовать эту функцию для добавления новых сессий в наши маршруты.
Сервисный слой и новый стиль запросов
Теперь определим две сервисные (служебные) функции в файле service.py: одну для добавления, а другую для получения городов.
from sqlalchemy import select
from sqlalchemy.ext.asyncio import AsyncSession
from fastapi_asyncalchemy.models import *
async def get_biggest_cities(session: AsyncSession) -> list[City]:
result = await session.execute(select(City).order_by(City.population.desc()).limit(20))
return result.scalars().all()
def add_city(session: AsyncSession, name: str, population: int):
new_city = City(name=name, population=population)
session.add(new_city)
return new_city
В get_biggest_cities
для создания запроса используется новая функция select
. Запрос выглядит так же, как если бы он выполнялся непосредственно на объекте сессии. Но теперь, вместо вызова session().query()
, мы ожидаем session.execute()
, который выполнит запрос и сохранит результаты. Метод scalars()
обеспечивает доступ к результатам.
Функция add_city
просто помещает новый объект City в сессию — мы будем управлять транзакцией в контроллере (маршрут).
Командная строка
Прежде чем мы рассмотрим маршруты, нам нужно создать таблицу. Просто определим функцию в main.py
, которая будет запускать init_models()
в цикле событий. Я использовал Typer для создания команды CLI, хотя, когда определена только одна команда, ее выполнение сводится к вызову python main.py
без дополнительных аргументов.
import asyncio
import typer
cli = typer.Typer()
@cli.command()
def db_init_models():
asyncio.run(init_models())
print("Done")
if __name__ == "__main__":
cli()
Маршруты
Давайте теперь посмотрим на маршруты FastAPI (с CitySchema для представления переданного JSON):
from fastapi import FastAPI
from fastapi import Depends
from sqlalchemy.ext.asyncio import AsyncSession
from pydantic import BaseModel
from sqlalchemy.exc import IntegrityError
from fastapi_asyncalchemy.exceptions import DuplicatedEntryError
from fastapi_asyncalchemy.db.base import init_models
from fastapi_asyncalchemy.db.base import get_session
from fastapi_asyncalchemy import service
app = FastAPI()
class CitySchema(BaseModel):
name: str
population: int
@app.get("/cities/biggest", response_model=list[CitySchema])
async def get_biggest_cities(session: AsyncSession = Depends(get_session)):
cities = await service.get_biggest_cities(session)
return [CitySchema(name=c.name, population=c.population) for c in cities]
@app.post("/cities/")
async def add_city(city: CitySchema, session: AsyncSession = Depends(get_session)):
city = service.add_city(session, city.name, city.population)
try:
await session.commit()
return city
except IntegrityError as ex:
await session.rollback()
raise DuplicatedEntryError("The city is already stored")
Мы видим, что сессия может быть внедрена (инжектирована) с помощью Depends. Таким образом, вызов каждого из маршрутов создаст новую сессию. Для получения данных единственное изменение заключается в том, что теперь мы хотим применить await для нашей сервисной функции.
Чтобы продемонстрировать ситуацию с появлением какого-либо сбоя, я перехватываю ошибку нарушения целостности от уникального ограничения, определенного ранее. Мы просто используем await session.commit()
или await session.rollback()
.
Заключительные слова
Я не являюсь экспертом по асинхронному использованию SQLAlchemy, а просто хотел посмотреть, что потребуется для преобразования стандартного приложения FastAPI с SQLAlchemy в асинхронный формат.
Приглашаем на два открытых занятия для новичков в IT: «Основы ООП в Python» и «Продвинутое ООП и исключения в Python». Подробнее и регистрация — по ссылке.