В работе над одним проектом в компании NUT.Tech нам понадобилась система событий, работа которой не влияла бы на основной поток выполнения программы. 

Требования к системе были довольно простыми:

  • Возможность подписываться на события,

  • Возможность уведомлять систему о событии,

  • Возможность передавать в обработчики событий дополнительную информацию,

  • Простая реализация обработчиков событий,

  • Выполнение обработчиков событий не должно никак аффектить основной поток программы.

Какое-то время мы пытались найти подходящую нам библиотеку в интернете. “Наверняка, мы не первые, кто столкнулся с необходимостью такого функционала у себя в приложении” - думали мы. И, действительно, мы нашли довольно много пакетов со схожей функциональностью. Ниже перечислю некоторые из рассмотренных нами библиотек:

  • https://github.com/ReactiveX/RxGo - популярная библиотека, обладающая нужной нам функциональностью, но эта функциональность только малая часть того, что это библиотека умеет. А нам очень не хотелось использовать что-то большое ради одной небольшой функции. Это похоже на забивание гвоздей микроскопом.

  • https://github.com/gookit/event - показалась нам переусложненной.

  • https://github.com/agoalofalife/event - не умеет запускать обработчики в отдельном потоке.

  • https://github.com/AlexanderGrom/go-event - также не умеет запускать обработчики в отдельном потоке. Зато нам понравилась легковесность библиотеки и простой интерфейс.

Остальные найденные нами библиотеки были или очень объемные, с большим количеством настроек (а нам хотелось что-то простое и легкое), либо давно не обновлялись, либо работали в том же потоке, что и основная программа.

В общем, так и не сумев найти отвечающую всем нашим требованиям библиотеку, мы решили, что проще и быстрее будет написать все самим (какая редкость для Golang, да?).

Сначала мы добавили нашу систему событий как часть основного приложения, но с требованием “в будущем должно быть легко выделить код в отдельный пакет”. На написание непосредственно кода и тестов к нему ушло несколько дней.

Ниже расскажу и покажу на примерах, что у нас получилось.

Библиотека написана на языке Go и представляет из себя простейшую систему событий, которая основана на выполнении обработчиков независимо от основного потока.

Особенностями библиотеки являются:

  • Нет зависимостей от сторонних библиотек,

  • Возможность добавить несколько обработчиков одного или нескольких событий,

  • Каждый обработчик события запускается в отдельной горутине, обработчики выполняются независимо от основного потока,

  • Возможность передавать любые пользовательские данные в обработчики событий,

  • Полное покрытие тестами.

Исходный код и примеры можно посмотреть по ссылке https://github.com/nuttech/bell.

Использование библиотеки

Для добавления пакета в приложение достаточно выполнить команду

go get -u github.com/nuttech/bell

и далее в нужном файле проимпортировать ее:

import "github.com/nuttech/bell"

Чтобы добавить обработчик того или иного события, вам нужно добавить следующий код:

bell.Listen("event_name", func(message bell.Message) {
    // здесь код вашего обработчика
})

Первый аргумент функции - это название события, им может быть любая строка. Например, для вызова события успешной авторизации пользователя можно использовать название “user_login_success”.

Второй аргумент функции Listen - это функция, которая на вход принимает структуру bell.Message. Это и есть ваша функция-обработчик события. В структуре bell.Message будет передана системная информация и пользовательские данные:

type Message struct {
	Event     string // название события
	Timestamp time.Time // время вызова события
	Value     interface{} // пользовательские данные
}

Так как Message.Value - это interface{}, туда можно передать что угодно: идентификатор, строку, структуру и т.д.

Обработчиков события можно добавлять сколько угодно много. Все они будут вызваны в отдельной горутине:

bell.Listen("event_name", func(message bell.Message) { 
	// первый обработчик
})

bell.Listen("event_name", func(message bell.Message) {
	// второй обработчик
})

Для того, чтобы вызвать событие и запустить обработчики, достаточно добавить примерно такой код:

bell.Call("event_name", "some data")

где первый параметр - это название события, а второй - пользовательские данные, которые вы захотите передать в обработчики.

Например, если у вас есть структура userStruct и вызов события выглядит следующим образом:

type userStruct struct {
	Name string
}

bell.Call("event_name", userStruct{Name: "Jon"})

То обработчик может может быть таким:

bell.Listen("event_name", func(message bell.Message) {
	user := message.Value.(userStruct)
	fmt.Printf("%#v\n", userStruct{Name: "Jon"})  // main.userStruct{Name:"Jon"}
})

Вспомогательные функции

Пакет также содержит еще несколько вспомогательных функций:

bell.List()

Функция вернет список всех событий, на которые добавлены подписчики.

bell.Has("event_name")

С помощью этой функции можно проверить, существуют ли обработчики для указанного события.

_ = bell.Remove()

А эта функция удалит все обработчики всех событий.

_ = bell.Remove("event_name")

Если передать в функцию bell.Remove название события, то будут удалены только обработчики переданного события.

Полный пример работы пакета

Напоследок приведу простой пример использования Bell. В коде для понимания происходящего добавлены комментарии.

Пример использования библиотеки Bell
package main

import (
  "fmt"
  "github.com/nuttech/bell"
  "log"
  "net/http"
  "net/url"
  "time"
)

const requestEvent = "request"
const loginSuccessEvent = "login_success"

type LoginRequest struct {
  Method    string
  Path      string
  UserAgent string

}

type User struct {
  Login string
}

func main() {
  // создаем обработчик события request, который будет выводить информацию о запросе
  bell.Listen(requestEvent, func(message bell.Message) {
     time.Sleep(time.Second * 2)
     r := message.Value.(LoginRequest)
     log.Printf("%s %s, %s", r.Method, r.Path, r.UserAgent)

  })

  // Создаем два обработчика события успешного логина
  // Первый будет писать локальный лог
  bell.Listen(loginSuccessEvent, func(message bell.Message) {
     data := message.Value.(User)
     log.Printf("%#v\n", data)
  })

  // Второй будет отправлять данные на какой-то сторонний сервис
  bell.Listen(loginSuccessEvent, func(message bell.Message) {
     userData := message.Value.(User)
     rData := url.Values{
        "login": {userData.Login},
     }

     // шлем запрос локально для упрощения примера
     ,  = http.PostForm("http://localhost:8888/log", rData)

  })

  // Создадим обработчик запроса на запись лога
  http.HandleFunc("/log", func(writer http.ResponseWriter, request *http.Request) {
     log.Println("Save login request")
     request.ParseForm()
     fmt.Printf("%#v\n", request.PostForm)
  })

  http.HandleFunc("/login", func(writer http.ResponseWriter, request *http.Request) {
     r := LoginRequest{
        Path:      request.RequestURI,
        Method:    request.Method,
        UserAgent: request.UserAgent(),
     }

     // Вызываем событие request и продолжаем работу обработчика
     _ = bell.Ring(requestEvent, r)

     // получаем логи и пароль
     request.ParseForm()
     login := request.FormValue("login")
     pass := request.FormValue("password")

     if login != "login" || pass != "pass" {
        writer.WriteHeader(http.StatusUnauthorized)
        return
     }

     // вызываем событие успешного логина
     _ = bell.Ring(loginSuccessEvent, User{Login: login})
     // и сразу отдаем клиенту 200 ответ
     writer.WriteHeader(http.StatusOK)
  })

  log.Fatal(http.ListenAndServe(":8888", nil))
}

Заключение

В итоге, у нас получилась легкая и простая библиотека, которая отвечает всем нашим требованиям. Теперь она используется в наших проектах. 

Мы будем рады любым замечаниям и предложениям по доработке данной библиотеки.

Комментарии (9)


  1. z0ic
    14.02.2022 16:32
    -3

    Люблю Go.


  1. paramtamtam
    14.02.2022 16:38
    +2

    Спасибо за библиотеку! Врятли её буду использовать, но чуть-чуть посмотрел её внутренности. Сразу несколько вопросов:

    • Почему используется глобальный стейт?

    • Если код порождает горутины - то почему нет возможности их из-вне прибить? Контекст там, или отдельный канал какой-нить для уведомления об этом.

    • Вот тут будет происходить блокировка до момента, пока предыдущие вызовы не будут обработаны, а если их будет много? Выглядит как узкое место

    • Структура вместо интерфейса для описания сообщения заставит код быть более связанным, что по идее не очень хорошо

    • Почему бы в дизайне не заложить то, что при подписке на события - просто передавать на вход канал, в который надо отправить сообщение, а при отписке наоборот - его принимать (пример можно глянуть тут)?


    1. alitvinenko Автор
      15.02.2022 14:24
      +2

      Спасибо за быстрое ревью!

      Почему используется глобальный стейт?

      Мы не можем понять как иначе поступить в данном случае? Может быть подскажете, направите?

      Если код порождает горутины - то почему нет возможности их из-вне прибить? Контекст там, или отдельный канал какой-нить для уведомления об этом.

      Вот тут будет происходить блокировка до момента, пока предыдущие вызовы не будут обработаны, а если их будет много? Выглядит как узкое место

      Структура вместо интерфейса для описания сообщения заставит код быть более связанным, что по идее не очень хорошо

      Спасибо! Поставили себе таски на доработки.

      Почему бы в дизайне не заложить то, что при подписке на события - просто передавать на вход канал, в который надо отправить сообщение, а при отписке наоборот - его принимать (пример можно глянуть тут)?

      А какое преимущество даст передача сообщений в каналы на ваш взгляд? Изначально мы решили не выбрасывать каналы наружу потому что не хотели чтобы они использовались напрямую в обход библиотеки.


      1. paramtamtam
        15.02.2022 14:33

        Мы не можем понять как иначе поступить в данном случае

        Да просто же:

        func New() *State {
        	return &State{m: make(map[string][]chan Message{})}
        }

        И уже на эту структуру состояния "навешивать" нужные функции (так и тестировать будет проще, и иметь возможность переиспользовать пакет в разных контекстах)

        Поставили себе таски на доработки

        Заметил :) Вам спасибо за то, что не игнорируете обратную связь!

        А какое преимущество даст передача сообщений в каналы на ваш взгляд

        Зона ответственности в первую очередь будет локализованной (тот, кто создаёт каналы и передаёт их вашей реализации "в работу"- занимается их обслуживаем и своевременным закрытием, вспомни signal.Notify(...); этот подход чем-то похож на Rust-овский), и это более "нативный" путь для Go, вспомни тот-же <-time.After(...)


        1. alitvinenko Автор
          16.02.2022 18:07

          По первой части стало понятно. Подумаем над реализацией, спасибо!

          По второй части, если честно, нужно какое-то время на осознание :)


    1. Devoter
      16.02.2022 17:10
      +1

      Тоже писал что-то похожее для своего проекта, для разрешения блокировок добавил возможность указать время ожидания, по истечении которого запись в конкретный канал слушателя будет пропущена и выплюнута ошибка в канал ошибок (до этого иногда все приложение "вставало колом", если буфер канала на слушающей стороне переполнялся или если просто горутина там подвисала).

      И каналы тоже передает подписчик, либо он может создаваться автоматически. В общем, интересное резюме для себя получил )) Спасибо за статью и за комментарий.


  1. KGeist
    14.02.2022 16:52
    +3

    Для каких целей используется данная библиотека? При падении сервиса операция останется висеть в полуобработанном состоянии, т.к. события никуда не персистируются. Т.е. для критичной бизнес-логики не подходит. Для логирования - ОК. Возможно, стоит прописать это явно?

    Имеет смысл ещё написать про реентрабельность. Напр., что будет, если я дёрну bell.Ring в обработчике?

    Как дела с graceful degradation? Т.е. у нас крутятся тяжеловесные обработчики, а процесс нужно срочно стопануть. Как поведёт себя библиотека?

    Хотелось бы подобных деталей.


    1. alitvinenko Автор
      15.02.2022 14:49

      Для каких целей используется данная библиотека? При падении сервиса операция останется висеть в полуобработанном состоянии, т.к. события никуда не персистируются. Т.е. для критичной бизнес-логики не подходит. Для логирования - ОК. Возможно, стоит прописать это явно?

      Вы совершенно правы! Добавим в осписание сервиса что у нас нет персистентного хранения сообщений.

      Имеет смысл ещё написать про реентрабельность. Напр., что будет, если я дёрну bell.Ring в обработчике?

      В данный момент если из обработчика вызывать то же событие, то ничего не произойдет. Если вызвать другое событие, то будет вызван обработчик этого события.

      Как дела с graceful degradation? Т.е. у нас крутятся тяжеловесные обработчики, а процесс нужно срочно стопануть. Как поведёт себя библиотека?

      Сейчас graceful degradation у нас нет. Деградируем некрасиво (все просто прекратится на середине выполнения) :) Спасибо за наводку. Поставили таску на доработку.


  1. THQSql
    15.02.2022 10:17
    +1

    Вот что бывает, когда разработчики приходят в Go и тащат свои привычные инструменты.

    Библиотека весьма удобная и выглядит аккуратно, но в в Go есть каналы!