Друзья, приветствую!
Надеюсь, вы ждали выхода третьей статьи из серии «Асинхронный SQLAlchemy 2». Напоминаю, что на Хабре уже можно найти мои предыдущие статьи:
Ознакомьтесь с ними перед изучением данного материала, так как я буду опираться на предыдущие наработки.
Чем займемся сегодня?
В этой статье мы сделаем значительный шаг вперед в освоении асинхронного SQLAlchemy 2.
Программа действий:
Оптимизация кода: усовершенствуем базовый класс (
BaseDao
) для работы с таблицами и декоратор для генерации сессий, сделав их более гибкими и эффективными.Обновление данных: научимся выполнять одиночные и массовые обновления записей в таблицах.
Удаление данных: освоим методы удаления отдельных записей и групп данных.
Асинхронный подход: все операции будут выполняться асинхронно, что позволит приложениям работать быстрее и эффективнее.
Материала будет много, так что запасаемся терпением, открываем IDE и начинаем кодить!
Улучшение кода и реорганизация проекта
Сразу начнем с улучшения и реорганизации кода.
Переместим файл
database.py
в пакетdao
.В пакете
dao
создадим файлsession_maker.py
.Перенесем декоратор
connection
в файлsession_maker.py
.
Вот, как выглядел старый код декоратора:
def connection(method):
async def wrapper(*args, **kwargs):
async with async_session_maker() as session:
try:
# Явно не открываем транзакции, так как они уже есть в контексте
return await method(*args, session=session, **kwargs)
except Exception as e:
await session.rollback() # Откатываем сессию при ошибке
raise e # Поднимаем исключение дальше
finally:
await session.close() # Закрываем сессию
return wrapper
Основная цель этого декоратора — избежать ручного создания сессии каждый раз. Мы его немного переработаем и добавим пояснения.
Новый декоратор connection (файл dao/session_maker.py):
from functools import wraps
from typing import Optional
from sqlalchemy import text
from dao.database import async_session_maker
def connection(self, isolation_level: Optional[str] = None, commit: bool = True):
"""
Декоратор для управления сессией с возможностью настройки уровня изоляции и коммита.
Параметры:
- `isolation_level`: уровень изоляции для транзакции (например, "SERIALIZABLE").
- `commit`: если `True`, выполняется коммит после вызова метода.
"""
def decorator(method):
@wraps(method)
async def wrapper(*args, **kwargs):
async with self.session_maker() as session:
try:
if isolation_level:
await session.execute(text(f"SET TRANSACTION ISOLATION LEVEL {isolation_level}"))
result = await method(*args, session=session, **kwargs)
if commit:
await session.commit()
return result
except Exception as e:
await session.rollback()
raise
finally:
await session.close()
return wrapper
return decorator
Основные улучшения нового декоратора:
-
Управление уровнем изоляции
Теперь можно выбирать подходящий уровень изоляции для разных операций:
READ COMMITTED — для обычных запросов (по умолчанию в PostgreSQL).
SERIALIZABLE — для финансовых операций, требующих максимальной надежности.
REPEATABLE READ — для отчетов и аналитики.
-
Управление коммитами
Гибкое управление коммитами полезно в следующих случаях:
Объединение нескольких операций в одну транзакцию.
Дополнительная логика до коммита.
Коммит выполняется в другом месте кода.
При чтении данных, где коммит не требуется.
-
Улучшенная обработка результатов
result = await method(*args, session=session, **kwargs) if commit: await session.commit() return result
Сравнение с предыдущей версией:
Гибкость: была одинаковая для всех операций, теперь можно настроить под каждую задачу.
Безопасность: отсутствовал контроль изоляции, теперь гарантирован уровень изоляции для критичных операций.
Контроль транзакций: не было управления коммитами, теперь есть гибкие настройки для различных сценариев.
Примеры использования:
# Чтение данных
@connection(isolation_level="READ COMMITTED")
async def get_user(self, session, user_id: int):
...
# Финансовая операция
@connection(isolation_level="SERIALIZABLE", commit=False)
async def transfer_money(self, session, from_id: int, to_id: int):
...
Новый декоратор существенно повышает гибкость и безопасность работы с БД, позволяя точно настраивать поведение транзакций под конкретные задачи.
В сегодняшней статье мы будем использовать подход декоратора, но в будущем мы создадим универсальный класс, который сможет генерировать методы для создания сессии:
В виде декоратора (как в текущей реализации).
В виде зависимостей для FastAPI.
Этот класс можно описать всего один раз, а затем использовать в самых разных проектах, где требуется работа с базами данных: от простых Telegram-ботов до сложных FastApi-сервисов.
В моем Telegram‑канале «Легкий путь в Python» вы можете найти готовую заготовку для разработки приложений на FastAPI, которая включает:
Классовую реализацию
session_maker
.Расширенный класс
BaseDao
с богатым функционалом.Модуль для авторизации и аутентификации.
Улучшаем BaseDao
Мы вынесли коммиты в декоратор. Следовательно, теперь нет необходимости использовать commit
в методах класса. Давайте заменим все await session.commit()
на await session.flush()
.
В прошлой статье мы подробно рассматривали, зачем нужен метод flush()
и как он работает. Если кратко, то, в отличие от commit()
, flush()
выполняет временное сохранение в базе данных. С учетом того, что commit()
мы вынесли на уровень декоратора, использование flush()
в некоторых сценариях может дать значительный прирост производительности.
Кроме того, если в рамках ваших функций необходимо выполнять несколько связанных фиксаций в базе данных за пределами базового класса BaseDao
, вы также можете использовать flush()
.
Наша реализация декоратора теперь позволяет выполнять commit()
(сохранение изменений в базе данных) автоматически. Это создает ситуацию, при которой вы можете выполнить множество временных фиксаций через flush()
быстро и оптимально, а затем не переживать о том, что изменения будут сохранены.
Надеюсь, что объяснение было понятным.
Это не все изменения, которые мы сегодня внесем в код. Следующее важное изменение — отказ от использования **values для передачи параметров в методы.
Текущая реализация, хоть и удобная, гибкая и быстрая, может привести к множеству проблем, таким как передача несуществующих параметров или неверных ключей, что потенциально может вызвать серьезные ошибки и проблемы.
Для улучшения класса в этой части нам поможет Pydantic. Напоминаю, что в прошлой статье мы учились его использовать для получения данных. Кроме того, в моем блоге на Хабре вы найдете подробную статью про Pydantic 2, которая имеет название "Pydantic 2: Полное руководство для Python-разработчиков — от основ до продвинутых техник". Желательно ознакомиться с ней перед тем, как продолжить чтение.
Для примера, внесем изменения в метод find_one_or_none
и разберем их на практике:
@classmethod
async def find_one_or_none(cls, session: AsyncSession, filters: BaseModel):
# Найти одну запись по фильтрам
filter_dict = filters.model_dump(exclude_unset=True)
try:
query = select(cls.model).filter_by(**filter_dict)
result = await session.execute(query)
record = result.scalar_one_or_none()
return record
except SQLAlchemyError as e:
raise
Вместо передачи фильтров напрямую через **kwargs
, теперь метод ожидает объект класса BaseModel
из Pydantic.
Затем мы преобразуем объект класса BaseModel
в обычный питоновский словарь с помощью метода model_dump
. Параметр exclude_unset=True
гарантирует, что в словарь попадут только явно заданные поля.
Благодаря флагу exclude_unset=True
исключаются поля со значениями по умолчанию, которые не были явно установлены. Это позволяет создавать более гибкие и точные запросы, учитывая только те фильтры, которые действительно были заданы пользователем.
Далее логика работы остается как в старой реализации. Полученный словарь фильтров применяется к запросу с помощью filter_by(**filter_dict)
, и мы получаем либо нужное значение, либо None
.
Динамические и обычные модели в Pydantic
Перед началом тестирования нового подхода в методах хочу поделиться трюком, который позволяет «на лету» формировать простые модели Pydantic. Это будет полезно, когда не требуется глубокая валидация и описание полей.
Для этой цели в Pydantic есть функция create_model
. Она позволяет описывать модели, передавая в них простое описание полей. Вот простой пример применения:
from pydantic import create_model
MyModel = create_model('DynamicModel',
name=(str, ...),
age=(int, 42))
instance = MyModel(name="test")
print(instance.dict())
print(MyModel.__name__) # Выведет 'DynamicModel'
MyModel
— переменная, в которой сохраняется созданная модель.DynamicModel
(первый аргумент) — строка, задающая имя модели для внутреннего представления и вывода информации о ней.
Это разделение позволяет создавать модели с одинаковыми именами, но разными структурами в разных частях кода, избегая конфликтов имен.
Практическое применение create_model
Ранее у нас был пример старого подхода к методу find_one_or_none
:
@connection
async def select_full_user_info_email(session, user_id, email):
rez = await UserDAO.find_one_or_none(session=session, id=user_id, email=email)
if rez:
return UserPydantic.from_orm(rez).dict()
return {'message': f'Пользователь с ID {user_id} не найден!'}
Теперь у нас многое изменилось. Во-первых, наш декоратор стал фабрикой декораторов, поэтому его нужно использовать со скобками, и в данном контексте можно передать commit=False
, так как мы просто получаем данные из базы.
Метод find_one_or_none
больше не принимает kwargs
. Теперь нам нужна модель Pydantic, в которую мы передадим корректные значения.
Кроме того, для преобразования данных из SQLAlchemy ORM в модель Pydantic я использовал методы from_orm()
и dict()
для создания словаря. В новой версии Pydantic 2 эти методы были переименованы, хотя их прежние названия пока остаются поддерживаемыми:
from_orm()
теперь называетсяmodel_validate()
dict()
заменён наmodel_dump()
Вот итоговая реализация:
@connection(commit=False)
async def select_full_user_info_email(session: AsyncSession, user_id: int, email: str):
FilterModel = create_model(
'FilterModel',
id=(int, ...),
email=(EmailStr, ...)
)
user = await UserDAO.find_one_or_none(session=session, filters=FilterModel(id=user_id, email=email))
if user:
# Преобразуем ORM-модель в Pydantic-модель и затем в словарь
return UserPydantic.model_validate(user).model_dump()
return {'message': f'Пользователь с ID {user_id} не найден!'}
Также можно описать модель фильтра следующим образом:
class FilterModel(BaseModel):
id: int
email: EmailStr
Этот вариант работал бы аналогично.
В итоге, кода стало немного больше, но это дает ясность для вас и других разработчиков, какие параметры нужно передавать и чего ожидать. Поэтому настоятельно рекомендую внедрять Pydantic в свои проекты.
Добавление дженериков
Прежде чем перепишем оставшиеся методы BaseDAO
, давайте сделаем еще одно улучшение в нашем базовом классе с методами, а именно, добавим дженерики.
Сначала пропишем код, а после будем разбираться.
from typing import Generic, TypeVar, List
from pydantic import BaseModel
from sqlalchemy import select
from sqlalchemy.exc import SQLAlchemyError
from sqlalchemy.ext.asyncio import AsyncSession
from dao.database import Base
# Объявляем типовой параметр T с ограничением, что это наследник Base
T = TypeVar("T", bound=Base)
class BaseDAO(Generic[T]):
model: type[T]
Это изменение связано с улучшением типизации и использованием дженериков в Python.
Давайте разберем ключевые аспекты:
Generic[T]:
Делает класс
BaseDAO
дженериком.Позволяет указать конкретный тип модели при наследовании.
T = TypeVar("T", bound=Base):
Создает типовую переменную T.
Ограничивает T только наследниками класса
Base
(SQLAlchemy модели).
model: type[T]:
Указывает, что атрибут
model
будет типом, являющимся подклассом T.
Преимущества:
Улучшенная статическая типизация.
Более точные подсказки IDE.
Возможность использовать специфичные методы модели в дочерних DAO.
Предотвращение ошибок при работе с неправильными типами моделей.
Это изменение делает код более надежным и удобным для разработки, особенно в больших проектах с множеством моделей и DAO.
Изменения теперь касаются и дочерних классов. Теперь мы не просто наследуемся от BaseDAO
, а можем указывать конкретные модели, с которыми работает дочерний класс.
Например, было:
class UserDAO(BaseDAO):
model = User
Стало:
class UserDAO(BaseDAO[User]):
model = User
Преимущества такого подхода:
Статическая типизация: IDE и инструменты статического анализа кода смогут правильно определить тип модели, с которой работает DAO.
Безопасность типов: Вы получите предупреждения или ошибки, если попытаетесь использовать методы с неправильными типами.
Улучшенные подсказки: IDE сможет предоставлять более точные подсказки при работе с методами DAO.
Теперь мы можем переписать все методы BaseDAO
под новый формат и двигаться дальше.
from typing import Generic, TypeVar, List
from pydantic import BaseModel
from sqlalchemy import select
from sqlalchemy.exc import SQLAlchemyError
from sqlalchemy.ext.asyncio import AsyncSession
from dao.database import Base
# Объявляем типовой параметр T с ограничением, что это наследник Base
T = TypeVar("T", bound=Base)
class BaseDAO(Generic[T]):
model: type[T]
@classmethod
async def find_one_or_none_by_id(cls, data_id: int, session: AsyncSession):
# Найти запись по ID
try:
query = select(cls.model).filter_by(id=data_id)
result = await session.execute(query)
record = result.scalar_one_or_none()
return record
except SQLAlchemyError as e:
raise
@classmethod
async def find_one_or_none(cls, session: AsyncSession, filters: BaseModel):
# Найти одну запись по фильтрам
filter_dict = filters.model_dump(exclude_unset=True)
try:
query = select(cls.model).filter_by(**filter_dict)
result = await session.execute(query)
record = result.scalar_one_or_none()
return record
except SQLAlchemyError as e:
raise
@classmethod
async def find_all(cls, session: AsyncSession, filters: BaseModel | None):
if filters:
filter_dict = filters.model_dump(exclude_unset=True)
else:
filter_dict = {}
try:
query = select(cls.model).filter_by(**filter_dict)
result = await session.execute(query)
records = result.scalars().all()
return records
except SQLAlchemyError as e:
raise
@classmethod
async def add(cls, session: AsyncSession, values: BaseModel):
# Добавить одну запись
values_dict = values.model_dump(exclude_unset=True)
new_instance = cls.model(**values_dict)
session.add(new_instance)
try:
await session.flush()
except SQLAlchemyError as e:
await session.rollback()
raise e
return new_instance
@classmethod
async def add_many(cls, session: AsyncSession, instances: List[BaseModel]):
# Добавить несколько записей
values_list = [item.model_dump(exclude_unset=True) for item in instances]
new_instances = [cls.model(**values) for values in values_list]
session.add_all(new_instances)
try:
await session.flush()
except SQLAlchemyError as e:
await session.rollback()
raise e
return new_instances
Как видите, изменения не особо глобальные. Тут мы просто добавили больше порядка, безопасности и читаемости кода.
Обновление данных через SQLAlchemy
К этому моменту у вас должны быть:
База данных с таблицами и заполненными строками в PostgreSQL
Описаны модели таблиц
Создан декоратор для генерации сессий
Создан базовый и дочерний классы для работы с информацией в базе данных (в базовом классе должны быть прописаны методы для добавления и получения данных из таблиц).
Если это так, то мы можем приступить к методу update
.
Что касается обновлений данных, то в SQLAlchemy можно выделить два основных подхода:
Обновление отдельного объекта
Использование метода
update()
Обновление отдельного объекта
Этот метод использует паттерн Unit of Work и идеально подходит для обновления одной записи:
async def update_user(session: AsyncSession, user_id: int, new_name: str):
user = await session.get(User, user_id)
if user:
user.name = new_name
await session.commit()
return user
return None
В этом подходе:
Мы извлекаем объект из базы данных
Изменяем его атрибуты
SQLAlchemy отслеживает изменения
При вызове
session.commit()
изменения сохраняются в базе
Синхронный метод get
Обратите внимание, что я использовал метод get
для получения данных. В прошлой статье я ввел вас в небольшое заблуждение: в последней версии SQLAlchemy этот метод стал асинхронным, тогда как раньше он был только синхронным.
Суть метода get
в том, что он принимает модель таблицы и идентификатор PrimaryKey
. Примечательно, что неважно, как называется колонка с PrimaryKey
— get
автоматически подставит имя нужной колонки.
Пример использования метода get
:
@connection(commit=False)
async def get_select(session: AsyncSession, user_id: int):
user = await session.get(User, user_id)
print(UserPydantic.model_validate(user).model_dump())
asyncio.run(get_select(user_id=21))
Результат:
{
"username": "bob_smith",
"email": "bob.smith@example.com",
"profile": {
"first_name": "Bob",
"last_name": "Smith",
"age": 25,
"gender": "мужчина",
"profession": "дизайнер",
"interests": ["gaming", "photography", "traveling"],
"contacts": {
"phone": "+987654321",
"email": "bob.smith@example.com"
}
}
}
Теперь можно изменить метод базового класса BaseDao
для получения информации о пользователе или None
по ID.
Было:
@classmethod
async def find_one_or_none_by_id(cls, data_id: int, session: AsyncSession):
# Найти запись по ID
try:
query = select(cls.model).filter_by(id=data_id)
result = await session.execute(query)
record = result.scalar_one_or_none()
return record
except SQLAlchemyError as e:
raise
Стало:
@classmethod
async def find_one_or_none_by_id(cls, data_id: int, session: AsyncSession):
# Найти запись по ID
try:
return await session.get(cls.model, data_id)
except SQLAlchemyError as e:
print(f"Error occurred: {e}")
raise
Проверка:
@connection(commit=False)
async def get_select(session: AsyncSession, user_id: int):
user = await UserDAO.find_one_or_none_by_id(session=session, data_id=user_id)
print(UserPydantic.model_validate(user).model_dump())
asyncio.run(get_select(user_id=21))
Результат остаётся таким же.
Использование метода update()
Этот метод эффективен для массового обновления или когда нам не нужно загружать объект в память:
from sqlalchemy import update
async def update_users_status(session: AsyncSession, age: int, new_status: str):
stmt = (
update(User)
.where(User.age > age)
.values(status=new_status)
)
result = await session.execute(stmt)
await session.commit()
return result.rowcount
Преимущества этого метода:
Более эффективен для массовых обновлений
Не требует предварительной загрузки объектов
Позволяет использовать сложные условия в where
Как мы видим, в этом случае снова необходимо обратиться к ранее изученной теме выборки данных. Вы можете использовать как метод where (другое название filter), так и filter_by. Однако важно помнить, что основной принцип остается неизменным.
Сначала вы должны определить, с какими объектами или одним объектом будет работать SQLAlchemy, а затем выполнить необходимые действия по обновлению.
Универсальные методы для обновления данных
Теперь, когда мы знаем общие подходы к обновлению информации, мы можем прописать универсальные методы в BaseDao
.
Первый метод будет принимать идентификатор и Pydantic
модель с новыми значениями и выполнять обновление.
@classmethod
async def update_one_by_id(cls, session: AsyncSession, data_id: int, values: BaseModel):
values_dict = values.model_dump(exclude_unset=True)
try:
record = await session.get(cls.model, data_id)
for key, value in values_dict.items():
setattr(record, key, value)
await session.flush()
except SQLAlchemyError as e:
print(e)
raise e
Метод update_one_by_id
демонстрирует элегантное применение объектно-ориентированного программирования (ООП) в Python для обновления записей в базе данных. Вот как это работает:
-
Получение объекта записи:
Используя уникальный идентификатор (
data_id
), мы извлекаем соответствующую запись из базы данных.Эта запись представлена в виде объекта модели таблицы, что позволяет работать с ней как с обычным Python-объектом.
-
Обновление атрибутов объекта:
Новые значения, предоставленные через
Pydantic
модель, применяются к атрибутам объекта.Мы используем
setattr()
для динамического обновления каждого атрибута, что обеспечивает гибкость.
-
Фиксация изменений:
После обновления атрибутов вызываем
session.flush()
, синхронизируя изменения объекта с базой данных без полного коммита транзакции.
Этот подход объединяет мощь ORM (Object-Relational Mapping) с гибкостью динамического обновления атрибутов.
Протестируем метод
Создадим файл update_methods_dao.py
и попробуем обновить имя конкретного пользователя:
import asyncio
from pydantic import create_model
from sqlalchemy.ext.asyncio import AsyncSession
from dao.dao import UserDAO
from dao.session_maker import connection
@connection(commit=True)
async def update_username(session: AsyncSession, user_id: int, new_username: str):
ValueModel = create_model('ValueModel', username=(str, ...))
await UserDAO.update_one_by_id(session=session, data_id=user_id, values=ValueModel(username=new_username))
asyncio.run(update_username(user_id=1, new_username='yakvenalexx'))
Метод массовой обработки по переданному ID
@connection(commit=True)
async def update_user(session: AsyncSession, user_id: int, new_username: str, email: int):
ValueModel = create_model('ValueModel', username=(str, ...), email=(EmailStr, ...))
await UserDAO.update_one_by_id(session=session, data_id=user_id, values=ValueModel(username=new_username, email=email))
asyncio.run(update_user(user_id=1, email='mail@mail.ru', new_username='admin'))
Чтобы обновить значение username из таблицы users и возраст из связанной таблицы profiles, нам нужно создать отдельный метод в дочернем классе UserDAO. Этот метод будет явно указывать, где хранятся соответствующие переменные. В остальном логика работы не будет сильно отличаться от универсального метода из BaseDAO.
Вот пример кода:
class UserDAO(BaseDAO[User]):
model = User
@classmethod
async def update_username_age_by_id(cls, session: AsyncSession, data_id: int, username: str, age: int):
user = await session.get(cls.model, data_id)
user.username = username
user.profile.age = age
await session.flush()
Массовое обновление через SQLAlchemy update
Пример массового обновления, где выбираются все пользователи с фамилией Smith
, и каждому устанавливается возраст в 22 года:
@connection(commit=True)
async def update_age_mass(session: AsyncSession, new_age: int, last_name: str):
try:
stmt = (
update(Profile)
.filter_by(last_name=last_name)
.values(age=new_age)
)
result = await session.execute(stmt)
updated_count = result.rowcount
print(f'Обновлено {updated_count} записей')
return updated_count
except SQLAlchemyError as e:
print(f"Error updating profiles: {e}")
raise
asyncio.run(update_age_mass(new_age=22, last_name='Smith'))
Универсальный метод для массового обновления
Опираясь на полученные знания, мы можем разработать универсальный метод, который будет использоваться для массового обновления записей в нашем классе BaseDao.
@classmethod
async def update_many(cls, session: AsyncSession, filter_criteria: BaseModel, values: BaseModel):
filter_dict = filter_criteria.model_dump(exclude_unset=True)
values_dict = values.model_dump(exclude_unset=True)
try:
stmt = (
update(cls.model)
.filter_by(**filter_dict)
.values(**values_dict)
)
result = await session.execute(stmt)
await session.flush()
return result.rowcount
except SQLAlchemyError as e:
print(f"Error in mass update: {e}")
raise
Напишем на основании него функцию.
@connection(commit=True)
async def update_age_mass_dao(session: AsyncSession, new_age: int, last_name: str):
filter_criteria = create_model('FilterModel', last_name=(str, ...))
values = create_model('ValuesModel', age=(int, ...))
await ProfileDAO.update_many(session=session,
filter_criteria=filter_criteria(last_name=last_name),
values=values(age=new_age))
asyncio.run(update_age_mass_dao(new_age=33, last_name='Smith'))
Обратите внимание, тут я уже импортировал ProfileDAO. Сама запись получилась достаточно читаемой, а сам код понятный.
Удаление в ООП-стиле
При объектно-ориентированном подходе сначала нужно получить нужный объект из базы данных. Это можно сделать через методы get()
или select()
:
Метод
get()
позволяет быстро найти запись по первичному ключу (например,id
). Это полезно, когда нужно удалить запись, ID которой известен заранее.Метод
select()
позволяет создавать более сложные запросы, например, на основе нескольких полей, чтобы выбрать нужные объекты.
После получения объекта его можно удалить с помощью session.delete(obj)
, а затем подтвердить изменения вызовом commit()
:
# Пример удаления одного объекта
obj = await session.get(User, 1) # Получаем объект по первичному ключу
if obj:
await session.delete(obj) # Удаляем объект
await session.commit() # Подтверждаем удаление
В этом примере, если объект с ID = 1
существует, он будет удален. Такой подход удобен, если перед удалением нужно выполнить проверки или другую логику.
Если вы выбираете несколько объектов через select
, удаление можно выполнить в цикле:
# Получаем объекты, которые нужно удалить
result = await session.execute(select(User).where(User.age > 30))
users_to_delete = result.scalars().all()
# Удаляем объекты в цикле
for user in users_to_delete:
await session.delete(user)
# Подтверждаем изменения
await session.commit()
Этот способ подходит для выборочного удаления, но менее удобен для массового удаления большого числа записей.
Массовое удаление на уровне базы данных
Когда нужно удалить сразу несколько записей, лучше выполнять массовое удаление на стороне базы данных. Такой подход экономит ресурсы, так как позволяет избежать загрузки объектов в память.
Для этого используется метод delete()
на уровне Session
, что позволяет SQLAlchemy отправить один запрос на удаление напрямую в базу данных:
# Пример массового удаления
await session.execute(
delete(User).where(User.age > 30)
)
await session.commit() # Подтверждаем массовое удаление
В этом примере все записи пользователей старше 30 лет удаляются одним запросом. Этот метод эффективен, когда нет необходимости в проверках перед удалением каждого объекта, и важна производительность.
Практика удаления
Теперь закрепим на практике удаление записей с нашей базы данных, а далее добавим универсальные методы для удаления записей с наших таблиц. Для написания функций для удаления записей создадим новый файл delete_methods_dao.py
.
Для начала удалим пользователя с ID 5
. Код будет выглядеть так:
from sqlalchemy.ext.asyncio import AsyncSession
from dao.session_maker import connection
from models import User
@connection(commit=True)
async def delete_user_by_id(session: AsyncSession, user_id: int):
user = await session.get(User, user_id)
if user:
await session.delete(user)
asyncio.run(delete_user_by_id(user_id=5))
Как вы видите, код максимально "питоновский". Строкой user = await session.get(User, user_id)
мы получаем объект пользователя, а далее его удаляем. Commit
тут не прописан, так как он предусмотрен на уровне сессии в декораторе.
Тут сразу вырисовывается универсальный метод для BaseDao
. Давайте его напишем.
@classmethod
async def delete_one_by_id(cls, data_id: int, session: AsyncSession):
# Найти запись по ID
try:
data = await session.get(cls.model, data_id)
if data:
await session.delete(data)
await session.flush()
except SQLAlchemyError as e:
print(f"Error occurred: {e}")
raise
Теперь напишем функцию уже с новым методом:
@connection(commit=True)
async def delete_user_by_id_dao(session: AsyncSession, user_id: int):
await UserDAO.delete_one_by_id(session=session, data_id=user_id)
asyncio.run(delete_user_by_id_dao(user_id=10))
Теперь напишем метод для удаления записей по условию. Тоже пока на уровне обычной функции, которую затем трансформируем в универсальный метод BaseDao
.
Давайте удалим всех пользователей, у которых username
начинается на "ja". Код будет выглядеть так:
@connection(commit=True)
async def delete_user_username_ja(session: AsyncSession, start_letter: str = 'ja'):
stmt = delete(User).where(User.username.like(f"{start_letter}%"))
await session.execute(stmt)
asyncio.run(delete_user_username_ja())
В этом методе сначала мы формируем SQL-запрос для удаления записи с базы данных:
stmt = delete(User).where(User.username.like(f"{start_letter}%"))
Далее, методом session.execute()
мы просто выполняем этот запрос. Подход удобен тем, что он гибкий в связке с where
/ filter_by
, и у нас нет необходимости сохранять данные на стороне приложения перед удалением, как в методе get
, когда мы выполняем запрос await session.delete(obj)
.
Напишем универсальный метод для удаления записей с базы данных, но в его основу положим filter_by
. Сложные фильтры, такие как where
(filter), будем выносить на сторону дочерних классов, когда это будет необходимо.
Метод будет иметь следующий вид:
@classmethod
async def delete_many(cls, session: AsyncSession, filters: BaseModel | None):
if filters:
filter_dict = filters.model_dump(exclude_unset=True)
stmt = delete(cls.model).filter_by(**filter_dict)
else:
stmt = delete(cls.model)
try:
result = await session.execute(stmt)
await session.flush()
return result.rowcount
except SQLAlchemyError as e:
print(f"Error occurred: {e}")
raise
Я сделал метод универсальным и для удаления всех данных из таблицы, так что будьте внимательны. Если, используя его, вы не передадите никаких фильтров (модели Pydantic), то все данные будут удалены.
Проверим.
@connection(commit=True)
async def delete_user_by_password(session: AsyncSession, password: str):
filter_criteria = create_model('FilterModel', password=(str, ...))
await UserDAO.delete_many(session=session, filters=filter_criteria(password=password))
asyncio.run(delete_user_by_password(password='asdasd'))
Этой записью мы удалили всех пользователей с паролем "asdasd".
Для удаления всех данных о пользователях можно было бы написать такую функцию:
@connection(commit=True)
async def delete_all_users(session: AsyncSession):
await UserDAO.delete_many(session=session, filters=None)
Заключение
Сегодня мы подробно разобрали ключевые аспекты работы с SQLAlchemy, сосредоточив внимание на операциях удаления и обновления данных. Я старался донести материал так, чтобы у вас сформировалось не только понимание конкретных действий, но и целостное представление о принципах работы с SQLAlchemy.
Хотя этот фреймворк может показаться многослойным и сложным, SQLAlchemy остаётся гибким и удобным инструментом, который доступен для освоения каждому. Главное — понять основные концепции классов и постепенно углубляться в практическую работу с кодом. Если вы внимательно изучили предыдущие статьи об асинхронном SQLAlchemy 2 и сегодняшнюю, у вас уже есть все необходимые знания для уверенного использования табличных баз данных в Python. Дальнейшее развитие — это лишь вопрос опыта и практики.
Мы обсудили два основных подхода к обновлению и удалению данных:
Объектно‑ориентированный подход, при котором данные загружаются на сторону приложения и обрабатываются как объекты. Этот метод интуитивно понятен и удобен, но может потребовать больше ресурсов.
Прямое выполнение SQL‑запросов на стороне базы данных, что помогает оптимизировать использование ресурсов приложения, особенно при массовых операциях.
Полный исходный код проекта и эксклюзивные материалы доступны в моем Telegram‑канале «Легкий путь в Python», где уже более 1000 единомышленников! А если вам требуется развернуть проект на сервере и доставлять в него обновления тремя командами в IDE, зарегистрируйтесь в Amvera Cloud, и получите 111 руб. на тестирование функционала.
В планах — ещё несколько статей по асинхронной работе с SQLAlchemy. Продолжение этой серии будет зависеть от вашего отклика и интереса.
Спасибо за внимание! До скорых встреч!
Комментарии (3)
TLemur
03.11.2024 10:51Интересно было бы сравнить скорость работы с async и без него именно в случае SQLAlchemy. Существуют ли такие бенчи?
Superman_1
Cпасибо за третью часть по работе с SqlAlchemy, очень ждал её выхода, каждый день проверял на наличие уведомления о ней !)
1) Что по поводу метода merge? О нем вообще ни сказано ни слова было, хотя это достаточно удобный метод для обновления модели по её ID:
async def update(self, entity: TEntity) -> int:
"""
Updates an existing entity with id (!) or creates a new one if it does not exist.
This method uses the SQLAlchemy `merge` function to either update an existing
entity with the same id in the database or create a new entity if no such
entity exists. Return id
Args:
entity (Model): The entity to be updated or created.
Returns:
int: The updated or newly created entity's id.
"""
await self.__session.merge(entity)
await self.__session.commit()
return entity.id
2) Есть вопрос небольшой: у меня есть слой сервисов, который будет получать DTO в зависимости от типа объекта и делать разные манипуляции. Так вот, чтобы обратно смаппить entity из БД в dto из слоя сервисов, нам нужно модели БД также унаследовать от BaseModel от pydantic, верно?