ORM, или объектно-реляционное отображение — это программная технология, которая позволяет взаимодействовать с базами данных с использованием объектно-ориентированной парадигмы. Вместо того чтобы писать SQL-запросы напрямую для работы с данными в базе данных, можно использовать ORM, чтобы взаимодействовать с данными, как если бы они были объектами в вашем коде.

ORM позволяет абстрагироваться от сырых SQL запросов путем абстракций.

В этой статье мы и рассмотрим создание своей ORM на Python с документацией и публикацией на PyPI. Данный проект очень интересен со стороны реализации: ведь требуется изучить большую часть ООП, принципов и паттернов.

Мы создадим сессии, модели баз данных, различные поля, миграции и другой вспомогательный функционал. Мы разберем изнутри, как работает такая концепция и как достигается удобство работы.

Некоторые из вас могут подумать что мы изобретаем велосипед. А я в ответ скажу — сможете ли вы прямо сейчас, без подсказок, только по памяти, нарисовать велосипед без ошибок?


Репозиторий доступен по ссылке.

Ремарка: sqlite3 выбран из-за простоты, нетрудно заменить обращения к нему на обращения к любой удобной для вас базе данных. По синтаксису я ориентировался на джанго.

Базы данных - очень популярный метод хранения и организации доступа к данным.

Базы данных имеют следующие преимущества перед обычными таблицами или файлами:

  • Базы данных позволяют обрабатывать, хранить и структурировать намного большие объёмы информации, чем таблицы.

  • Удалённый доступ и система запросов позволяет множеству людей одновременно использовать базы данных. С электронными таблицами тоже можно работать онлайн всей командой, но системы управления базами данных делают этот процесс организованнее, быстрее и безопаснее.

  • Объём информации в базах данных может быть огромным и не влиять на скорость работы. А в Google Таблицах уже после нескольких сотен строк или тысяч символов страница будет загружаться очень медленно.

В основном работают с реляционными базами данных (также называют SQL). Записи и связи между ними организованы при помощи таблиц. В таблицах есть поле для внешнего ключа со ссылками на другие таблицы. Благодаря высокой организации и гибкости структуры реляционные БД применяются для многих типов данных.

Базы данных, где информация о реальных вещах представлена в виде объектов под уникальным идентификатором, называется ООБД. К ООБД обращаются на языке объектно-ориентированного программирования (ООП). Состояние объекта описывается атрибутами, а его поведение — набором методов. Объекты с одинаковыми атрибутами и методами образуют классы.
Объект в ООП создаётся как отдельная сущность со своими свойствами и методами работы. И как только объект создан, его можно вызвать по «имени», или коду, а не разрабатывать заново. То есть то что мы и будем создавать сегодня — ORM!

Свою библиотеку-orm я назвал SQLSymphony, так что вам иногда придется сменить название, или импорты в соответствии с вашей структурой.

❯ sqlite

SQLite3 — это простая реляционная база данных, созданная и поддерживаемая всего тремя людьми. Для работы с ней существует стандартная python-библиотека.

Почему SQLite?

SQLite - это компактная и легкая встраиваемая база данных, которая позволяет хранить и управлять данными прямо внутри вашего приложения. Её простота в использовании и широкая поддержка делают её прекрасным выбором для различных проектов, включая веб-приложения, мобильные приложения и многое другое.

Больше о нем можно прочитать здесь.

❯ Инициализация

ORM будет распространяться в виде python-модуля, поэтому создадим в директории проекта файл __init__.py:

import logging
from typing import Union, List
from rich.traceback import install

from loguru import logger

install(show_locals=True)


class InterceptHandler(logging.Handler):
	"""
	This class describes an intercept handler.
	"""

	def emit(self, record) -> None:
		"""
		Get corresponding Loguru level if it exists

		:param		record:	 The record
		:type		record:	 record

		:returns:	None
		:rtype:		None
		"""
		try:
			level = logger.level(record.levelname).name
		except ValueError:
			level = record.levelno

		frame, depth = logging.currentframe(), 2

		while frame.f_code.co_filename == logging.__file__:
			frame = frame.f_back
			depth += 1

		logger.opt(depth=depth, exception=record.exc_info).log(
			level, record.getMessage()
		)


def setup_logger(level: Union[str, int] = "DEBUG", ignored: List[str] = "") -> None:
	"""
	Setup logger

	:param		level:	  The level
	:type		level:	  str
	:param		ignored:  The ignored
	:type		ignored:  List[str]
	"""
	logging.basicConfig(
		handlers=[InterceptHandler()], level=logging.getLevelName(level)
	)

	for ignore in ignored:
		logger.disable(ignore)

	logger.add("sqlsymphony_orm.log")

	logger.info("Logging is successfully configured")


setup_logger()

Я буду использовать модуль loguru для логгирования и rich для более красивых и информативных исключений.

Для логгирования при помощи loguru можно использовать следующую конструкцию:

from loguru import logger

logger.info('info')
logger.warning('warning')
logger.error('error')
logger.debug('debug')

Типы данных

В sqlite3 существуют следующие типы данных: INTEGER — вещественное число с указанной точностью, TEXT — текст, BLOB — двоичные данные, REAL — число с плавающей запятой(float24), NUMERIC — то же, что и INTEGER, VarChar — текст с ограниченным количеством символов, BOOLEAN — логическое значение.

У типов данных есть параметры: NOT NULL, DEFAULT, UNIQUE.

fields
fields

Давайте реализуем базовые типы данных, создав модуль fields.py:

from abc import ABC, abstractmethod
from dataclasses import dataclass
from typing import Any


class BaseDataType(ABC):
	"""
	This class describes a base data type.
	"""

	def __init__(
		self,
		primary_key: bool = False,
		unique: bool = False,
		null: bool = True,
		default: Any = None,
	):
		"""
		Constructs a new instance.

		:param		primary_key:  The primary key
		:type		primary_key:  bool
		:param		unique:		  The unique
		:type		unique:		  bool
		:param		null:		  The null
		:type		null:		  bool
		:param		default:	  The default
		:type		default:	  Any
		"""
		self.primary_key: bool = primary_key
		self.unique: bool = unique
		self.null: bool = null
		self.default: Any = default

	@abstractmethod
	def validate(self, value: Any) -> bool:
		"""
		Validate value for current datatype

		:param		value:	The value
		:type		value:	Any

		:returns:	if the value is verified then True, otherwise False
		:rtype:		bool
		"""
		raise NotImplementedError()

	@abstractmethod
	def to_db_value(self, value: Any) -> Any:
		"""
		convert to db value

		:param		value:	The value
		:type		value:	Any

		:returns:	db value
		:rtype:		Any
		"""
		raise NotImplementedError()

	@abstractmethod
	def from_db_value(self, value: Any) -> Any:
		"""
		convert from db value

		:param		value:	The value
		:type		value:	Any

		:returns:	db value
		:rtype:		Any
		"""
		raise NotImplementedError()

	@abstractmethod
	def to_sql_type(self) -> str:
		"""
		Returns a sql type representation of the object.

		:returns:	Sql type representation of the object.
		:rtype:		str

		:raises		NotImplementedError:  abstract method
		"""
		raise NotImplementedError()

	def __str__(self):
		return "<BaseDataType>"

Теперь реализуем на основе этого абстрактного класса другие типы данных:

@dataclass
class IntegerField(BaseDataType):
	"""
	This class describes an integer field.
	"""

	def __init__(
		self,
		max_value: int = None,
		min_value: int = None,
		primary_key: bool = False,
		unique: bool = False,
		null: bool = True,
		default: int = None,
	):
		"""
		Constructs a new instance.

		:param		primary_key:  The primary key
		:type		primary_key:  bool
		:param		unique:		  The unique
		:type		unique:		  bool
		:param		null:		  The null
		:type		null:		  bool
		:param		default:	  The default
		:type		default:	  int
		"""
		self.primary_key = primary_key
		self.unique: bool = unique
		self.null: bool = null
		self.default: int = default

		self.min_value = min_value
		self.max_value = max_value

		if self.primary_key:
			if self.default:
				raise ValueError('The parameter "default" is not used for PrimaryKey')

			self.default = 1
			self.value = 1

	def validate(self, value: Any) -> bool:
		"""
		Validate value

		:param		value:	The value
		:type		value:	Any

		:returns:	if the value is verified then True, otherwise False
		:rtype:		bool
		"""
		if self.primary_key and value is None:
			return True
		if value is None and self.null:
			return True
		if self.min_value is not None and value < self.min_value:
			return False
		if self.max_value is not None and value > self.max_value:
			return False

		return True

	def to_db_value(self, value: Any) -> int:
		"""
		Convert to db value

		:param		value:	The value
		:type		value:	Any

		:returns:	db value
		:rtype:		int
		"""
		if self.primary_key and value is None:
			return 0

		return int(value) if value is not None else self.default

	def from_db_value(self, value: Any) -> int:
		"""
		Convert from db value

		:param		value:	The value
		:type		value:	Any

		:returns:	db value
		:rtype:		int
		"""
		return int(value) if value is not None else None

	def to_sql_type(self) -> str:
		return "INTEGER"

	def __str__(self):
		return "<IntegerField>"


@dataclass
class RealField(BaseDataType):
	"""
	This class describes an real field.
	"""

	def __init__(
		self,
		min_value: float = None,
		max_value: float = None,
		unique: bool = False,
		null: bool = True,
		default: float = None,
	):
		"""
		Constructs a new instance.

		:param		primary_key:  The primary key
		:type		primary_key:  bool
		:param		unique:		  The unique
		:type		unique:		  bool
		:param		null:		  The null
		:type		null:		  bool
		:param		default:	  The default
		:type		default:	  float
		"""
		self.primary_key = False
		self.unique: bool = unique
		self.null: bool = null
		self.default: float = default

		self.min_value = min_value
		self.max_value = max_value

	def validate(self, value: Any) -> bool:
		"""
		Validate value

		:param		value:	The value
		:type		value:	Any

		:returns:	if the value is verified then True, otherwise False
		:rtype:		bool
		"""
		if value is None and self.null:
			return True
		if not isinstance(value, float):
			return False
		if self.min_value is not None and value < self.min_value:
			return False
		if self.max_value is not None and value > self.max_value:
			return False

		return True

	def to_db_value(self, value: Any) -> float:
		"""
		Convert to db value

		:param		value:	The value
		:type		value:	Any

		:returns:	db value
		:rtype:		float
		"""
		return float(value) if value is not None else self.default

	def from_db_value(self, value: Any) -> float:
		"""
		Convert from db value

		:param		value:	The value
		:type		value:	Any

		:returns:	db value
		:rtype:		float
		"""
		return float(value) if value is not None else None

	def to_sql_type(self) -> str:
		return "REAL"

	def __str__(self):
		return "<RealField>"


class CharField(BaseDataType):
	"""
	This class describes a character field.
	"""

	def __init__(
		self,
		max_length: int = 64,
		unique: bool = False,
		null: bool = True,
		default: Any = None,
	):
		"""
		Constructs a new instance.

		:param		primary_key:  The primary key
		:type		primary_key:  bool
		:param		unique:		  The unique
		:type		unique:		  bool
		:param		null:		  The null
		:type		null:		  bool
		:param		default:	  The default
		:type		default:	  Any
		"""
		self.primary_key: bool = False
		self.unique: bool = unique
		self.null: bool = null
		self.default: Any = default

		self.max_length = max_length

	def to_sql_type(self) -> str:
		return f"VARCHAR({self.max_length})"

	def validate(self, value: Any) -> bool:
		"""
		Validate value

		:param		value:	The value
		:type		value:	Any

		:returns:	if the value is verified then True, otherwise False
		:rtype:		bool
		"""
		if value is None and self.null:
			return True

		if not isinstance(value, str):
			return False

		return len(value) <= self.max_length

	def to_db_value(self, value: Any) -> str:
		"""
		Convert value to db

		:param		value:	The value
		:type		value:	Any

		:returns:	db value
		:rtype:		str
		"""
		return str(value) if value is not None else self.default

	def from_db_value(self, value: Any) -> str:
		"""
		Convert value from db

		:param		value:	The value
		:type		value:	Any

		:returns:	db value
		:rtype:		str
		"""
		return str(value) if value is not None else self.default

	def __str__(self):
		return "<CharField>"


class BooleanField(BaseDataType):
	"""
	This class describes a boolean field.
	"""

	def __init__(
		self,
		unique: bool = False,
		null: bool = True,
		default: Any = None,
	):
		"""
		Constructs a new instance.

		:param		primary_key:  The primary key
		:type		primary_key:  bool
		:param		unique:		  The unique
		:type		unique:		  bool
		:param		null:		  The null
		:type		null:		  bool
		:param		default:	  The default
		:type		default:	  Any
		"""
		self.primary_key = False
		self.unique: bool = unique
		self.null: bool = null
		self.default: Any = default

	def to_sql_type(self) -> str:
		return "BOOLEAN"

	def validate(self, value: Any) -> bool:
		"""
		Validate value

		:param		value:	The value
		:type		value:	Any

		:returns:	if the value is verified then True, otherwise False
		:rtype:		bool
		"""
		if isinstance(value, bool):
			return True
		else:
			return False

	def to_db_value(self, value: Any) -> str:
		"""
		Convert to db value

		:param		value:	The value
		:type		value:	Any

		:returns:	db value
		:rtype:		str
		"""
		return str(value).upper() if value is not None else self.default

	def from_db_value(self, value: Any) -> str:
		"""
		Convert from db value

		:param		value:	The value
		:type		value:	Any

		:returns:	db value
		:rtype:		str
		"""
		return str(value).upper() if value is not None else self.default

	def __str__(self):
		return "<BooleanField>"


class TextField(BaseDataType):
	"""
	This class describes a character field.
	"""

	def __init__(
		self,
		unique: bool = False,
		null: bool = True,
		default: Any = None,
	):
		"""
		Constructs a new instance.

		:param		primary_key:  The primary key
		:type		primary_key:  bool
		:param		unique:		  The unique
		:type		unique:		  bool
		:param		null:		  The null
		:type		null:		  bool
		:param		default:	  The default
		:type		default:	  Any
		"""
		self.primary_key = False
		self.unique: bool = unique
		self.null: bool = null
		self.default: Any = default

	def to_sql_type(self) -> str:
		return "TEXT"

	def validate(self, value: Any) -> bool:
		"""
		Validate value

		:param		value:	The value
		:type		value:	Any

		:returns:	if the value is verified then True, otherwise False
		:rtype:		bool
		"""
		if value is None and self.null:
			return True

		return isinstance(value, str)

	def to_db_value(self, value: Any) -> str:
		"""
		Convert to db value

		:param		value:	The value
		:type		value:	Any

		:returns:	db value
		:rtype:		str
		"""
		return str(value) if value is not None else self.default

	def from_db_value(self, value: Any) -> str:
		"""
		Convert from db value

		:param		value:	The value
		:type		value:	Any

		:returns:	db value
		:rtype:		str
		"""
		return str(value) if value is not None else None

	def __str__(self):
		return "<TextField>"


@dataclass
class BlobField(BaseDataType):
	"""
	This class describes a blob field.
	"""

	def __init__(
		self,
		max_size_in_bytes: int = None,
		unique: bool = False,
		null: bool = True,
		default: Any = None,
	):
		"""
		Constructs a new instance.

		:param		primary_key:  The primary key
		:type		primary_key:  bool
		:param		unique:		  The unique
		:type		unique:		  bool
		:param		null:		  The null
		:type		null:		  bool
		:param		default:	  The default
		:type		default:	  Any
		"""
		self.primary_key = False
		self.unique: bool = unique
		self.null: bool = null
		self.default: Any = default

		self.max_size_in_bytes = max_size_in_bytes

	def to_sql_type(self) -> str:
		return "BLOB"

	def validate(self, value: Any) -> bool:
		"""
		Validate value

		:param		value:	The value
		:type		value:	Any

		:returns:	if the value is verified then True, otherwise False
		:rtype:		bool
		"""
		if value is None and self.null:
			return True

		if len(value) > self.max_size_in_bytes:
			return False

		return isinstance(value, bytes)

	def to_db_value(self, value: Any) -> bytes:
		"""
		Convert to db value

		:param		value:	The value
		:type		value:	Any

		:returns:	db value
		:rtype:		bytes
		"""
		return bytes(value) if value is not None else self.default

	def from_db_value(self, value: Any) -> bytes:
		"""
		Convert from db value

		:param		value:	The value
		:type		value:	Any

		:returns:	db value
		:rtype:		bytes
		"""
		return bytes(value) if value is not None else None

	def __str__(self):
		return "<BlobField>"


class FieldMeta(type):
	"""
	This class describes a field meta.
	"""

	def __new__(cls, name, bases, attrs):
		"""
		New 'magic' func

		:param		cls:	The cls
		:param		name:	The name
		:param		bases:	The bases
		:param		attrs:	The attributes

		:returns:	class
		"""
		fields = {}
		primary_key = None

		for key, value in attrs.items():
			if isinstance(value, BaseDataType):
				fields[key] = value

				if value.primary_key:
					if primary_key:
						raise ValueError("Multiple primary keys are not allowed")

					primary_key = key

					if value.auto_increment:
						value.default = 1

		attrs["_fields"] = fields
		attrs["_primary_key"] = primary_key

		return super().__new__(cls, name, bases, attrs)

	def __str__(self):
		return "<FieldMeta>"

У каждого поля есть следующие параметры: unique (должно ли поле быть уникальным), null (может ли быть NULL) и default (значение по умолчанию). Также некоторые поля имеют дополнительные параметры (например CharField, требуется задать максимальную длину в символах).

❯ Запросы (Query)

В будущем, для фильтрации и получения записей из БД, нам нужны будут запросы. Вместо сырых SQL-запросов мы будем использовать классы для выборки, фильтрации и получения записей.

Класс QueryBuilder как раз и будет отвественнен за это. Строковый вид класса будет возращать созданный SQL запрос:

from abc import ABC, abstractmethod
from rich.console import Console
from rich.table import Table
from loguru import logger

AND = "and"
OR = "or"


class Q:
	"""
	This class describes a Q.
	"""

	def __init__(self, exp_type: str = AND, **kwargs):
		"""
		Constructs a new instance.

		:param		exp_type:  The exponent type
		:type		exp_type:  str
		:param		kwargs:	   The keywords arguments
		:type		kwargs:	   dictionary
		"""
		self.separator = exp_type
		self._params = kwargs

	def __str__(self) -> str:
		"""
		Returns a string representation of the object.

		:returns:	String representation of the object.
		:rtype:		str
		"""
		kv_pairs = [f'{k} = "{v}"' for k, v in self._params.items()]
		return f" {self.separator} ".join(kv_pairs)

	def __bool__(self) -> bool:
		"""
		Returns a boolean representation of the object

		:returns:	Boolean representation of the object.
		:rtype:		bool
		"""
		return bool(self._params)


class BaseExp(ABC):
	"""
	This abstract class describes a base exponent.
	"""

	name = None

	@abstractmethod
	def add(self, *args, **kwargs):
		"""
		Add params

		:param		args:				  The arguments
		:type		args:				  list
		:param		kwargs:				  The keywords arguments
		:type		kwargs:				  dictionary
		"""
		raise NotImplementedError()

	def definition(self) -> str:
		"""
		Get the definition of query

		:returns:	sql query
		:rtype:		str
		"""
		return self.name + " " + self.line() + " "

	@abstractmethod
	def line(self):
		"""
		Get line
		"""
		raise NotImplementedError()

	@abstractmethod
	def __bool__(self):
		"""
		Boolean magic function
		"""
		raise NotImplementedError()


class Select(BaseExp):
	"""
	This class describes a select.
	"""

	name = "SELECT"

	def __init__(self):
		"""
		Constructs a new instance.
		"""
		self._params = []

	def add(self, *args, **kwargs):
		"""
		Add params

		:param		args:	 The arguments
		:type		args:	 list
		:param		kwargs:	 The keywords arguments
		:type		kwargs:	 dictionary
		"""
		self._params.extend(args)

	def line(self) -> str:
		"""
		Get line

		:returns:	line
		:rtype:		str
		"""
		separator = ","
		return separator.join(self._params)

	def __bool__(self):
		"""
		Boolean magic function

		:returns:	if self._params defined
		:rtype:		bool
		"""
		return bool(self._params)


class From(BaseExp):
	"""
	This class describes a from.
	"""

	name = "FROM"

	def __init__(self):
		"""
		Constructs a new instance.
		"""
		self._params = []

	def add(self, *args, **kwargs):
		"""
		Add params

		:param		args:	 The arguments
		:type		args:	 list
		:param		kwargs:	 The keywords arguments
		:type		kwargs:	 dictionary
		"""
		self._params.extend(args)

	def line(self) -> str:
		"""
		Get line

		:returns:	line
		:rtype:		str
		"""
		separator = ","
		return separator.join(self._params)

	def __bool__(self):
		"""
		Boolean magic function

		:returns:	if self._params defined
		:rtype:		bool
		"""
		return bool(self._params)


class Where(BaseExp):
	"""
	This class describes a SQL query `where`.
	"""

	name = "WHERE"

	def __init__(self, exp_type: str = AND, **kwargs):
		"""
		Constructs a new instance.

		:param		exp_type:  The exponent type
		:type		exp_type:  str
		:param		kwargs:	   The keywords arguments
		:type		kwargs:	   dictionary
		"""
		self._q = Q(exp_type, **kwargs)

	def add(self, exp_type: str = AND, **kwargs):
		"""
		Add params to sql query `where`

		:param		exp_type:  The exponent type
		:type		exp_type:  str
		:param		kwargs:	   The keywords arguments
		:type		kwargs:	   dictionary

		:returns:	Q class instance
		:rtype:		Q
		"""
		self._q = Q(exp_type, **kwargs)
		return self._q

	def line(self):
		"""
		Get line

		:returns:	line
		:rtype:		str
		"""
		return str(self._q)

	def __bool__(self):
		"""
		Boolean magic function

		:returns:	if self._q defined
		:rtype:		bool
		"""
		return bool(self._q)


class QueryBuilder:
	"""
	Front-end to create query objects step by step.
	"""

	def __init__(self):
		"""
		Constructs a new instance.
		"""
		self._data = {"select": Select(), "from": From(), "where": Where()}

	def SELECT(self, *args) -> "QueryBuilder":
		"""
		SQL query `select`

		:param		args:  The arguments
		:type		args:  list

		:returns:	Query Builder
		:rtype:		self
		"""
		self._data["select"].add(*args)
		return self

	def FROM(self, *args) -> "QueryBuilder":
		"""
		SQL query `from`

		:param		args:  The arguments
		:type		args:  list

		:returns:	Query Builder
		:rtype:		self
		"""
		self._data["from"].add(*args)
		return self

	def WHERE(self, exp_type: str = AND, **kwargs) -> "QueryBuilder":
		"""
		SQL query `where`

		:param		exp_type:  The exponent type
		:type		exp_type:  str
		:param		kwargs:	   The keywords arguments
		:type		kwargs:	   dictionary

		:returns:	Query Builder
		:rtype:		self
		"""
		self._data["where"].add(exp_type=exp_type, **kwargs)
		return self

	def _lines(self):
		"""
		Lines

		:returns:	Value definition
		:rtype:		yeild (str)
		"""
		for key, value in self._data.items():
			if value:
				yield value.definition()

	def __str__(self) -> str:
		"""
		Returns a string representation of the object.

		:returns:	String representation of the object.
		:rtype:		str
		"""
		return "".join(self._lines())

❯ Кастомные исключение

Для того, чтобы разработчику было более понятней разобраться в ошибках, создадим кастомные исключения:

from loguru import logger


class SQLSymphonyException(Exception):
	"""
	Exception for signaling sql symphony errors.
	"""

	def __init__(self, *args):
		"""
		Constructs a new instance.

		:param		args:  The arguments
		:type		args:  list
		"""
		if args:
			self.message = args[0]
		else:
			self.message = None

	def get_explanation(self) -> str:
		"""
		Gets the explanation.

		:returns:	The explanation.
		:rtype:		str
		"""
		return f"Basic SQLSymphony ORM exception. Message: {self.message if self.message else 'missing'}"

	def __str__(self):
		"""
		Returns a string representation of the object.

		:returns:	String representation of the object.
		:rtype:		str
		"""
		logger.error(f"{self.__class__.__name__}: {self.get_explanation()}")
		return f"SQLSymphonyException has been raised. {self.get_explanation()}"


class FieldNamingError(SQLSymphonyException):
	"""
	This class describes a field naming error.
	"""

	def __init__(self, *args):
		"""
		Constructs a new instance.

		:param		args:  The arguments
		:type		args:  list
		"""
		if args:
			self.message = args[0]
		else:
			self.message = None

	def get_explanation(self) -> str:
		"""
		Gets the explanation.

		:returns:	The explanation.
		:rtype:		str
		"""
		return f"SQLSymphony Field Naming Error. The field name is prohibited/unavailable to avoid naming errors. Message: {self.message if self.message else 'missing'}"

	def __str__(self):
		logger.error(f"{self.__class__.__name__}: {self.get_explanation()}")
		return f"Field Naming Error has been raised. {self.get_explanation()}"


class NullableFieldError(SQLSymphonyException):
	"""
	This class describes a nullable field error.
	"""

	def __init__(self, *args):
		"""
		Constructs a new instance.

		:param		args:  The arguments
		:type		args:  list
		"""
		if args:
			self.message = args[0]
		else:
			self.message = None

	def get_explanation(self) -> str:
		"""
		Gets the explanation.

		:returns:	The explanation.
		:rtype:		str
		"""
		return f"SQLSymphony Nullable Field Error. Field is set to NOT NULL, but it is empty. Message: {self.message if self.message else 'missing'}"

	def __str__(self):
		logger.error(f"{self.__class__.__name__}: {self.get_explanation()}")
		return f"Nullable Field Error has been raised. {self.get_explanation()}"


class FieldValidationError(SQLSymphonyException):
	"""
	This class describes a field validation error.
	"""

	def __init__(self, *args):
		"""
		Constructs a new instance.

		:param		args:  The arguments
		:type		args:  list
		"""
		if args:
			self.message = args[0]
		else:
			self.message = None

	def get_explanation(self) -> str:
		"""
		Gets the explanation.

		:returns:	The explanation.
		:rtype:		str
		"""
		return f"SQLSymphony Field Validation Error. Message: {self.message if self.message else 'missing'}"

	def __str__(self):
		logger.error(f"{self.__class__.__name__}: {self.get_explanation()}")
		return f"Field Validation Error has been raised. {self.get_explanation()}"


class PrimaryKeyError(SQLSymphonyException):
	"""
	This class describes a primary key error.
	"""

	def __init__(self, *args):
		"""
		Constructs a new instance.

		:param		args:  The arguments
		:type		args:  list
		"""
		if args:
			self.message = args[0]
		else:
			self.message = None

	def get_explanation(self) -> str:
		"""
		Gets the explanation.

		:returns:	The explanation.
		:rtype:		str
		"""
		return f"SQLSymphony Primary Key Error. According to database theory, each table should have only one PrimaryKey field, Message: {self.message if self.message else 'missing'}"

	def __str__(self):
		logger.error(f"{self.__class__.__name__}: {self.get_explanation()}")
		return f"Primary Key Error has been raised. {self.get_explanation()}"


class UniqueConstraintError(SQLSymphonyException):
	"""
	This class describes an unique constraint error.
	"""

	def __init__(self, *args):
		"""
		Constructs a new instance.

		:param		args:  The arguments
		:type		args:  list
		"""
		if args:
			self.message = args[0]
		else:
			self.message = None

	def get_explanation(self) -> str:
		"""
		Gets the explanation.

		:returns:	The explanation.
		:rtype:		str
		"""
		return f"SQLSymphony Unique Constraint Error. An exception occurred when executing an SQL query due to problems with UNIQUE fields. Message: {self.message if self.message else 'missing'}"

	def __str__(self):
		logger.error(f"{self.__class__.__name__}: {self.get_explanation()}")
		return f"Unique Constraint Error has been raised. {self.get_explanation()}"


class ModelHookError(SQLSymphonyException):
	"""
	This class describes a model hook error.
	"""

	def __init__(self, *args):
		"""
		Constructs a new instance.

		:param		args:  The arguments
		:type		args:  list
		"""
		if args:
			self.message = args[0]
		else:
			self.message = None

	def get_explanation(self) -> str:
		"""
		Gets the explanation.

		:returns:	The explanation.
		:rtype:		str
		"""
		return f"Model Hooks Error. An exception occurred when executing an hook due to problems with ORM. Message: {self.message if self.message else 'missing'}"

	def __str__(self):
		logger.error(f"{self.__class__.__name__}: {self.get_explanation()}")
		return f"Model Hook error has been raised. {self.get_explanation()}"


class MigrationError(SQLSymphonyException):
	"""
	This class describes a migration error.
	"""

	def __init__(self, *args):
		"""
		Constructs a new instance.

		:param		args:  The arguments
		:type		args:  list
		"""
		if args:
			self.message = args[0]
		else:
			self.message = None

	def get_explanation(self) -> str:
		"""
		Gets the explanation.

		:returns:	The explanation.
		:rtype:		str
		"""
		return f"Database Migration Error. An exception occurred when executing an hook due to problems with migration. Message: {self.message if self.message else 'missing'}"

	def __str__(self):
		logger.error(f"{self.__class__.__name__}: {self.get_explanation()}")
		return f"Migration Error has been raised. {self.get_explanation()}"

Мы создаем базовый класс, наследуемый от Exception и метод строкового обращения. А потом уже создаем более подробные исключения на основе базового класса.

❯ Пользовательские модели

По традициям, разработчик в нашей ORM должен создавать модели примерно следующим образом:

class User(SessionModel):
	__tablename__ = "Users"

	id = IntegerField(primary_key=True)
	name = TextField(null=False)
	cash = RealField(null=False, default=0.0)

	def __repr__(self):
		return f"<User {self.pk}>"


class User2(SessionModel):
	__tablename__ = "Users"

	id = IntegerField(primary_key=True)
	name = TextField(null=False)
	cash = RealField(null=False, default=0.0)
	password = TextField(default="password1234")

	def __repr__(self):
		return f"<User {self.pk}>"


class Comment(SessionModel):
	id = IntegerField(primary_key=True)
	name = TextField(null=False)
	user_id = IntegerField(null=False)

	def __repr__(self):
		return f"<Comment {self.pk}>"

Для этого нам нужно будет реализовать классы моделей (мета-класс и саму модель).

from pathlib import Path
from typing import List, Any, Union, Callable
from uuid import uuid4
from abc import ABC, abstractmethod
from collections import OrderedDict

from loguru import logger

from sqlsymphony_orm.datatypes.fields import BaseDataType, IntegerField # модуль типов данных
from sqlsymphony_orm.exceptions import (
	PrimaryKeyError,
	FieldValidationError,
	NullableFieldError,
	FieldNamingError,
)
from sqlsymphony_orm.queries import QueryBuilder


class MetaSessionModel(type):
	"""
	This class describes a meta session model.
	"""

	__tablename__ = None

	def __new__(cls, class_object: "SessionModel", parents: tuple, attributes: dict):
		"""
		Magic method for creating instances and classes

		:param		cls:		   The cls
		:type		cls:		   cls
		:param		class_object:  The class object
		:type		class_object:  Model
		:param		parents:	   The parents
		:type		parents:	   tuple
		:param		attributes:	   The attributes
		:type		attributes:	   dict

		:returns:	new class
		:rtype:		model
		"""
		new_class = super(MetaSessionModel, cls).__new__(
			cls, class_object, parents, attributes
		)
		fields = OrderedDict()

		setattr(new_class, "_model_name", attributes["__qualname__"].lower())

		if new_class.__tablename__ is None:
			setattr(new_class, "table_name", attributes["__qualname__"].lower())
		else:
			setattr(new_class, "table_name", new_class.__tablename__)

		for k, v in attributes.items():
			if isinstance(v, BaseDataType):
				fields[k] = v
				attributes[k] = None

				if isinstance(v, IntegerField):
					if v.primary_key:
						setattr(new_class, "_pk_name", k)

		setattr(new_class, "_original_fields", fields)

		return new_class


class SessionModel(metaclass=MetaSessionModel):
	"""
	This class describes a ORM model.
	"""

	__tablename__ = None
	_ids = 0

	def __init__(self, **kwargs):
		"""
		Constructs a new instance.

		:param		kwargs:	 The keywords arguments
		:type		kwargs:	 dictionary
		"""
		self.fields = {}
		self.hooks = {}

		self.unique_id = str(uuid4())

		for field_name, field in self._original_fields.items():
			value = kwargs.get(field_name, None)

			if not kwargs.get("manager", False):
				if not field.null and value is None and field.default is None:
					raise NullableFieldError(
						f"Field {field_name} is set to NOT NULL, but it is empty"
					)

			if value is not None and field.validate(value):
				setattr(self, field_name, field.to_db_value(value))
				self.fields[field_name] = getattr(self, field_name)
			else:
				if value is not None and not field.validate(value):
					raise FieldValidationError(
						f'Validate error: field {field}; field name "{field_name}"; value "{value}"'
					)

				if isinstance(field, IntegerField):
					if field.primary_key:
						self.__class__._ids += 1
						setattr(
							self,
							"_primary_key",
							{
								"field": field,
								"field_name": field_name,
								"value": self.__class__._ids,
							},
						)

				setattr(self, field_name, field.default)
				self.fields[field_name] = getattr(self, field_name)

		if not getattr(self, "_primary_key"):
			raise PrimaryKeyError()

		self._last_action = {}

	def add_hook(self, before_action: str, func: Callable, func_args: tuple = ()):
		"""
		Adds a hook.

		:param		before_action:	The before action
		:type		before_action:	str
		:param		func:			The function
		:type		func:			Callable
		:param		func_args:		The function arguments
		:type		func_args:		tuple

		:raises		ValueError:		unknown before action
		"""
		actions = ["save", "delete", "update"]

		if before_action.lower() not in actions:
			raise ValueError(
				f"Unknown action: {before_action}. Supported actions: {actions}"
			)

		logger.info(
			f"[{self.table_name}] Add Model Hook: before {before_action} execute {func.__name__}"
		)

		self.hooks[before_action.lower()] = {"function": func, "args": func_args}

	@property
	def pk(self) -> Any:
		"""
		Get primary key value

		:returns:	primary key value
		:rtype:		primary key
		"""
		return self._primary_key["value"]

	@classmethod
	def _class_get_formatted_sql_fields(cls, skip_primary_key: bool = False) -> dict:
		"""
		Gets the formatted sql fields.

		:returns:	The formatted sql fields.
		:rtype:		dict
		"""
		model_fields = {}

		for field_name, field in cls._original_fields.items():
			if field.primary_key and skip_primary_key:
				continue

			model_fields[field_name] = field.to_sql_type()

			if field.primary_key:
				model_fields[field_name] = f"{field.to_sql_type()} PRIMARY KEY"
			else:
				if not field.null:
					try:
						model_fields[field_name] += " NOT NULL"
					except KeyError:
						model_fields[field_name] = f"{field.to_sql_type()} NOT NULL"
				if field.unique:
					try:
						model_fields[field_name] += " UNIQUE"
					except KeyError:
						model_fields[field_name] = f"{field.to_sql_type()} UNIQUE"
				if field.default is not None:
					try:
						model_fields[field_name] += f" DEFAULT {field.default}"
					except KeyError:
						model_fields[field_name] = (
							f"{field.to_sql_type()} DEFAULT {field.default}"
						)

		return model_fields

	def get_formatted_sql_fields(self, skip_primary_key: bool = False) -> dict:
		"""
		Gets the formatted sql fields.

		:returns:	The formatted sql fields.
		:rtype:		dict
		"""
		model_fields = {}

		for field_name, field in self._original_fields.items():
			if field.primary_key and skip_primary_key:
				continue

			model_fields[field_name] = field.to_sql_type()

			if field.primary_key:
				model_fields[field_name] = f"{field.to_sql_type()} PRIMARY KEY"
			else:
				if not field.null:
					try:
						model_fields[field_name] += " NOT NULL"
					except KeyError:
						model_fields[field_name] = f"{field.to_sql_type()} NOT NULL"
				if field.unique:
					try:
						model_fields[field_name] += " UNIQUE"
					except KeyError:
						model_fields[field_name] = f"{field.to_sql_type()} UNIQUE"
				if field.default is not None:
					try:
						model_fields[field_name] += f" DEFAULT {field.default}"
					except KeyError:
						model_fields[field_name] = (
							f"{field.to_sql_type()} DEFAULT {field.default}"
						)

		return model_fields

В метаклассе мы читаем модель, добавляем поля, задаем базовые настройки. В классе модели мы все проверяем и задаем Primary Key.

Также есть classmethod-функция и обычная функция для получения форматированных полей для SQL.

У нас есть свойство для получения Primary Key, а также функция для добавления хуков. Хуки в контексте нашей ORM — это функции, выполняемые до определенной операции модели и базы данных.

❯ Подключение к БД

Давайте теперь возьмемся за основу работы с базой данных — класс подключения и класс менеджера.

Напишем код для подключения к бд:

import sqlite3
from abc import ABC, abstractmethod
from typing import Tuple

from rich import print

from loguru import logger


class DBConnector(ABC):
	"""
	This class describes a db connector.
	"""

	def __new__(cls, *args, **kwargs):
		"""
		New class

		:param		cls:	 The cls
		:type		cls:	 list
		:param		args:	 The arguments
		:type		args:	 list
		:param		kwargs:	 The keywords arguments
		:type		kwargs:	 dictionary

		:returns:	cls instance
		:rtype:		self
		"""
		if not hasattr(cls, "instance"):
			cls.instance = super(DBConnector, cls).__new__(cls, *args, **kwargs)

		return cls.instance

	@abstractmethod
	def connect(self, database_name: str):
		"""
		Connect to database

		:param		database_name:		  The database name
		:type		database_name:		  str

		:raises		NotImplementedError:  Abstract method
		"""
		raise NotImplementedError()

	@abstractmethod
	def commit(self):
		"""
		Commit changes to database

		:raises		NotImplementedError:  Abstract method
		"""
		raise NotImplementedError()

	@abstractmethod
	def fetch(self, query: str):
		"""
		Fetches the given query.

		:param		query:				  The query
		:type		query:				  str

		:raises		NotImplementedError:  Abstract method
		"""
		raise NotImplementedError()


class SQLiteDBConnector(DBConnector):
	"""
	This class describes a sqlite db connector.
	"""

	def __new__(cls, *args, **kwargs):
		"""
		New class

		:param		cls:	 The cls
		:type		cls:	 list
		:param		args:	 The arguments
		:type		args:	 list
		:param		kwargs:	 The keywords arguments
		:type		kwargs:	 dictionary

		:returns:	cls instance
		:rtype:		self
		"""
		if not hasattr(cls, "instance"):
			cls.instance = super(SQLiteDBConnector, cls).__new__(cls, *args, **kwargs)

		return cls.instance

	def close_connection(self):
		"""
		Closes a connection.
		"""
		self._connection.close()
		print("[bold]Connection has been closed[/bold]")
		logger.info("Close Database Connection")

	def connect(self, database_name: str = "database.db"):
		"""
		Connect to database

		:param		database_name:	The database name
		:type		database_name:	str
		"""
		pragmas = ["PRAGMA foreign_keys = 1"]
		self._connection = sqlite3.connect(database_name)
		self.database_name = database_name
		logger.info(f"[{database_name}] Connect database...")

		for pragma in pragmas:
			self._connection.execute(pragma)
			logger.debug(f"Set pragma: {pragma}")

	def commit(self):
		"""
		Commit changes to database
		"""
		logger.info("Commit changes to database")
		self._connection.commit()

	def fetch(self, query: str, values: Tuple = (), get_cursor: bool = False) -> list:
		"""
		Fetch SQL query

		:param		query:	 The query
		:type		query:	 str
		:param		values:	 The values
		:type		values:	 Tuple

		:returns:	list with fetched results
		:rtype:		list
		"""
		cursor = self._connection.cursor()
		self.commit()

		logger.debug(f"Fetch query: {query} {values}")

		try:
			cursor.execute(query, values)
		except Exception as ex:
			logger.error(f"An exception occurred while executing the request: {ex}")
			raise ex

		return [cursor, cursor.fetchall()] if get_cursor else cursor.fetchall()

Все просто — создаем абстрактный класс и класс SQLiteDBConnector, наследуемый от него. Если вам потребуется добавить поддержку других СУБД, такая структура сделает это более удобней.

Мы имеем метод __new__ для создания новых инстансов, метод подключения, отключения, коммита и выполнения запроса.

Перейдем теперь к более сложному — к менеджеру:

from abc import ABC, abstractmethod
from typing import Any
from loguru import logger

from sqlsymphony_orm.queries import QueryBuilder
from sqlsymphony_orm.database.connection import DBConnector, SQLiteDBConnector


class MultiManager(ABC):
	"""
	This class describes a multi manager.
	"""

	@abstractmethod
	def reconnect(self):
		"""
		reconnect to db

		:raises		NotImplementedError:  abstract method
		"""
		raise NotImplementedError()

	@abstractmethod
	def drop_table(self, table_name: str):
		"""
		Drop sql table

		:param		table_name:			  The table name
		:type		table_name:			  str

		:raises		NotImplementedError:  abstract method
		"""
		raise NotImplementedError()

	@abstractmethod
	def close_connection(self):
		"""
		Closes a connection.

		:raises		NotImplementedError:  abstract method
		"""
		raise NotImplementedError()

	@abstractmethod
	def insert(
		self,
		table_name: str,
		formatted_fields: dict,
		pk: int,
		model_class: "Model",
		ignore: bool = False,
	):
		"""
		insert new model to database

		:param		table_name:			  The table name
		:type		table_name:			  str
		:param		formatted_fields:	  The formatted fields
		:type		formatted_fields:	  dict
		:param		pk:					  primary key value
		:type		pk:					  int
		:param		model_class:		  The model class
		:type		model_class:		  Model
		:param		ignore:				  The ignore
		:type		ignore:				  bool

		:raises		NotImplementedError:  { exception_description }
		"""
		raise NotImplementedError()

	@abstractmethod
	def update(self, table_name: str, key: str, orig_field: str, new_value: str):
		"""
		update model

		:param		table_name:			  The table name
		:type		table_name:			  str
		:param		key:				  The key
		:type		key:				  str
		:param		orig_field:			  The original field
		:type		orig_field:			  str
		:param		new_value:			  The new value
		:type		new_value:			  str

		:raises		NotImplementedError:  abstract method
		"""
		raise NotImplementedError()

	@abstractmethod
	def filter(self, query: QueryBuilder):
		"""
		filter and get model by query

		:param		query:				  The query
		:type		query:				  QueryBuilder

		:raises		NotImplementedError:  abstract method
		"""
		raise NotImplementedError()

	@abstractmethod
	def commit(self):
		"""
		Commit changes

		:raises		NotImplementedError:  abstract method
		"""
		raise NotImplementedError()

	@abstractmethod
	def create_table(self, table_name: str, fields: dict):
		"""
		Creates a table.

		:param		table_name:			  The table name
		:type		table_name:			  str
		:param		fields:				  The fields
		:type		fields:				  dict

		:raises		NotImplementedError:  abstract method
		"""
		raise NotImplementedError()

	@abstractmethod
	def delete(self, table_name: str, field_name: str, field_value: Any):
		"""
		delete model

		:param		table_name:			  The table name
		:type		table_name:			  str
		:param		field_name:			  The field name
		:type		field_name:			  str
		:param		field_value:		  The field value
		:type		field_value:		  Any

		:raises		NotImplementedError:  abstract method
		"""
		raise NotImplementedError()


class SQLiteMultiManager(MultiManager):
	"""
	This class describes a sqlite multi manager.
	"""

	def __init__(self, database_name: str):
		"""
		Constructs a new instance.

		:param		database_name:	The database name
		:type		database_name:	str
		"""
		self._connector = SQLiteDBConnector()
		self.database_name = database_name
		self._connector.connect(self.database_name)

	def execute(self, raw_sql_query: str, values: tuple = (), get_cursor: bool = False):
		return self._connector.fetch(raw_sql_query, values, get_cursor)

	def reconnect(self, database_file: str = None):
		"""
		reconnect to database
		"""
		if database_file is not None:
			self.database_name = database_file
		self._connector.connect(self.database_name)

	def drop_table(self, table_name: str):
		"""
		drop table

		:param		table_name:	 The table name
		:type		table_name:	 str
		"""
		query = f"DROP TABLE IF EXISTS {table_name}"

		logger.warning(f"Drop table: {table_name}")

		self._connector.fetch(query)
		self._connector.commit()

	def close_connection(self):
		"""
		Closes a connection.
		"""
		self._connector.close_connection()

	def insert(
		self,
		table_name: str,
		formatted_fields: dict,
		pk: int,
		model_class: "Model",
		ignore: bool = False,
	):
		"""
		Insert a fields to database

		:param		table_name:	 The table name
		:type		table_name:	 str
		:param		columns:	 The columns
		:type		columns:	 str
		:param		count:		 The count
		:type		count:		 str
		:param		values:		 The values
		:type		values:		 tuple
		"""
		fields = []
		values = []

		for k, v in formatted_fields.items():
			fields.append(k)
			if "PRIMARY KEY" in v:
				values.append(pk)
			else:
				values.append(getattr(model_class, k))

		columns = ", ".join(fields)
		count = ", ".join(["?" for _ in values])

		query = "INSERT "

		if ignore:
			query += "OR IGNORE "

		query += f"INTO {table_name} ({columns}) VALUES ({count})"

		logger.info(
			f'[{table_name}] Insert {"(or ignore)" if ignore else ""} new model into database'
		)

		self._connector.fetch(query, values)

	def update(self, table_name: str, key: str, orig_field: str, new_value: str):
		"""
		Update fields in database table

		:param		table_name:	 The table name
		:type		table_name:	 str
		:param		key:		 The key
		:type		key:		 str
		:param		orig_field:	 The original field
		:type		orig_field:	 str
		:param		new_value:	 The new value
		:type		new_value:	 str
		"""
		query = f"UPDATE {table_name} SET {key} = ? WHERE {key} = ?"

		logger.info(f"[{table_name}] Update model: {key}={new_value}")

		self._connector.fetch(query, (new_value, orig_field))

	def filter(self, query: str) -> list:
		"""
		filter and get model by query

		:param		query:	The query
		:type		query:	str

		:returns:	models
		:rtype:		list
		"""
		result = self._connector.fetch(query)

		return result

	def commit(self):
		"""
		Commits changes.
		"""
		self._connector.commit()

	def create_table(self, table_name: str, fields: dict):
		"""
		Creates a table.

		:param		table_name:	 The table name
		:type		table_name:	 str
		:param		fields:		 The fields
		:type		fields:		 dict
		"""
		columns = [f"{k} {v}" for k, v in fields.items()]

		query = f"CREATE TABLE IF NOT EXISTS {table_name} ("

		for column in columns:
			query += f"{column},"

		query = query[:-1]
		query += ")"

		logger.info(f"Create new table: {table_name}")

		self._connector.fetch(query)
		self._connector.commit()

	def delete(self, table_name: str, field_name: str, field_value: Any):
		"""
		Delete model from database

		:param		table_name:	  The table name
		:type		table_name:	  str
		:param		field_name:	  The field name
		:type		field_name:	  str
		:param		field_value:  The field value
		:type		field_value:  Any
		"""
		query = f"DELETE FROM {table_name} WHERE {field_name} = ?"
		logger.info(f"[{table_name}] Delete model ({field_name}={field_value})")

		self._connector.fetch(query, (field_value,))

Это также относительно просто: при инициализации создаем инстанс класса подключения, подключаемся к БД, и создаем базовые методы: удаление, создание таблицы, коммит, фильтр, обновление, добавления, и прочие вспомогательные методы для работы с подключением и таблицами.

❯ Сессии

Но как мы будем добавлять модели в базу данных и работать с ними? Все просто — я решил что правильным способом будем создание класса сессии.

Давайте реализуем это:

from pathlib import Path
from typing import List, Any, Union, Callable
from uuid import uuid4
from abc import ABC, abstractmethod
from collections import OrderedDict

from loguru import logger

from sqlsymphony_orm.database.manager import SQLiteMultiManager # менеджер
from sqlsymphony_orm.datatypes.fields import BaseDataType, IntegerField # типы данныъ
from sqlsymphony_orm.exceptions import ( # исключения
	PrimaryKeyError,
	FieldValidationError,
	NullableFieldError,
	FieldNamingError,
)lsymphony_orm.queries import QueryBuilder # билдер запросов


class Session(ABC):
	"""
	This class describes a session.
	"""

	@abstractmethod
	def reconnect(self):
		"""
		reconnect to database
		"""
		raise NotImplementedError

	@abstractmethod
	def get_all(self):
		"""
		Gets all models
		"""
		raise NotImplementedError

	@abstractmethod
	def get_all_by_module(self, needed_model: SessionModel):
		"""
		Gets all models by module.

		:param		needed_model:  The needed model
		:type		needed_model:  SessionModel
		"""
		raise NotImplementedError

	@abstractmethod
	def drop_table(self, table_name: str):
		"""
		drop table

		:param		table_name:	 The table name
		:type		table_name:	 str
		"""
		raise NotImplementedError

	@abstractmethod
	def filter(self, query: "QueryBuilder", first: bool = False):
		"""
		Filter and get models by query

		:param		query:	The query
		:type		query:	QueryBuilder
		:param		first:	The first
		:type		first:	bool
		"""
		raise NotImplementedError

	@abstractmethod
	def update(self, model: SessionModel, **kwargs):
		"""
		Update model

		:param		model:	 The model
		:type		model:	 SessionModel
		:param		kwargs:	 The keywords arguments
		:type		kwargs:	 dictionary
		"""
		raise NotImplementedError

	@abstractmethod
	def add(self, model: SessionModel, ignore: bool = False):
		"""
		Add model

		:param		model:	 The model
		:type		model:	 SessionModel
		:param		ignore:	 The ignore
		:type		ignore:	 bool
		"""
		raise NotImplementedError

	@abstractmethod
	def delete(self, model: SessionModel):
		"""
		Deletes the given model.

		:param		model:	The model
		:type		model:	SessionModel
		"""
		raise NotImplementedError

	@abstractmethod
	def commit(self):
		"""
		Commit changes
		"""
		raise NotImplementedError

	@abstractmethod
	def close(self):
		"""
		Close connection
		"""
		raise NotImplementedError


class SQLiteSession(Session):
	"""
	This class describes a sqlite session.
	"""

	def __init__(self, database_file: str):
		"""
		Constructs a new instance.

		:param		database_file:	The database file
		:type		database_file:	str
		"""
		self.database_file = Path(database_file)
		self.models = {}
		self.manager = SQLiteMultiManager(self.database_file)

	def reconnect(self, database_file: str = None):
		"""
		Reconnecto to database
		"""
		if database_file is not None:
			self.database_file = Path(database_file)
		logger.info(f"Session {self.database_file}: reconnect")
		self.manager.reconnect(database_file)

	def execute(
		self, raw_sql_query: str, values: tuple = (), get_cursor: bool = False
	) -> list:
		"""
		Execute raw sql query

		:param		raw_sql_query:	The raw sql query
		:type		raw_sql_query:	str
		:param		values:			The values
		:type		values:			tuple
		:param		get_cursor:		The get cursor
		:type		get_cursor:		bool

		:returns:	list with output data
		:rtype:		list
		"""
		return self.manager.execute(raw_sql_query, values, get_cursor)

	def get_all(self) -> List[SessionModel]:
		"""
		Gets all.

		:returns:	All.
		:rtype:		List[SessionModel]
		"""
		models_instances = [self.models[model]["model"] for model in self.models.keys()]
		return models_instances

	def get_all_by_module(self, needed_model: SessionModel) -> List[SessionModel]:
		"""
		Gets all by module.

		:param		needed_model:  The needed model
		:type		needed_model:  SessionModel

		:returns:	All by module.
		:rtype:		List[SessionModel]
		"""
		all_instances = [self.models[model]["model"] for model in self.models.keys()]
		needed_instances = []
		model_name = needed_model._model_name

		for model in all_instances:
			if model._model_name == model_name:
				needed_instances.append(model)

		return needed_instances

	def drop_table(self, table_name: str):
		"""
		Drop table

		:param		table_name:	 The table name
		:type		table_name:	 str
		"""
		logger.info(f"Session {self.database_file}: drop table {table_name}")
		self.manager.drop_table(table_name)

	def filter(
		self, query: "QueryBuilder", first: bool = False
	) -> Union[List[SessionModel], SessionModel]:
		"""
		Filter and get model by query

		:param		query:	The query
		:type		query:	QueryBuilder
		:param		first:	The first
		:type		first:	bool

		:returns:	list with SessionModel or SessionModel
		:rtype:		Union[List[SessionModel], SessionModel]
		"""
		db_results = self.manager.filter(str(query))
		results = []
		fields = {}

		for unique_id, curr_model in self.models.items():
			model = curr_model["model"]
			fields[unique_id] = {
				"keys": model._original_fields.keys(),
				"values": [
					getattr(model, value)
					if model._primary_key["field_name"] != value
					else model.pk
					for value in model._original_fields.keys()
				],
			}

		for result in db_results:
			for unique_id, data in fields.items():
				if len(data["keys"]) == len(result):
					if tuple(data["values"]) == result:
						results.append(self.models[unique_id]["model"])

		if results:
			return results[0] if first else results
		else:
			return None

	def update(self, model: SessionModel, **kwargs):
		"""
		Update model

		:param		model:	 The model
		:type		model:	 SessionModel
		:param		kwargs:	 The keywords arguments
		:type		kwargs:	 dictionary
		"""
		current_model = self.models.get(model.unique_id, None)

		if current_model is None:
			self.add(model)

		logger.info(f"Session {self.database_file}: update model {model.unique_id}")

		if model.hooks:
			func = model.hooks["update"]["function"]
			logger.debug(f"Exec Model Hook[update]: {func.__name__}")
			func(*model.hooks["update"]["args"])

		for key, value in kwargs.items():
			if hasattr(model, key):
				if value is not None and model._original_fields[key].validate(value):
					orig_field = getattr(model, key)
					setattr(model, key, model._original_fields[key].to_db_value(value))
					logger.info(
						f"[{model.table_name}] Update {model._model_name}#{model.pk} {key}: {orig_field} -> {value}"
					)

		self.models[model.unique_id]["model"] = model

	def add(self, model: SessionModel, ignore: bool = False):
		"""
		Add new model

		:param		model:	 The model
		:type		model:	 SessionModel
		:param		ignore:	 The ignore
		:type		ignore:	 bool
		"""
		if self.models.get(model.unique_id, None) is not None:
			logger.warning(f"Model {model.unique_id} already added")
			return

		if model.hooks:
			func = model.hooks["save"]["function"]
			logger.debug(f"Exec Model Hook[save]: {func.__name__}")
			func(*model.hooks["save"]["args"])

		self.models[model.unique_id] = {"model": model}

		formatted_fields = model.get_formatted_sql_fields(skip_primary_key=True)

		self.manager.create_table(model.table_name, model.get_formatted_sql_fields())

		self.manager.insert(model.table_name, formatted_fields, model.pk, model, ignore)

		last_pk = self.execute(
			f'SELECT max({model._primary_key["field_name"]}) FROM {model.table_name}'
		)

		model._primary_key["value"] = int(last_pk[0][0])

		logger.info(
			f"Session {self.database_file}: insert new model: {model.unique_id}"
		)

	def delete(self, model: SessionModel):
		"""
		Deletes the given model.

		:param		model:	The model
		:type		model:	SessionModel
		"""
		current_model = self.models.get(model.unique_id, None)

		if current_model is None:
			logger.error(f"Model {model.unique_id} does not exists")
			return

		if model.hooks:
			func = model.hooks["delete"]["function"]
			logger.debug(f"Exec Model Hook[delete]: {func.__name__}")
			func(*model.hooks["delete"]["args"])

		self.manager.delete(
			current_model["model"].table_name,
			current_model["model"]._primary_key["field_name"],
			current_model["model"].pk,
		)

		logger.info(f"Session {self.database_file}: delete model: {model.unique_id}")

	def commit(self):
		"""
		Commit changes
		"""
		self.manager.commit()

	def close(self):
		"""
		Close connection
		"""
		self.manager.close_connection()

Такая же структура, как и в остальном коде — создаем абстрактный класс сессии и позже создаем класс-наследник нужной СУБД — в данном случае sqlite.

Класс сессии имеет методы для добавления модели, фильтра, получения всех моделей, получения моделей одного типа, удаление моделей, обновления и другие вспомогательные методы.

Через сессию можно работать так:

from sqlsymphony_orm.datatypes.fields import IntegerField, RealField, TextField
from sqlsymphony_orm.models.session_models import SessionModel
from sqlsymphony_orm.models.session_models import SQLiteSession
from sqlsymphony_orm.queries import QueryBuilder

session = SQLiteSession("example.db")


class User(SessionModel):
	__tablename__ = "Users"

	id = IntegerField(primary_key=True)
	name = TextField(null=False)
	cash = RealField(null=False, default=0.0)

	def __repr__(self):
		return f"<User {self.pk}>"


class User2(SessionModel):
	__tablename__ = "Users"

	id = IntegerField(primary_key=True)
	name = TextField(null=False)
	cash = RealField(null=False, default=0.0)
	password = TextField(default="password1234")

	def __repr__(self):
		return f"<User {self.pk}>"


class Comment(SessionModel):
	id = IntegerField(primary_key=True)
	name = TextField(null=False)
	user_id = IntegerField(null=False)


user = User(name="John")
user2 = User(name="Bob")
user3 = User(name="Ellie")
session.add(user)
session.commit()
session.add(user2)
session.commit()
session.add(user3)
session.commit()
session.delete(user3)
session.commit()
session.update(model=user2, name="Anna")
session.commit()

comment = Comment(name=user.name, user_id=user.pk)
session.add(comment)
session.commit()

print(
	session.filter(QueryBuilder().SELECT("*").FROM(User.table_name).WHERE(name="Anna"))
)
print(session.get_all())
print(session.get_all_by_module(User))
print(user.pk)

session.close()

❯ Вспомогательные модули

Давайте реализуем небольшой модуль хеширования. Это может потребоваться, когда в БД хранятся данные, которые должны быть засекречены.

import hashlib
from abc import ABC, abstractmethod
from enum import Enum, auto
from hmac import compare_digest
from typing import Union


class HashAlgorithm(Enum):
	"""
	This class describes a hash algorithms.
	"""

	SHA256 = auto()
	SHA512 = auto()
	MD5 = auto()
	BLAKE2B = auto()
	BLAKE2S = auto()


class HashingBase(ABC):
	"""
	This class describes a hashing base.
	"""

	@abstractmethod
	def hash(
		self, data: Union[bytes, str], hexdigest: bool = False
	) -> Union[bytes, str]:
		"""
		Hash

		:param		data:				  The data
		:type		data:				  Union[bytes, str]
		:param		hexdigest:			  The hexdigest
		:type		hexdigest:			  bool

		:returns:	hashing
		:rtype:		Union[bytes, str]

		:raises		NotImplementedError:  abstract method
		"""
		raise NotImplementedError()

	@abstractmethod
	def verify(self, data: Union[bytes, str], hashed_data: Union[bytes, str]) -> bool:
		"""
		Verify data and hashed data

		:param		data:				  The data
		:type		data:				  Union[bytes, str]
		:param		hashed_data:		  The hashed data
		:type		hashed_data:		  Union[bytes, str]

		:returns:	true if data=hashed_data
		:rtype:		bool

		:raises		NotImplementedError:  { exception_description }
		"""
		raise NotImplementedError()


class PlainHasher(HashingBase):
	"""
	This class describes a plain hasher.
	"""

	def __init__(self, algorithm: HashAlgorithm = HashAlgorithm.SHA256):
		"""
		Constructs a new instance.

		:param		algorithm:	The algorithm
		:type		algorithm:	HashAlgorithm
		"""
		self.algorithm = algorithm

	def hash(self, data: Union[bytes, str]) -> bytes:
		"""
		Generate hash

		:param		data:  The data
		:type		data:  Union[bytes, str]

		:returns:	hash
		:rtype:		bytes
		"""
		if isinstance(data, str):
			data = data.encode("utf-8")

		hasher = self.get_hasher()
		return hasher(data).digest()

	def verify(self, data: Union[bytes, str], hashed_data: Union[bytes, str]) -> bool:
		"""
		Verify data and hashed data

		:param		data:		  The data
		:type		data:		  Union[bytes, str]
		:param		hashed_data:  The hashed data
		:type		hashed_data:  Union[bytes, str]

		:returns:	true if data==hashed_data
		:rtype:		bool
		"""
		if isinstance(data, str):
			data = data.encode("utf-8")
		if isinstance(hashed_data, str):
			hashed_data = hashed_data.encode()

		expected_hash = self.hash(data)

		return compare_digest(expected_hash, hashed_data)

	def get_hasher(self) -> callable:
		"""
		Gets the hasher function.

		:returns:	The hasher.
		:rtype:		callable

		:raises		ValueError:	 unknown hash function.
		"""
		hash_functions = {
			HashAlgorithm.SHA256: hashlib.sha256,
			HashAlgorithm.SHA512: hashlib.sha512,
			HashAlgorithm.MD5: hashlib.md5,
			HashAlgorithm.BLAKE2B: hashlib.blake2b,
			HashAlgorithm.BLAKE2S: hashlib.blake2s,
		}

		hash_function = hash_functions.get(self.algorithm, None)

		if hash_function is None:
			raise ValueError(f"Unknown hash function type: {self.algorithm}")
		else:
			return hash_function


class SaltedHasher(HashingBase):
	"""
	This class describes a salted hasher.
	"""

	def __init__(
		self, algorithm: HashAlgorithm = HashAlgorithm.SHA256, salt: str = "SOMESALT"
	):
		"""
		Constructs a new instance.

		:param		algorithm:	The algorithm
		:type		algorithm:	HashAlgorithm
		:param		salt:		The salt
		:type		salt:		str
		"""
		self.algorithm = algorithm
		self.salt = salt

	def hash(self, data: Union[bytes, str]) -> bytes:
		"""
		Generate hash

		:param		data:  The data
		:type		data:  Union[bytes, str]

		:returns:	hash
		:rtype:		bytes
		"""
		salt = self.salt.encode("utf-8")

		if isinstance(data, str):
			data = data.encode("utf-8")

		hasher = self.get_hasher()
		value = f"{data}{salt}".encode("utf-8")

		return hasher(value).digest()

	def verify(self, data: str, hashed_data: Union[bytes, str]) -> bool:
		"""
		Verify data and hashed_data

		:param		data:		  The data
		:type		data:		  str
		:param		hashed_data:  The hashed data
		:type		hashed_data:  Union[bytes, str]

		:returns:	true if data==hashed_data
		:rtype:		bool
		"""
		if isinstance(hashed_data, str):
			print("convert")

		expected_hash = self.hash(data)

		return compare_digest(expected_hash, hashed_data)

	def get_hasher(self) -> callable:
		"""
		Gets the hasher function.

		:returns:	The hasher.
		:rtype:		callable

		:raises		ValueError:	 unknown hasher function
		"""
		hash_functions = {
			HashAlgorithm.SHA256: hashlib.sha256,
			HashAlgorithm.SHA512: hashlib.sha512,
			HashAlgorithm.MD5: hashlib.md5,
			HashAlgorithm.BLAKE2B: hashlib.blake2b,
			HashAlgorithm.BLAKE2S: hashlib.blake2s,
		}

		hash_function = hash_functions.get(self.algorithm, None)

		if hash_function is None:
			raise ValueError(f"Unknown hash function type: {self.algorithm}")
		else:
			return hash_function

Здесь есть также абстрактный класс хешера и два его наследника — обычный (plain) хешер и засоленный (salted) hasher. Salted hasher генерирует хеш с солью, то есть к значению для хеширования мы добавляем соль. Это позволит избежать нахождения хеш-коллизий.

❯ Миграции

Миграции в контексте нашей ORM — это действия для обновления старой модели на новую с апдейтом базы данных. То есть: если старая модель имела два поля: id и name, то мы можем создать новую модель с тремя полями (id, name и age), и во время миграции в базу данных добавится новое поле, без потери старых изменений. Но есть несколько ограничений:

  • Возможны проблемы с UNIQUE-полями.

  • Если поле с параметром NOT NULL, она должна иметь параметр DEFAULT, иначе будет ошибка.

Конечно, у нас миграции простые, но вы можете сделать лучше.

Также я сделал возможность бекапить БД, которые были до миграции. Во время инициализации менеджера миграций, создается директория migrations, туда помещается бекап текущей базы данных, и после этого уже идет работа менеджера. А сами миграции для восстановления хранятся в json-файле. Благодаря этому, если миграция прошла неудачно, можно ее отменить и все вернуть на место. Главное — чтобы существовал бекап.

Напишем код:

from typing import Optional, Union
from abc import ABC, abstractmethod
import os
import json
import shutil
from pathlib import Path
from datetime import datetime
from sqlsymphony_orm.models.session_models import SQLiteSession # сессия
from sqlsymphony_orm.exceptions import MigrationError # исключение
from loguru import logger


class MigrationManager(ABC):
	"""
	This class describes a migration manager.
	"""

	@abstractmethod
	def get_current_table_columns(self, table_name: str):
		"""
		Gets the current table columns.

		:param		table_name:			  The table name
		:type		table_name:			  str

		:raises		NotImplementedError:  abstract method
		"""
		raise NotImplementedError()

	@abstractmethod
	def get_table_columns_from_model(self, model: "Model"):
		"""
		Gets the table columns from model.

		:param		model:				  The model
		:type		model:				  Model

		:raises		NotImplementedError:  abstract method
		"""
		raise NotImplementedError()

	@abstractmethod
	def revert_migration(self, index_key: int = -1):
		"""
		Revert migration

		:param		index_key:			  The index key
		:type		index_key:			  int

		:raises		NotImplementedError:  abstract method
		"""
		raise NotImplementedError()


class SQLiteMigrationManager(MigrationManager):
	"""
	This class describes a sqlite migration manager.
	"""

	def __init__(self, session: SQLiteSession, migrations_dir: str = "migrations"):
		"""
		Constructs a new instance.

		:param		session:		 The session
		:type		session:		 SQLiteSession
		:param		migrations_dir:	 The migrations dir
		:type		migrations_dir:	 str
		"""
		self.session = session
		self.migrations_dir = migrations_dir
		os.makedirs(self.migrations_dir, exist_ok=True)
		self.migrations = {}
		self.migrations_file = "sqlsymphony_migrates.json"

	def get_current_table_columns(self, table_name: str) -> list:
		"""
		Gets the current table columns.

		:param		table_name:	 The table name
		:type		table_name:	 str

		:returns:	The current table columns.
		:rtype:		list
		"""
		data = self.session.execute(f"SELECT * FROM {table_name}", get_cursor=True)
		cursor = data[0]
		fieldnames = [field[0] for field in cursor.description]

		return fieldnames

	def get_table_columns_from_model(self, model: "Model") -> list:
		"""
		Gets the table columns from model.

		:param		model:	The model
		:type		model:	Model

		:returns:	The table columns from model.
		:rtype:		list
		"""
		return [key for key in model._original_fields.keys()]

	def upload_migrations_file(self):
		logger.debug(f"Load JSON migrations history file: {self.migrations_file}")
		with open(self.migrations_file, "r") as read_file:
			self.migrations = json.load(read_file)

	def update_migrations_file(self):
		logger.debug(f"Update JSON migrations history file: {self.migrations_file}")
		with open(self.migrations_file, "w") as write_file:
			json.dump(self.migrations, write_file, indent=4)

	def migrate_from_model(
		self,
		old_model: Union["SessionModel", "Model"],
		new_model: Union["SessionModel", "Model"],
		original_table_name: str,
		new_table_name: Optional[str] = None,
	):
		"""
		Migrate from old model to new model

		:param		old_model:			  The old model
		:type		old_model:			  Union["SessionModel", "Model"]
		:param		new_model:			  The new model
		:type		new_model:			  Union["SessionModel", "Model"]
		:param		original_table_name:  The original table name
		:type		original_table_name:  str
		:param		new_table_name:		  The new table name
		:type		new_table_name:		  Optional[str]

		:raises		MigrationError:		  fields error
		"""
		sql_queries = []

		logger.info("Start database migrating")

		if new_table_name is not None:
			sql_queries.append(
				f"ALTER TABLE {original_table_name} RENAME TO {new_table_name};"
			)
			logger.debug(
				f"[Migration] Change table name: {original_table_name} -> {new_table_name}"
			)
			new_model.table_name = new_table_name
			original_table_name = new_table_name

		old_fields = set(
			[
				f"{field_name} {field_params}"
				for field_name, field_params in old_model._class_get_formatted_sql_fields(
					skip_primary_key=False
				).items()
			]
		)
		new_fields = set(
			[
				f"{field_name} {field_params}"
				for field_name, field_params in new_model._class_get_formatted_sql_fields(
					skip_primary_key=False
				).items()
			]
		)
		added = new_fields - old_fields
		dropped = old_fields - new_fields

		for field_name in dropped:
			logger.debug(
				f'[Migration] Drop column {field_name.split(" ")[0]} from table {original_table_name}'
			)
			sql_queries.append(
				f"ALTER TABLE {original_table_name} DROP COLUMN {field_name.split(" ")[0]};"
			)

		for field in added:
			if "NOT NULL" in field and "DEFAULT" not in field:
				raise MigrationError(
					f'Cannot script a "not null" field without default value in field "{field}"'
				)
			logger.debug(f"[Migration] Add column {field} to {original_table_name}")
			sql_queries.append(f"ALTER TABLE {original_table_name} ADD COLUMN {field};")

		migrationfile = os.path.join(
			self.migrations_dir,
			f'{datetime.now().strftime("backup_%Y%m%d%H%M%S")}_{self.session.database_file}',
		)
		logger.debug(f"Create migraton file: {migrationfile}")
		shutil.copyfile(self.session.database_file, migrationfile)

		if Path(self.migrations_file).exists():
			self.upload_migrations_file()

		self.migrations[str(len(self.migrations) + 1)] = {
			"migrationfile": migrationfile,
			"tablename": original_table_name,
			"description": f"from {old_model._model_name} to {new_model._model_name}",
			"sql_queries": list(sql_queries),
			"fields": {
				"new": list(new_fields),
				"old": list(old_fields),
				"added": list(added),
				"dropped": list(dropped),
			},
		}

		self.update_migrations_file()

		for sql_query in sql_queries:
			logger.debug(f"[Migration] Execute sql query: {sql_query}")

			try:
				self.session.execute(sql_query)
			except Exception as ex:
				raise MigrationError(str(ex))

	def revert_migration(self, index_key: int = -1):
		"""
		Revert migration

		:param		index_key:	The index key
		:type		index_key:	int
		"""
		self.upload_migrations_file()

		if index_key == -1:
			index_key = [k for k in self.migrations.keys()][-1]

		try:
			migration = self.migrations[str(index_key)]
		except KeyError as ke:
			logger.error(f"Cannot get migration by index {index_key}: {ke}")

		logger.info("[Migration] Rollback database from new to old.")
		shutil.copyfile(migration["migrationfile"], self.session.database_file)

Весь основной код содержится в методе migrate_from_model, которая принимает на вход старую модель, новую модель, название таблицы и новое название таблицы (опционально). Мы получаем нужные поля, работая с ними через множества set, и создаем нужные бекапы.

Метод revert_migration же позволяет по индексу миграции вернуть старую версию БД. Индекс по умолчанию -1, то есть последний. После мы получаем нужную миграцию и замещаем новую БД старой БД.

❯ Пример работы

Вот полный код примера работы ORM:

from sqlsymphony_orm.datatypes.fields import IntegerField, RealField, TextField # поля
from sqlsymphony_orm.models.session_models import SessionModel # модель
from sqlsymphony_orm.models.session_models import SQLiteSession # сессия
from sqlsymphony_orm.queries import QueryBuilder # билдер запросов
from sqlsymphony_orm.migrations.migrations_manager import SQLiteMigrationManager # миграции

start = time()
session = SQLiteSession("example.db")


class User(SessionModel):
	__tablename__ = "Users"

	id = IntegerField(primary_key=True)
	name = TextField(null=False)
	cash = RealField(null=False, default=0.0)

	def __repr__(self):
		return f"<User {self.pk}>"


class User2(SessionModel):
	__tablename__ = "Users"

	id = IntegerField(primary_key=True)
	name = TextField(null=False)
	cash = RealField(null=False, default=0.0)
	password = TextField(default="password1234")

	def __repr__(self):
		return f"<User {self.pk}>"


class Comment(SessionModel):
	id = IntegerField(primary_key=True)
	name = TextField(null=False)
	user_id = IntegerField(null=False)


user = User(name="John")
user2 = User(name="Bob")
user3 = User(name="Ellie")
session.add(user)
session.commit()
session.add(user2)
session.commit()
session.add(user3)
session.commit()
session.delete(user3)
session.commit()
session.update(model=user2, name="Anna")
session.commit()

comment = Comment(name=user.name, user_id=user.pk)
session.add(comment)
session.commit()

print(
	session.filter(QueryBuilder().SELECT("*").FROM(User.table_name).WHERE(name="Anna"))
)
print(session.get_all())
print(session.get_all_by_module(User))
print(user.pk)

migrations_manager = SQLiteMigrationManager(session)
migrations_manager.migrate_from_model(User, User2, "Users", "UserAnd")
migrations_manager.revert_migration(-1)

session.close()

У нас есть три модели - модель юзера, новая модель юзера, модель комментария. Мы всех их добавляем, обновляем если надо. Потом в коде демонстрируется фильтрация и получение моделей, а в конце мы создаем миграцию и после сразу же возвращаем старую БД, и в конце закрываем сессию.

Для сохранения изменений после операции следует вызвать session.commit().

Итак, это все!

Мы смогли изучить многие сложные конструкции ООП в python на примере создания такой базовой вещи как ORM.

Да, наша ORM не идеальна, нет ForeignKey, некоторых других вещей. Значение PrimaryKey доступно только после добавления модели. Если у вас есть замечания по поводу статьи - пишите в комментариях. Разумная критика приветствуется!

Ссылка на github-репозиторий с примерами здесь.

Документация моей ORM доступна по этой ссылке.

А PyPI проект находится по этой ссылке.

❯ Заключение

Спасибо за внимание! Это был довольно интересный опыт для меня, т.к. это мой первый большой проект на python с продвинутым ООП, где я попытался изучить более подробно язык и инструментарии.

Если у вас есть замечания по статье или по коду - пишите, наверняка есть более опытный и профессиональный программист на Python, который может помочь как и читателям статьи, так и мне.

Ссылка на мой репозиторий реализации ORM здесь.

Буду рад, если вы присоединитесь к моему небольшому телеграм-блогу. Анонсы статей, новости из мира IT и полезные материалы для изучения программирования и смежных областей. Если конечно хотите :)

Новости, обзоры продуктов и конкурсы от команды Timeweb.Cloud - в нашем Telegram-канале

Перейти

? Читайте также:

❯ Источники

Комментарии (15)


  1. GoldGoblin
    21.11.2024 09:33

    А вы уверены что это статья?
    Мне кажется это просто код с отсебятиной автора в комментариях


    1. DrArgentum Автор
      21.11.2024 09:33

      Мне кажется что нет. Перед написанием этой статьи мне посоветовали поменьше объяснений и больше кода, т.к. все таки здесь продвинутое сообщество.


      1. Blumfontein
        21.11.2024 09:33

        т.к. все таки здесь продвинутое сообщество.

        Давно уже тут полно домохозяек


        1. GoldGoblin
          21.11.2024 09:33

          Домохозяйки тут не причем. Какая полезность этого кода? Вы бы стали использовать эту orm или взяли бы алхимию?


          1. DrArgentum Автор
            21.11.2024 09:33

            Я хочу сказать, что это больше проект для обучения, чтобы знать +- как устроены ORM. И как минимум, еще и немного можно изучить ООП в питоне.


        1. DrArgentum Автор
          21.11.2024 09:33

          Но все равно, костяк остается. Домохозяек много, но они в основном идут либо на популярные статьи, либо на простые. И то, часть из них не являются активными пользователями хабра. Но это мое мнение, если что.


  1. gideon-ul
    21.11.2024 09:33

    а зачем такое выкгадывать на хабре? для галочки?


    1. DrArgentum Автор
      21.11.2024 09:33

      Я сам когда то нуждался в подобной статье, т.к. хотелось создать что то такое, по типу библиотеки. Но не хватало общего понимания. Эта статья как раз и создана для таких как я.


  1. vagon333
    21.11.2024 09:33

    Пара замечаний:

    1. Исходники портянкой на хабре, наверное, перебор.
      Существует стандартная практика - ссылка на GitHub или любой другой публично доступный Version Control System ресурс.

    2. Если уж вы создаете ORM со своими типами данных, то наверное нужно расширить типы данных, добавив поддержку стандартных Date, DateTime и др. фишек, типа Enumerations.

    Это только у меня строковый тип поля "SlugField" вызывает подтормаживание - что за новое именование?


    1. DrArgentum Автор
      21.11.2024 09:33

      SlugField - небольшая фича, для генерации слагов. То есть это как бы текст, но он позволяет генерировать слаг из текста. Тость, из "Привет мир" в privet-mir.

      Спасибо за замечания!


  1. obabichev
    21.11.2024 09:33

    Выглядит хорошо, я тоже недавно задавался вопросом, а как сделать entity cache в orm (если что, еще не сделал, но я верю в свой путь), мне очень помогло чтение кода Hibernate (но там правда, очевидно, java) и неплохо этот вопрос описан у Фаулера (Patterns of Enterprise Application Architecture - Martin Fowler) в 11 главе. Там и про Unit of work и про Identity map по отдельности.

    По поводу кучи кода, я читал без проблем, некоторые замечания:

    • как по мне лучше вычистить для статьи комментарии вида commits changes для функции commit

    • можно поудалять бойлерплейт и заменить на комменты вида доказательство теоремы оставим как упраженение читателю =)

    • убрать импорты, если они не критически важны для понимания, т.к. они просто забивают пространство

    • убрать имплементацию длинных функций, если нет цели описать алгоритм внутри этих функций; если я правильно понял, одна из целей - сделать сложную OOP структуру, то для рассказа о такой структуре можно фокусироваться на классах и их связях (через поля, методы и т.п.. ну и запихать весь код в статью, можно наверно в начале проекта, но потом он перестанет туда умещаться >_<

    Будет здорово, если продолжите делать, когда дойдете до one-to-many, many-to-many, many-to-many with extra columns, multicolumn primary key, начнете чувствовать легкое жжение, не обращайте внимание =)


    1. DrArgentum Автор
      21.11.2024 09:33

      Большое спасибо за замечания! Планирую сделать вторую часть со всеми связами и foreign key. Может даже попробую сделать асинхронно.


  1. bondaran
    21.11.2024 09:33

    Не рекомендую следовать данному гайду, я так 6 лет назад тоже написал свою ORM (tortoise) - до сих пор приходится поддерживать и фиксить баги (:


    1. KonstantinTokar
      21.11.2024 09:33

      Это так и есть, но как обучающий материал вполне неплохо.


  1. manyakRus
    21.11.2024 09:33

    https://github.com/ManyakRus/crud_generator

    У меня ещё лучше - автоматически создаётся полностью готовый микросервис с функциями CRUD операций (Create, Read, Update, Delete), а также работа всех функций по протоколу GRPC.
    Генерируется исходный код для каждой таблицы и колонки в БД.
    Заполнить надо только настройки подключения к БД, больше не надо ничего заполнять(настраивать), можно и настраивать, изменять имена каталогов и др.