ORM, или объектно-реляционное отображение — это программная технология, которая позволяет взаимодействовать с базами данных с использованием объектно-ориентированной парадигмы. Вместо того чтобы писать SQL-запросы напрямую для работы с данными в базе данных, можно использовать ORM, чтобы взаимодействовать с данными, как если бы они были объектами в вашем коде.
ORM позволяет абстрагироваться от сырых SQL запросов путем абстракций.
В этой статье мы и рассмотрим создание своей ORM на Python с документацией и публикацией на PyPI. Данный проект очень интересен со стороны реализации: ведь требуется изучить большую часть ООП, принципов и паттернов.
Мы создадим сессии, модели баз данных, различные поля, миграции и другой вспомогательный функционал. Мы разберем изнутри, как работает такая концепция и как достигается удобство работы.
Некоторые из вас могут подумать что мы изобретаем велосипед. А я в ответ скажу — сможете ли вы прямо сейчас, без подсказок, только по памяти, нарисовать велосипед без ошибок?
Репозиторий доступен по ссылке.
Ремарка: sqlite3 выбран из-за простоты, нетрудно заменить обращения к нему на обращения к любой удобной для вас базе данных. По синтаксису я ориентировался на джанго.
Базы данных - очень популярный метод хранения и организации доступа к данным.
Базы данных имеют следующие преимущества перед обычными таблицами или файлами:
Базы данных позволяют обрабатывать, хранить и структурировать намного большие объёмы информации, чем таблицы.
Удалённый доступ и система запросов позволяет множеству людей одновременно использовать базы данных. С электронными таблицами тоже можно работать онлайн всей командой, но системы управления базами данных делают этот процесс организованнее, быстрее и безопаснее.
Объём информации в базах данных может быть огромным и не влиять на скорость работы. А в Google Таблицах уже после нескольких сотен строк или тысяч символов страница будет загружаться очень медленно.
В основном работают с реляционными базами данных (также называют SQL). Записи и связи между ними организованы при помощи таблиц. В таблицах есть поле для внешнего ключа со ссылками на другие таблицы. Благодаря высокой организации и гибкости структуры реляционные БД применяются для многих типов данных.
Базы данных, где информация о реальных вещах представлена в виде объектов под уникальным идентификатором, называется ООБД. К ООБД обращаются на языке объектно-ориентированного программирования (ООП). Состояние объекта описывается атрибутами, а его поведение — набором методов. Объекты с одинаковыми атрибутами и методами образуют классы.
Объект в ООП создаётся как отдельная сущность со своими свойствами и методами работы. И как только объект создан, его можно вызвать по «имени», или коду, а не разрабатывать заново. То есть то что мы и будем создавать сегодня — ORM!
Свою библиотеку-orm я назвал SQLSymphony, так что вам иногда придется сменить название, или импорты в соответствии с вашей структурой.
❯ sqlite
SQLite3 — это простая реляционная база данных, созданная и поддерживаемая всего тремя людьми. Для работы с ней существует стандартная python-библиотека.
Почему SQLite?
SQLite - это компактная и легкая встраиваемая база данных, которая позволяет хранить и управлять данными прямо внутри вашего приложения. Её простота в использовании и широкая поддержка делают её прекрасным выбором для различных проектов, включая веб-приложения, мобильные приложения и многое другое.
Больше о нем можно прочитать здесь.
❯ Инициализация
ORM будет распространяться в виде python-модуля, поэтому создадим в директории проекта файл __init__.py
:
import logging
from typing import Union, List
from rich.traceback import install
from loguru import logger
install(show_locals=True)
class InterceptHandler(logging.Handler):
"""
This class describes an intercept handler.
"""
def emit(self, record) -> None:
"""
Get corresponding Loguru level if it exists
:param record: The record
:type record: record
:returns: None
:rtype: None
"""
try:
level = logger.level(record.levelname).name
except ValueError:
level = record.levelno
frame, depth = logging.currentframe(), 2
while frame.f_code.co_filename == logging.__file__:
frame = frame.f_back
depth += 1
logger.opt(depth=depth, exception=record.exc_info).log(
level, record.getMessage()
)
def setup_logger(level: Union[str, int] = "DEBUG", ignored: List[str] = "") -> None:
"""
Setup logger
:param level: The level
:type level: str
:param ignored: The ignored
:type ignored: List[str]
"""
logging.basicConfig(
handlers=[InterceptHandler()], level=logging.getLevelName(level)
)
for ignore in ignored:
logger.disable(ignore)
logger.add("sqlsymphony_orm.log")
logger.info("Logging is successfully configured")
setup_logger()
Я буду использовать модуль loguru для логгирования и rich для более красивых и информативных исключений.
Для логгирования при помощи loguru можно использовать следующую конструкцию:
from loguru import logger
logger.info('info')
logger.warning('warning')
logger.error('error')
logger.debug('debug')
Типы данных
В sqlite3 существуют следующие типы данных: INTEGER — вещественное число с указанной точностью, TEXT — текст, BLOB — двоичные данные, REAL — число с плавающей запятой(float24), NUMERIC — то же, что и INTEGER, VarChar — текст с ограниченным количеством символов, BOOLEAN — логическое значение.
У типов данных есть параметры: NOT NULL, DEFAULT, UNIQUE.
Давайте реализуем базовые типы данных, создав модуль fields.py:
from abc import ABC, abstractmethod
from dataclasses import dataclass
from typing import Any
class BaseDataType(ABC):
"""
This class describes a base data type.
"""
def __init__(
self,
primary_key: bool = False,
unique: bool = False,
null: bool = True,
default: Any = None,
):
"""
Constructs a new instance.
:param primary_key: The primary key
:type primary_key: bool
:param unique: The unique
:type unique: bool
:param null: The null
:type null: bool
:param default: The default
:type default: Any
"""
self.primary_key: bool = primary_key
self.unique: bool = unique
self.null: bool = null
self.default: Any = default
@abstractmethod
def validate(self, value: Any) -> bool:
"""
Validate value for current datatype
:param value: The value
:type value: Any
:returns: if the value is verified then True, otherwise False
:rtype: bool
"""
raise NotImplementedError()
@abstractmethod
def to_db_value(self, value: Any) -> Any:
"""
convert to db value
:param value: The value
:type value: Any
:returns: db value
:rtype: Any
"""
raise NotImplementedError()
@abstractmethod
def from_db_value(self, value: Any) -> Any:
"""
convert from db value
:param value: The value
:type value: Any
:returns: db value
:rtype: Any
"""
raise NotImplementedError()
@abstractmethod
def to_sql_type(self) -> str:
"""
Returns a sql type representation of the object.
:returns: Sql type representation of the object.
:rtype: str
:raises NotImplementedError: abstract method
"""
raise NotImplementedError()
def __str__(self):
return "<BaseDataType>"
Теперь реализуем на основе этого абстрактного класса другие типы данных:
@dataclass
class IntegerField(BaseDataType):
"""
This class describes an integer field.
"""
def __init__(
self,
max_value: int = None,
min_value: int = None,
primary_key: bool = False,
unique: bool = False,
null: bool = True,
default: int = None,
):
"""
Constructs a new instance.
:param primary_key: The primary key
:type primary_key: bool
:param unique: The unique
:type unique: bool
:param null: The null
:type null: bool
:param default: The default
:type default: int
"""
self.primary_key = primary_key
self.unique: bool = unique
self.null: bool = null
self.default: int = default
self.min_value = min_value
self.max_value = max_value
if self.primary_key:
if self.default:
raise ValueError('The parameter "default" is not used for PrimaryKey')
self.default = 1
self.value = 1
def validate(self, value: Any) -> bool:
"""
Validate value
:param value: The value
:type value: Any
:returns: if the value is verified then True, otherwise False
:rtype: bool
"""
if self.primary_key and value is None:
return True
if value is None and self.null:
return True
if self.min_value is not None and value < self.min_value:
return False
if self.max_value is not None and value > self.max_value:
return False
return True
def to_db_value(self, value: Any) -> int:
"""
Convert to db value
:param value: The value
:type value: Any
:returns: db value
:rtype: int
"""
if self.primary_key and value is None:
return 0
return int(value) if value is not None else self.default
def from_db_value(self, value: Any) -> int:
"""
Convert from db value
:param value: The value
:type value: Any
:returns: db value
:rtype: int
"""
return int(value) if value is not None else None
def to_sql_type(self) -> str:
return "INTEGER"
def __str__(self):
return "<IntegerField>"
@dataclass
class RealField(BaseDataType):
"""
This class describes an real field.
"""
def __init__(
self,
min_value: float = None,
max_value: float = None,
unique: bool = False,
null: bool = True,
default: float = None,
):
"""
Constructs a new instance.
:param primary_key: The primary key
:type primary_key: bool
:param unique: The unique
:type unique: bool
:param null: The null
:type null: bool
:param default: The default
:type default: float
"""
self.primary_key = False
self.unique: bool = unique
self.null: bool = null
self.default: float = default
self.min_value = min_value
self.max_value = max_value
def validate(self, value: Any) -> bool:
"""
Validate value
:param value: The value
:type value: Any
:returns: if the value is verified then True, otherwise False
:rtype: bool
"""
if value is None and self.null:
return True
if not isinstance(value, float):
return False
if self.min_value is not None and value < self.min_value:
return False
if self.max_value is not None and value > self.max_value:
return False
return True
def to_db_value(self, value: Any) -> float:
"""
Convert to db value
:param value: The value
:type value: Any
:returns: db value
:rtype: float
"""
return float(value) if value is not None else self.default
def from_db_value(self, value: Any) -> float:
"""
Convert from db value
:param value: The value
:type value: Any
:returns: db value
:rtype: float
"""
return float(value) if value is not None else None
def to_sql_type(self) -> str:
return "REAL"
def __str__(self):
return "<RealField>"
class CharField(BaseDataType):
"""
This class describes a character field.
"""
def __init__(
self,
max_length: int = 64,
unique: bool = False,
null: bool = True,
default: Any = None,
):
"""
Constructs a new instance.
:param primary_key: The primary key
:type primary_key: bool
:param unique: The unique
:type unique: bool
:param null: The null
:type null: bool
:param default: The default
:type default: Any
"""
self.primary_key: bool = False
self.unique: bool = unique
self.null: bool = null
self.default: Any = default
self.max_length = max_length
def to_sql_type(self) -> str:
return f"VARCHAR({self.max_length})"
def validate(self, value: Any) -> bool:
"""
Validate value
:param value: The value
:type value: Any
:returns: if the value is verified then True, otherwise False
:rtype: bool
"""
if value is None and self.null:
return True
if not isinstance(value, str):
return False
return len(value) <= self.max_length
def to_db_value(self, value: Any) -> str:
"""
Convert value to db
:param value: The value
:type value: Any
:returns: db value
:rtype: str
"""
return str(value) if value is not None else self.default
def from_db_value(self, value: Any) -> str:
"""
Convert value from db
:param value: The value
:type value: Any
:returns: db value
:rtype: str
"""
return str(value) if value is not None else self.default
def __str__(self):
return "<CharField>"
class BooleanField(BaseDataType):
"""
This class describes a boolean field.
"""
def __init__(
self,
unique: bool = False,
null: bool = True,
default: Any = None,
):
"""
Constructs a new instance.
:param primary_key: The primary key
:type primary_key: bool
:param unique: The unique
:type unique: bool
:param null: The null
:type null: bool
:param default: The default
:type default: Any
"""
self.primary_key = False
self.unique: bool = unique
self.null: bool = null
self.default: Any = default
def to_sql_type(self) -> str:
return "BOOLEAN"
def validate(self, value: Any) -> bool:
"""
Validate value
:param value: The value
:type value: Any
:returns: if the value is verified then True, otherwise False
:rtype: bool
"""
if isinstance(value, bool):
return True
else:
return False
def to_db_value(self, value: Any) -> str:
"""
Convert to db value
:param value: The value
:type value: Any
:returns: db value
:rtype: str
"""
return str(value).upper() if value is not None else self.default
def from_db_value(self, value: Any) -> str:
"""
Convert from db value
:param value: The value
:type value: Any
:returns: db value
:rtype: str
"""
return str(value).upper() if value is not None else self.default
def __str__(self):
return "<BooleanField>"
class TextField(BaseDataType):
"""
This class describes a character field.
"""
def __init__(
self,
unique: bool = False,
null: bool = True,
default: Any = None,
):
"""
Constructs a new instance.
:param primary_key: The primary key
:type primary_key: bool
:param unique: The unique
:type unique: bool
:param null: The null
:type null: bool
:param default: The default
:type default: Any
"""
self.primary_key = False
self.unique: bool = unique
self.null: bool = null
self.default: Any = default
def to_sql_type(self) -> str:
return "TEXT"
def validate(self, value: Any) -> bool:
"""
Validate value
:param value: The value
:type value: Any
:returns: if the value is verified then True, otherwise False
:rtype: bool
"""
if value is None and self.null:
return True
return isinstance(value, str)
def to_db_value(self, value: Any) -> str:
"""
Convert to db value
:param value: The value
:type value: Any
:returns: db value
:rtype: str
"""
return str(value) if value is not None else self.default
def from_db_value(self, value: Any) -> str:
"""
Convert from db value
:param value: The value
:type value: Any
:returns: db value
:rtype: str
"""
return str(value) if value is not None else None
def __str__(self):
return "<TextField>"
@dataclass
class BlobField(BaseDataType):
"""
This class describes a blob field.
"""
def __init__(
self,
max_size_in_bytes: int = None,
unique: bool = False,
null: bool = True,
default: Any = None,
):
"""
Constructs a new instance.
:param primary_key: The primary key
:type primary_key: bool
:param unique: The unique
:type unique: bool
:param null: The null
:type null: bool
:param default: The default
:type default: Any
"""
self.primary_key = False
self.unique: bool = unique
self.null: bool = null
self.default: Any = default
self.max_size_in_bytes = max_size_in_bytes
def to_sql_type(self) -> str:
return "BLOB"
def validate(self, value: Any) -> bool:
"""
Validate value
:param value: The value
:type value: Any
:returns: if the value is verified then True, otherwise False
:rtype: bool
"""
if value is None and self.null:
return True
if len(value) > self.max_size_in_bytes:
return False
return isinstance(value, bytes)
def to_db_value(self, value: Any) -> bytes:
"""
Convert to db value
:param value: The value
:type value: Any
:returns: db value
:rtype: bytes
"""
return bytes(value) if value is not None else self.default
def from_db_value(self, value: Any) -> bytes:
"""
Convert from db value
:param value: The value
:type value: Any
:returns: db value
:rtype: bytes
"""
return bytes(value) if value is not None else None
def __str__(self):
return "<BlobField>"
class FieldMeta(type):
"""
This class describes a field meta.
"""
def __new__(cls, name, bases, attrs):
"""
New 'magic' func
:param cls: The cls
:param name: The name
:param bases: The bases
:param attrs: The attributes
:returns: class
"""
fields = {}
primary_key = None
for key, value in attrs.items():
if isinstance(value, BaseDataType):
fields[key] = value
if value.primary_key:
if primary_key:
raise ValueError("Multiple primary keys are not allowed")
primary_key = key
if value.auto_increment:
value.default = 1
attrs["_fields"] = fields
attrs["_primary_key"] = primary_key
return super().__new__(cls, name, bases, attrs)
def __str__(self):
return "<FieldMeta>"
У каждого поля есть следующие параметры: unique (должно ли поле быть уникальным), null (может ли быть NULL) и default (значение по умолчанию). Также некоторые поля имеют дополнительные параметры (например CharField, требуется задать максимальную длину в символах).
❯ Запросы (Query)
В будущем, для фильтрации и получения записей из БД, нам нужны будут запросы. Вместо сырых SQL-запросов мы будем использовать классы для выборки, фильтрации и получения записей.
Класс QueryBuilder как раз и будет отвественнен за это. Строковый вид класса будет возращать созданный SQL запрос:
from abc import ABC, abstractmethod
from rich.console import Console
from rich.table import Table
from loguru import logger
AND = "and"
OR = "or"
class Q:
"""
This class describes a Q.
"""
def __init__(self, exp_type: str = AND, **kwargs):
"""
Constructs a new instance.
:param exp_type: The exponent type
:type exp_type: str
:param kwargs: The keywords arguments
:type kwargs: dictionary
"""
self.separator = exp_type
self._params = kwargs
def __str__(self) -> str:
"""
Returns a string representation of the object.
:returns: String representation of the object.
:rtype: str
"""
kv_pairs = [f'{k} = "{v}"' for k, v in self._params.items()]
return f" {self.separator} ".join(kv_pairs)
def __bool__(self) -> bool:
"""
Returns a boolean representation of the object
:returns: Boolean representation of the object.
:rtype: bool
"""
return bool(self._params)
class BaseExp(ABC):
"""
This abstract class describes a base exponent.
"""
name = None
@abstractmethod
def add(self, *args, **kwargs):
"""
Add params
:param args: The arguments
:type args: list
:param kwargs: The keywords arguments
:type kwargs: dictionary
"""
raise NotImplementedError()
def definition(self) -> str:
"""
Get the definition of query
:returns: sql query
:rtype: str
"""
return self.name + " " + self.line() + " "
@abstractmethod
def line(self):
"""
Get line
"""
raise NotImplementedError()
@abstractmethod
def __bool__(self):
"""
Boolean magic function
"""
raise NotImplementedError()
class Select(BaseExp):
"""
This class describes a select.
"""
name = "SELECT"
def __init__(self):
"""
Constructs a new instance.
"""
self._params = []
def add(self, *args, **kwargs):
"""
Add params
:param args: The arguments
:type args: list
:param kwargs: The keywords arguments
:type kwargs: dictionary
"""
self._params.extend(args)
def line(self) -> str:
"""
Get line
:returns: line
:rtype: str
"""
separator = ","
return separator.join(self._params)
def __bool__(self):
"""
Boolean magic function
:returns: if self._params defined
:rtype: bool
"""
return bool(self._params)
class From(BaseExp):
"""
This class describes a from.
"""
name = "FROM"
def __init__(self):
"""
Constructs a new instance.
"""
self._params = []
def add(self, *args, **kwargs):
"""
Add params
:param args: The arguments
:type args: list
:param kwargs: The keywords arguments
:type kwargs: dictionary
"""
self._params.extend(args)
def line(self) -> str:
"""
Get line
:returns: line
:rtype: str
"""
separator = ","
return separator.join(self._params)
def __bool__(self):
"""
Boolean magic function
:returns: if self._params defined
:rtype: bool
"""
return bool(self._params)
class Where(BaseExp):
"""
This class describes a SQL query `where`.
"""
name = "WHERE"
def __init__(self, exp_type: str = AND, **kwargs):
"""
Constructs a new instance.
:param exp_type: The exponent type
:type exp_type: str
:param kwargs: The keywords arguments
:type kwargs: dictionary
"""
self._q = Q(exp_type, **kwargs)
def add(self, exp_type: str = AND, **kwargs):
"""
Add params to sql query `where`
:param exp_type: The exponent type
:type exp_type: str
:param kwargs: The keywords arguments
:type kwargs: dictionary
:returns: Q class instance
:rtype: Q
"""
self._q = Q(exp_type, **kwargs)
return self._q
def line(self):
"""
Get line
:returns: line
:rtype: str
"""
return str(self._q)
def __bool__(self):
"""
Boolean magic function
:returns: if self._q defined
:rtype: bool
"""
return bool(self._q)
class QueryBuilder:
"""
Front-end to create query objects step by step.
"""
def __init__(self):
"""
Constructs a new instance.
"""
self._data = {"select": Select(), "from": From(), "where": Where()}
def SELECT(self, *args) -> "QueryBuilder":
"""
SQL query `select`
:param args: The arguments
:type args: list
:returns: Query Builder
:rtype: self
"""
self._data["select"].add(*args)
return self
def FROM(self, *args) -> "QueryBuilder":
"""
SQL query `from`
:param args: The arguments
:type args: list
:returns: Query Builder
:rtype: self
"""
self._data["from"].add(*args)
return self
def WHERE(self, exp_type: str = AND, **kwargs) -> "QueryBuilder":
"""
SQL query `where`
:param exp_type: The exponent type
:type exp_type: str
:param kwargs: The keywords arguments
:type kwargs: dictionary
:returns: Query Builder
:rtype: self
"""
self._data["where"].add(exp_type=exp_type, **kwargs)
return self
def _lines(self):
"""
Lines
:returns: Value definition
:rtype: yeild (str)
"""
for key, value in self._data.items():
if value:
yield value.definition()
def __str__(self) -> str:
"""
Returns a string representation of the object.
:returns: String representation of the object.
:rtype: str
"""
return "".join(self._lines())
❯ Кастомные исключение
Для того, чтобы разработчику было более понятней разобраться в ошибках, создадим кастомные исключения:
from loguru import logger
class SQLSymphonyException(Exception):
"""
Exception for signaling sql symphony errors.
"""
def __init__(self, *args):
"""
Constructs a new instance.
:param args: The arguments
:type args: list
"""
if args:
self.message = args[0]
else:
self.message = None
def get_explanation(self) -> str:
"""
Gets the explanation.
:returns: The explanation.
:rtype: str
"""
return f"Basic SQLSymphony ORM exception. Message: {self.message if self.message else 'missing'}"
def __str__(self):
"""
Returns a string representation of the object.
:returns: String representation of the object.
:rtype: str
"""
logger.error(f"{self.__class__.__name__}: {self.get_explanation()}")
return f"SQLSymphonyException has been raised. {self.get_explanation()}"
class FieldNamingError(SQLSymphonyException):
"""
This class describes a field naming error.
"""
def __init__(self, *args):
"""
Constructs a new instance.
:param args: The arguments
:type args: list
"""
if args:
self.message = args[0]
else:
self.message = None
def get_explanation(self) -> str:
"""
Gets the explanation.
:returns: The explanation.
:rtype: str
"""
return f"SQLSymphony Field Naming Error. The field name is prohibited/unavailable to avoid naming errors. Message: {self.message if self.message else 'missing'}"
def __str__(self):
logger.error(f"{self.__class__.__name__}: {self.get_explanation()}")
return f"Field Naming Error has been raised. {self.get_explanation()}"
class NullableFieldError(SQLSymphonyException):
"""
This class describes a nullable field error.
"""
def __init__(self, *args):
"""
Constructs a new instance.
:param args: The arguments
:type args: list
"""
if args:
self.message = args[0]
else:
self.message = None
def get_explanation(self) -> str:
"""
Gets the explanation.
:returns: The explanation.
:rtype: str
"""
return f"SQLSymphony Nullable Field Error. Field is set to NOT NULL, but it is empty. Message: {self.message if self.message else 'missing'}"
def __str__(self):
logger.error(f"{self.__class__.__name__}: {self.get_explanation()}")
return f"Nullable Field Error has been raised. {self.get_explanation()}"
class FieldValidationError(SQLSymphonyException):
"""
This class describes a field validation error.
"""
def __init__(self, *args):
"""
Constructs a new instance.
:param args: The arguments
:type args: list
"""
if args:
self.message = args[0]
else:
self.message = None
def get_explanation(self) -> str:
"""
Gets the explanation.
:returns: The explanation.
:rtype: str
"""
return f"SQLSymphony Field Validation Error. Message: {self.message if self.message else 'missing'}"
def __str__(self):
logger.error(f"{self.__class__.__name__}: {self.get_explanation()}")
return f"Field Validation Error has been raised. {self.get_explanation()}"
class PrimaryKeyError(SQLSymphonyException):
"""
This class describes a primary key error.
"""
def __init__(self, *args):
"""
Constructs a new instance.
:param args: The arguments
:type args: list
"""
if args:
self.message = args[0]
else:
self.message = None
def get_explanation(self) -> str:
"""
Gets the explanation.
:returns: The explanation.
:rtype: str
"""
return f"SQLSymphony Primary Key Error. According to database theory, each table should have only one PrimaryKey field, Message: {self.message if self.message else 'missing'}"
def __str__(self):
logger.error(f"{self.__class__.__name__}: {self.get_explanation()}")
return f"Primary Key Error has been raised. {self.get_explanation()}"
class UniqueConstraintError(SQLSymphonyException):
"""
This class describes an unique constraint error.
"""
def __init__(self, *args):
"""
Constructs a new instance.
:param args: The arguments
:type args: list
"""
if args:
self.message = args[0]
else:
self.message = None
def get_explanation(self) -> str:
"""
Gets the explanation.
:returns: The explanation.
:rtype: str
"""
return f"SQLSymphony Unique Constraint Error. An exception occurred when executing an SQL query due to problems with UNIQUE fields. Message: {self.message if self.message else 'missing'}"
def __str__(self):
logger.error(f"{self.__class__.__name__}: {self.get_explanation()}")
return f"Unique Constraint Error has been raised. {self.get_explanation()}"
class ModelHookError(SQLSymphonyException):
"""
This class describes a model hook error.
"""
def __init__(self, *args):
"""
Constructs a new instance.
:param args: The arguments
:type args: list
"""
if args:
self.message = args[0]
else:
self.message = None
def get_explanation(self) -> str:
"""
Gets the explanation.
:returns: The explanation.
:rtype: str
"""
return f"Model Hooks Error. An exception occurred when executing an hook due to problems with ORM. Message: {self.message if self.message else 'missing'}"
def __str__(self):
logger.error(f"{self.__class__.__name__}: {self.get_explanation()}")
return f"Model Hook error has been raised. {self.get_explanation()}"
class MigrationError(SQLSymphonyException):
"""
This class describes a migration error.
"""
def __init__(self, *args):
"""
Constructs a new instance.
:param args: The arguments
:type args: list
"""
if args:
self.message = args[0]
else:
self.message = None
def get_explanation(self) -> str:
"""
Gets the explanation.
:returns: The explanation.
:rtype: str
"""
return f"Database Migration Error. An exception occurred when executing an hook due to problems with migration. Message: {self.message if self.message else 'missing'}"
def __str__(self):
logger.error(f"{self.__class__.__name__}: {self.get_explanation()}")
return f"Migration Error has been raised. {self.get_explanation()}"
Мы создаем базовый класс, наследуемый от Exception и метод строкового обращения. А потом уже создаем более подробные исключения на основе базового класса.
❯ Пользовательские модели
По традициям, разработчик в нашей ORM должен создавать модели примерно следующим образом:
class User(SessionModel):
__tablename__ = "Users"
id = IntegerField(primary_key=True)
name = TextField(null=False)
cash = RealField(null=False, default=0.0)
def __repr__(self):
return f"<User {self.pk}>"
class User2(SessionModel):
__tablename__ = "Users"
id = IntegerField(primary_key=True)
name = TextField(null=False)
cash = RealField(null=False, default=0.0)
password = TextField(default="password1234")
def __repr__(self):
return f"<User {self.pk}>"
class Comment(SessionModel):
id = IntegerField(primary_key=True)
name = TextField(null=False)
user_id = IntegerField(null=False)
def __repr__(self):
return f"<Comment {self.pk}>"
Для этого нам нужно будет реализовать классы моделей (мета-класс и саму модель).
from pathlib import Path
from typing import List, Any, Union, Callable
from uuid import uuid4
from abc import ABC, abstractmethod
from collections import OrderedDict
from loguru import logger
from sqlsymphony_orm.datatypes.fields import BaseDataType, IntegerField # модуль типов данных
from sqlsymphony_orm.exceptions import (
PrimaryKeyError,
FieldValidationError,
NullableFieldError,
FieldNamingError,
)
from sqlsymphony_orm.queries import QueryBuilder
class MetaSessionModel(type):
"""
This class describes a meta session model.
"""
__tablename__ = None
def __new__(cls, class_object: "SessionModel", parents: tuple, attributes: dict):
"""
Magic method for creating instances and classes
:param cls: The cls
:type cls: cls
:param class_object: The class object
:type class_object: Model
:param parents: The parents
:type parents: tuple
:param attributes: The attributes
:type attributes: dict
:returns: new class
:rtype: model
"""
new_class = super(MetaSessionModel, cls).__new__(
cls, class_object, parents, attributes
)
fields = OrderedDict()
setattr(new_class, "_model_name", attributes["__qualname__"].lower())
if new_class.__tablename__ is None:
setattr(new_class, "table_name", attributes["__qualname__"].lower())
else:
setattr(new_class, "table_name", new_class.__tablename__)
for k, v in attributes.items():
if isinstance(v, BaseDataType):
fields[k] = v
attributes[k] = None
if isinstance(v, IntegerField):
if v.primary_key:
setattr(new_class, "_pk_name", k)
setattr(new_class, "_original_fields", fields)
return new_class
class SessionModel(metaclass=MetaSessionModel):
"""
This class describes a ORM model.
"""
__tablename__ = None
_ids = 0
def __init__(self, **kwargs):
"""
Constructs a new instance.
:param kwargs: The keywords arguments
:type kwargs: dictionary
"""
self.fields = {}
self.hooks = {}
self.unique_id = str(uuid4())
for field_name, field in self._original_fields.items():
value = kwargs.get(field_name, None)
if not kwargs.get("manager", False):
if not field.null and value is None and field.default is None:
raise NullableFieldError(
f"Field {field_name} is set to NOT NULL, but it is empty"
)
if value is not None and field.validate(value):
setattr(self, field_name, field.to_db_value(value))
self.fields[field_name] = getattr(self, field_name)
else:
if value is not None and not field.validate(value):
raise FieldValidationError(
f'Validate error: field {field}; field name "{field_name}"; value "{value}"'
)
if isinstance(field, IntegerField):
if field.primary_key:
self.__class__._ids += 1
setattr(
self,
"_primary_key",
{
"field": field,
"field_name": field_name,
"value": self.__class__._ids,
},
)
setattr(self, field_name, field.default)
self.fields[field_name] = getattr(self, field_name)
if not getattr(self, "_primary_key"):
raise PrimaryKeyError()
self._last_action = {}
def add_hook(self, before_action: str, func: Callable, func_args: tuple = ()):
"""
Adds a hook.
:param before_action: The before action
:type before_action: str
:param func: The function
:type func: Callable
:param func_args: The function arguments
:type func_args: tuple
:raises ValueError: unknown before action
"""
actions = ["save", "delete", "update"]
if before_action.lower() not in actions:
raise ValueError(
f"Unknown action: {before_action}. Supported actions: {actions}"
)
logger.info(
f"[{self.table_name}] Add Model Hook: before {before_action} execute {func.__name__}"
)
self.hooks[before_action.lower()] = {"function": func, "args": func_args}
@property
def pk(self) -> Any:
"""
Get primary key value
:returns: primary key value
:rtype: primary key
"""
return self._primary_key["value"]
@classmethod
def _class_get_formatted_sql_fields(cls, skip_primary_key: bool = False) -> dict:
"""
Gets the formatted sql fields.
:returns: The formatted sql fields.
:rtype: dict
"""
model_fields = {}
for field_name, field in cls._original_fields.items():
if field.primary_key and skip_primary_key:
continue
model_fields[field_name] = field.to_sql_type()
if field.primary_key:
model_fields[field_name] = f"{field.to_sql_type()} PRIMARY KEY"
else:
if not field.null:
try:
model_fields[field_name] += " NOT NULL"
except KeyError:
model_fields[field_name] = f"{field.to_sql_type()} NOT NULL"
if field.unique:
try:
model_fields[field_name] += " UNIQUE"
except KeyError:
model_fields[field_name] = f"{field.to_sql_type()} UNIQUE"
if field.default is not None:
try:
model_fields[field_name] += f" DEFAULT {field.default}"
except KeyError:
model_fields[field_name] = (
f"{field.to_sql_type()} DEFAULT {field.default}"
)
return model_fields
def get_formatted_sql_fields(self, skip_primary_key: bool = False) -> dict:
"""
Gets the formatted sql fields.
:returns: The formatted sql fields.
:rtype: dict
"""
model_fields = {}
for field_name, field in self._original_fields.items():
if field.primary_key and skip_primary_key:
continue
model_fields[field_name] = field.to_sql_type()
if field.primary_key:
model_fields[field_name] = f"{field.to_sql_type()} PRIMARY KEY"
else:
if not field.null:
try:
model_fields[field_name] += " NOT NULL"
except KeyError:
model_fields[field_name] = f"{field.to_sql_type()} NOT NULL"
if field.unique:
try:
model_fields[field_name] += " UNIQUE"
except KeyError:
model_fields[field_name] = f"{field.to_sql_type()} UNIQUE"
if field.default is not None:
try:
model_fields[field_name] += f" DEFAULT {field.default}"
except KeyError:
model_fields[field_name] = (
f"{field.to_sql_type()} DEFAULT {field.default}"
)
return model_fields
В метаклассе мы читаем модель, добавляем поля, задаем базовые настройки. В классе модели мы все проверяем и задаем Primary Key.
Также есть classmethod-функция и обычная функция для получения форматированных полей для SQL.
У нас есть свойство для получения Primary Key, а также функция для добавления хуков. Хуки в контексте нашей ORM — это функции, выполняемые до определенной операции модели и базы данных.
❯ Подключение к БД
Давайте теперь возьмемся за основу работы с базой данных — класс подключения и класс менеджера.
Напишем код для подключения к бд:
import sqlite3
from abc import ABC, abstractmethod
from typing import Tuple
from rich import print
from loguru import logger
class DBConnector(ABC):
"""
This class describes a db connector.
"""
def __new__(cls, *args, **kwargs):
"""
New class
:param cls: The cls
:type cls: list
:param args: The arguments
:type args: list
:param kwargs: The keywords arguments
:type kwargs: dictionary
:returns: cls instance
:rtype: self
"""
if not hasattr(cls, "instance"):
cls.instance = super(DBConnector, cls).__new__(cls, *args, **kwargs)
return cls.instance
@abstractmethod
def connect(self, database_name: str):
"""
Connect to database
:param database_name: The database name
:type database_name: str
:raises NotImplementedError: Abstract method
"""
raise NotImplementedError()
@abstractmethod
def commit(self):
"""
Commit changes to database
:raises NotImplementedError: Abstract method
"""
raise NotImplementedError()
@abstractmethod
def fetch(self, query: str):
"""
Fetches the given query.
:param query: The query
:type query: str
:raises NotImplementedError: Abstract method
"""
raise NotImplementedError()
class SQLiteDBConnector(DBConnector):
"""
This class describes a sqlite db connector.
"""
def __new__(cls, *args, **kwargs):
"""
New class
:param cls: The cls
:type cls: list
:param args: The arguments
:type args: list
:param kwargs: The keywords arguments
:type kwargs: dictionary
:returns: cls instance
:rtype: self
"""
if not hasattr(cls, "instance"):
cls.instance = super(SQLiteDBConnector, cls).__new__(cls, *args, **kwargs)
return cls.instance
def close_connection(self):
"""
Closes a connection.
"""
self._connection.close()
print("[bold]Connection has been closed[/bold]")
logger.info("Close Database Connection")
def connect(self, database_name: str = "database.db"):
"""
Connect to database
:param database_name: The database name
:type database_name: str
"""
pragmas = ["PRAGMA foreign_keys = 1"]
self._connection = sqlite3.connect(database_name)
self.database_name = database_name
logger.info(f"[{database_name}] Connect database...")
for pragma in pragmas:
self._connection.execute(pragma)
logger.debug(f"Set pragma: {pragma}")
def commit(self):
"""
Commit changes to database
"""
logger.info("Commit changes to database")
self._connection.commit()
def fetch(self, query: str, values: Tuple = (), get_cursor: bool = False) -> list:
"""
Fetch SQL query
:param query: The query
:type query: str
:param values: The values
:type values: Tuple
:returns: list with fetched results
:rtype: list
"""
cursor = self._connection.cursor()
self.commit()
logger.debug(f"Fetch query: {query} {values}")
try:
cursor.execute(query, values)
except Exception as ex:
logger.error(f"An exception occurred while executing the request: {ex}")
raise ex
return [cursor, cursor.fetchall()] if get_cursor else cursor.fetchall()
Все просто — создаем абстрактный класс и класс SQLiteDBConnector, наследуемый от него. Если вам потребуется добавить поддержку других СУБД, такая структура сделает это более удобней.
Мы имеем метод __new__
для создания новых инстансов, метод подключения, отключения, коммита и выполнения запроса.
Перейдем теперь к более сложному — к менеджеру:
from abc import ABC, abstractmethod
from typing import Any
from loguru import logger
from sqlsymphony_orm.queries import QueryBuilder
from sqlsymphony_orm.database.connection import DBConnector, SQLiteDBConnector
class MultiManager(ABC):
"""
This class describes a multi manager.
"""
@abstractmethod
def reconnect(self):
"""
reconnect to db
:raises NotImplementedError: abstract method
"""
raise NotImplementedError()
@abstractmethod
def drop_table(self, table_name: str):
"""
Drop sql table
:param table_name: The table name
:type table_name: str
:raises NotImplementedError: abstract method
"""
raise NotImplementedError()
@abstractmethod
def close_connection(self):
"""
Closes a connection.
:raises NotImplementedError: abstract method
"""
raise NotImplementedError()
@abstractmethod
def insert(
self,
table_name: str,
formatted_fields: dict,
pk: int,
model_class: "Model",
ignore: bool = False,
):
"""
insert new model to database
:param table_name: The table name
:type table_name: str
:param formatted_fields: The formatted fields
:type formatted_fields: dict
:param pk: primary key value
:type pk: int
:param model_class: The model class
:type model_class: Model
:param ignore: The ignore
:type ignore: bool
:raises NotImplementedError: { exception_description }
"""
raise NotImplementedError()
@abstractmethod
def update(self, table_name: str, key: str, orig_field: str, new_value: str):
"""
update model
:param table_name: The table name
:type table_name: str
:param key: The key
:type key: str
:param orig_field: The original field
:type orig_field: str
:param new_value: The new value
:type new_value: str
:raises NotImplementedError: abstract method
"""
raise NotImplementedError()
@abstractmethod
def filter(self, query: QueryBuilder):
"""
filter and get model by query
:param query: The query
:type query: QueryBuilder
:raises NotImplementedError: abstract method
"""
raise NotImplementedError()
@abstractmethod
def commit(self):
"""
Commit changes
:raises NotImplementedError: abstract method
"""
raise NotImplementedError()
@abstractmethod
def create_table(self, table_name: str, fields: dict):
"""
Creates a table.
:param table_name: The table name
:type table_name: str
:param fields: The fields
:type fields: dict
:raises NotImplementedError: abstract method
"""
raise NotImplementedError()
@abstractmethod
def delete(self, table_name: str, field_name: str, field_value: Any):
"""
delete model
:param table_name: The table name
:type table_name: str
:param field_name: The field name
:type field_name: str
:param field_value: The field value
:type field_value: Any
:raises NotImplementedError: abstract method
"""
raise NotImplementedError()
class SQLiteMultiManager(MultiManager):
"""
This class describes a sqlite multi manager.
"""
def __init__(self, database_name: str):
"""
Constructs a new instance.
:param database_name: The database name
:type database_name: str
"""
self._connector = SQLiteDBConnector()
self.database_name = database_name
self._connector.connect(self.database_name)
def execute(self, raw_sql_query: str, values: tuple = (), get_cursor: bool = False):
return self._connector.fetch(raw_sql_query, values, get_cursor)
def reconnect(self, database_file: str = None):
"""
reconnect to database
"""
if database_file is not None:
self.database_name = database_file
self._connector.connect(self.database_name)
def drop_table(self, table_name: str):
"""
drop table
:param table_name: The table name
:type table_name: str
"""
query = f"DROP TABLE IF EXISTS {table_name}"
logger.warning(f"Drop table: {table_name}")
self._connector.fetch(query)
self._connector.commit()
def close_connection(self):
"""
Closes a connection.
"""
self._connector.close_connection()
def insert(
self,
table_name: str,
formatted_fields: dict,
pk: int,
model_class: "Model",
ignore: bool = False,
):
"""
Insert a fields to database
:param table_name: The table name
:type table_name: str
:param columns: The columns
:type columns: str
:param count: The count
:type count: str
:param values: The values
:type values: tuple
"""
fields = []
values = []
for k, v in formatted_fields.items():
fields.append(k)
if "PRIMARY KEY" in v:
values.append(pk)
else:
values.append(getattr(model_class, k))
columns = ", ".join(fields)
count = ", ".join(["?" for _ in values])
query = "INSERT "
if ignore:
query += "OR IGNORE "
query += f"INTO {table_name} ({columns}) VALUES ({count})"
logger.info(
f'[{table_name}] Insert {"(or ignore)" if ignore else ""} new model into database'
)
self._connector.fetch(query, values)
def update(self, table_name: str, key: str, orig_field: str, new_value: str):
"""
Update fields in database table
:param table_name: The table name
:type table_name: str
:param key: The key
:type key: str
:param orig_field: The original field
:type orig_field: str
:param new_value: The new value
:type new_value: str
"""
query = f"UPDATE {table_name} SET {key} = ? WHERE {key} = ?"
logger.info(f"[{table_name}] Update model: {key}={new_value}")
self._connector.fetch(query, (new_value, orig_field))
def filter(self, query: str) -> list:
"""
filter and get model by query
:param query: The query
:type query: str
:returns: models
:rtype: list
"""
result = self._connector.fetch(query)
return result
def commit(self):
"""
Commits changes.
"""
self._connector.commit()
def create_table(self, table_name: str, fields: dict):
"""
Creates a table.
:param table_name: The table name
:type table_name: str
:param fields: The fields
:type fields: dict
"""
columns = [f"{k} {v}" for k, v in fields.items()]
query = f"CREATE TABLE IF NOT EXISTS {table_name} ("
for column in columns:
query += f"{column},"
query = query[:-1]
query += ")"
logger.info(f"Create new table: {table_name}")
self._connector.fetch(query)
self._connector.commit()
def delete(self, table_name: str, field_name: str, field_value: Any):
"""
Delete model from database
:param table_name: The table name
:type table_name: str
:param field_name: The field name
:type field_name: str
:param field_value: The field value
:type field_value: Any
"""
query = f"DELETE FROM {table_name} WHERE {field_name} = ?"
logger.info(f"[{table_name}] Delete model ({field_name}={field_value})")
self._connector.fetch(query, (field_value,))
Это также относительно просто: при инициализации создаем инстанс класса подключения, подключаемся к БД, и создаем базовые методы: удаление, создание таблицы, коммит, фильтр, обновление, добавления, и прочие вспомогательные методы для работы с подключением и таблицами.
❯ Сессии
Но как мы будем добавлять модели в базу данных и работать с ними? Все просто — я решил что правильным способом будем создание класса сессии.
Давайте реализуем это:
from pathlib import Path
from typing import List, Any, Union, Callable
from uuid import uuid4
from abc import ABC, abstractmethod
from collections import OrderedDict
from loguru import logger
from sqlsymphony_orm.database.manager import SQLiteMultiManager # менеджер
from sqlsymphony_orm.datatypes.fields import BaseDataType, IntegerField # типы данныъ
from sqlsymphony_orm.exceptions import ( # исключения
PrimaryKeyError,
FieldValidationError,
NullableFieldError,
FieldNamingError,
)lsymphony_orm.queries import QueryBuilder # билдер запросов
class Session(ABC):
"""
This class describes a session.
"""
@abstractmethod
def reconnect(self):
"""
reconnect to database
"""
raise NotImplementedError
@abstractmethod
def get_all(self):
"""
Gets all models
"""
raise NotImplementedError
@abstractmethod
def get_all_by_module(self, needed_model: SessionModel):
"""
Gets all models by module.
:param needed_model: The needed model
:type needed_model: SessionModel
"""
raise NotImplementedError
@abstractmethod
def drop_table(self, table_name: str):
"""
drop table
:param table_name: The table name
:type table_name: str
"""
raise NotImplementedError
@abstractmethod
def filter(self, query: "QueryBuilder", first: bool = False):
"""
Filter and get models by query
:param query: The query
:type query: QueryBuilder
:param first: The first
:type first: bool
"""
raise NotImplementedError
@abstractmethod
def update(self, model: SessionModel, **kwargs):
"""
Update model
:param model: The model
:type model: SessionModel
:param kwargs: The keywords arguments
:type kwargs: dictionary
"""
raise NotImplementedError
@abstractmethod
def add(self, model: SessionModel, ignore: bool = False):
"""
Add model
:param model: The model
:type model: SessionModel
:param ignore: The ignore
:type ignore: bool
"""
raise NotImplementedError
@abstractmethod
def delete(self, model: SessionModel):
"""
Deletes the given model.
:param model: The model
:type model: SessionModel
"""
raise NotImplementedError
@abstractmethod
def commit(self):
"""
Commit changes
"""
raise NotImplementedError
@abstractmethod
def close(self):
"""
Close connection
"""
raise NotImplementedError
class SQLiteSession(Session):
"""
This class describes a sqlite session.
"""
def __init__(self, database_file: str):
"""
Constructs a new instance.
:param database_file: The database file
:type database_file: str
"""
self.database_file = Path(database_file)
self.models = {}
self.manager = SQLiteMultiManager(self.database_file)
def reconnect(self, database_file: str = None):
"""
Reconnecto to database
"""
if database_file is not None:
self.database_file = Path(database_file)
logger.info(f"Session {self.database_file}: reconnect")
self.manager.reconnect(database_file)
def execute(
self, raw_sql_query: str, values: tuple = (), get_cursor: bool = False
) -> list:
"""
Execute raw sql query
:param raw_sql_query: The raw sql query
:type raw_sql_query: str
:param values: The values
:type values: tuple
:param get_cursor: The get cursor
:type get_cursor: bool
:returns: list with output data
:rtype: list
"""
return self.manager.execute(raw_sql_query, values, get_cursor)
def get_all(self) -> List[SessionModel]:
"""
Gets all.
:returns: All.
:rtype: List[SessionModel]
"""
models_instances = [self.models[model]["model"] for model in self.models.keys()]
return models_instances
def get_all_by_module(self, needed_model: SessionModel) -> List[SessionModel]:
"""
Gets all by module.
:param needed_model: The needed model
:type needed_model: SessionModel
:returns: All by module.
:rtype: List[SessionModel]
"""
all_instances = [self.models[model]["model"] for model in self.models.keys()]
needed_instances = []
model_name = needed_model._model_name
for model in all_instances:
if model._model_name == model_name:
needed_instances.append(model)
return needed_instances
def drop_table(self, table_name: str):
"""
Drop table
:param table_name: The table name
:type table_name: str
"""
logger.info(f"Session {self.database_file}: drop table {table_name}")
self.manager.drop_table(table_name)
def filter(
self, query: "QueryBuilder", first: bool = False
) -> Union[List[SessionModel], SessionModel]:
"""
Filter and get model by query
:param query: The query
:type query: QueryBuilder
:param first: The first
:type first: bool
:returns: list with SessionModel or SessionModel
:rtype: Union[List[SessionModel], SessionModel]
"""
db_results = self.manager.filter(str(query))
results = []
fields = {}
for unique_id, curr_model in self.models.items():
model = curr_model["model"]
fields[unique_id] = {
"keys": model._original_fields.keys(),
"values": [
getattr(model, value)
if model._primary_key["field_name"] != value
else model.pk
for value in model._original_fields.keys()
],
}
for result in db_results:
for unique_id, data in fields.items():
if len(data["keys"]) == len(result):
if tuple(data["values"]) == result:
results.append(self.models[unique_id]["model"])
if results:
return results[0] if first else results
else:
return None
def update(self, model: SessionModel, **kwargs):
"""
Update model
:param model: The model
:type model: SessionModel
:param kwargs: The keywords arguments
:type kwargs: dictionary
"""
current_model = self.models.get(model.unique_id, None)
if current_model is None:
self.add(model)
logger.info(f"Session {self.database_file}: update model {model.unique_id}")
if model.hooks:
func = model.hooks["update"]["function"]
logger.debug(f"Exec Model Hook[update]: {func.__name__}")
func(*model.hooks["update"]["args"])
for key, value in kwargs.items():
if hasattr(model, key):
if value is not None and model._original_fields[key].validate(value):
orig_field = getattr(model, key)
setattr(model, key, model._original_fields[key].to_db_value(value))
logger.info(
f"[{model.table_name}] Update {model._model_name}#{model.pk} {key}: {orig_field} -> {value}"
)
self.models[model.unique_id]["model"] = model
def add(self, model: SessionModel, ignore: bool = False):
"""
Add new model
:param model: The model
:type model: SessionModel
:param ignore: The ignore
:type ignore: bool
"""
if self.models.get(model.unique_id, None) is not None:
logger.warning(f"Model {model.unique_id} already added")
return
if model.hooks:
func = model.hooks["save"]["function"]
logger.debug(f"Exec Model Hook[save]: {func.__name__}")
func(*model.hooks["save"]["args"])
self.models[model.unique_id] = {"model": model}
formatted_fields = model.get_formatted_sql_fields(skip_primary_key=True)
self.manager.create_table(model.table_name, model.get_formatted_sql_fields())
self.manager.insert(model.table_name, formatted_fields, model.pk, model, ignore)
last_pk = self.execute(
f'SELECT max({model._primary_key["field_name"]}) FROM {model.table_name}'
)
model._primary_key["value"] = int(last_pk[0][0])
logger.info(
f"Session {self.database_file}: insert new model: {model.unique_id}"
)
def delete(self, model: SessionModel):
"""
Deletes the given model.
:param model: The model
:type model: SessionModel
"""
current_model = self.models.get(model.unique_id, None)
if current_model is None:
logger.error(f"Model {model.unique_id} does not exists")
return
if model.hooks:
func = model.hooks["delete"]["function"]
logger.debug(f"Exec Model Hook[delete]: {func.__name__}")
func(*model.hooks["delete"]["args"])
self.manager.delete(
current_model["model"].table_name,
current_model["model"]._primary_key["field_name"],
current_model["model"].pk,
)
logger.info(f"Session {self.database_file}: delete model: {model.unique_id}")
def commit(self):
"""
Commit changes
"""
self.manager.commit()
def close(self):
"""
Close connection
"""
self.manager.close_connection()
Такая же структура, как и в остальном коде — создаем абстрактный класс сессии и позже создаем класс-наследник нужной СУБД — в данном случае sqlite.
Класс сессии имеет методы для добавления модели, фильтра, получения всех моделей, получения моделей одного типа, удаление моделей, обновления и другие вспомогательные методы.
Через сессию можно работать так:
from sqlsymphony_orm.datatypes.fields import IntegerField, RealField, TextField
from sqlsymphony_orm.models.session_models import SessionModel
from sqlsymphony_orm.models.session_models import SQLiteSession
from sqlsymphony_orm.queries import QueryBuilder
session = SQLiteSession("example.db")
class User(SessionModel):
__tablename__ = "Users"
id = IntegerField(primary_key=True)
name = TextField(null=False)
cash = RealField(null=False, default=0.0)
def __repr__(self):
return f"<User {self.pk}>"
class User2(SessionModel):
__tablename__ = "Users"
id = IntegerField(primary_key=True)
name = TextField(null=False)
cash = RealField(null=False, default=0.0)
password = TextField(default="password1234")
def __repr__(self):
return f"<User {self.pk}>"
class Comment(SessionModel):
id = IntegerField(primary_key=True)
name = TextField(null=False)
user_id = IntegerField(null=False)
user = User(name="John")
user2 = User(name="Bob")
user3 = User(name="Ellie")
session.add(user)
session.commit()
session.add(user2)
session.commit()
session.add(user3)
session.commit()
session.delete(user3)
session.commit()
session.update(model=user2, name="Anna")
session.commit()
comment = Comment(name=user.name, user_id=user.pk)
session.add(comment)
session.commit()
print(
session.filter(QueryBuilder().SELECT("*").FROM(User.table_name).WHERE(name="Anna"))
)
print(session.get_all())
print(session.get_all_by_module(User))
print(user.pk)
session.close()
❯ Вспомогательные модули
Давайте реализуем небольшой модуль хеширования. Это может потребоваться, когда в БД хранятся данные, которые должны быть засекречены.
import hashlib
from abc import ABC, abstractmethod
from enum import Enum, auto
from hmac import compare_digest
from typing import Union
class HashAlgorithm(Enum):
"""
This class describes a hash algorithms.
"""
SHA256 = auto()
SHA512 = auto()
MD5 = auto()
BLAKE2B = auto()
BLAKE2S = auto()
class HashingBase(ABC):
"""
This class describes a hashing base.
"""
@abstractmethod
def hash(
self, data: Union[bytes, str], hexdigest: bool = False
) -> Union[bytes, str]:
"""
Hash
:param data: The data
:type data: Union[bytes, str]
:param hexdigest: The hexdigest
:type hexdigest: bool
:returns: hashing
:rtype: Union[bytes, str]
:raises NotImplementedError: abstract method
"""
raise NotImplementedError()
@abstractmethod
def verify(self, data: Union[bytes, str], hashed_data: Union[bytes, str]) -> bool:
"""
Verify data and hashed data
:param data: The data
:type data: Union[bytes, str]
:param hashed_data: The hashed data
:type hashed_data: Union[bytes, str]
:returns: true if data=hashed_data
:rtype: bool
:raises NotImplementedError: { exception_description }
"""
raise NotImplementedError()
class PlainHasher(HashingBase):
"""
This class describes a plain hasher.
"""
def __init__(self, algorithm: HashAlgorithm = HashAlgorithm.SHA256):
"""
Constructs a new instance.
:param algorithm: The algorithm
:type algorithm: HashAlgorithm
"""
self.algorithm = algorithm
def hash(self, data: Union[bytes, str]) -> bytes:
"""
Generate hash
:param data: The data
:type data: Union[bytes, str]
:returns: hash
:rtype: bytes
"""
if isinstance(data, str):
data = data.encode("utf-8")
hasher = self.get_hasher()
return hasher(data).digest()
def verify(self, data: Union[bytes, str], hashed_data: Union[bytes, str]) -> bool:
"""
Verify data and hashed data
:param data: The data
:type data: Union[bytes, str]
:param hashed_data: The hashed data
:type hashed_data: Union[bytes, str]
:returns: true if data==hashed_data
:rtype: bool
"""
if isinstance(data, str):
data = data.encode("utf-8")
if isinstance(hashed_data, str):
hashed_data = hashed_data.encode()
expected_hash = self.hash(data)
return compare_digest(expected_hash, hashed_data)
def get_hasher(self) -> callable:
"""
Gets the hasher function.
:returns: The hasher.
:rtype: callable
:raises ValueError: unknown hash function.
"""
hash_functions = {
HashAlgorithm.SHA256: hashlib.sha256,
HashAlgorithm.SHA512: hashlib.sha512,
HashAlgorithm.MD5: hashlib.md5,
HashAlgorithm.BLAKE2B: hashlib.blake2b,
HashAlgorithm.BLAKE2S: hashlib.blake2s,
}
hash_function = hash_functions.get(self.algorithm, None)
if hash_function is None:
raise ValueError(f"Unknown hash function type: {self.algorithm}")
else:
return hash_function
class SaltedHasher(HashingBase):
"""
This class describes a salted hasher.
"""
def __init__(
self, algorithm: HashAlgorithm = HashAlgorithm.SHA256, salt: str = "SOMESALT"
):
"""
Constructs a new instance.
:param algorithm: The algorithm
:type algorithm: HashAlgorithm
:param salt: The salt
:type salt: str
"""
self.algorithm = algorithm
self.salt = salt
def hash(self, data: Union[bytes, str]) -> bytes:
"""
Generate hash
:param data: The data
:type data: Union[bytes, str]
:returns: hash
:rtype: bytes
"""
salt = self.salt.encode("utf-8")
if isinstance(data, str):
data = data.encode("utf-8")
hasher = self.get_hasher()
value = f"{data}{salt}".encode("utf-8")
return hasher(value).digest()
def verify(self, data: str, hashed_data: Union[bytes, str]) -> bool:
"""
Verify data and hashed_data
:param data: The data
:type data: str
:param hashed_data: The hashed data
:type hashed_data: Union[bytes, str]
:returns: true if data==hashed_data
:rtype: bool
"""
if isinstance(hashed_data, str):
print("convert")
expected_hash = self.hash(data)
return compare_digest(expected_hash, hashed_data)
def get_hasher(self) -> callable:
"""
Gets the hasher function.
:returns: The hasher.
:rtype: callable
:raises ValueError: unknown hasher function
"""
hash_functions = {
HashAlgorithm.SHA256: hashlib.sha256,
HashAlgorithm.SHA512: hashlib.sha512,
HashAlgorithm.MD5: hashlib.md5,
HashAlgorithm.BLAKE2B: hashlib.blake2b,
HashAlgorithm.BLAKE2S: hashlib.blake2s,
}
hash_function = hash_functions.get(self.algorithm, None)
if hash_function is None:
raise ValueError(f"Unknown hash function type: {self.algorithm}")
else:
return hash_function
Здесь есть также абстрактный класс хешера и два его наследника — обычный (plain) хешер и засоленный (salted) hasher. Salted hasher генерирует хеш с солью, то есть к значению для хеширования мы добавляем соль. Это позволит избежать нахождения хеш-коллизий.
❯ Миграции
Миграции в контексте нашей ORM — это действия для обновления старой модели на новую с апдейтом базы данных. То есть: если старая модель имела два поля: id и name, то мы можем создать новую модель с тремя полями (id, name и age), и во время миграции в базу данных добавится новое поле, без потери старых изменений. Но есть несколько ограничений:
Возможны проблемы с UNIQUE-полями.
Если поле с параметром NOT NULL, она должна иметь параметр DEFAULT, иначе будет ошибка.
Конечно, у нас миграции простые, но вы можете сделать лучше.
Также я сделал возможность бекапить БД, которые были до миграции. Во время инициализации менеджера миграций, создается директория migrations, туда помещается бекап текущей базы данных, и после этого уже идет работа менеджера. А сами миграции для восстановления хранятся в json-файле. Благодаря этому, если миграция прошла неудачно, можно ее отменить и все вернуть на место. Главное — чтобы существовал бекап.
Напишем код:
from typing import Optional, Union
from abc import ABC, abstractmethod
import os
import json
import shutil
from pathlib import Path
from datetime import datetime
from sqlsymphony_orm.models.session_models import SQLiteSession # сессия
from sqlsymphony_orm.exceptions import MigrationError # исключение
from loguru import logger
class MigrationManager(ABC):
"""
This class describes a migration manager.
"""
@abstractmethod
def get_current_table_columns(self, table_name: str):
"""
Gets the current table columns.
:param table_name: The table name
:type table_name: str
:raises NotImplementedError: abstract method
"""
raise NotImplementedError()
@abstractmethod
def get_table_columns_from_model(self, model: "Model"):
"""
Gets the table columns from model.
:param model: The model
:type model: Model
:raises NotImplementedError: abstract method
"""
raise NotImplementedError()
@abstractmethod
def revert_migration(self, index_key: int = -1):
"""
Revert migration
:param index_key: The index key
:type index_key: int
:raises NotImplementedError: abstract method
"""
raise NotImplementedError()
class SQLiteMigrationManager(MigrationManager):
"""
This class describes a sqlite migration manager.
"""
def __init__(self, session: SQLiteSession, migrations_dir: str = "migrations"):
"""
Constructs a new instance.
:param session: The session
:type session: SQLiteSession
:param migrations_dir: The migrations dir
:type migrations_dir: str
"""
self.session = session
self.migrations_dir = migrations_dir
os.makedirs(self.migrations_dir, exist_ok=True)
self.migrations = {}
self.migrations_file = "sqlsymphony_migrates.json"
def get_current_table_columns(self, table_name: str) -> list:
"""
Gets the current table columns.
:param table_name: The table name
:type table_name: str
:returns: The current table columns.
:rtype: list
"""
data = self.session.execute(f"SELECT * FROM {table_name}", get_cursor=True)
cursor = data[0]
fieldnames = [field[0] for field in cursor.description]
return fieldnames
def get_table_columns_from_model(self, model: "Model") -> list:
"""
Gets the table columns from model.
:param model: The model
:type model: Model
:returns: The table columns from model.
:rtype: list
"""
return [key for key in model._original_fields.keys()]
def upload_migrations_file(self):
logger.debug(f"Load JSON migrations history file: {self.migrations_file}")
with open(self.migrations_file, "r") as read_file:
self.migrations = json.load(read_file)
def update_migrations_file(self):
logger.debug(f"Update JSON migrations history file: {self.migrations_file}")
with open(self.migrations_file, "w") as write_file:
json.dump(self.migrations, write_file, indent=4)
def migrate_from_model(
self,
old_model: Union["SessionModel", "Model"],
new_model: Union["SessionModel", "Model"],
original_table_name: str,
new_table_name: Optional[str] = None,
):
"""
Migrate from old model to new model
:param old_model: The old model
:type old_model: Union["SessionModel", "Model"]
:param new_model: The new model
:type new_model: Union["SessionModel", "Model"]
:param original_table_name: The original table name
:type original_table_name: str
:param new_table_name: The new table name
:type new_table_name: Optional[str]
:raises MigrationError: fields error
"""
sql_queries = []
logger.info("Start database migrating")
if new_table_name is not None:
sql_queries.append(
f"ALTER TABLE {original_table_name} RENAME TO {new_table_name};"
)
logger.debug(
f"[Migration] Change table name: {original_table_name} -> {new_table_name}"
)
new_model.table_name = new_table_name
original_table_name = new_table_name
old_fields = set(
[
f"{field_name} {field_params}"
for field_name, field_params in old_model._class_get_formatted_sql_fields(
skip_primary_key=False
).items()
]
)
new_fields = set(
[
f"{field_name} {field_params}"
for field_name, field_params in new_model._class_get_formatted_sql_fields(
skip_primary_key=False
).items()
]
)
added = new_fields - old_fields
dropped = old_fields - new_fields
for field_name in dropped:
logger.debug(
f'[Migration] Drop column {field_name.split(" ")[0]} from table {original_table_name}'
)
sql_queries.append(
f"ALTER TABLE {original_table_name} DROP COLUMN {field_name.split(" ")[0]};"
)
for field in added:
if "NOT NULL" in field and "DEFAULT" not in field:
raise MigrationError(
f'Cannot script a "not null" field without default value in field "{field}"'
)
logger.debug(f"[Migration] Add column {field} to {original_table_name}")
sql_queries.append(f"ALTER TABLE {original_table_name} ADD COLUMN {field};")
migrationfile = os.path.join(
self.migrations_dir,
f'{datetime.now().strftime("backup_%Y%m%d%H%M%S")}_{self.session.database_file}',
)
logger.debug(f"Create migraton file: {migrationfile}")
shutil.copyfile(self.session.database_file, migrationfile)
if Path(self.migrations_file).exists():
self.upload_migrations_file()
self.migrations[str(len(self.migrations) + 1)] = {
"migrationfile": migrationfile,
"tablename": original_table_name,
"description": f"from {old_model._model_name} to {new_model._model_name}",
"sql_queries": list(sql_queries),
"fields": {
"new": list(new_fields),
"old": list(old_fields),
"added": list(added),
"dropped": list(dropped),
},
}
self.update_migrations_file()
for sql_query in sql_queries:
logger.debug(f"[Migration] Execute sql query: {sql_query}")
try:
self.session.execute(sql_query)
except Exception as ex:
raise MigrationError(str(ex))
def revert_migration(self, index_key: int = -1):
"""
Revert migration
:param index_key: The index key
:type index_key: int
"""
self.upload_migrations_file()
if index_key == -1:
index_key = [k for k in self.migrations.keys()][-1]
try:
migration = self.migrations[str(index_key)]
except KeyError as ke:
logger.error(f"Cannot get migration by index {index_key}: {ke}")
logger.info("[Migration] Rollback database from new to old.")
shutil.copyfile(migration["migrationfile"], self.session.database_file)
Весь основной код содержится в методе migrate_from_model, которая принимает на вход старую модель, новую модель, название таблицы и новое название таблицы (опционально). Мы получаем нужные поля, работая с ними через множества set, и создаем нужные бекапы.
Метод revert_migration же позволяет по индексу миграции вернуть старую версию БД. Индекс по умолчанию -1, то есть последний. После мы получаем нужную миграцию и замещаем новую БД старой БД.
❯ Пример работы
Вот полный код примера работы ORM:
from sqlsymphony_orm.datatypes.fields import IntegerField, RealField, TextField # поля
from sqlsymphony_orm.models.session_models import SessionModel # модель
from sqlsymphony_orm.models.session_models import SQLiteSession # сессия
from sqlsymphony_orm.queries import QueryBuilder # билдер запросов
from sqlsymphony_orm.migrations.migrations_manager import SQLiteMigrationManager # миграции
start = time()
session = SQLiteSession("example.db")
class User(SessionModel):
__tablename__ = "Users"
id = IntegerField(primary_key=True)
name = TextField(null=False)
cash = RealField(null=False, default=0.0)
def __repr__(self):
return f"<User {self.pk}>"
class User2(SessionModel):
__tablename__ = "Users"
id = IntegerField(primary_key=True)
name = TextField(null=False)
cash = RealField(null=False, default=0.0)
password = TextField(default="password1234")
def __repr__(self):
return f"<User {self.pk}>"
class Comment(SessionModel):
id = IntegerField(primary_key=True)
name = TextField(null=False)
user_id = IntegerField(null=False)
user = User(name="John")
user2 = User(name="Bob")
user3 = User(name="Ellie")
session.add(user)
session.commit()
session.add(user2)
session.commit()
session.add(user3)
session.commit()
session.delete(user3)
session.commit()
session.update(model=user2, name="Anna")
session.commit()
comment = Comment(name=user.name, user_id=user.pk)
session.add(comment)
session.commit()
print(
session.filter(QueryBuilder().SELECT("*").FROM(User.table_name).WHERE(name="Anna"))
)
print(session.get_all())
print(session.get_all_by_module(User))
print(user.pk)
migrations_manager = SQLiteMigrationManager(session)
migrations_manager.migrate_from_model(User, User2, "Users", "UserAnd")
migrations_manager.revert_migration(-1)
session.close()
У нас есть три модели - модель юзера, новая модель юзера, модель комментария. Мы всех их добавляем, обновляем если надо. Потом в коде демонстрируется фильтрация и получение моделей, а в конце мы создаем миграцию и после сразу же возвращаем старую БД, и в конце закрываем сессию.
Для сохранения изменений после операции следует вызвать session.commit().
Итак, это все!
Мы смогли изучить многие сложные конструкции ООП в python на примере создания такой базовой вещи как ORM.
Да, наша ORM не идеальна, нет ForeignKey, некоторых других вещей. Значение PrimaryKey доступно только после добавления модели. Если у вас есть замечания по поводу статьи - пишите в комментариях. Разумная критика приветствуется!
Ссылка на github-репозиторий с примерами здесь.
Документация моей ORM доступна по этой ссылке.
А PyPI проект находится по этой ссылке.
❯ Заключение
Спасибо за внимание! Это был довольно интересный опыт для меня, т.к. это мой первый большой проект на python с продвинутым ООП, где я попытался изучить более подробно язык и инструментарии.
Если у вас есть замечания по статье или по коду - пишите, наверняка есть более опытный и профессиональный программист на Python, который может помочь как и читателям статьи, так и мне.
Ссылка на мой репозиторий реализации ORM здесь.
Буду рад, если вы присоединитесь к моему небольшому телеграм-блогу. Анонсы статей, новости из мира IT и полезные материалы для изучения программирования и смежных областей. Если конечно хотите :)
Новости, обзоры продуктов и конкурсы от команды Timeweb.Cloud - в нашем Telegram-канале ↩
Перейти ↩
? Читайте также:
❯ Источники
Комментарии (15)
gideon-ul
21.11.2024 09:33а зачем такое выкгадывать на хабре? для галочки?
DrArgentum Автор
21.11.2024 09:33Я сам когда то нуждался в подобной статье, т.к. хотелось создать что то такое, по типу библиотеки. Но не хватало общего понимания. Эта статья как раз и создана для таких как я.
vagon333
21.11.2024 09:33Пара замечаний:
Исходники портянкой на хабре, наверное, перебор.
Существует стандартная практика - ссылка на GitHub или любой другой публично доступный Version Control System ресурс.Если уж вы создаете ORM со своими типами данных, то наверное нужно расширить типы данных, добавив поддержку стандартных Date, DateTime и др. фишек, типа Enumerations.
Это только у меня строковый тип поля "SlugField" вызывает подтормаживание - что за новое именование?
DrArgentum Автор
21.11.2024 09:33SlugField - небольшая фича, для генерации слагов. То есть это как бы текст, но он позволяет генерировать слаг из текста. Тость, из "Привет мир" в privet-mir.
Спасибо за замечания!
obabichev
21.11.2024 09:33Выглядит хорошо, я тоже недавно задавался вопросом, а как сделать entity cache в orm (если что, еще не сделал, но я верю в свой путь), мне очень помогло чтение кода Hibernate (но там правда, очевидно, java) и неплохо этот вопрос описан у Фаулера (Patterns of Enterprise Application Architecture - Martin Fowler) в 11 главе. Там и про Unit of work и про Identity map по отдельности.
По поводу кучи кода, я читал без проблем, некоторые замечания:
как по мне лучше вычистить для статьи комментарии вида
commits changes
для функцииcommit
можно поудалять бойлерплейт и заменить на комменты вида
доказательство теоремы оставим как упраженение читателю
=)убрать импорты, если они не критически важны для понимания, т.к. они просто забивают пространство
убрать имплементацию длинных функций, если нет цели описать алгоритм внутри этих функций; если я правильно понял, одна из целей - сделать сложную OOP структуру, то для рассказа о такой структуре можно фокусироваться на классах и их связях (через поля, методы и т.п.. ну и запихать весь код в статью, можно наверно в начале проекта, но потом он перестанет туда умещаться >_<
Будет здорово, если продолжите делать, когда дойдете до one-to-many, many-to-many, many-to-many with extra columns, multicolumn primary key, начнете чувствовать легкое жжение, не обращайте внимание =)
DrArgentum Автор
21.11.2024 09:33Большое спасибо за замечания! Планирую сделать вторую часть со всеми связами и foreign key. Может даже попробую сделать асинхронно.
bondaran
21.11.2024 09:33Не рекомендую следовать данному гайду, я так 6 лет назад тоже написал свою ORM (tortoise) - до сих пор приходится поддерживать и фиксить баги (:
manyakRus
21.11.2024 09:33https://github.com/ManyakRus/crud_generator
У меня ещё лучше - автоматически создаётся полностью готовый микросервис с функциями CRUD операций (Create, Read, Update, Delete), а также работа всех функций по протоколу GRPC.
Генерируется исходный код для каждой таблицы и колонки в БД.
Заполнить надо только настройки подключения к БД, больше не надо ничего заполнять(настраивать), можно и настраивать, изменять имена каталогов и др.
GoldGoblin
А вы уверены что это статья?
Мне кажется это просто код с отсебятиной автора в комментариях
DrArgentum Автор
Мне кажется что нет. Перед написанием этой статьи мне посоветовали поменьше объяснений и больше кода, т.к. все таки здесь продвинутое сообщество.
Blumfontein
Давно уже тут полно домохозяек
GoldGoblin
Домохозяйки тут не причем. Какая полезность этого кода? Вы бы стали использовать эту orm или взяли бы алхимию?
DrArgentum Автор
Я хочу сказать, что это больше проект для обучения, чтобы знать +- как устроены ORM. И как минимум, еще и немного можно изучить ООП в питоне.
DrArgentum Автор
Но все равно, костяк остается. Домохозяек много, но они в основном идут либо на популярные статьи, либо на простые. И то, часть из них не являются активными пользователями хабра. Но это мое мнение, если что.