Паттерн (CQRS — Command and Query Responsibility Segregation) разделяющей в своей основе команды по чтению данных от команд по их модификации или добавлению. Это позволяет достичь максимальную производительность, масштабируемость и безопасность, а также позволяет увеличить гибкость системы к модификациям с течением времени и снизить количество ошибок при усложнении логики системы, причиной которых обычно является обработка данных на доменном уровне.

Этот паттерн не новый и относится еще к языку Eiffel. Но благодаря усилиями Greg Young-а и Martin Fowler-а получил реинкарнацию, особенно в .NET мире.

По своему личному опыту, могу сказать, что паттерн очень удобный, довольно легко позволяет разделить бизнес логику, очень хорошо поддерживаемый, легко «усваиваемый» новичками, а также хорошо позволяющий изолировать ошибки. О нем много написано информации на хабре и нет смысл еще раз перепечатывать это все в этой статье. Тут я решил полностью сфокусироваться на портировании этого паттерна в Go.

Итак начнем…

Сразу хотело бы сказать, что CQRS не обязательно подразумевает Event Sourcing, но часто используется в связке CQRS +DDD + Event Sourcing из-за простоты реализации последнего. Я пробовал проекты вместе с Event Sourcing и без в него, и в обоих случаях патерн CQRS хорошо ложился на бизнес логику, но стоит заметить, что использованиe Event Sourcing приносит свои преимущества при реализации денормализованной базы данных, скорости сохранения и чтения данных, но при этому усложняет понимание данных, миграцию и формирование отчетности.
Проще говоря для Event Sourcing необходимо использовать CQRS, но не наоборот.
Поэтому я не буду касаться Event Sourcing, чтобы не усложнять статью.

И так для обработки Command нам понадобится Bus. Определим ее базовый интерфейс

type EventBus interface {
	Subscribe(handler handlers.Handler) error
	Publish(eventName string, args ...interface{})
	Unsubscribe(eventName string) error
}

Также нам нужно где-то хранить логику обработки команд. Для этого определим интерфейс обработчиков — Handler

type Handler interface {
	Event() string
	Execute(... interface{}) error
	OnSubscribe()
	OnUnsubscribe()
}

Во многих контрактных языках обычно добавляют еще интерфейс для Command, но в силу реализации интерфейсов в Go это не имеет смысла, и команда заменятся событием — Event, которое может быть обработано множеством Handler-ов.

Если вы уже заметили, то в метод Publish передается набор параметров — Publish(eventName string, args ...interface{}), а не определенный типа. В результате чего вы можете передать в метод любой тип или набор типов.

Полная реализация EventBus будет выглядеть так:

type eventBus struct {
	mtx      sync.RWMutex
	handlers map[string][] handlers.Handler
}

// Execute appropriate handlers
func (b *eventBus) Publish(eventName string, args ...interface{}) {
	b.mtx.RLock()
	defer b.mtx.RUnlock()

	if hs, ok := b.handlers[eventName]; ok {
		rArgs := createArgs(args)

		for _, h := range hs {

			// In case of Closure "h" variable will be reassigned before ever executed by goroutine.
			// Because if this you need to save value into variable and use this variable in closure.
			h_in_goroutine := h

			go func() {
				//Handle Panic in Handler.Execute.
				defer func() {
					if err := recover(); err != nil {
						log.Printf("Panic in EventBus.Publish: %s", err)
					}
				}()
				h_in_goroutine.Execute(rArgs)
			}()
		}
	}
}

// Subscribe Handler
func (b *eventBus) Subscribe(h handlers.Handler) error {
	b.mtx.Lock()
	//Handle Panic on adding new handler
	defer func() {
		b.mtx.Unlock()
		if err := recover(); err != nil {
			log.Printf("Panic in EventBus.Subscribe: %s", err)
		}
	}()

	if h == nil {
		return errors.New("Handler can not be nil.")
	}

	if len(h.Event()) == 0 {
		return errors.New("Handlers with empty Event are not allowed.")
	}

	h.OnSubscribe()
	b.handlers[h.Event()] = append(b.handlers[h.Event()], h)

	return nil
}

// Unsubscribe Handler
func (b *eventBus) Unsubscribe(eventName string) error {
	b.mtx.Lock()
	//Handle Panic on adding new handler
	defer func() {
		b.mtx.Unlock()
		if err := recover(); err != nil {
			log.Printf("Panic in EventBus.Unsubscribe: %s", err)
		}
	}()

	if _, ok := b.handlers[eventName]; ok {

		for i, h := range b.handlers[eventName] {
			if h != nil {
				h.OnUnsubscribe()
				b.handlers[eventName] = append(b.handlers[eventName][:i], b.handlers[eventName][i+1:]...)
			}
		}

		return nil
	}

	return fmt.Errorf("event are not %s exist", eventName)
}

func createArgs(args []interface{}) []reflect.Value {
	reflectedArgs := make([]reflect.Value, 0)

	for _, arg := range args {
		reflectedArgs = append(reflectedArgs, reflect.ValueOf(arg))
	}

	return reflectedArgs
}

// New creates new EventBus
func New() EventBus {
	return &eventBus{
		handlers: make(map[string][] handlers.Handler),
	}
}

Внутри метода Publish вызывается метод обработчика событие обернутый в goroutine с обработкой panic, на случай непредвиденной ситуации.

Как видите реализация очень простая, гораздо проще, чем это можно реализовать на .NET или Java.

Полный код с примерами вы может скачать тут: github.

Комментарии (16)


  1. kilgur
    02.02.2018 17:02
    +2

    Скажите, а где у вас тут CQRS? Я не придираюсь, мне термин CQRS до вашей статьи незнаком был, ходил в википедию почитать. Если правильно уловил суть, то должно быть разделение чтений и изменений. Т.е. мухи и котлеты запросы и команды — отдельно. Дважды просмотрел ваш код, вижу только publish-subscribe паттерн. Я как-то неправильно понимаю эту «кухню»?


    1. zelenin
      02.02.2018 19:12
      +1

      увы) в статье нет CQRS и есть реализация простейшей шины событий.


      1. BOOTLOADER Автор
        02.02.2018 21:32

        Вы правы — я не стал добавлять репозитории для чтения данных и в результате получилась реализация шины. Ценность это в стать, при отсутствии EventSourcing небольшая, но делает статью урезанной. Я тогда добавлю.


    1. claygod
      02.02.2018 21:03
      +1

      Недавно по CQRS была статья, думаю, пригодится не только вам (если предмет действительно интересен) но и автору статьи: habrahabr.ru/post/347908 Там довольно интересное обсуждение получилось в комментариях…


      1. BOOTLOADER Автор
        02.02.2018 21:32

        Спасибо. Почитаю.


  1. Workanator
    02.02.2018 22:19

    Также, вместо того, чтобы создавать переменную h_in_gorotine, её можно передать, как параметр функции.


  1. GeckoGreen
    02.02.2018 22:58

    А с какой целью аргументы в Publish заворачиваются в reflect.Value?
    Так же стоит заметить, что recover повсюду не есть хорошая практика.


    1. BOOTLOADER Автор
      02.02.2018 23:03

      Так же стоит заметить, что recover повсюду не есть хорошая практика.

      Она не повсюда, понятно, что наиболее часто recover будет вызывать только в методе Publish, если вынести recover куда-то выше, то это приведет к краху всей программы. С удовольствием выслушаю совет.


      1. GeckoGreen
        03.02.2018 11:42

        За то вызовет обработчик панику или нет, должны отвечать тот кто пишет сам обработчик и тот кто передает обработчик в шину. Так например, для сервера из net/http все обработчики тоже неродные, но никаких recover в нем нет.


        1. BOOTLOADER Автор
          03.02.2018 21:05

          Ну допустим у вас есть 6 обработчиком. Что лучше — один обработчик на уровне шин, или шесть recover на уровне каждого хандлера? И если скажем кто-то где-то забыл поставить recover вся шина упадет.


          1. GeckoGreen
            04.02.2018 20:28

            Лучше не паниковать вообще, а возвращать ошибки. А что если хэндлер решит запустить горутину и в ней паниковать? Вся программа упадет.


            1. BOOTLOADER Автор
              05.02.2018 09:19

              Да, вы правы — если внутри хендлера запустить goroutine и они уже выбросить панику, то все упадет.


    1. kilgur
      02.02.2018 23:05

      В данном случае это может быть библиотека и код обработчиков для неё чужероден, т.е. доверять ему нельзя. Обработчик может и панику выкинуть, но шина от этого падать не должна… На мой взгляд, recover здесь вполне уместны.


    1. BOOTLOADER Автор
      04.02.2018 10:35

      А с какой целью аргументы в Publish заворачиваются в reflect.Value?

      В reflect.Value преобразовывыл потому, что все равно значение передаются как интерфейсы и нужно будет анализировать значение через рефлексию, но немного поразмыслив я понял, что я не совсем был прав и нужно оставить самому хендеру выяснить что за значение переданы и как с их обрабатывать, и теперь все параметры передаются также как interface{}

      Спасибо. Вы также меня натолкнули на мысль, что будет если один их хендлеров изменит значение параметров и тут у меня оказалось ошибка. Я добавил тест (Test_Should_not_leak_args_changes_to_another_handler), чтобы проверить, что параметры от обработчика к обработчику не меняются. И если в методе Publish --> подменить строку h_in_goroutine.Execute(cArgs ...) на h_in_goroutine.Execute(args ...) тест упадет, потому что значение параметров из первого хендлера попадают во второй. К сожалению не могу вам добавить "+" из-за низкой кармы…


  1. stranger_shaman
    02.02.2018 23:07

    добавлению по средствам различных интерфейсов
    поправьте чтоли


    1. BOOTLOADER Автор
      02.02.2018 23:07

      Изменил, спасибо.