Привет, хабровчане!
Я Дима, Python-разработчик из 21YARD, сервиса поиска строительных подрядчиков.
Это вторая часть цикла о DDD. В ней расскажу, как добавить к проекту событийно-ориентированную архитектуру.
Оглавление
Код подопытного приложения ищите в репозитории по ссылке. Подробнее о DDD и паттернах Repository и Unit of Work читайте в первой части по ссылке.
CQRS
Первый шаг к реализации событийно-ориентированной архитектуры - внедрение CQRS (The Command and Query Responsibility Segregation). Применим этот принцип и разделим чтение данных и их создание, обновление, удаление.
Чтением данных займется класс-представление (src/users/entrypoints/views.py):
class UsersViews:
"""
Views related to users, which purpose is to return information upon read requests,
due to the fact that write requests (represented by commands) are different from read requests.
# TODO At current moment uses repositories pattern to retrieve data. In future can be changed on raw SQL
# TODO for speed-up purpose
"""
def __init__(self, uow: UsersUnitOfWork) -> None:
self._uow: UsersUnitOfWork = uow
async def get_user_account(self, user_id: int) -> UserModel:
users_service: UsersService = UsersService(self._uow)
user: UserModel = await users_service.get_user_by_id(id=user_id)
return user
async def get_user_statistics(self, user_id: int) -> UserStatisticsModel:
users_service: UsersService = UsersService(self._uow)
user_statistics: UserStatisticsModel = await users_service.get_user_statistics_by_user_id(user_id=user_id)
return user_statistics
async def get_all_users(self) -> List[UserModel]:
users_service: UsersService = UsersService(self._uow)
users: List[UserModel] = await users_service.get_all_users()
return users
Реализация представления использует сервисный слой из первой статьи. Если нужно ускорить запросы к базе данных, используйте чистый SQL вместо сервисного слоя, абстракций и ORM.
Представления вызываются из зависимостей практически без изменений (src/users/entrypoints/dependencies.py):
async def get_my_account(token: str = Depends(oauth2_scheme)) -> UserModel:
jwt_data: JWTDataModel = await parse_jwt_token(token=token)
users_views: UsersViews = UsersViews(uow=SQLAlchemyUsersUnitOfWork())
return await users_views.get_user_account(user_id=jwt_data.user_id)
Запросы на изменение данных от пользователя будут представлены отдельными объектами-командами.
Напишем абстрактную команду (src/core/interfaces/commands.py). Другие команды в проекте будут наследоваться от нее.
@dataclass(frozen=True)
class AbstractCommand(ABC):
"""
Base command, from which any domain command should be inherited.
Commands represents external operations, which must be executed.
"""
async def to_dict(
self,
exclude: Optional[Set[str]] = None,
include: Optional[Dict[str, Any]] = None
) -> Dict[str, Any]:
"""
Create a dictionary representation of the model.
exclude: set of model fields, which should be excluded from dictionary representation.
include: set of model fields, which should be included into dictionary representation.
"""
data: Dict[str, Any] = asdict(self)
if exclude:
for key in exclude:
try:
del data[key]
except KeyError:
pass
if include:
data.update(include)
return data
В ходе обработки команд от пользователя могут возникнуть внутренние события. Внутренние события отличаются от внешних причиной возникновения и целями. Выразим их через объект-ивент.
Введем абстрактный ивент (src/core/interfaces/events.py), от которого отнаследуются прочие внутренние события:
@dataclass(frozen=True)
class AbstractEvent(ABC):
"""
Base event, from which any domain event should be inherited.
Events represents internal operations, which may be executed.
"""
async def to_dict(
self,
exclude: Optional[Set[str]] = None,
include: Optional[Dict[str, Any]] = None
) -> Dict[str, Any]:
"""
Create a dictionary representation of the model.
exclude: set of model fields, which should be excluded from dictionary representation.
include: set of model fields, which should be included into dictionary representation.
"""
data: Dict[str, Any] = asdict(self)
if exclude:
for key in exclude:
try:
del data[key]
except KeyError:
pass
if include:
data.update(include)
return data
Событийно-ориентированная архитектура
В основе событийно-ориентированная архитектуры лежит действие, которое изменяет состояние приложения. Событие может быть инициализировано внешним или внутренним источником: быть командой или ивентом.
События могут каскадно порождать друг друга. Все события должны быть обработаны.
Рассмотрим ситуацию, когда пользователь ставит оценку другому пользователю. Последний должен получить уведомление. Это реализуется через создание ивента.
Функционал приложения можно прописать построчно. Но тогда будет сложно масштабировать проект, менять реализации. Команды и ивенты помогают этого избежать.
Рассмотрим команду голосования за пользователя (src/users/domain/commands.py):
@dataclass(frozen=True)
class VoteForUserCommand(AbstractCommand):
voted_for_user_id: int
voting_user_id: int
liked: bool
disliked: bool
Событие является DTO (Data Transfer Object) - объектом, который хранит данные или состояние, но не имеет методов для их изменения. DTO передается между слоями приложения.
Для обработки DTO используются хендлеры или обработчики событий. Введем абстрактные классы обработчиков ивентов и команд (src/core/interfaces/handlers.py):
class AbstractHandler(ABC):
@abstractmethod
def __init__(self, uow: AbstractUnitOfWork) -> None:
raise NotImplementedError
class AbstractEventHandler(AbstractHandler, ABC):
"""
Abstract event handler class, from which every event handler should be inherited from.
"""
@abstractmethod
async def __call__(self, event: AbstractEvent) -> None:
raise NotImplementedError
class AbstractCommandHandler(AbstractHandler, ABC):
"""
Abstract command handler class, from which every command handler should be inherited from.
"""
@abstractmethod
async def __call__(self, command: AbstractCommand) -> Any:
raise NotImplementedError
При инициализации хендлер получает Unit of Work. Он нужен по двум причинам:
чтобы инициализировать сервис, которым хендлер будет пользоваться;
чтобы хранить ивенты.
Добавим обработку ивентов в базовый класс Unit of Work (src/core/interfaces/units_of_work.py):
class AbstractUnitOfWork(ABC):
"""
Interface for any units of work, which would be used for transaction atomicity, according DDD.
"""
def __init__(self):
# Creating events storage for retrieve them in MessageBus:
self._events: List[AbstractEvent] = []
...
async def add_event(self, event: AbstractEvent) -> None:
self._events.append(event)
def get_events(self) -> Generator[AbstractEvent, None, None]:
"""
Using generator to get elements only when they needed.
Also can not use self._events directly, not to run events endlessly.
"""
while self._events:
yield self._events.pop(0)
Рассмотрим обработчик команды выставления оценки пользователю (src/users/service_layer/handlers/command_handlers.py):
class VoteForUserCommandHandler(UsersCommandHandler):
async def __call__(self, command: VoteForUserCommand) -> UserStatisticsModel:
"""
1) Checks, if a vote is appropriate.
2) Likes or dislikes user, depends on from command data.
3) Creates event, signaling that user has voted.
"""
async with self._uow as uow:
if command.voting_user_id == command.voted_for_user_id:
raise UserCanNotVoteForHimSelf
users_service: UsersService = UsersService(uow=uow)
if await users_service.check_if_user_already_voted(
voting_user_id=command.voting_user_id,
voted_for_user_id=command.voted_for_user_id
):
raise UserAlreadyVotedError
user_statistics: UserStatisticsModel
if command.liked:
user_statistics = await users_service.like_user(
voting_user_id=command.voting_user_id,
voted_for_user_id=command.voted_for_user_id
)
else:
user_statistics = await users_service.dislike_user(
voting_user_id=command.voting_user_id,
voted_for_user_id=command.voted_for_user_id
)
voted_for_user: UserModel = await users_service.get_user_by_id(id=command.voted_for_user_id)
voting_user: UserModel = await users_service.get_user_by_id(id=command.voting_user_id)
await uow.add_event(
UserVotedEvent(
liked=command.liked,
disliked=command.disliked,
voted_for_user_email=voted_for_user.email,
voted_for_user_username=voted_for_user.username,
voting_user_username=voting_user.username,
voting_user_email=voting_user.email,
)
)
return user_statistics
Часть логики обработчика перенесена из зависимости (dependency), которая была в первой статье. После добавили создание ивента уведомления (src/users/domain/events.py):
@dataclass(frozen=True)
class UserVotedEvent(AbstractEvent):
voted_for_user_email: str
voted_for_user_username: str
voting_user_email: str
voting_user_username: str
liked: bool
disliked: bool
Шина сообщений
Создание ивента требует его обработки. Unit of Work не может этим заниматься, так как его задача - сохранять атомарность транзакций. Если он будет дополнительно обрабатывать ивенты, нарушится принцип единственной ответственности по SOLID.
Обработкой ивентов займется шина сообщений (src/core/messagebus.py):
class MessageBus:
def __init__(
self,
uow: AbstractUnitOfWork,
event_handlers: Dict[Type[AbstractEvent], List[AbstractEventHandler]],
command_handlers: Dict[Type[AbstractCommand], AbstractCommandHandler],
) -> None:
self._uow: AbstractUnitOfWork = uow
self._event_handlers: Dict[Type[AbstractEvent], List[AbstractEventHandler]] = event_handlers
self._command_handlers: Dict[Type[AbstractCommand], AbstractCommandHandler] = command_handlers
self._queue: Queue = Queue()
self._command_result: Any = None
async def handle(self, message: Message) -> None:
self._queue.put(message)
while not self._queue.empty():
message = self._queue.get()
if isinstance(message, AbstractEvent):
await self._handle_event(event=message)
elif isinstance(message, AbstractCommand):
await self._handle_command(command=message)
else:
raise MessageBusMessageError
async def _handle_event(self, event: AbstractEvent) -> None:
handler: AbstractEventHandler
for handler in self._event_handlers[type(event)]:
await handler(event)
for event in self._uow.get_events():
self._queue.put_nowait(event)
async def _handle_command(self, command: AbstractCommand) -> None:
handler: AbstractCommandHandler = self._command_handlers[type(command)]
self._command_result = await handler(command)
for event in self._uow.get_events():
self._queue.put_nowait(event)
@property
def command_result(self) -> Any:
return self._command_result
При инициализации шина сообщений получает замапленные типы событий с экземплярами обработчиков.
У ивента может быть несколько обработчиков. Каждый обработчик выполняет одно действие. Например, когда пользователю поставят оценку, уведомления на почту и на телефон отправят разные обработчики. У команд всегда один обработчик.
Главный метод шины сообщений handle() получает объект Message - объединение внешних и внутренних событий приложения (src/core/interfaces/messages.py):
Message = Union[AbstractEvent, AbstractCommand]
Ивенты попадают в очередь и обрабатываются в порядке возникновения. Если ивенты не равноценны, можно использовать очередь с приоритетами.
Внедрение зависимостей
Инициализацией шины сообщений мог бы заниматься слой зависимостей (dependency), но тогда было бы много дублирующегося кода, что нарушает принцип DRY (Don't Repeat Yourself).
Создадим класс, который предоставит шину сообщений, а значит проинициализирует ее подготовленными обработчиками (src/core/bootstrap.py):
class Bootstrap:
"""
Bootstrap class for Dependencies Injection purposes.
"""
def __init__(
self,
uow: AbstractUnitOfWork,
events_handlers_for_injection: Dict[Type[AbstractEvent], List[Type[AbstractEventHandler]]],
commands_handlers_for_injection: Dict[Type[AbstractCommand], Type[AbstractCommandHandler]],
dependencies: Optional[Dict[str, Any]] = None
) -> None:
self._uow: AbstractUnitOfWork = uow
self._dependencies: Dict[str, Any] = {'uow': self._uow}
self._events_handlers_for_injection: Dict[Type[AbstractEvent], List[Type[AbstractEventHandler]]] = (
events_handlers_for_injection
)
self._commands_handlers_for_injection: Dict[Type[AbstractCommand], Type[AbstractCommandHandler]] = (
commands_handlers_for_injection
)
if dependencies:
self._dependencies.update(dependencies)
async def get_messagebus(self) -> MessageBus:
"""
Makes necessary injections to commands handlers and events handlers for creating appropriate messagebus,
after which returns messagebus instance.
"""
injected_event_handlers: Dict[Type[AbstractEvent], List[AbstractEventHandler]] = {
event_type: [
await self._inject_dependencies(handler=handler)
for handler in event_handlers
]
for event_type, event_handlers in self._events_handlers_for_injection.items()
}
injected_command_handlers: Dict[Type[AbstractCommand], AbstractCommandHandler] = {
command_type: await self._inject_dependencies(handler=handler)
for command_type, handler in self._commands_handlers_for_injection.items()
}
return MessageBus(
uow=self._uow,
event_handlers=injected_event_handlers,
command_handlers=injected_command_handlers,
)
async def _inject_dependencies(
self,
handler: Union[Type[AbstractEventHandler], Type[AbstractCommandHandler]]
) -> Union[AbstractEventHandler, AbstractCommandHandler]:
"""
Inspecting handler to know its signature and init params, after which only necessary dependencies will be
injected to the handler.
"""
params: MappingProxyType[str, inspect.Parameter] = inspect.signature(handler).parameters
handler_dependencies: Dict[str, Any] = {
name: dependency
for name, dependency in self._dependencies.items()
if name in params
}
return handler(**handler_dependencies)
Метод внедрения зависимостей _inject_dependencies() с помощью модуля inspect проверяет сигнатуру каждого обработчика. Обработчик получает только те зависимости, которые соответствуют его сигнатуре.
Соберем все воедино в зависимостях (src/users/entrypoints/dependencies.py):
async def like_user(user_id: int, token: str = Depends(oauth2_scheme)) -> UserStatisticsModel:
jwt_data: JWTDataModel = await parse_jwt_token(token=token)
bootstrap: Bootstrap = Bootstrap(
uow=SQLAlchemyUsersUnitOfWork(),
events_handlers_for_injection=EVENTS_HANDLERS_FOR_INJECTION,
commands_handlers_for_injection=COMMANDS_HANDLERS_FOR_INJECTION
)
messagebus: MessageBus = await bootstrap.get_messagebus()
await messagebus.handle(
VoteForUserCommand(
voted_for_user_id=user_id,
voting_user_id=jwt_data.user_id,
liked=True,
disliked=False
)
)
return messagebus.command_result
При изменениях во внутренних слоях приложения не потребуется вносить правки в зависимости. Если поменяется бизнес-логика, можно внести новый обработчик для события или добавить новое событие в конфигурационном файле событий (src/users/service_layer/handlers/init.py):
EVENTS_HANDLERS_FOR_INJECTION: Dict[Type[AbstractEvent], List[Type[AbstractEventHandler]]] = {
UserVotedEvent: [SendVoteNotificationMessageEventHandler],
}
COMMANDS_HANDLERS_FOR_INJECTION: Dict[Type[AbstractCommand], Type[AbstractCommandHandler]] = {
RegisterUserCommand: RegisterUserCommandHandler,
VoteForUserCommand: VoteForUserCommandHandler,
VerifyUserCredentialsCommand: VerifyUserCredentialsCommandHandler,
}
Выводы
Внедрение событийно-ориентированной архитектуры повышает гибкость и масштабируемость приложения. Новый функционал не требует больших изменений в коде, достаточно создать новое событие и обработчики для него.
Минус внедрения событийно-ориентированной архитектуры - повышение сложности приложения. Из-за этого новые люди сложнее погружаются в проект. Поэтому использование событийно-ориентированной архитектуры должно быть оправдано сложностью приложения, его потенциалом роста и склонностью к изменениям.
Изучить подробнее описанные в статье паттерны и идеомы можно в книге Персиваля Г., Грегори Б. - "Паттерны разработки на Python: TDD, DDD и событийно-ориентированная архитектура".
Услышимся в следующих выпусках.
Буду благодарен поддержке и конструктивной критике!
milinsky
Безусловно, возлагать обработку событий на Unit of Work - это было бы ошибкой проектирования. Но и к Single Responsibility Principle из SOLID это имеет весьма слабое отношение.