Приветствую всех! Продолжаем наш цикл статей, посвящённых работе с асинхронной SQLAlchemy 2 в стиле ORM. Если вы ещё не успели ознакомиться с первой статьёй, настоятельно рекомендую сделать это, так как сегодняшний материал основывается на знаниях, изложенных ранее.

Резюме прошлой статьи

В прошлой статье под названием "Асинхронный SQLAlchemy 2: простой пошаговый гайд по настройке, моделям, связям и миграциям с использованием Alembic" мы:

  • Развернули базу данных PostgreSQL.

  • Рассмотрели три различных способа размещения базы данных, выбрав в итоге сервис Amvera Cloud. Напоминаю, что процесс настройки базы с удалённым доступом занял не более двух минут!

  • Ознакомились с основными принципами работы SQLAlchemy и обсудили её значимость для Python-разработчиков.

  • Настроили базу для работы в стиле ORM.

  • Описали модели таблиц и установили связи между ними.

  • Выполнили миграции с использованием Alembic.

Эти шаги заложили прочный фундамент для дальнейшего изучения темы. Перед прочтением этого материала убедитесь, что у вас развернута база PostgreSQL и созданы таблицы users и profiles, так как они понадобятся для практической части.

О чём будем говорить сегодня?

В этой статье детально рассмотрим несколько ключевых тем:

  • Сессии и фабрики сессий: как управлять сессиями для взаимодействия с базой данных и как применять их через декораторы.

  • Добавление данных в таблицы: разберём безопасные методы добавления записей с использованием ORM. Также обсудим метод flush и разницу между ним и commit.

  • Извлечение данных из таблиц: большой блок, в котором научимся извлекать данные через select, используя фильтры (например, where, filter, filter_by). Также обсудим работу с «грязными» данными и преобразование объектов SQLAlchemy в удобные словари Python с помощью Pydantic. В этом блоке разберём и методы SQLAlchemy, такие как scalar, scalars, scalar_one_or_none, all и другие.

После прочтения вы сможете уверенно работать с добавлением и извлечением данных через SQLAlchemy для любых табличных баз данных.

Сессия в контексте SQLAlchemy

Что такое сессия?

Сессия в SQLAlchemy — это основной инструмент для взаимодействия с базой данных. Представьте её как рабочую область, где происходят все операции: добавление, удаление, извлечение, обновление данных. Все запросы к базе данных выполняются через сессию, без неё никакие операции невозможны.

Сессия управляет транзакциями и следит за состоянием объектов, с которыми вы работаете. Она не устанавливает прямого соединения с базой, а абстрагирует этот процесс. Все изменения отправляются в базу данных через метод commit(). В случае ошибки их можно отменить с помощью rollback().

Фабрика сессий

Фабрика сессий — это специальная функция для создания новых сессий по мере необходимости. В SQLAlchemy это реализуется с помощью sessionmaker(). Этот объект создаёт сессии, которые можно использовать для работы с базой данных. В синхронном варианте запись будет такой:

from sqlalchemy.orm import sessionmaker

Session = sessionmaker(bind=engine)
session = Session()

Важный момент: сессии в асинхронном коде не работают автоматически, поэтому для этого необходимо использовать асинхронные версии объектов.

Асинхронная фабрика сессий

Асинхронная фабрика создаёт сессии, которые поддерживают асинхронное выполнение. Это позволяет не блокировать основной поток программы во время работы с базой данных. Для создания асинхронных сессий используется async_sessionmaker. Мы уже заложили основу под асинхронную сессию в прошлой статье, используя следующую запись:

DATABASE_URL = settings.get_db_url()

# Создаем асинхронный движок для работы с базой данных
engine = create_async_engine(url=DATABASE_URL)

# Создаем фабрику сессий для взаимодействия с базой данных
async_session_maker = async_sessionmaker(engine, expire_on_commit=False)

Далее необходимо получить сессию (достать ее из фабрики сессий). Затем выполняется необходимая логика работы с базой данных. После чего сессию необходимо закрывать. Проще всего сделать это используя асинхронный менеджер async with. Работа в рамках одной сессии, в таком случае, будет иметь следующий вид:

async with AsyncSessionLocal() as session:
    # логика работы

То есть, для того чтоб начать взаимодействовать с базой данных (например для получения или добавления туда информации) нам всегда необходимо быть в рамках сессии. Вопрос только в том какое количество операций вы будете выполнять в рамках одной такой сессии, до ее закрытия.

Основные подходы к управлению сессиями:

  • Открытие сессии на каждое действие: для каждого действия с базой данных создаётся новая сессия. Этот подход эффективен для небольших проектов, но на крупных проектах он может привести к дополнительным накладным расходам.

  • Открытие сессии на весь блок операций: сессия создаётся один раз перед серией операций и закрывается по завершению всех действий. Это позволяет объединить несколько запросов в одну сессию, что экономит ресурсы и повышает производительность.

Мы будем использовать второй подход, так как он лучше подходит для крупных асинхронных приложений.

Декоратор для создания сессии

Декораторы в Python позволяют оборачивать одну функцию другой, добавляя дополнительную логику. В контексте работы с базой данных это позволяет автоматизировать создание и закрытие сессии. Пример реализации:

def connection(method):
    async def wrapper(*args, **kwargs):
        async with async_session_maker() as session:
            try:
                # Явно не открываем транзакции, так как они уже есть в контексте
                return await method(*args, session=session, **kwargs)
            except Exception as e:
                await session.rollback()  # Откатываем сессию при ошибке
                raise e  # Поднимаем исключение дальше
            finally:
                await session.close()  # Закрываем сессию

    return wrapper

Давайте сразу добавим этот декоратор в файл database.py, так как сегодня мы будем его использовать регулярно.

Как работает этот декоратор:

  1. connection принимает исходную функцию для обёртки.

  2. wrapper — это функция-обёртка, которая принимает все аргументы исходной функции.

  3. async with async_session_maker() автоматически создаёт и закрывает сессию в асинхронном режиме, освобождая вас от необходимости управлять сессией вручную.

  4. Сессия передаётся в исходную функцию через аргумент session.

  5. В случае ошибки выполняется откат транзакции через rollback(), а затем сессия закрывается.

Пример использования

@connection
async def get_users(session):
    return await session.execute(select(User))

Когда вызывается get_users(), декоратор создаёт сессию, передаёт её в функцию, а после выполнения автоматически закрывает сессию, что упрощает и обезопасивает работу с базой данных. Мы сегодня ещё успеем подробно разобраться с методами извлечения и добавление данных. На этом этапе хотел показать общий подход к вопросу работы с сессией.

Добавление данных в таблицы

Теперь, когда мы разобрались с созданием сессий, давайте перейдём к добавлению данных в базу данных. Логично начать именно с этого, ведь прежде чем говорить о извлечении данных, нужно сначала их туда внести, не так ли?

Процесс добавления данных в SQLAlchemy можно разделить на несколько шагов:

  1. Открытие сессии — это стандартный шаг для всех операций с базой данных. Мы уже реализовали его через декоратор, который автоматически создаёт и закрывает сессию.

  2. Создание инстанса (экземпляра) — это объект, который будет представлять строку в таблице базы данных. Мы инициализируем его, передавая необходимые значения полей таблицы.

  3. Формирование запроса — SQLAlchemy автоматически переводит действия с объектом модели в SQL‑запрос. Этот запрос сообщит базе данных, какую информацию нужно добавить.

  4. Коммит (сохранение изменений) — после того, как объект создан и сессия знает, что его нужно добавить, мы подтверждаем действие с помощью commit(), чтобы изменения сохранились в базе данных.

Рассмотрим всё это на простом примере:

@connection
async def add_user(name: str, age: int, session):
    # Создаём объект User (инстанс модели)
    new_user = User(name=name, age=age)
    # Добавляем объект в сессию
    session.add(new_user)
    # Сохраняем изменения в базе данных
    await session.commit()

Детальный разбор

  1. Создание инстанса Внутри функции add_user мы создаём новый объект модели User, передавая в него значения для полей name и age. Это и есть процесс создания инстанса — нового представления строки в таблице. Пока этот объект существует только в памяти, его ещё нет в базе данных.

  2. Добавление в сессию Чтобы подготовить SQLAlchemy к добавлению данных в базу, мы вызываем session.add(new_user). Это говорит сессии: «Этот объект должен быть добавлен в базу данных». Однако до тех пор, пока мы не вызовем commit(), изменения фактически не применяются. Другими словами, на этом этапе мы формируем SQL‑запрос к базе данных для добавления информации, но, при этом, мы не выполняем сам запрос.

  3. Выполнение коммита await session.commit() — это финальный шаг, который сохраняет изменения в базе данных. Без этого шага SQLAlchemy не отправит запрос в базу и не зафиксирует изменения. После коммита данные становятся постоянными.

Пример использования

await add_user("Алексей Яковенко", 31)

В этом примере вызывается функция add_user, которая создаёт нового пользователя с именем "Алексей Яковенко" и возрастом 31. Благодаря декоратору @connection сессия создаётся автоматически, данные добавляются, и сессия закрывается после выполнения операции.

Попрактикуемся в добавлении данных в наши таблицы

Я напоминаю, что на данный момент у нас существуют следующие таблицы (модели таблиц):

  • Users: Основная таблица с пользователями

  • Profiles: связанная таблица с Users в формате 1 к 1

  • Posts: таблица со статьями пользователя

  • Comments: таблица с комментариями пользователя

Все таблицы связаны между собой логически связями «Много к одному», «Один к многим» или «Один к одному».

При написании кода для добавления записей, как и в дальнейшем, для извлечения, мы должны помнить про эти особенности.

Давайте создадим файл add_methods.py и внутри него будем описывать необходимые методы по добавлению данных в виде функций с декораторами, а затем мы сделаем наш код более универсальным и начнем его писать в виде класса.

from sqlalchemy.ext.asyncio import AsyncSession
from database import connection
from asyncio import run
from models import User


@connection
async def create_user_example_1(username: str, email: str, password: str, session: AsyncSession) -> int:
    """
    Создает нового пользователя с использованием ORM SQLAlchemy.

    Аргументы:
    - username: str - имя пользователя
    - email: str - адрес электронной почты
    - password: str - пароль пользователя
    - session: AsyncSession - асинхронная сессия базы данных

    Возвращает:
    - int - идентификатор созданного пользователя
    """

    user = User(username=username, email=email, password=password)
    session.add(user)
    await session.commit()
    return user.id

В этом коде реализована функция создания нового пользователя в базе данных с использованием асинхронной сессии SQLAlchemy.

  1. Функция create_user_example_1:

    • Принимает аргументы username, email, и password, а также объект session (асинхронная сессия).

    • Создаёт экземпляр класса User (модель пользователя) с указанными данными.

    • Добавляет пользователя в сессию с помощью session.add().

    • Сохраняет изменения в базе данных через await session.commit().

    • Возвращает идентификатор созданного пользователя user.id.

  2. Запуск функции:

    • Используется run() для выполнения асинхронной функции create_user_example_1.

    • Выводится сообщение с идентификатором нового пользователя.

Код автоматизирует процесс создания записи пользователя и выполнения всех необходимых действий с базой данных асинхронно.

Выполним код:

new_user_id = run(create_user_example_1(username="yakvenalex",
                                        email="example@example.com",
                                        password="asdasd"))

print(f"Новый пользователь с идентификатором {new_user_id} создан")

Проверим

 Запись успешно создана.
Запись успешно создана.

Тут мы видим что поле profile_id у нас пустое и такой сценарий допускает формат нашей архитектуры.

К примеру, на вашем сайте прошла регистрация пользователя первичная, но он пока не указал ничего о себе.

Теперь представим, что у нас невозможен такой сценарий. То есть, представим, что при регистрации пользователь обязательно указывает информацию из своего профиля, а информация просто сохраняется в двух разных таблицах для удобства работы.

Внесем изменение в структуру таблиц

И, перед тем как мы напишем новую функцию, давайте небольшие правки сделаем в структуре наших моделей для удобства. Мы уберем колонку profile_id с основной таблицы users и заменим ее на колонку user_id в таблице profiles. Это позволит нам с меньшим количеством запросов добавить пользователя с профилем.

После небольших правок у меня получилось следующее описание моделей.

Скрытый текст
class User(Base):
    username: Mapped[uniq_str_an]
    email: Mapped[uniq_str_an]
    password: Mapped[str]

    # Связь один-к-одному с Profile
    profile: Mapped["Profile"] = relationship(
        "Profile",
        back_populates="user",
        uselist=False,  # Обеспечивает связь один-к-одному
        lazy="joined"  # Автоматически загружает связанные данные из Profile при запросе User
    )

    # Связь один-ко-многим с Post
    posts: Mapped[list["Post"]] = relationship(
        "Post",
        back_populates="user",
        cascade="all, delete-orphan"
    )

    # Связь один-ко-многим с Comment
    comments: Mapped[list["Comment"]] = relationship(
        "Comment",
        back_populates="user",
        cascade="all, delete-orphan"
    )


class Profile(Base):
    first_name: Mapped[str]
    last_name: Mapped[str | None]
    age: Mapped[int | None]
    gender: Mapped[GenderEnum]
    profession: Mapped[ProfessionEnum] = mapped_column(
        default=ProfessionEnum.DEVELOPER,
        server_default=text("'UNEMPLOYED'")
    )
    interests: Mapped[array_or_none_an]
    contacts: Mapped[dict | None] = mapped_column(JSON)

    # Внешний ключ на таблицу users
    user_id: Mapped[int] = mapped_column(ForeignKey('users.id'), unique=True)

    # Обратная связь один-к-одному с User
    user: Mapped["User"] = relationship(
        back_populates="profile",
        uselist=False
    )

Создадим файл с миграциями.

alembic revision --autogenerate -m "update tables"

И выполним миграцию.

alembic upgrade head

Проверим применились ли изменения в базе данных.

 В таблице пользователей пропала колонка user_id
В таблице пользователей пропала колонка user_id
 В таблице профилей появилась колонка user_id.
В таблице профилей появилась колонка user_id.

Теперь мы готовы к описанию метода, который добавит информацию в две смежные таблицы, связанные по типу «Один к одному».

@connection
async def get_user_by_id_example_2(username: str, email: str, password: str,
                                   first_name: str,
                                   last_name: str | None,
                                   age: str | None,
                                   gender: GenderEnum,
                                   profession: ProfessionEnum | None,
                                   interests: list | None,
                                   contacts: dict | None,
                                   session: AsyncSession) -> dict[str, int]:
    user = User(username=username, email=email, password=password)
    session.add(user)
    await session.commit()

    profile = Profile(
        user_id=user.id,
        first_name=first_name,
        last_name=last_name,
        age=age,
        gender=gender,
        profession=profession,
        interests=interests,
        contacts=contacts)

    session.add(profile)
    await session.commit()
    print(f'Создан пользователь с ID {user.id} и ему присвоен профиль с ID {profile.id}')
    return {'user_id': user.id, 'profile_id': profile.id}

Давайте выполним этот код, а после применим более оптимальный подход для добавления данных в таблицу, используя волшебный метод flush.

Пример выполнения текущего кода:

user_profile = run(get_user_by_id_example_2(
    username="john_doe",
    email="john.doe@example.com",
    password="password123",
    first_name="John",
    last_name="Doe",
    age=28,
    gender=GenderEnum.MALE,
    profession=ProfessionEnum.ENGINEER,
    interests=["hiking", "photography", "coding"],
    contacts={"phone": "+123456789", "email": "john.doe@example.com"},
))

Проверим

Таблица users.
Таблица users.
Таблица profiles
Таблица profiles

Теперь напишем ещё один пример выполнения кода добавления в связанные таблицы, но сделаем это более оптимально.

@connection
async def get_user_by_id_example_3(username: str, email: str, password: str,
                                   first_name: str,
                                   last_name: str | None,
                                   age: str | None,
                                   gender: GenderEnum,
                                   profession: ProfessionEnum | None,
                                   interests: list | None,
                                   contacts: dict | None,
                                   session: AsyncSession) -> dict[str, int]:
    try:
        user = User(username=username, email=email, password=password)
        session.add(user)
        await session.flush()  # Промежуточный шаг для получения user.id без коммита

        profile = Profile(
            user_id=user.id,
            first_name=first_name,
            last_name=last_name,
            age=age,
            gender=gender,
            profession=profession,
            interests=interests,
            contacts=contacts
        )
        session.add(profile)

        # Один коммит для обоих действий
        await session.commit()

        print(f'Создан пользователь с ID {user.id} и ему присвоен профиль с ID {profile.id}')
        return {'user_id': user.id, 'profile_id': profile.id}

    except Exception as e:
        await session.rollback()  # Откатываем транзакцию при ошибке
        raise e

Код прекрасно отработал и стоит отметить, что выполнился код быстрее чем в первый раз. Теперь давайте разберемся что такое flush и почему это более оптимально чем в нашем первом варианте.

Как работает flush в SQLAlchemy

flush в SQLAlchemy отправляет изменения в базу данных без их окончательной фиксации, то есть без выполнения коммита. Это полезно, когда нужно сгенерировать данные, такие как идентификаторы (например, user.id), чтобы использовать их до фактического сохранения данных в базе. При этом сама транзакция остаётся открытой, и окончательное сохранение происходит позже, при вызове commit.

Почему это более оптимально:

  1. Работа с промежуточными данными: flush позволяет работать с данными, которые ещё не записаны в базу окончательно, но уже доступны для использования. Например, после создания пользователя вы можете получить его user.id и использовать его для добавления профиля, не выполняя коммит между этими операциями.

  2. Сокращение количества транзакций: Используя flush, мы избегаем нескольких коммитов. Это снижает нагрузку на базу данных, так как все изменения будут зафиксированы одним коммитом в конце транзакции. Таким образом, база данных фиксирует изменения только один раз, что ускоряет выполнение операций.

Массовое добавление данных в таблицу

Для массового добавления данных в SQLAlchemy существует удобный метод add_all. Он работает аналогично методу add, но принимает на вход список экземпляров (инстансов). С его помощью мы можем добавить сразу несколько записей за одну операцию. Давайте рассмотрим пример, где добавим пять пользователей с использованием этого метода.

@connection
async def create_user_example_4(users_data: list[dict], session: AsyncSession) -> list[int]:
    """
    Создает нескольких пользователей с использованием ORM SQLAlchemy.

    Аргументы:
    - users_data: list[dict] - список словарей, содержащих данные пользователей
      Каждый словарь должен содержать ключи: 'username', 'email', 'password'.
    - session: AsyncSession - асинхронная сессия базы данных

    Возвращает:
    - list[int] - список идентификаторов созданных пользователей
    """
    users_list = [
        User(
            username=user_data['username'],
            email=user_data['email'],
            password=user_data['password']
        )
        for user_data in users_data
    ]
    session.add_all(users_list)
    await session.commit()
    return [user.id for user in users_list]

При необходимости тут тоже можно выполнить flush, но решил не перегружать примером.

Подготовим список пользователей для добавления.

users = [
    {"username": "michael_brown", "email": "michael.brown@example.com", "password": "pass1234"},
    {"username": "sarah_wilson", "email": "sarah.wilson@example.com", "password": "mysecurepwd"},
    {"username": "david_clark", "email": "david.clark@example.com", "password": "davidsafe123"},
    {"username": "emma_walker", "email": "emma.walker@example.com", "password": "walker987"},
    {"username": "james_martin", "email": "james.martin@example.com", "password": "martinpass001"}
]

Выполним код.

run(create_user_example_4(users_data=users))
 Информация добавлена.
Информация добавлена.

Создаем универсальный базовый класс и работаем с ним

Если вы внимательно рассмотрели написанный код, то наверняка заметили закономерности: каждый метод принимает набор параметров в формате "ключ-значение" и затем передаёт их в функции. Это открывает перед нами возможность ещё больше упростить и оптимизировать код, выделив универсальные методы.

Здесь нам на помощь придёт объектно-ориентированное программирование (ООП) в Python. Мы создадим базовый класс, который упростит работу с повторяющимися действиями.

Для удобства создадим новую папку dao, а внутри неё файлы base.py и __init.py__, чтобы избежать проблем с импортами в будущем.

В файле base.py мы опишем наш универсальный базовый класс. Для начала добавим два метода: один для добавления одной записи, а второй — для массового добавления записей. Приготовьтесь, сейчас начнётся настоящая магия!

from typing import List, Any, Dict
from sqlalchemy.exc import SQLAlchemyError
from sqlalchemy.ext.asyncio import AsyncSession


class BaseDAO:
    model = None  # Устанавливается в дочернем классе

    @classmethod
    async def add(cls, session: AsyncSession, **values):
        # Добавить одну запись
        new_instance = cls.model(**values)
        session.add(new_instance)
        try:
            await session.commit()
        except SQLAlchemyError as e:
            await session.rollback()
            raise e
        return new_instance

    @classmethod
    async def add_many(cls, session: AsyncSession, instances: List[Dict[str, Any]]):
        new_instances = [cls.model(**values) for values in instances]
        session.add_all(new_instances)
        try:
            await session.commit()
        except SQLAlchemyError as e:
            await session.rollback()
            raise e
        return new_instances

Давайте разбираться, что делает этот код.

Этот класс BaseDAO — это базовый класс для работы с базой данных, который предоставляет универсальные методы для добавления данных в таблицы. Он использует SQLAlchemy для взаимодействия с базой данных в асинхронном режиме.

Основные моменты:

  1. Атрибут model: Это переменная, которая будет установлена в дочернем классе. В дочерних классах будет указана конкретная модель базы данных (например, User, Profile и т. д.), с которой будет работать этот класс. Благодаря этой переменной мы сможем гибко создавать дочерние классы, внутри которых уже будем добавлять конкретную модель. Этот подход позволяет как легко использовать универсальные методы базового класса, так и позволяет описывать собственные методы (этот механизм мы рассмотрим подробно сегодня).

  2. Метод add: Этот метод позволяет добавить одну запись (например, одного пользователя) в базу данных.

    • Он принимает сессию базы данных и значения для полей записи в виде именованных аргументов (**values).

    • Создаётся новый экземпляр модели с переданными данными, затем он добавляется в сессию.

    • После этого вызывается commit, чтобы зафиксировать изменения в базе данных.

    • В случае ошибки, происходит откат (rollback), и ошибка выбрасывается.

  3. Метод add_many: Этот метод используется для добавления сразу нескольких записей в базу данных за один раз.

    • Он принимает список словарей, где каждый словарь содержит данные для одной записи.

    • Из этих словарей создаются экземпляры модели и добавляются в сессию с помощью add_all.

    • После добавления всех экземпляров вызывается коммит для сохранения изменений.

    • Если возникает ошибка, как и в первом методе, вызывается откат транзакции и ошибка поднимается дальше.

Теперь, для того, чтоб это все работало, нам необходимо создать дочерний класс под каждую таблицу. Для этого, в корне пакета dao, давайте создадим файл dao.py и опишем дочерние классы.

from dao.base import BaseDAO
from models import User, Profile, Post, Comment


class UserDAO(BaseDAO):
    model = User


class ProfileDAO(BaseDAO):
    model = Profile


class PostDAO(BaseDAO):
    model = Post


class CommentDAO(BaseDAO):
    model = Comment

Теперь у нас появилась возможность обращаясь через точку вызывать необходимые нам универсальные методы.

Если нам потребуется описать индивидуальные методы, такие как регистрация пользователя с занесением информации в две связанные таблицы, то мы эти методы сможем описать уже в дочерних классах.

Кроме того, вам ничего не мешает в дочерние классы, явно, импортировать другие модели таблиц. То есть, не стоит воспринимать базовый класс, как некое ограничение для вас. Нет. Он должен только помогать.

Давайте теперь создадим в корне проекта файл add_methods_dao.py и там, по-новому, перезапишем простые методы для добавления данных в одну таблицу (одиночное добавление данных и массовое).

Выполним импорты:

from dao.dao import UserDAO
from database import connection
from asyncio import run
from sqlalchemy.ext.asyncio import AsyncSession

Опишем сам метод.

@connection
async def add_one(user_data: dict, session: AsyncSession):
    new_user = await UserDAO.add(session=session, **user_data)
    print(f"Добавлен новый пользователь с ID: {new_user.id}")
    return new_user.id

Как видите, все максимально гибко.

На вход теперь мы будем получать словарь с данными о пользователе и будем генерировать новую сессию через наш декоратор.

Далее мы просто передадим сессию и распакуем словарь.

Подготовим данные для добавления.

one_user = {"username": "oliver_jackson", "email": "oliver.jackson@example.com", "password": "jackson123"}

Выполним код:

run(add_one(user_data={"username": "oliver_jackson", "email": "oliver.jackson@example.com", "password": "jackson123"}))

Теперь опишем метод для массового добавления пользователей новым способом.

@connection
async def add_many_users(users_data: List[dict], session: AsyncSession):
    new_users = await UserDAO.add_many(session=session, instances=users_data)
    user_ilds_list = [user.id for user in new_users]
    print(f"Добавлены новые пользователи с ID: {user_ilds_list}")
    return user_ilds_list

Как видите, отличие только в том, что мы передаем не обычный словарь, а список словарей.

Наш подготовленный метод add_many работает таким образом, чтоб принимать на вход список словарей. Предварительную распаковку делать не нужно.

Подготовим набор данных для добавления.

users = [
    {"username": "amelia_davis", "email": "amelia.davis@example.com", "password": "davispassword"},
    {"username": "lucas_white", "email": "lucas.white@example.com", "password": "whiteSecure"},
    {"username": "mia_moore", "email": "mia.moore@example.com", "password": "moorepass098"},
    {"username": "benjamin_hall", "email": "benjamin.hall@example.com", "password": "hallben123"},
    {"username": "sophia_hill", "email": "sophia.hill@example.com", "password": "hillSophia999"},
    {"username": "liam_green", "email": "liam.green@example.com", "password": "greenSecure789"},
    {"username": "isabella_clark", "email": "isabella.clark@example.com", "password": "clarkIsabella001"},
    {"username": "ethan_baker", "email": "ethan.baker@example.com", "password": "bakerEthan555"},
    {"username": "charlotte_scott", "email": "charlotte.scott@example.com", "password": "scottcharl333"},
    {"username": "logan_young", "email": "logan.young@example.com", "password": "younglogan876"}
]

Выполним код

run(add_many_users(users_data=users))

Проверим:

 Пользователи в базе данных.
Пользователи в базе данных.

У нас есть ещё задача, связанная с добавлением пользователя сразу в две таблицы: users и profiles. Ее мы закроем через дочерний класс, описав там новым метод.

class UserDAO(BaseDAO):
    model = User

    @classmethod
    async def add_user_with_profile(cls, session: AsyncSession, user_data: dict) -> User:
        """
        Добавляет пользователя и привязанный к нему профиль.

        Аргументы:
        - session: AsyncSession - асинхронная сессия базы данных
        - user_data: dict - словарь с данными пользователя и профиля

        Возвращает:
        - User - объект пользователя
        """
        # Создаем пользователя из переданных данных
        user = cls.model(
            username=user_data['username'],
            email=user_data['email'],
            password=user_data['password']
        )
        session.add(user)
        await session.flush()  # Чтобы получить user.id для профиля

        # Создаем профиль, привязанный к пользователю
        profile = Profile(
            user_id=user.id,
            first_name=user_data['first_name'],
            last_name=user_data.get('last_name'),
            age=user_data.get('age'),
            gender=user_data['gender'],
            profession=user_data.get('profession'),
            interests=user_data.get('interests'),
            contacts=user_data.get('contacts')
        )
        session.add(profile)

        # Один коммит для обеих операций
        await session.commit()

        return user  # Возвращаем объект пользователя

Технически, мы уже описывали этот метод, просто немного другим способом. Тут самое главное что я хотел показать – это гибкость подхода при работе с базовым и дочерними классами.

Кроме того, чтоб убрать избыточность кода в аргументах решил все значения передать питоновским словарем.

Добавим нового пользователя обновленным методом и перейдем к блоку получения данных (SELECT).

@connection
async def add_full_user(user_data: dict, session: AsyncSession):
    new_user = await UserDAO.add_user_with_profile(session=session, user_data=user_data)
    print(f"Добавлен новый пользователь с ID: {new_user.id}")
    return new_user.id

Подготовим информацию по пользователю.

user_data_bob = {
    "username": "bob_smith",
    "email": "bob.smith@example.com",
    "password": "bobsecure456",
    "first_name": "Bob",
    "last_name": "Smith",
    "age": 25,
    "gender": GenderEnum.MALE,
    "profession": ProfessionEnum.DESIGNER,
    "interests": ["gaming", "photography", "traveling"],
    "contacts": {"phone": "+987654321", "email": "bob.smith@example.com"}
}

Выполним код.

run(add_full_user(user_data=user_data_bob))
Пользователь добавлен.
Пользователь добавлен.

Проверим.

На данный момент у вас есть полное понимание того, как правильно добавлять данные в базу данных. Это относится как одиноких таблиц, так и смежных. Теперь постараемся получить данные с базы данных разными способами, используя механизмы фильтров, join и связей.

Получение данных

Для выборки данных в SQLAlchemy используется метод select(), который служит основной точкой для формирования запросов в асинхронных приложениях. На этом этапе мы сосредоточимся на использовании этого метода в дочерних классах для закрепления материала.

В дальнейшем, по мере развития, мы сможем перенести общие операции выборки данных в базовый класс BaseDao, чтобы стандартизировать подход к работе с данными.

Метод get()

Хотя метод get() является удобным для быстрого получения одной записи по первичному ключу, он не поддерживает асинхронный режим. По этой причине мы не будем его рассматривать подробно в контексте асинхронных приложений.

Метод select()

Мы сосредоточим внимание на методе select(), который обеспечивает большую гибкость за счет поддержки сложных фильтров, сортировки и объединений.

Начнем с простого примера получения всех записей из таблицы. Для этого в дочернем классе UserDao опишем следующий метод:

@classmethod
async def get_all_users(cls, session: AsyncSession):
    # Создаем запрос для выборки всех пользователей
    query = select(cls.model)

    # Выполняем запрос и получаем результат
    result = await session.execute(query)

    # Извлекаем записи как объекты модели
    records = result.scalars().all()

    # Возвращаем список всех пользователей
    return records

Теперь разберемся с этим кодом поэтапно. Сам процесс получения данных у нас делится на несколько шагов:

  1. Открытие сессии — здесь всё понятно, это необходимо для работы с базой данных.

  2. Формирование запроса: query = select(cls.model). На этом этапе мы только создаем SQL‑запрос. Фактически это подготовленная строка, которая пока не отправлена в базу данных.

  3. Выполнение запроса: при помощи метода execute() мы отправляем этот запрос в базу данных и получаем ответ.

  4. Преобразование результата: после выполнения запроса нам нужно преобразовать результат в удобный для работы формат — в данном случае это объекты модели.

Стоит обратить внимание на переменную query (или stmt, как её часто называют). Этот блок кода будет часто встречаться, так как на этом этапе мы задаем структуру SQL-запроса. Здесь важно понимать, что SQLAlchemy предоставляет нам множество способов фильтрации и модификации запросов, но на этом этапе это всё ещё просто строка запроса и ничего больше.

SQLAlchemy помогает нам легко и безопасно формировать правильные SQL-запросы благодаря своим методам и связям, которые мы заранее описали в моделях таблиц.

В данном примере запрос query = select(cls.model) будет преобразован в SQL-запрос вида: SELECT * FROM users;.

Давайте детально разберем процесс преобразования результатов запроса в SQLAlchemy и основные методы, которые используются для получения данных. Это важный этап, так как после выполнения SQL-запроса нам нужно превратить "сырые" результаты в удобные для работы объекты.

Преобразование результата запроса

После того как запрос был выполнен с помощью метода session.execute(query), SQLAlchemy возвращает объект result. Этот объект содержит строки, полученные из базы данных, но чтобы превратить их в объекты моделей или нужный формат, нужно выполнить преобразование. Для этого используются различные методы: scalars(), all(), first(), и другие.

Основные методы для работы с результатами запроса

1) scalars()

Метод scalars() используется, когда мы ожидаем получить одну колонку результата, а не несколько полей. Например, когда мы запрашиваем всю модель (как в нашем случае) или одно конкретное поле (например, только имена пользователей).

Пример:

result = await session.execute(query)
records = result.scalars().all()

В этом случае scalars() преобразует результат в объекты модели, так как мы запрашиваем всю модель (например, User). Если запрос вернет несколько колонок (например, id, name, email), то scalars() "выберет" только ту колонку, которая соответствует нашей модели (или конкретному полю, если оно было запрошено).

2) all()

Метод all() возвращает список всех записей, которые удовлетворяют нашему запросу. Когда мы применяем его к результатам scalars(), мы получаем список всех объектов модели, которые были выбраны.

Пример:

records = result.scalars().all()

Здесь all() извлекает все результаты и возвращает их в виде списка. Это значит, что если в базе данных 10 пользователей, all() вернет список из 10 объектов User.

3) first()

Метод first() возвращает только первую запись из результатов запроса. Это полезно, когда нам нужно получить одну запись, и нас не интересуют остальные.

Пример:

record = result.scalars().first()

Здесь first() вернет первый объект из результатов, или None, если записей не найдено.

4) scalar()

Метод scalar() используется, когда мы ожидаем одну запись и одно поле в результате запроса. Если запрос вернет больше одной строки, метод выбросит исключение.

Пример:

record = result.scalar()

Этот метод вернет одно значение, например, если запрос направлен на выборку одного конкретного поля.

5) scalar_one() и scalar_one_or_none()

Эти методы похожи на scalar(), но с важными отличиями:

  • scalar_one() — возвращает одно значение. Если запрос вернет более одной строки, произойдет ошибка.

  • scalar_one_or_none() — вернет либо одно значение, либо None, если записей не найдено. Если вернется более одной строки, также произойдет ошибка.

Пример:

record = result.scalar_one_or_none()

Этот метод используется, когда мы точно знаем, что либо будет одна запись, либо её не будет вовсе.

Важность понимания работы с результатами

SQLAlchemy предоставляет гибкость в том, как мы можем работать с результатами запросов. Это позволяет оптимизировать работу с базой данных: получать либо всю информацию, либо только необходимую часть (например, одну строку или одно поле).

Методы, такие как scalars() и first(), позволяют легко преобразовать результат в объекты модели и работать с ними дальше в коде, без необходимости вручную извлекать данные из сырых строк запроса.

Таким образом, выбор нужного метода для обработки результата зависит от конкретной задачи: нужно ли получить все записи, только первую запись, одно поле или вообще проверить факт существования записи в базе данных.

То есть все зависит от конкретной задачи. Так как часто бывает, что обращаться к сырому результату запроса и вызывать через точку нужное значение / значения, вполне себе оптимальный способ.

Получаем данные с наших таблиц

Теперь в корне проекта давайте создадим файл select_methods_dao.py и там запросим все данные с таблицы с пользователями (users).

from dao.dao import UserDAO
from database import connection
from asyncio import run


@connection
async def select_all_users(session):
    return await UserDAO.get_all_users(session)

Вот такая лаконичная запись позволяет получить данные о всех пользователях в виде списка.

Давайте выполним код и распечатаем результаты.

all_users = run(select_all_users())
for i in all_users:
    print(i)

Что-то похожее на то что нам нужно мы получили, но хотелось бы информацию получить в более удобном формате.

Из того что мы имеем сейчас мы можем обращаться через точку к данным и получать результат, что уже не плохо. Вот пример.

for i in all_users:
    data = {'username': i.username, 'password': i.password, 'email': i.email}
    print(data)

Уже говорил выше. Иногда такой подход приемлем, но так бывает не всегда. Для того чтоб было проще работать с полученными данными без необходимости получать каждый параметр через точку я, в практике, использую два подхода, которыми сейчас с вами поделюсь.

Удобнее всего работать с объектами моделей таблиц, трансформируя их в питоновские словари (списки питоновских словарей). Для этого в базовом классе моделей, который у нас в файле database.py, можно написать следующий метод.

    def to_dict(self) -> dict:
        """Универсальный метод для конвертации объекта SQLAlchemy в словарь"""
        # Получаем маппер для текущей модели
        columns = class_mapper(self.__class__).columns
        # Возвращаем словарь всех колонок и их значений
        return {column.key: getattr(self, column.key) for column in columns}

class_mapper импортируем из sqlalchemy.orm

Объяснение:

  1. class_mapper(self.__class__) — этот метод возвращает объект маппера SQLAlchemy, который содержит информацию о всех колонках модели.

  2. {column.key: getattr(self, column.key)} — создает словарь, в котором ключи — это названия колонок, а значения — данные этих колонок для текущего объекта.

  3. Этот метод универсален и будет работать с любой таблицей или моделью, унаследованной от класса Base.

Под этот новый метод миграцию в Alembic делать не нужно, так как работать он будет только на стороне нашего приложения.

all_users = run(select_all_users())
for i in all_users:
    print(i.to_dict())

Все выглядит намного привлекательнее и приятнее, но тут сразу появляется несколько проблем.

  • Может появится желание отобразить не все значения, а только определенные

  • Мы знаем, что сейчас должна автоматически подгрузится информация по профилям пользователей, ведь мы в прошлый раз указывали в описании модели «lazy=»joined»», а я гарантировал, что информация подтянется автоматически.

Мы можем убедиться, что информация по профилю действительно есть:

for i in all_users:
    print(i.profile)

Информация по профилям для тех кто их имеет вполне себе подгружается, но просто не отображается. В этом случае у нас появляется необходимость в более детальном управлении отображением данных и тут на сцену выходит очень полезная библиотека Pydantic, которую мы установили в прошлый раз.

Благодаря этой библиотеке мы можем описать так называемую модель отображения данных, которую можно будет максимально удобно использовать в своих приложениях.

Перед тем как мы ее напишем хочу обратить внимание на важный момент. Мы на уровне моделей таблиц указали необходимость подгрузки автоматическую со связанной таблицы профилей. То есть, для постов и комментариев эта схема работать не будет и нам необходимо будет данные подгружать отдельно. Если сейчас я уберу из модели User строку lazy="joined", то получу следующую ошибку:

Ошибка sqlalchemy.orm.exc.DetachedInstanceError говорит о том, что объект модели, с которым мы работаем (в данном случае User), был отсоединен от сессии, и SQLAlchemy не может выполнить ленивую загрузку (lazy loading) связанного объекта profile, что логично, ведь этот объект не загружен.

На этом моменте у вас должно появиться понимание того зачем мы вообще настраивали эти связи (relationship).

Описание Pydantic моделей для работы с объектами SQLAlchemy

Опишем простую Pydantic модель. Там мы просто обозначим какие данные и в каком виде мы хотим получить.

Для удобства отдельно опишем модель для профиля (те данные, которые нас интересуют для просмотра) и модель users.

Для этого я создам файл schemas.py.

Выполню необходимые импорты.

from typing import List
from pydantic import BaseModel, ConfigDict
from sql_enums import GenderEnum, ProfessionEnum

Теперь опишем первую модель (схему), которая будет описывать полученные данные с таблицы profiles.

class ProfilePydantic(BaseModel):
    first_name: str
    last_name: str | None
    age: int | None
    gender: GenderEnum
    profession: ProfessionEnum
    interests: List[str] | None
    contacts: dict | None

    model_config = ConfigDict(from_attributes=True, use_enum_values=True)

Синтаксис чем-то похож на современное описание моделей таблиц в ORM SQLAlchemy, разве что тут нет Mapped и mapped_column.

Мы создаем Pydantic модель для того, чтобы валидировать и структурировать входящие данные в соответствии с заданными типами. Pydantic позволяет легко описывать схему данных и выполнять автоматическую проверку типов, а также трансформировать значения в нужный формат при необходимости (наш случай).

Описания полей достаточно понятны, а что заслуживает внимание так это model_config (переменная, в которой прописываются конфигурации конкретной модели). Разберемся с атрибутами и значениями.

  • from_attributes = True: это позволяет модели автоматически маппить атрибуты Python объектов на поля модели. Примерно то что мы делали в методе to_dict, но более расширенно.

  • use_enum_values = True: это указание преобразовывать значения перечислений в их фактические значения, а не в объекты перечислений. Просто для удобства восприятия человеком.

Теперь опишем схему с пользователями.

class UserPydantic(BaseModel):
    username: str
    email: str
    profile: ProfilePydantic | None

    model_config = ConfigDict(from_attributes=True, use_enum_values=True)

Из нового синтаксиса тут только то что мы добавили поле profile и в качестве описания этого поля добавили схему, описанную выше. Решил использовать название «схема», чтоб путаницы дальше с «моделями» SQLAlchemy не было.

Теперь нам необходимо на основании схемы Pydantic создать экземпляр Pydantic модели.

Выглядеть это будет так.

all_users = run(select_all_users())
for i in all_users:
    user_pydantic = UserPydantic.from_orm(i)
    print(user_pydantic)

Метод from_orm в Pydantic используется для создания экземпляра Pydantic модели на основе объекта ORM. Этот метод автоматически преобразует данные из ORM объекта в Pydantic модель, что позволяет легко работать с данными, извлеченными из базы данных через ORM (например, SQLAlchemy).

То есть, когда мы выполним преобразование, мы получим новую модель Pydantic, которая будет основана на полученных данных с SQLAlchemy. Это дает нам мощный инструмент для работы, так у объектов Pydantic-моделей много методов, в частности, методы, которые трансформируют объект Pydantic модели в JSON или dict.

Уже интереснее и мы видим, что отобразились контакты. А теперь давайте экземпляр Pydantic модели трансформируем в обычный питоновский словарь. Для этого мы можем использовать такую запись:

all_users = run(select_all_users())
for i in all_users:
    user_pydantic = UserPydantic.from_orm(i)
    print(user_pydantic.dict())

То есть, в Pydantic есть встроенный метод dict(), который трансформирует полученные значения в словарь. Проверим.

А вот теперь мы получили то, что действительно хотели. С такой вложенной структурой максимально удобно работать каждому, кто знаком с базовым Python синтаксисом.

Существуют библиотеки, которые автоматически трансформируют объекты SQLAlchemy в pydantic модели, но я, из опыта, скажу, что лучше описание делать самостоятельно. Тем более у вас появляется полный контроль над результатом.

Получение конкретных значений из нужных колонок

Хочу отметить важный момент. На данном этапе мы просто отсекли ненужные колонки, такие как update_at и createt_at в двух таблицах, но это не значит, что мы их не получили.

Чтобы извлекать только отдельные значения из колонок, например, username и id пользователя, необходимо использовать другие подходы при выполнении запросов. Методы scalar и scalars здесь не подходят, так как они возвращают либо одно значение, либо список значений одной колонки или модели, что ограничивает их применение для более сложных запросов с выборкой нескольких полей.

В таких случаях стоит использовать методы, которые позволяют получить именно те колонки, которые вас интересуют, и работать с ними напрямую.

Давайте опишем ситуацию, когда мы хотим получить значение user_name и id всех пользователей с таблицы users (модель User) и, для того чтоб вы увидели запрос мы отобразим его в консоли.

Добавим метод в UserDao.

@classmethod
async def get_username_id(cls, session: AsyncSession):
    # Создаем запрос для выборки id и username всех пользователей
    query = select(cls.model.id, cls.model.username)  # Указываем конкретные колонки
    print(query)  # Выводим запрос для отладки
    result = await session.execute(query)  # Выполняем асинхронный запрос
    records = result.all()  # Получаем все результаты
    return records  # Возвращаем список записей

Из нового мы просто указали явно в select какие поля хотим видеть при вызове метода. Давайте напишем на основании этого метода функцию.

@connection
async def select_username_id(session):
    return await UserDAO.get_username_id(session)

Вызовем

rez = run(select_username_id())
for i in rez:
    print(i)

Обратите внимание. Тут сформировался корректный SQL -запрос, который указал что мы явно хотим получить значение из двух колонок. Но, результат мы получили уже в виде кортежей не именованных. То есть, у нас сейчас нет возможности достать нужное значение по ключу и не сработает наш метод to_dict(), так как работает он только с объектом модели.

Можно сделать так.

rez = run(select_username_id())
for i in rez:
    data = {'user_id': i[0], 'username': i[1]}
    print(data)

И на небольших массивах данных это вполне рабочий подход, но, можно зайти со стороны Pydantic!

Давайте опишем новую Pydantic-схему, которая позволит корректно отобразить значения из эти последнего метода.

class UsernameIdPydantic(BaseModel):
    id: int
    username: str

    model_config = ConfigDict(from_attributes=True)

Это максимально простая схема в которой мы просто обозначаем ключи и значения, которые должны в них попасть.

rez = run(select_username_id())
for i in rez:
    rez = UsernameIdPydantic.from_orm(i)
    print(rez.dict())
 Согласитесь, это мощь!
Согласитесь, это мощь!

Запросы с использованием метода select позволяют не только выбирать конкретные колонки, но и применять агрегатные функции, а также создавать вычисляемые поля "на лету" с помощью специальных конструкций. Это значительно расширяет возможности манипуляции данными прямо в запросе. Однако, чтобы не перегружать информацией, на сегодня мы остановимся на том, что уже обсудили. Теперь кратко о фильтрации данных.

Фильтры для получения нужных данных

Метод select в SQLAlchemy указывает базе данных, какие именно колонки или модели нас интересуют. Чтобы задать условия выборки и получать только релевантные данные, используются фильтры.

Существует два основных подхода к фильтрации данных в SQLAlchemy:

  • Метод where (или filter) — гибкий и мощный инструмент для создания сложных условий фильтрации. Он позволяет использовать операторы сравнения и логические условия. С его помощью можно задавать несколько условий одновременно:

query = select(User).where(User.age > 18, User.active == True)

  • Метод filter_by — более удобный и "упрощённый" способ фильтрации, если нужно отфильтровать по конкретным значениям колонок. Он принимает именованные аргументы, соответствующие названиям колонок в модели, и автоматически создаёт условия для запроса:

query = select(User).filter_by(username="john_doe", active=True)

Хотя оба метода решают одну и ту же задачу, where (или filter) чаще используется для сложных условий фильтрации. Давайте рассмотрим, как эти методы работают на практике.

Пример: получение информации о пользователе

Создадим метод, который принимает user_id и возвращает полную информацию о пользователе:

@classmethod
async def get_user_info(cls, session: AsyncSession, user_id: int):
    query = select(cls.model).filter_by(id=user_id)
    # query = select(cls.model).filter(cls.model.id == user_id)
    result = await session.execute(query)
    user_info = result.scalar_one_or_none()
    return user_info

Этот метод достаточно прост. Он принимает user_id и возвращает либо полную информацию о пользователе, либо None. Эквивалентный запрос с использованием filter выглядел бы так:

query = select(cls.model).filter(cls.model.id == user_id)

Чуть больше кода, но результат остаётся тем же. Теперь добавим функцию, которая будет использовать этот метод:

@connection
async def select_full_user_info(session, user_id: int):
    rez = await UserDAO.get_user_info(session=session, user_id=user_id)
    if rez:
        return UserPydantic.from_orm(rez).dict()
    return {'message': f'Пользователь с ID {user_id} не найден!'}

Здесь мы сразу преобразуем результат в питоновский словарь через Pydantic-схему.

Пример использования:

info = run(select_full_user_info(user_id=1))
print(info)
# {'username': 'yakvenalex', 'email': 'example@example.com', 'profile': None}

info = run(select_full_user_info(user_id=3))
print(info)
# {'username': 'john_doe', 'email': 'john.doe@example.com', 'profile': {'first_name': 'John', 'last_name': 'Doe', 'age': 28, 'gender': 'мужчина', 'profession': 'инженер', 'interests': ['hiking', 'photography', 'coding'], 'contacts': {'phone': '+123456789', 'email': 'john.doe@example.com'}}}

info = run(select_full_user_info(user_id=1113))
print(info)
# {'message': 'Пользователь с ID 1113 не найден!'}

Как видите, метод прекрасно работает и является достаточно универсальным для получения информации о записи по её id.

Добавляем универсальный метод в базовый класс

Теперь добавим метод в базовый класс, который будет возвращать запись по её id:

@classmethod
async def find_one_or_none_by_id(cls, data_id: int, session: AsyncSession):
    query = select(cls.model).filter_by(id=data_id)
    result = await session.execute(query)
    record = result.scalar_one_or_none()
    return record

Проверим работу этого метода:

@connection
async def select_full_user_info(session, user_id: int):
    rez = await UserDAO.find_one_or_none_by_id(session=session, data_id=user_id)
    if rez:
        return UserPydantic.from_orm(rez).dict()
    return {'message': f'Пользователь с ID {user_id} не найден!'}

Метод отлично справляется со своей задачей. Также важно отметить, что filter_by может принимать несколько именованных аргументов, что позволяет использовать его для более сложных выборок.

Универсальный метод для получения одной записи по нескольким параметрам

@classmethod
async def find_one_or_none(cls, session: AsyncSession, **filter_by):
    query = select(cls.model).filter_by(**filter_by)
    result = await session.execute(query)
    record = result.scalar_one_or_none()
    return record

Этот метод принимает неограниченное количество аргументов (первым всегда передаётся session) и возвращает либо одну запись, либо None. Важно быть осторожным, так как если будет найдено больше одной записи, возникнет ошибка.

Попробуем найти пользователя по его электронной почте, используя этот метод:

@connection
async def select_full_user_info_email(session, user_id: int, email: str):
    rez = await UserDAO.find_one_or_none(session=session, id=user_id, email=email)
    if rez:
        return UserPydantic.from_orm(rez).dict()
    return {'message': f'Пользователь с ID {user_id} не найден!'}

Пример использования:

info = run(select_full_user_info_email(user_id=21, email='bob.smith@example.com'))
print(info)

# {'username': 'bob_smith', 'email': 'bob.smith@example.com', 'profile': {'first_name': 'Bob', 'last_name': 'Smith', 'age': 25, 'gender': 'мужчина', 'profession': 'дизайнер', 'interests': ['gaming', 'photography', 'traveling'], 'contacts': {'phone': '+987654321', 'email': 'bob.smith@example.com'}}}

Если передать неверный user_id или ошибиться в почте, получим сообщение:

# {'message': 'Пользователь с ID 22 не найден!'}

Универсальный метод для получения всех записей

Теперь создадим универсальный метод для получения нескольких записей:

@classmethod
async def find_all(cls, session: AsyncSession, **filter_by):
    query = select(cls.model).filter_by(**filter_by)
    result = await session.execute(query)
    records = result.scalars().all()
    return records

Логика работы остаётся той же, однако мы получаем данные с помощью scalars().all(), что возвращает список объектов модели. Если фильтры не будут переданы, метод вернёт все записи из таблицы.

Подгрузка данных в SQLAlchemy

SQLAlchemy предоставляет два основных подхода для подгрузки данных из связанных таблиц:

  1. Подгрузка через join (явный JOIN в запросе).

  2. Подгрузка через options (гибкая загрузка с использованием стратегий).

1. Подгрузка через join

Этот подход используется для объединения данных из нескольких таблиц через SQL-запрос с явным JOIN. Основное преимущество такого подхода — это возможность оптимизировать запросы и избегать проблемы "N+1 запросов", когда для каждого объекта выполняется отдельный запрос для связанных данных.

Пример использования join:

query = select(User).join(Profile).where(User.id == Profile.user_id)
result = await session.execute(query)
users = result.scalars().all()

В этом примере выполняется JOIN между таблицами User и Profile, что позволяет за один запрос получить информацию о пользователях и их профилях.

2. Подгрузка через options

SQLAlchemy также предоставляет более гибкий способ загрузки связанных данных через метод options(), который позволяет задавать стратегии подгрузки.

Основные стратегии подгрузки:

  • joinedload — подгружает связанные данные через LEFT OUTER JOIN. Эквивалентно использованию lazy="joined" в описании relationship.

    Пример использования joinedload:

query = select(User).options(joinedload(User.profile))
result = await session.execute(query)
users = result.scalars().all()
В этом случае данные профиля пользователя будут загружены через `JOIN`, но запрос будет выглядеть более "чисто", так как `JOIN` скрыт внутри стратегии загрузки.
  • subqueryload — загружает связанные данные через отдельный подзапрос. Полезно, когда JOIN неэффективен, например, при сложных или больших связанных данных.

    Пример использования subqueryload:

query = select(User).options(subqueryload(User.profile))
result = await session.execute(query)
users = result.scalars().all()
В этом случае связанные данные загружаются отдельным подзапросом, что может быть полезно для оптимизации производительности в некоторых сценариях.
  • selectinload — загружает связанные данные через запрос с оператором IN, что особенно эффективно для загрузки больших объёмов связанных данных.

    Пример использования selectinload:

query = select(User).options(selectinload(User.profile))
result = await session.execute(query)
users = result.scalars().all()

Эта стратегия подгрузки выполняет один основной запрос для всех пользователей, а затем дополнительный запрос для связанных данных с использованием оператора IN, что значительно улучшает производительность для больших наборов данных.

Сравнение двух подходов

  1. Подгрузка через join:

    • Полезно для явных объединений данных через несколько таблиц.

    • Особенно эффективно при работе с большими данными и сложными связями.

    • Используется, когда нет явных relationship между моделями или требуется гибкое объединение.

  2. Подгрузка через options:

    • Обеспечивает гибкость в выборе стратегии загрузки данных (joinedload, subqueryload, selectinload).

    • Полезно для автоматической подгрузки связанных данных через заранее настроенные связи в моделях.

    • Даёт больше контроля над производительностью запросов в зависимости от их структуры и объёма данных.

Использование lazy="joined"

Если в модели установлено lazy="joined", это эквивалентно использованию joinedload в запросах через options(). SQLAlchemy автоматически выполнит JOIN и подгрузит связанные данные при выборке основного объекта.

Пример использования lazy="joined:

# Связь один-к-одному с Profile
profile: Mapped["Profile"] = relationship(
    back_populates="user",
    uselist=False,
    lazy="joined"
)

Этот подход автоматически применяет JOIN при каждом запросе к таблице User, что позволяет избежать необходимости явно использовать joinedload при выборке.

Выводы

На данном этапе важно усвоить несколько ключевых моментов:

  1. Мы можем загружать данные из конкретной таблицы.

  2. Мы можем подгружать данные из связанных таблиц через различные стратегии.

  3. Автоматическая подгрузка данных может быть настроена через параметр lazy в модели.

Дальнейшие шаги

В следующей статье мы рассмотрим методы для обновления и удаления данных, а также углубимся в более специфические темы, такие как:

  • Автоматическое агрегирование колонок,

  • Триггеры,

  • Вычисляемые данные,

  • И другие возможности SQLAlchemy.

Подгрузка данных — это мощный инструмент, который поможет вам эффективно управлять связанными данными, избегать избыточных запросов и оптимизировать работу с базой данных. Убедитесь, что вы освоили текущий материал, прежде чем переходить к более сложным конструкциям.

Заключение

Друзья, сегодня мы рассмотрели достаточно большую и, можно сказать, фундаментальную тему. Теперь, если вы читали эту и предыдущую статью вы уже умеете не просто описывать модели таблиц, связи между ними и миграции, а полноценно работать с данными через SQLAlchemy, добавлять их и читать.

Сегодня мы научились добавлять данные в таблицу и разобрались как добавлять данные в связанные таблицы, где присутствует зависимость одной таблицы от другой. В этом контексте мы познакомились с понятием flush и поняли зачем он нам нужен.
Сегодня мы глубоко разобрали тему сессий и поняли, как работает фабрика сессий в SQLAlchemy.

Кроме того, большой блок в этой статье был посвящен получению данных из таблиц и трансформации этих данных в приятные для глаза пиитониста словари. Постарайтесь на практике закрепить связку Pydantic и ORM SQLalchemy.

Информации было очень много и, как вы понимаете, написание такого материала занимает очень много времени и сил. Поэтому, не жалейте добрых комментариев и лайков. Это мотивирует меня генерировать больше обучающего материала в таком формате и так я понимаю, что мои усилия не напрасны.

Полный исходный код этого проекта, как и прочий эксклюзивный материал, который я не публикую на Хабре, вы найдете в моем телеграмм канале «Легкий путь в Python».

До скорого!

Комментарии (14)


  1. perminoff
    15.10.2024 07:40

    Отличная статья!


    1. yakvenalex Автор
      15.10.2024 07:40

      Спасибо за обратную связь)


  1. Veritaris
    15.10.2024 07:40

    Статья вроде бы неплоха, но есть нюансы, которые в какой-то момент оказываются важны, но не рассмотрены, или просто использованы bad practice

    1. Декоратор def connection(method): – очень не хватает уровня изоляции для транзакции со значением по-умолчанию. Да, в большинстве случаев будет стандартный read commiеted, но когда нужно выбрать другой для всего UnitOfWork – хочется передать его в декоратор, а не писать в коде уродливое await session.execute(text("BEGIN TRANSACTION ISOLATION LEVEL REPEATABLE READ")), ещё и часто на уровне сервисного слоя, а не уровне репозитория. Ведь всё равно session будет закрыта по завершении вызова декорированного метода

    class BaseDAO:
        model = None  # Устанавливается в дочернем классе
    

    Вот этот момент прям боль. И я даже не про отсутствие типа у поля model. У нас же есть дженерики. Почему не использовать их?
    Python < 3.12:

    import typing
    
    T = typing.TypeVar("T", bound=Base) # мы можем задать границу типа, т.о. мы будем уверены при статическом анализе что использованы верные типы как минимум в иерархии
    
    class BaseDAO(typing.Generic[T]):
        model: type[T]
    

    Python >= 3.12:

    # точно так же можно задать границу дженерика 
    class BaseDAO[T: Base]:
        model: type[T]
    

    И используем наш T для типизации где хотим:

    class BaseDAO[T: Base]:
        model: type[T]
    
        @classmethod
        async def add(cls, session: AsyncSession, **values: dict[str, typing.Any]) -> T:
            new_instance = cls.model(**values)
            session.add(new_instance)
            try:
                await session.commit()
            except SQLAlchemyError:
                await session.rollback()
                raise
            return new_instance
    
        @classmethod
        async def add_many(cls, session: AsyncSession, instances: list[dict[str, typing.Any]]) -> list[T]:
            new_instances = [cls.model(**values) for values in instances]
            session.add_all(new_instances)
            try:
                await session.commit()
            except SQLAlchemyError:
                await session.rollback()
                raise
            return new_instances
    

    И сами dao-классы описываются лучше:

    class UserDAO(BaseDAO[User]):
        ...
    
    
    class ProfileDAO(BaseDAO[Profile]):
        ...
    

    Чем же лучше? А тем, что мы не можем не указать класс – статический анализ отвалится. А вот забыть написать model = XXX вполне можно, и mypy даже не ругнётся при model: Base

    1. async def find_all(cls, session: AsyncSession, **filter_by), async def find_one_or_none(cls, session: AsyncSession, **filter_by): и прочие, где передаются **kwargs – я бы сказал а-та-та, очень просто словить из-за опечатки ошибку в запросе из-за того, что передаётся ключ, ссылающийся на несуществующую колонку. Лучше сделать pydantic-модель со всеми нужными опциональными полями, передавать её и делать query = select(сls.model).filter_by(**filters.model_dump(exclude_unset=True))


    1. yakvenalex Автор
      15.10.2024 07:40

      Благодарю за конструктивную критику


    1. Haskir
      15.10.2024 07:40

      Попробовал переписать свой pet проект согласно вашему второму замечанию

      class BaseORMRepository[T: Base]:
          model: type[T]
      
      class UserManager(BaseORMRepository[User], Singleton):
          @classmethod
          async def create(cls, token_data: TokenMessage, session: AsyncSession):
              user_info = await cls.get_user_info(token_data)
              user = await cls.get("login", user_info.login, session)
              if not user:
                  user = cls.model().from_dict(user_info)
                  session.add(user)
              cls.__update_token(token=token_data, user=user, session=session)
              await session.commit()

      Но получаю ошибку (python 3.12)
      :query = Select(cls.model).filter_by(**{arg: value})
      ^^^^^^^^^
      AttributeError: type object 'UserManager' has no attribute 'model'


      1. Veritaris
        15.10.2024 07:40

        Добрый день!
        Да, вы абсолютно правы, я совершенно упустил момент что я у себя использую Dependency Injection, а в данной статье он не рассматривается (я не стал о нём писать, т.к. автор мог либо написать о нём в следующих статьях либо просто его не использовать, все фломастеры разные)
        Конечно же в коде, который я написал, у класса BaseDAO и его наследников не будет поля класса (в typing.ClassVar понимании) model, а будет поле экземпляра класса
        Судя по тому, что класс UserManager наследуется от Singleton, а session пробрасывается извне, то предположу что у вас user_manager создаётся где-то отдельно и используется заместо того самого DI. В таком случае я бы просто убрал декоратор, сделав методом экземпляра класса, а в методе инициализации сделал user_manager = UserManager(model=User, ...) либо просто переопределил __init__, вызвав super().__init__(model=User)
        Склеивая, верный вариант моего кода выше (и наиболее близкий к реальному) выглядит как-то так:

        import asyncio
        import dataclasses
        import typing
        
        import pydantic
        import sqlalchemy as sa
        from sqlalchemy.ext.asyncio import AsyncSession
        from sqlalchemy.orm import DeclarativeBase, Mapped, mapped_column
        
        
        #  DTO для валидации входных данных
        class UserCreateSchema(pydantic.BaseModel):
            first_name: str
            last_name: str
        
        
        METADATA: typing.Final = sa.MetaData()
        
        
        class Base(DeclarativeBase):
            metadata = METADATA
        
        
        class User(Base):
            __tablename__ = "user"
        
            id: Mapped[typing.Annotated[int, mapped_column(sa.BigInteger, primary_key=True)]]
        
            first_name: Mapped[typing.Annotated[str, mapped_column(sa.String(length=64))]]
            last_name: Mapped[typing.Annotated[str, mapped_column(sa.String(length=64))]]
        
        
        @dataclasses.dataclass(kw_only=True)
        class BaseORMRepository[T: Base, V: pydantic.BaseModel]:  # T - класс модели-объекта, V - класс модели-валидатора
            model: type[T]
        
            async def create(self, session: AsyncSession, validated_value: V) -> T:
                user = self.model(**validated_value.model_dump())
                session.add(user)
                # await session.commit() #  закомментировано чтобы код мог запуститься без реального подключения к базе
                return user
        
        
        class UserManager(BaseORMRepository[User, UserCreateSchema]):
            def __init__(self) -> None:
                super().__init__(model=User)
        
        
        if __name__ == "__main__":
            user_manager = UserManager()
            session = AsyncSession(bind=None)  # представим что это настоящая сессия
            user_validated = UserCreateSchema(first_name="John", last_name="Doe")
            user_created = asyncio.run(user_manager.create(session, user_validated))
            print(user_created.first_name, user_created.last_name)
        

        Что выведет John Doe в терминал. Да, теряется краткость за счёт переопределения __init__, но приобретается другой момент – все CRUD-методы внутри BaseORMRepository можно правильно типизировать, уменьшив вероятность ошибок


        1. 3apa3ka3
          15.10.2024 07:40

          Не автор вопроса, но было интересно!

          При прочтении кода немного застопорился на моментне:

          user = self.model(**validated_value.model_dump())

          если заменить user на instance (как пример) будет более очевидно что общий метод.


          1. Veritaris
            15.10.2024 07:40

            Да, конечно, в реальном коде в общих методах лучше созданную сущность именовать общим именем. Я просто хотел сохранить связь с указанным автором комментария методом, но, пожалуй, приму на будущее что лучше написать код правильнее, ибо связь тогда будет точно ясна


  1. fobo
    15.10.2024 07:40

    очень полезные статьи. спасибо!

    один только вопрос: почему "получим С таблицы", "загрузим С базы" и т. п.? и это не разово, на протяжении 2 статей. вроде бы "ИЗ таблицы". может я чего-то не понимаю?

    для новичков полезно было бы добавить echo=True при создании сессии и можно было бы рассматривать какие SQL-запросы алхимия генерирует и выполняет.


    1. yakvenalex Автор
      15.10.2024 07:40

      Спасибо за обратную связь. Возможно просто неграмотно пишу) Казалось что так правильно. По поводу echo планирую в следующей статье описать вывод кастомный под логирование с параметрами. echo перегружает консоль


  1. comargo
    15.10.2024 07:40

    Знаком с алхимией поверхностно, пока нового нашёл мало, но все равно интересно.

    Но вот замечание: тема сисек скаляров не раскрыта совсем. В частности не ясно, чем отличается scalar() от scalar_one()


    1. yakvenalex Автор
      15.10.2024 07:40

      В статье блоку скаляров уделена полная глава.


    1. yakvenalex Автор
      15.10.2024 07:40

      Основные методы для работы с результатами запроса

      1) scalars()

      Метод scalars() используется, когда мы ожидаем получить одну колонку результата, а не несколько полей. Например, когда мы запрашиваем всю модель (как в нашем случае) или одно конкретное поле (например, только имена пользователей).

      И далее по тексту)


  1. Veritaris
    15.10.2024 07:40

    Подскажите, а планируется ли статья по миксинам и hybrid_property / hybrid_method? Первое пригождается когда появляются одинаковые поля (те же id, created_at, deleted и т.д.), а второе позволяет динамически вычислять некие параметры, связанные с объектом БД, без повторений (например посчитать кол-во children при one-to-many и прочее). Если про миксины ещё встречал статьи на хабре, то про hybrid_property как-то не сталкивался, а тема интересная и ИМХО довольно полезная