Этот паттерн не новый и относится еще к языку Eiffel. Но благодаря усилиями Greg Young-а и Martin Fowler-а получил реинкарнацию, особенно в .NET мире.
По своему личному опыту, могу сказать, что паттерн очень удобный, довольно легко позволяет разделить бизнес логику, очень хорошо поддерживаемый, легко «усваиваемый» новичками, а также хорошо позволяющий изолировать ошибки. О нем много написано информации на хабре и нет смысл еще раз перепечатывать это все в этой статье. Тут я решил полностью сфокусироваться на портировании этого паттерна в Go.
Итак начнем…
Сразу хотело бы сказать, что CQRS не обязательно подразумевает Event Sourcing, но часто используется в связке CQRS +DDD + Event Sourcing из-за простоты реализации последнего. Я пробовал проекты вместе с Event Sourcing и без в него, и в обоих случаях патерн CQRS хорошо ложился на бизнес логику, но стоит заметить, что использованиe Event Sourcing приносит свои преимущества при реализации денормализованной базы данных, скорости сохранения и чтения данных, но при этому усложняет понимание данных, миграцию и формирование отчетности.
Проще говоря для Event Sourcing необходимо использовать CQRS, но не наоборот.
Поэтому я не буду касаться Event Sourcing, чтобы не усложнять статью.
И так для обработки Command нам понадобится Bus. Определим ее базовый интерфейс
type EventBus interface {
Subscribe(handler handlers.Handler) error
Publish(eventName string, args ...interface{})
Unsubscribe(eventName string) error
}
Также нам нужно где-то хранить логику обработки команд. Для этого определим интерфейс обработчиков — Handler
type Handler interface {
Event() string
Execute(... interface{}) error
OnSubscribe()
OnUnsubscribe()
}
Во многих контрактных языках обычно добавляют еще интерфейс для Command, но в силу реализации интерфейсов в Go это не имеет смысла, и команда заменятся событием — Event, которое может быть обработано множеством Handler-ов.
Если вы уже заметили, то в метод Publish передается набор параметров — Publish(eventName string, args ...interface{}), а не определенный типа. В результате чего вы можете передать в метод любой тип или набор типов.
Полная реализация EventBus будет выглядеть так:
type eventBus struct {
mtx sync.RWMutex
handlers map[string][] handlers.Handler
}
// Execute appropriate handlers
func (b *eventBus) Publish(eventName string, args ...interface{}) {
b.mtx.RLock()
defer b.mtx.RUnlock()
if hs, ok := b.handlers[eventName]; ok {
rArgs := createArgs(args)
for _, h := range hs {
// In case of Closure "h" variable will be reassigned before ever executed by goroutine.
// Because if this you need to save value into variable and use this variable in closure.
h_in_goroutine := h
go func() {
//Handle Panic in Handler.Execute.
defer func() {
if err := recover(); err != nil {
log.Printf("Panic in EventBus.Publish: %s", err)
}
}()
h_in_goroutine.Execute(rArgs)
}()
}
}
}
// Subscribe Handler
func (b *eventBus) Subscribe(h handlers.Handler) error {
b.mtx.Lock()
//Handle Panic on adding new handler
defer func() {
b.mtx.Unlock()
if err := recover(); err != nil {
log.Printf("Panic in EventBus.Subscribe: %s", err)
}
}()
if h == nil {
return errors.New("Handler can not be nil.")
}
if len(h.Event()) == 0 {
return errors.New("Handlers with empty Event are not allowed.")
}
h.OnSubscribe()
b.handlers[h.Event()] = append(b.handlers[h.Event()], h)
return nil
}
// Unsubscribe Handler
func (b *eventBus) Unsubscribe(eventName string) error {
b.mtx.Lock()
//Handle Panic on adding new handler
defer func() {
b.mtx.Unlock()
if err := recover(); err != nil {
log.Printf("Panic in EventBus.Unsubscribe: %s", err)
}
}()
if _, ok := b.handlers[eventName]; ok {
for i, h := range b.handlers[eventName] {
if h != nil {
h.OnUnsubscribe()
b.handlers[eventName] = append(b.handlers[eventName][:i], b.handlers[eventName][i+1:]...)
}
}
return nil
}
return fmt.Errorf("event are not %s exist", eventName)
}
func createArgs(args []interface{}) []reflect.Value {
reflectedArgs := make([]reflect.Value, 0)
for _, arg := range args {
reflectedArgs = append(reflectedArgs, reflect.ValueOf(arg))
}
return reflectedArgs
}
// New creates new EventBus
func New() EventBus {
return &eventBus{
handlers: make(map[string][] handlers.Handler),
}
}
Внутри метода Publish вызывается метод обработчика событие обернутый в goroutine с обработкой panic, на случай непредвиденной ситуации.
Как видите реализация очень простая, гораздо проще, чем это можно реализовать на .NET или Java.
Полный код с примерами вы может скачать тут: github.
Комментарии (16)
Workanator
02.02.2018 22:19Также, вместо того, чтобы создавать переменную h_in_gorotine, её можно передать, как параметр функции.
GeckoGreen
02.02.2018 22:58А с какой целью аргументы в Publish заворачиваются в reflect.Value?
Так же стоит заметить, что recover повсюду не есть хорошая практика.BOOTLOADER Автор
02.02.2018 23:03Так же стоит заметить, что recover повсюду не есть хорошая практика.
Она не повсюда, понятно, что наиболее часто recover будет вызывать только в методе Publish, если вынести recover куда-то выше, то это приведет к краху всей программы. С удовольствием выслушаю совет.GeckoGreen
03.02.2018 11:42За то вызовет обработчик панику или нет, должны отвечать тот кто пишет сам обработчик и тот кто передает обработчик в шину. Так например, для сервера из net/http все обработчики тоже неродные, но никаких recover в нем нет.
BOOTLOADER Автор
03.02.2018 21:05Ну допустим у вас есть 6 обработчиком. Что лучше — один обработчик на уровне шин, или шесть recover на уровне каждого хандлера? И если скажем кто-то где-то забыл поставить recover вся шина упадет.
GeckoGreen
04.02.2018 20:28Лучше не паниковать вообще, а возвращать ошибки. А что если хэндлер решит запустить горутину и в ней паниковать? Вся программа упадет.
BOOTLOADER Автор
05.02.2018 09:19Да, вы правы — если внутри хендлера запустить goroutine и они уже выбросить панику, то все упадет.
kilgur
02.02.2018 23:05В данном случае это может быть библиотека и код обработчиков для неё чужероден, т.е. доверять ему нельзя. Обработчик может и панику выкинуть, но шина от этого падать не должна… На мой взгляд, recover здесь вполне уместны.
BOOTLOADER Автор
04.02.2018 10:35А с какой целью аргументы в Publish заворачиваются в reflect.Value?
В reflect.Value преобразовывыл потому, что все равно значение передаются как интерфейсы и нужно будет анализировать значение через рефлексию, но немного поразмыслив я понял, что я не совсем был прав и нужно оставить самому хендеру выяснить что за значение переданы и как с их обрабатывать, и теперь все параметры передаются также как interface{}
Спасибо. Вы также меня натолкнули на мысль, что будет если один их хендлеров изменит значение параметров и тут у меня оказалось ошибка. Я добавил тест (Test_Should_not_leak_args_changes_to_another_handler), чтобы проверить, что параметры от обработчика к обработчику не меняются. И если в методе Publish --> подменить строку h_in_goroutine.Execute(cArgs ...) на h_in_goroutine.Execute(args ...) тест упадет, потому что значение параметров из первого хендлера попадают во второй. К сожалению не могу вам добавить "+" из-за низкой кармы…
kilgur
Скажите, а где у вас тут CQRS? Я не придираюсь, мне термин CQRS до вашей статьи незнаком был, ходил в википедию почитать. Если правильно уловил суть, то должно быть разделение чтений и изменений. Т.е.
мухи и котлетызапросы и команды — отдельно. Дважды просмотрел ваш код, вижу только publish-subscribe паттерн. Я как-то неправильно понимаю эту «кухню»?zelenin
увы) в статье нет CQRS и есть реализация простейшей шины событий.
BOOTLOADER Автор
Вы правы — я не стал добавлять репозитории для чтения данных и в результате получилась реализация шины. Ценность это в стать, при отсутствии EventSourcing небольшая, но делает статью урезанной. Я тогда добавлю.
claygod
Недавно по CQRS была статья, думаю, пригодится не только вам (если предмет действительно интересен) но и автору статьи: habrahabr.ru/post/347908 Там довольно интересное обсуждение получилось в комментариях…
BOOTLOADER Автор
Спасибо. Почитаю.