Для асинхронного Python существует мало полноценных ORM, и им далеко до таких монстров-комбайнов, как DjangoOrm и SQLAlchemy.ORM. Бедность ORM-инструментария для асинхронного программирования заставила многих программистов отказаться от зачастую непонятной им работы с ORM и перейти к более прозрачному взаимодействию с БД. Решение в лоб — написание raw SQL, но в этом случае запросы не будут защищены от инъекций, а запросы, составляемые по бизнес логике с опциональными параметрами, превратятся в конкатенацию строк. Важно найти баланс между прозрачностью выполнения кода, скоростью его написания и читаемостью.

Ниже я предлагаю реализацию такого баланса c использованием SQLAlchemy Core.

Сравнение с остальными ORM

Прежде чем рассматривать работу SQLAlchemy Core, для расширения инженерного кругозора давайте познакомимся с альтернативными решениями и их недостатками.

  • Django ORM и ponyORM не подходят из-за невозможности работать в асинхронном режиме.

  • SQLAlchemy.ORM: поддержку полноценной асинхронной ORM в альфа-режиме добавили в версии 1.4, пока не подходит для production-решений.

  • Peewee: умеет работать в асинхронном режиме, но только с Core-функциями. Хорошая библиотека, но я предлагаю выбрать SQLAlchemy из-за её большей популярности и поддерживаемости. Также были жалобы на ленивую загрузку объектов в Peewee — иногда она намертво вешает цикл (возможно, эту ошибку уже поправили в новых версиях).

  • Gino: простая ORM на основе SQLAlchemy Core. Работать с этой библиотекой можно без знаний об SQLAlchemy.

  • tortoise-orm: молодой проект с похожим на Django ORM способом доступа к данным.

Работа с SQLAlchemy Core

Алхимия состоит из двух частей. Первая — это абстракция над SQL-базой данных, которая называется SQLAlchemy Сore. Вторая — это ORM, собственно mapping между реляционной БД и объектным представлением. Соответственно, SQLAlchemy Сore почти один к одному совпадает с SQL — если вы знаете последний, то проблем с Core, как правило, не возникает. Благодаря этому использование SQLAlchemy Сore имеет наименьший оверхэд при обращении к БД.

SQLAlchemy Core предоставляет всю основную функциональность для построения запросов. Но в крупном проекте при обращении к базовому API SQLAlchemy будет много дублирующихся участков, поэтому логично написать библиотеку-обёртку для реализации наиболее популярных сценариев использования.

Примеры основных запросов к Алхимии:

tbl = cls.__table__
select_sql = select([tbl]).where(tbl.c.user_type == user_type) 
cursor = await conn.execute(select_sql) 
rows = await cursor.fetchall()
tbl = cls.__table__
insert_sql = tbl.insert().values(val='abc').returning(*tbl.c)
cursor = await conn.execute(insert_sql)
row = await cursor.fetchone()

Как видите, участки кода с передачей параметров повторяются, а сам код содержит много параметров, связанных с внутренней реализацией SQLAlchemy Core, что ухудшает читаемость. Хотелось бы получать данные из БД с помощью таких запросов:

user = await User.select().where(User.user_type == user_type).get(conn)
_ = await User.insert().values(val='abc').execute(conn)

Чтобы этого добиться, необходимо реализовать обработчики для операций insert, select, update и delete.

Пишем «обёртку»

Напишем для функции select полноценную обёртку, которая даст возможность с минимумом кода использовать все виды join, сортировки и остальные часто используемые операции:

class SelectQuery:
    def __init__(self, model, fields):
        self._model = model
        self._from = None
        _fields = []
        self._relations = {field.class_ if isinstance(field, QueryableAttribute) else field for field in fields}
        if model in self._relations:
            self._relations.remove(model)
        for field in fields:
            if isinstance(field, QueryableAttribute):
                _fields.append(getattr(field.class_.__table__.c, field.key))
            elif isinstance(field, sa.Column):
                _fields.append(field)
            elif hasattr(field, '__table__'):
                for f in field.__table__.c:
                    _fields.append(f)
            else:
                _fields.append(field)

        self._stmt = sa.select([model, *self._relations]).with_only_columns(_fields)

    def join(self, right, on):
        if self._from is None:
            self._from = sa.join(self._model, right, on)
        else:
            self._from = self._from.join(right, on)
        return self

    def outerjoin(self, right, on):
        if self._from is None:
            self._from = sa.outerjoin(self._model, right, on)
        else:
            self._from = self._from.outerjoin(right, on)
        return self

    def where(self, condition):
        self._stmt = self._stmt.where(condition)
        return self

    def group_by(self, column):
        self._stmt = self._stmt.group_by(column)
        return self

    def order_by(self, *fields):
        self._stmt = self._stmt.order_by(*fields)
        return self

    def distinct(self, *fields):
        self._stmt = self._stmt.distinct()
        return self

    def limit(self, limit):
        self._stmt = self._stmt.limit(limit)
        return self

    def offset(self, offset):
        self._stmt = self._stmt.offset(offset)
        return self

    async def get(self, conn):
        if self._from is not None:
            self._stmt = self._stmt.select_from(self._from)

        result = await conn.execute(self._stmt)
        result = await result.first()
        return result

    async def all(self, conn):
        if self._from is not None:
            self._stmt = self._stmt.select_from(self._from)

        result = await conn.execute(self._stmt)
        return await result.fetchall()

    def get_query(self):
        if self._from is not None:
            return self._stmt.select_from(self._from)
        return self._stmt

    @property
    def raw_sql(self):
        stmt = self._stmt
        if self._from is not None:
            stmt = stmt.select_from(self._from)
        return stmt.compile(compile_kwargs={'literal_binds': True})

    def _create_model(self, model_class, data):
        model_fields = {
            getattr(field, 'name'): data[str(field)] for field in model_class.__table__.c if str(field) in data
        }
        model = model_class(**model_fields)
        for relation in model_class.__mapper__.relationships:
            if relation.mapper.class_ in self._relations:
                relation_model = self._create_model(relation.mapper.class_, data)
                setattr(model, relation.key, relation_model)

        return model

Также специфическими операциями обладают запросы на удаление:

class DeleteQuery:
    def __init__(self, model, stmt):
        self._model = model
        self._stmt = stmt

    def where(self, condition):
        self._stmt = self._stmt.where(condition)
        return self

    def returning(self, *cols):
        self._stmt = self._stmt.returning(*cols)
        return self

    async def execute(self, conn):
        return await conn.execute(self._stmt)

    def get_query(self):
        return self._stmt

Для insert и update подойдёт одинаковый набор операций:

class StatementQuery:
    def __init__(self, model, stmt):
        self._model = model
        self._stmt = stmt
        self._values = None

    def values(self, **values):
        self._values = self._filter_values(values)
        return self

    def where(self, condition):
        self._stmt = self._stmt.where(condition)
        return self

    def returning(self, *cols):
        self._stmt = self._stmt.returning(*cols)
        return self

    async def execute(self, conn):
        self._values = self._filter_values(self._values)
        if not self._values:
            return
        self._stmt = self._stmt.values(self._values)
        result = await conn.execute(self._stmt)
        return result

    def get_query(self):
        values = self._filter_values(self._values)
        return self._stmt.values(values)

    def _filter_values(self, values):
        return {
            f.key: values[f.key] for f in self._model.__table__.c if f.key in values
        }
 

Обернём описанный выше код в класс, от которого можно унаследовать модели SQLAlchemy:

class DB:
    @classmethod
    def select(cls, fields=['*']):
        return SelectQuery(cls, fields)

    @classmethod
    def update(cls):
        return StatementQuery(cls, sa.update(cls))

    @classmethod
    def insert(cls):
        return StatementQuery(cls, sa.insert(cls))

    @classmethod
    def delete(cls):
        return DeleteQuery(cls, sa.delete(cls))

    @staticmethod
    def before_save(data):
        pass

    @staticmethod
    def after_save(data):
        pass

    @classmethod
    def model_fields(cls):
        return list(filter(lambda x: not x.startswith('_'), dir(cls)))

Примеры использования:

from sqlalchemy.ext.declarative import declarative_base

Base = declarative_base()

class User(Base, DB):
    __tablename__ = 'user'

    id = Column(Integer, primary_key=True)
    login = Column(String(255))
    active = Column(Boolean, default=True)
    
    @staticmethod
    async def get_by_login(login, conn):
        user = await User.select()             .where(User.login == login)             .get(conn)
        return user
        
    @staticmethod
    async def get_all_active(conn):
        users = await User.select()             .where(User.active.is_(True))             .all(conn)
        return users
      
    @staticmethod
    async def create_user(login, conn):
      _ = await User.insert().values(
        login=login
      ).execute(conn)
      return True
    
    @staticmethod
    async def delete_user(login, conn):
      _ = await User.update().values(
        active=False
      ).execute(conn)
      return True

Выводы

  • Большинство существующих ORM на Python не позволяют работать с БД в асинхронном режиме.

  • При работе с SQLAlchemy Core генерируется понятный SQL с наименьшим оверхедом.

  • Для удобства работы с популярными инструментами можно написать «обёртку», которая повысит читаемость кода.