Именно так я решил назвать эту статью, чтобы описать свои эмоции на прожаривании кода спонтанном публичном код-ревью моего open-source Portal от участников Go-чата, в котором я активно участвую. Главным ревьюером был Глеб Яльчик, что делало ситуацию ещё более волнительной для меня. Расскажу, как я на это решился, что мы обсудили, к чему пришли и какой эффект это дало. А внедрять Portal мы будем на примере мини-приложения по мотивам (всеми любимой) мобы Dota.

Примечание: это приложение будет что-то делать только в учебных целях, чтобы понять, как Portal сможет нам помочь передавать данные. Логика в нём будет достаточно простой, без ряда проверок и, возможно, с багами. Если станет достаточно интересно, то можем обсудить и его в комментариях!

Как всё случилось?

Всё случилось во время отпуска в Стамбуле. Нас с женой чуть не сбил автобус, а потом, почти сразу после этого, нас развели на улице на 50 баксов. Достаточно печально, не правда ли? Хорошо, что всё закончилось потерей лишь небольшой суммы, ведь могло быть и хуже. А всё дело было в том, что мы с участниками чата бурно обсуждали очередной событийный паттерн и я предложил свою библиотеку и опыт разработки, и не прекращал думать об этом.

Кульминацией было увидеть в чате «не самые лестные отзывы» о моём коде в предложенной библиотеке. Сразу скажу, что ни о чём не жалею, потому что это очень круто прокачало меня! Библиотека уже была рабочей и «зарабатывающей деньги», но далеко не идеальной.

Общение в чате

Начнём с самого главного: select в select. Я увидел эту конструкцию в сети и, не особо вдумываясь, решил её скопировать, прикидывая, что она решит проблему приоритета выбора в select stmt. На деле оно не взлетело, и пришлось ещё сдобрить мьютексом. Хотел бы ещё отметить, что, несмотря на то, достаточную дружелюбность нашего чата, критика залетала отменная (у всех свой стиль делать code-review).

Дальше было веселей! Как известно, в чате (даже в таком продвинутом, как в Telegram) иногда теряется нить обсуждения и ответные сообщения со ссылкой на цитату, из чего некоторые из участников подумали, что моей библиотекой предложил пользоваться не я.

Мьютекс собственной персоной. Выглядит достаточно нелепо, учитывая, что у канала есть своя реализация мьютекса. Как говорил Глеб, разработчики языка дали нам куда более изящные и эффективные инструменты, например, каналы и контекст. Спойлер: к ним я в конечном итоге и пришёл.
От некоторых комментариев сразу становилось понятно, что я не в своей «уютной» команде и с рецензентом не знаком. Были и такие, кто прямо рубил с плеча.

Честно говоря, я понимал, что там много говнокода, но рецензенты чаще выделяли микроучастки, не особо смотря на «макро» и выполняемые кодом задачи. Как мы выяснили в чате, сложно глубоко проверить более чем 400 строк кода (исключая кодогенерацию).

Несмотря на говнокод, хотелось понять, как сделать лучше. Многие комментаторы писали не особо информативные комментарии вроде: «это жесть», «что за ужас?», «вот же косяк» и всё такое прочее, на что я вполне логично задавал вопрос: «а почему?». Некоторые завсегдатаи ответили достаточно осмысленно и по пунктам. Потом организатор чата Николай Тузов предложил провести стрим с публичным ревью от Глеба.

Некоторые участники чата взглянули на код под другим углом, поднявшись выше: а что же код делает? Несмотря на косяки работы с примитивами синхронизации он был не так уж и плох. Это мне написал даже Глеб, что вдохновляло после прилётов из чата =)

Вот несколько главных вопросов от Глеба, по которым мы и начали выстраивать наше общение, и примерный план на ревью:

Давайте разбираться

Portal v1 — библиотека, позволяющая отправлять сообщения между модулями приложения, не имея прямых зависимостей от них, а только привязываясь к хабу (Portal). Это очень удобно, когда не хочется, чтобы функции разрастались по количеству строк логики, а структуры включали в себя всё больше и больше полей.

Как это работает?

Мы создаём Portal (основную сущность библиотеки) в самом начале приложения. Portal умеет N раз подписывать M различных обработчиков на обновления, реализующих его интерфейс portal.Handler. Возможны повторные подписки одного и того же, но Portal не управляет этим, тут всё зависит от пользователя.

func main() {
  portalGate := portal.New()
  defer portalGate.Close()

  heroTavern := tavern.New(allHeroes(), portalGate)
  battleResultHandler := tavern.NewBattlerResultHandler(heroTavern)

  battleArena := arena.New(portalGate)
  heroSelectedHandler := arena.NewHeroSelectedHandler(battleArena)
  heroRebornHandler := arena.NewHeroRebornHandler(battleArena)

  portalGate.Subscribe(heroSelectedHandler, battleResultHandler, heroRebornHandler)

  app := application.New(portalGate, heroTavern)
  http.HandleFunc("/hero/choose/", app.ChoseHero)
  http.HandleFunc("/hero/scores/", app.Scores)

  log.Fatal(http.ListenAndServe(port, nil))
}

Здесь мы создаем Portal, просим при выходе из приложения его закрыть (далее будет чуть подробнее про логику его закрытия), и создаём Таверну Героев, заранее определяя, какие из них будут доступны для выбора. Таверна будет использовать Portal для отправки события о возрождении героя и возврата его на Арену, не имея к ней прямого доступа.

go func() {
  <-time.After(respawn.CoolDown)
  t.portal.Send(dto.HeroReborn{Hero: respawn.Hero})
}()

В пакете Таверны есть обработчик события BattleResult — это событие, при котором живые герои встречаются на Арене случайным образом и также случайно определяется победитель. BattleResultHandler обновляет счёт, который хранится в Таверне, пишет результат в stdout и отправляет проигравшего битву на respawn в Таверну.

func (h *BattleResultHandler) Handle(msg any) {
  battlerResult, ok := msg.(dto.BattleResult)
  if !ok || battlerResult.Empty() {
     return
  }

  coolDown, err := h.tavern.UpdateScore(battlerResult.Winner, battlerResult.Looser)
  if err != nil {
    log.Printf("[battler result handler error]: %s", err)
    return
  }

  log.Printf(
     "%s killz %s! See you %s after %v\n",
     battlerResult.Winner,
     battlerResult.Looser,
     battlerResult.Looser.Name,
     coolDown,
  )

  respawn := &dto.Respawn{Hero: battlerResult.Looser, CoolDown: coolDown}
  h.tavern.Respawn(respawn)
}

Таверна же имеет запущенный слушатель цикла событий, связанных с HeroRespawn.

Вернемся в main.go. Для выяснения отношений нашим героям нужна Арена для битв, создадим её. Для Арены также нужен наш Portal, туда мы будем отправлять события с ранее упомянутым BattleResult, в котором указано, кто бился и кто победил. Эмуляция схваток героев также запускается под капотом Арены и будет работать в фоне. Стоит заметить, что если у нас будет только один герой, или герои ещё не выбраны, то Арена не будет ничего отправлять.
В пакете с ней лежат два обработчика, давайте взглянем, что они делают. Первый — HeroSelectedHandler; из названия очевидно, что при выборе персонажа он, получив и обработав нужное ему событие, отправит героя в баталии на Арену, ну или отправит его ждать соперников.

func (h *HeroSelectedHandler) Handle(msg any) {
  heroSelected, ok := msg.(dto.HeroSelected)
  if !ok || heroSelected.Hero == nil || heroSelected.Username == "" {
     return
  }

  log.Printf(
     "A new hero %s selected by %s. Let's see what will happend...\n",
     heroSelected.Hero.Name,
     heroSelected.Username,
  )

  h.arena.PushToBattle(heroSelected.Hero)
}

Второй обработчик, который лежит в пакете Арены, — это HeroRebornHandler. Делает он тоже не сказать что много, но его функциональность очень важна для игроков и их героев: он возвращает героя на Арену после возрождения в Таверне.

func (h *HeroRebornHandler) Handle(msg any) {
  heroSelected, ok := msg.(dto.HeroReborn)
  if !ok || heroSelected.Hero == nil {
     return
  }

  log.Printf("Welcome back %s and Goodluck!\n", heroSelected.Hero)
  h.arena.PushToBattle(heroSelected.Hero)
}

Самое важное — не забыть подписать на обновления все созданные обработчики, иначе они не будут оповещены, никакой магии (кроме магии героев на Арене, конечно).

portalGate.Subscribe(heroSelectedHandler, battleResultHandler, heroRebornHandler)

Далее мы запускаем наш httpServer, чтобы игроки могли выбрать себе героя и посмотреть текущий счёт. Заранее стоит учесть, что наружному слою нашего приложения тоже может потребоваться Portal (далее поясню, зачем), ну и Таверна для запросов счёта или выбора персонажа для пользователя.

app := application.New(portalGate, heroTavern)
http.HandleFunc("/hero/choose/", app.ChoseHero)
http.HandleFunc("/hero/scores/", app.Scores)
log.Fatal(http.ListenAndServe(port, nil))

При выборе героя приложение будет слать сигнал Арене с помощью события выбора персонажа.

a.portal.Send(dto.HeroSelected{Username: username, Hero: dto.NewHero(hero)})

Отлично! Теперь, когда мы поверхностно разобрали логику нашей «самоигралки», можно чуть больше узнать о том, как работает Portal, что в нём было плохо или даже ужасно, и как получилось сделать лучше.

Продюсеров сообщений по всему сервису может быть достаточно много, как и подписчиков. У Portal есть метод Send, позволяющий отправлять сообщения всем подписчикам, после чего обработчик сам решает, что делать с сообщением. То есть все сообщения дойдут до всех подписчиков, подписанных на обновления на момент отправки, а там уже они сами будут решать, что с этим делать. Ключевой момент: дойдут только все отправленные до закрытия Portal сообщения, но об этом ниже.

В целом, у Portal  было три основных задачи:

  • развязать модули (decoupling);

  • отложить вызов;

  • гарантировать доставку.

Также были мысли об очереди выполнения событий, но от этой идеи я отказался. Это потребовало бы добавить логику в очередь сообщений, а хотелось просто хаб для доменных событий в сервисе.

Что даёт развязывание модулей приложения?

Код в каждом из модулей становится более понятным, связным в хорошем смысле (cohesion), а его сопровождение либо отладка багов становится куда проще. Вместе с этим упрощается тестирование, но об этом можно почитать в статьях про архитектуры.

Зачем делать отложенный вызов?

Всё дело в производительности и в фоновых задачах, не требующих ожидания от пользователей сервиса. То есть N клиентов могут «ддосить» сервис «25/8», и с этим приходится жить, но чтобы они не отваливались, если им нужен доступный сервис, то достаточно записать нужные данные от них, отпустить их с сообщением, что всё будет сделано, и отложить выполнение остального в фоновом режиме в зависимости от контекста. Надеюсь, понятно, почему Portal должен иметь гарантированную доставку, ведь если гарантии не будет, то зачем он тогда нужен? Теряется смысл отложенного вызова и система не будет работать.

Немного истории

На прежней работе я увлёкся событийными моделями и сделал некоторые наработки в этой области, но я не предполагал что на Go можно будет сделать совсем иначе и куда производительнее. Всегда можно задать резонный вопрос, а почему нельзя было взять Redis и радоваться жизни, изучая его конфигурации? Вопрос и правда резонный, и я бы ответил, что мой инструмент просто легче, чем Redis, да и управлять им проще. Portal работает в рамках одного пода и ему неизвестно, в кластере он или нет, это надо иметь ввиду, если есть какая-то отложенная работа, требующая синхронизации в рамках кластера. К тому же в моём случае Redis был бы похож на стрельбу из пушки по воробьям или на экскаватор в клумбе. Но да, его можно было использовать, но зачем?

Код был плохо написан, но он работал

Первая проблема всплыла сразу после внедрения Portal: в его интерфейсе есть метод Close, который пытается дождаться всех начатых процессов, подписанных обработчиков, чтобы не потерять данные и не пустить новые события на отправку. Это реализация так называемого gracefull shutdown. Как я уже писал, в моей библиотеке не было и нет завязки на порядок вызова событий, но вполне логично, что событие может порождать другое событие. Это, на мой взгляд, чуть более сложная концепция, но Portal вполне может и это. Главное не попасть на циклическую зависимость в этом случае. 

Но вернёмся к очевидной проблеме: send on closed channel. Я начал изобретать костыли и велосипеды, что привело к одному из «удачных» патчей первой версии. Оно работало, но сами всё видите:

func (p *Portal) Close() {
  log.Print("stopping...")
  log.Print("waiting handler jobs to be done...")
  p.wg.Wait()
  p.input.close()

  log.Print("closing subscriber channels...")
  p.lock.Lock()
  defer p.lock.Unlock()
  for _, sub := range p.subs {
     sub.close()
  }
}

func (p *Portal) Send(msg Message) {
  subs := p.subscriptions()
  p.wg.Add(len(subs))
  go p.input.send(msg)
}

func (i *input) send(msg Message) {
  if i.closed.Load() {
     return
  }
  i.lock.Lock()
  i.hub <- msg
  i.lock.Unlock()
}

Достаточно странно использовать мьютекс при отправке в канал, но так было. Ужасно, просто омерзительно, слов нет, но работает.

На момент записи «прожарки кода» Portal работал и приносил деньги. Был ли он полезен? Да, без сомнений. Мы доверяли ему не самые ответственные задачи: например, библиотеку часто использовали для записи в лог. Но хотелось большего, настоящей ответственности. Как писали в чате: «не надо путать хороший код и полезный». И это действительно так. Что же делать дальше?

Рефакторинг

Первое, на что в хотелось обратить внимание в коде, это select в select. С этого мы и начала обсуждение ???? кода в чатике. Этим приёмом я хотел решить главную проблему — не отсылать сообщения в закрытый канал, имея приоритизацию на проверку, что контекст отменён. Получилось, правда, крайне так себе. Даже более того, конструкция была не только странной, но и не эффективной:

go func() {
  for {
     select {
     case <-ctx.Done():
        return
     default:
        select {
        case <-ctx.Done():
           return
        case msg, open := <-p.input.hub:
           if !open {
              return
           }
           for _, sub := range p.subscriptions() {
              sub.send(msg)
           }
        }
     }
  }
}()

Так делать не стоит, всегда есть варианты получше. Конечно, всегда нужно понимать контекст (не программную сущность, а бизнес-условия). Рассмотрим вариант с каналом done:

select {
case <-p.done:
  return
default:
}
select {
case <-p.done:
  return
case p.input <- msg:
  p.wg.Add(int(p.subsCount.Load()))
}

И мой фаворит — вариант с context.Done(). Он удобнее из-за того, что если хочется приоритета на этот case выхода, то есть неблокирующая операция ctx.Err(): ошибка будет только в случае отмены контекста по каким-то причинам. Также я предпочитаю использовать контекст, потому что разработчики языка нам его дали как раз для таких случаев:

if err := p.ctx.Err(); err != nil {
  return
}
select {
case <-p.ctx.Done():
  return
case p.input <- envelope:
}

Забавный факт: изначально контекст был извне и почти ничего не решал внутри Portal. В результате я пришёл к варианту с context.Err(), что не блокировало контекст выполнения и всегда гарантированно управляло в выражении if/select.

Разбираемся с интерфейсами

Какое-то время спустя я написал вторую версию Portal. В ней у меня руки дошли до интерфейса. Я сменил версию после рефакторинга, потому что часть интерфейса стала обратнонесовместимой, а некоторые сущности пакета вовсе пропали. Не то, чтобы у Portal было много пользователей, просто в контексте semver так правильно. 

От чего я решил избавиться? Тип portal.Message был почти бесполезен из-за метода передачи данных и только тратил время принятия решения у обработчика. Я могу объяснить эту конструкцию лишь своей зашоренностью и тем, что я уже примерно так же делал в старых наработках. Как раз в них это казалось «отличной идеей», от которой я отказался в Portal.

if handler.Support(msg) {
  handler.Handle(msg)
  p.wg.Done()
}

Проверка поддерживаемости сообщения, конечно, нужна, но зачем разбивать её на два метода? И правда странно. Это вполне выполнимо и в одном методе, а если нас что-то не устроит, то просто выйдем из него. Для примера, вот реализация portal.Handler.v1:

func (h *createAccountForNewUser) Support(msg portal.Message) bool {
  _, ok := msg.(*user.CreatedMessage)
  return ok
}

const startBalance = 100

func (h *createAccountForNewUser) Handle(msg portal.Message) {
  data := msg.Data()
  u, ok := data.(user.User)
  if !ok {
     return
  }
  _, err := h.storage.Add(u, startBalance)
  if err != nil {
     fmt.Printf("an error occurred on creating for user %s", u)
  }
}

Отказ от portal.Message потянул за собой изменение интерфейса portal.Send() и интерфейса portal.Handler. Теперь туда отправлялось any вместо portal.Message. Структура Message был удален за ненадобностью.

Возможно, в следующей версии я вернусь к portal.Message с новой системой определения поддержки сообщения. Скорее всего, для этого воспользуюсь вариантами отправки структуры в метод с последующей сериализацией сообщения в неё или предопределёнными маркерами Enums.

В Portal v2 больше нет внутренней транспортной структуры portal.input, которая также использовалась для каждого подписчика. Дело в том, что когда начинаешь понимать паттерны конкурентного программирования на Go, отпадает необходимость в куче костылей и велосипедов, которые чаще всего ведут к багам, и самые неприятные из них — это те, о которых сложно узнать. В ряде моментов казалось, что сам Portal попросту не нужен, но это не так, и надо было просто доделать его. Только посмотрите на начальную реализацию :)

func (i *input) send(msg Message) {
  if i.closed.Load() {
     return
  }

  i.lock.Lock()
  i.hub <- msg
  i.lock.Unlock()
}

Не вижу особого смысла писать про логи, просто замечу, что на стороне обработчика всё достаточно просто, в нем можно определить логгер на месте (смотри примеры использования Portal). При необходимости в будущих версиях планирую дополнить транспортный механизм Portal’а логгером с опциональным включением.
Самой важной деталью рефакторинга и исправления ошибок является поле input. Отправка сообщения и закрытие input должны быть в одном месте, и это метод Send. Пишущая горутина должна понимать, можно ли отправлять сообщения в это поле, и если нельзя, то закрывать его. Для начала я сделал версию с каналом done, который закрывался в portal.Close() и тем самым сообщал select в portal.Send(), что больше отправлять сообщения не получится.

func (p *Portal) Close() {
  p.wg.Wait()
  close(p.done)
}
select {
case <-p.done:
  p.closeInput()
  return
default:
}

select {
case <-p.done:
  p.closeInput()
  return
case p.input <- msg:
  p.wg.Add(int(p.subsCount.Load()))
}

Если мы для сигнала просто запишем значение в done, то не все select, ждущие этот сигнал, получат его. И в этом большой смысл! Значение будет прочитано только в одном месте, а во всех остальных этот сигнал будет потерян. Поэтому, закрывая канал в одном месте, мы гарантированно оповестим всех его слушателей.

После закрытия канала done надо бы закрыть input… но как бы это так сделать, чтобы не уткнуться в проблему close of closed channel? Да всё просто, надо применить sync.Once. Причём изначально я определял каждый sync.Once отдельными свойствами Portal для input и подписчиков. Но можно куда проще, подглядел в исходниках пакета time:

var inputOnce = sync.Once{}
func (p *Portal) closeInput() {
  inputOnce.Do(func() {
     close(p.input)
  })
}

От идеи использовать канал done я тоже отказался, так как хотелось меньше select’ов, и пришёл к ctx with cancel (активно обсуждали с Глебом на прожарке). На мой взгляд, так куда удобнее, и, в зависимости от вашей бизнес-логики, можно обойтись одной проверкой на ctx.Err() в некоторых местах, без дополнительного select’а.

func (p *Portal) Send(msg any) {
  destinations := p.destinations()
  p.wg.Add(len(destinations))
  envelope := envelopeMsg{msg: msg, destinations: destinations}
  if err := p.ctx.Err(); err != nil {
     p.closeInput(len(destinations))
     return
  }
  p.input <- envelope
}

func (p *Portal) destinations() []chan any {
  p.lock.RLock()
  subs := make([]chan any, len(p.subs))
  copy(subs, p.subs)
  p.lock.RUnlock()
  return subs
}

Далее возникла очень интересная проблема: тестируя работу Portal, я заметил, что если подписываю на обновления самые простые обработчики (которые просто что-то печатают), причём делаю это (подписываю их) поэтапно — например, сначала три, потом ещё четыре и так далее, — то возникает расхождение по переданным данным. То есть на момент отправки первого сообщения некоторых подписчиков ещё не было в списке на эту рассылку! Достаточно интересный баг, на мой взгляд, причём в реальных приложениях я ни разу не дёргал метод Subscribe более одного раза, да и трафик был не такой жёсткий, а обработчики  не такие простые. В любом случае, я решил попробовать решить эту проблему, и решение оказалось простым и понятным: отправляя сообщение, я копировал текущее состояние подписчиков в список на рассылку и уже в input шло сообщение с этим списком. Своеобразный конверт для сообщения. Потом в методе, который слушает канал input, это сообщение рассылалось по конкретным адресатам из конверта, если ещё не было сигнала на закрытие… Ничего не настораживает? А вот меня тут осенило =)

func (p *Portal) Send(msg any) {
  if err := p.ctx.Err(); err != nil {
     p.closeSubs()
     return
  }
  subscriptions := p.subscriptions()
  p.wg.Add(len(subscriptions))
  for _, sub := range subscriptions {
     sub := sub
     go func() {
        sub <- msg
     }()
  }
}

var subsOnce = sync.Once{}

func (p *Portal) closeSubs() {
  subsOnce.Do(func() {
     p.lock.Lock()
     for _, sub := range p.subs {
        close(sub)
     }
     p.lock.Unlock()
  })
}

func (p *Portal) subscriptions() []chan any {
  p.lock.RLock()
  subs := make([]chan any, len(p.subs))
  copy(subs, p.subs)
  p.lock.RUnlock()
  return subs
}

Самое забавное, что при написании черновика статьи я нашёл способ сделать это взаимодействие ещё проще (листинг выше). Дело в том, что модель chan fan-out тут как бы и не нужна совсем. Референсами библиотеки был паттерн «хореограф» и работа моего коллеги Ивана Плотникова в одном из pull-request’ов. Оттуда я и взял идею общего хаба для input с последующим fan-out на подписчиков, но в целом это ненужное перекладывание данных и можно обойтись без него. Очень забавно, что в предложенных вариантах решения моей задачи ребятам тоже понравилась модель fun-out, и хотя они исправляли мои недочёты, но тоже придерживались этого подхода =). Хотелось бы добавить, что проверка «а открыт ли ещё канал?» справедлива только в тех местах, где мы взаимодействуем с ним только как читатель.

Выводы

Был код, он работал. Написан был, конечно, ужасно, почти без понимания инструментария. Но была проблема и идея её решения. Идея и двигала разработку. Недостатки были известны, но руки не доходили, если бы не воля случая. Без взгляда извне бывает очень сложно, особенно при недостатке опыта. Зовите помощников, не стесняйтесь! Людям бывает достаточно интересно общаться в контексте чего-то конкретно, вроде Portal. Главное, соблюдайте правила общения, не оскорбляйте, не переходите на личности, говорите и пишите по-существу. В равном обсуждении, как правило, рождается истина, и результат не заставит себя долго ждать. Для этого достаточно, например, посмотреть на релиз теги Portal’а и временные метки по ним. Отличный рефакторинг не заставил себя долго ждать.
В моём случае «взгляд извне» был стихийной случайностью, получилось достаточно нервно, но главное — очень продуктивно. Страшно ли делать плохо? Страшновато, да. Но без ошибок вряд ли получится чего-то добиться. Я стараюсь предположить самое страшное, что может случиться, и из этого уже делать вывод, а стоит ли пробовать?

Комментарии (2)


  1. GreyCheshire
    18.04.2023 18:54

    А чем реализация Pub/Sub от watermill не понравилась?


    1. BagarDavidyan Автор
      18.04.2023 18:54
      +2

      По-правде, я не смотрел этот репозиторий у ThreeDotsLabs. И не было задачи заюзать что-то, больше хотелось попробовать сделать своё. Ещё заметил что у них крайний CI на test race падает