Дисклеймер: Эта статья описывает неочевидное решение неочевидной проблемы. Прежде чем бросаться яйцами применять его на практике, рекомендую дочитать статью до конца и дважды подумать.

but_why


Всем привет! При работе с кодом, нам часто приходится иметь дело с состоянием. Один из таких случаев — жизненный цикл объектов. Управление объектом с несколькими возможными состояниями может быть весьма нетривиальной задачей. Добавьте сюда асинхронное исполнение и задача усложняется на порядок. Есть эффективное и естественное решение. В этой статье я расскажу о событийном автомате и как его реализовать в Go.


Зачем управлять состоянием?


Для начала, определимся с самим понятием. Простейший пример состояния: файлы и различные соединения. Нельзя просто так взять и прочитать файл. Его необходимо сначала открыть, а в конце желательно обязательно закрыть. Получается, текущее действие зависит от результата предыдущего действия: чтение зависит от открытия. Сохраненный результат — и есть состояние.


Главная проблема с состоянием — это сложность. Любое состояние автоматически усложняет код. Приходится хранить результаты действий в памяти и добавлять различные проверки в логику. Именно поэтому stateless-архитектуры так привлекают программистов — никто не хочет неприятностей сложностей. Если результаты ваших действий не влияют на логику исполнения, состояние вам не нужно.


Однако, есть одно свойство, которое заставляет считаться со сложностями. Состояние требует следовать определенному порядку выполнения действий. В целом таких ситуаций надо избегать, но это не всегда возможно. Пример — жизненный цикл объектов программы. Благодаря хорошему управлению состоянием, можно получать предсказуемое поведение объектов со сложным жизненным циклом.


Теперь разберемся, как это сделать круто.


Автомат как способ решения проблем


AK74


Когда говорят о состояниях, на ум сразу приходят конечные автоматы. Логично, ведь автомат — это наиболее естественный способ управления состоянием.


Я не буду углубляться в теорию автоматов, информации в интернете более чем достаточно.

Если искать примеры конечных автоматов для Go, вам обязательно встретится лексер от Роба Пайка (англ. Rob Pike). Отличный пример автомата, в котором входным алфавитом являются обрабатываемые данные. Это значит, что переходы состояний вызывает текст, который обрабатывает лексер. Изящное решение конкретной задачи.


Главное, что нужно понять, автомат — это решение строго конкретной задачи. Поэтому, прежде чем рассматривать его, как средство от всех проблем, вы должны полностью понимать задачу. А конкретно — сущность, которой вы хотите управлять:


  • состояния — жизненный цикл;
  • события — что именно вызывает переход в каждое состояние;
  • результат работы — выходные данные;
  • режим исполнения (синхронный/асинхронный);
  • основные сценарии использования.

Лексер прекрасен, но он меняет состояние только из-за данных, которые сам же и обрабатывает. А как быть в ситуации, когда переходы вызывает пользователь? Вот здесь и может выручить событийный автомат.


Реальный пример


Чтобы было понятнее, я разберу пример из библиотеки phono.


Для полного погружения в контекст, можете ознакомиться с вводной статьёй. Это не обязательно для данной темы, но поможет лучше понять, чем мы управляем.

А чем управляем?


В основе phono лежит DSP-конвейер. Он состоит из трёх стадий обработки. Каждая стадия может включать в себя от одного до нескольких компонентов:


pipe_diagram


  1. pipe.Pump (англ. насос) — обязательная стадия получения звука, всегда только один компонент.
  2. pipe.Processor (англ. обработчик) — опциональная стадия обработки звука, от 0 до N компонентов.
  3. pipe.Sink (англ. раковина) — обязательная стадия передачи звука, от 1 до N компонентов.

Собственно жизненным циклом конвейера мы и будем управлять.


Жизненный цикл


Вот так выглядит диаграмма состояний pipe.Pipe.


pipe_lifecycle


Курсивом обозначены переходы, вызванные внутренней логикой исполнения. Жирным шрифтом — переходы, вызванные событиями. На диаграмме видно, что состояния делятся на 2 типа:


  • состояния покояready и paused, из них можно перейти только по событию
  • активные состоянияrunning и pausing, переходы по событию и из-за логики исполнения

Перед детальным разбором кода, наглядный пример использования всех состояний:


// PlayWav воспроизводит .wav файл с помощью portaudio устройства по-умолчанию.
func PlayWav(wavFile string) error {
    bufferSize := phono.BufferSize(512) // размер буфера для передачи данных

    w, err := wav.NewPump(wavFile, bufferSize) // создаем wav pump
    if err != nil {
        return err
    }
    pa := portaudio.NewSink( // создаем portaudio sink
        bufferSize,
        w.WavSampleRate(),
        w.WavNumChannels(),
    )
    p := pipe.New(          // создаем pipe.Pipe с исходным состоянием ready
        w.WavSampleRate(),
        pipe.WithPump(w),
        pipe.WithSinks(pa),
    )

    p.Run()                 // переход в состояние running с помощью p.Run()

    errc := p.Pause()       // переход в состояние pausing с помощью p.Pause()

    err = pipe.Wait(errc)   // ожидание перехода в состояние paused
    if err != nil {
        return err
    }

    errc = p.Resume()       // переход в состояние running с помощью p.Resume()

    err = pipe.Wait(errc)   // ожидание перехода в состояние ready
    if err != nil {
        return err
    }

    return pipe.Wait(p.Close()) // освобождение ресурсов по окончанию работы
}

Теперь обо всём по порядку.


Весь исходный код доступен в репозитории.

Состояния и события


Начнём с самого главного.


// state определяет одно из возможных состояний конвейера.
type state interface {
    listen(*Pipe, target) (state, target)           // ожидание нового состояния
    transition(*Pipe, eventMessage) (state, error)  // функция перехода 
}

// idleState состояние покоя. Из него можно выйти только с помощью события.
type idleState interface {
    state
}

// activeState активное состояние. Из него можно выйти с помощью события и 
// внутренней логики исполнения.
type activeState interface {
    state
    sendMessage(*Pipe) state    // отправка нового сообщения
}

// типы состояний.
type (
    idleReady     struct{}
    activeRunning struct{}
    activePausing struct{}
    idlePaused    struct{}
)

// переменные состояний.
var (
    ready   idleReady     
    running activeRunning 
    paused  idlePaused    
    pausing activePausing 
)

Благодаря отдельным типам, переходы тоже объявлены отдельно для каждого состояния. Это позволяет избежать огромной колбасы функции перехода с вложенными операторами switch. Состояния сами по себе не содержат никаких данных или логики. Для них можно объявить переменные на уровне пакета, чтобы не делать этого каждый раз. Интерфейс state нужен для полиморфизма. Про activeState и idleState поговорим чуть позже.


Вторая важнейшая часть нашего автомата — события.


// event тип события.
type event int

// переменные событий.
const (
    run event = iota
    pause
    resume
    push
    measure
    cancel
)

// target обозначает конечное состояние для вызванного события.
type target struct {
    state idleState  // целевое состояние
    errc  chan error // канал с ошибками, закрывается когда достигнуто состояние 
}

// eventMessage передается в автомат, когда пользователь вызывает событие.
type eventMessage struct {
    event               // тип события
    params    params    // новые параметры
    components []string // id компонентов
    target              // конечное состояние для этого события
}

Чтобы понять, зачем нужен тип target, рассмотрим простой пример. Мы создали новый конвейер, он в состоянии ready. Теперь запускаем его функцией p.Run(). В автомат посылается событие run, конвейер переходит в состояние running. Как узнать, когда конвейер закончит работу? Вот тут нам и поможет тип target. Он указывает, какое состояния покоя ждать после события. В нашем примере, после окончания работы конвейер опять перейдет в состояние ready. То же самое, на диаграмме:



Теперь подробнее о типах состояний. Точнее, об интерфейсах idleState и activeState. Посмотрим на функции listen(*Pipe, target) (state, target) для разных типов стадий:


// listen ждёт перехода из стадии ready.
func (s idleReady) listen(p *Pipe, t target) (state, target) {
    return p.idle(s, t)
}

// listen ждёт перехода из стадии running.
func (s activeRunning) listen(p *Pipe, t target) (state, target) {
    return p.active(s, t)
}

У pipe.Pipe есть разные функции для ожидания перехода! Что там?


// idle ждёт переход из стадии покоя. Слушает только канал событий.
func (p *Pipe) idle(s idleState, t target) (state, target) {
    if s == t.state || s == ready {
        t = t.dismiss()         // цель достигнута, освобождаем target
    }
    for {
        var newState state
        var err error
        select {
        case e := <-p.events:                   // ждём событие
            newState, err = s.transition(p, e)  // вызываем функцию перехода
            if err != nil {
                e.target.handle(err)
            } else if e.hasTarget() {
                t.dismiss()
                t = e.target
            }
        }
        if s != newState {  
            return newState, t  // выходим, если произошёл переход
        }
    }
}

// active ждёт перехода из активной стадии. Слушает канал событий и каналы, 
// используемые при исполнении.
func (p *Pipe) active(s activeState, t target) (state, target) {
    for {
        var newState state
        var err error
        select {
        case e := <-p.events:                   // ждём событие
            newState, err = s.transition(p, e)  // вызываем функцию перехода
            if err != nil {                     // успешный переход?
                e.target.handle(err)            // нет, передаем ошибку наружу
            } else if e.hasTarget() {           // да, замещаем target
                t.dismiss()                     // отменяем текущий
                t = e.target                    // замещаем новым 
            }
        case <-p.provide:                       // ждем запроса нового сообщения
            newState = s.sendMessage(p)         // отправляем новое сообщение
        case err, ok := <-p.errc:               // ждем ошибок
            if ok {                             // если ошибка получена, то
                interrupt(p.cancel)             // прерываем исполнение
                t.handle(err)                   // передаем ошибку наружу
            }                                   // если ошибок не получено, то
            return ready, t                     // переходим в состояние ready
        }
        if s != newState {
            return newState, t  // выходим, если произошёл переход
        }
    }
}

Таким образом, мы можем слушать разные каналы в разных состояниях. Например, это позволяет не отправлять сообщения во время паузы — мы просто не слушаем соответствующий канал.


Конструктор и старт автомата



// New создает новый конвейер и применяет опции.
// Новый конвейер находится в состоянии ready.
func New(sampleRate phono.SampleRate, options ...Option) *Pipe {
    p := &Pipe{
        UID:        phono.NewUID(),
        sampleRate: sampleRate,
        log:        log.GetLogger(),
        processors: make([]*processRunner, 0),
        sinks:      make([]*sinkRunner, 0),
        metrics:    make(map[string]measurable),
        params:     make(map[string][]phono.ParamFunc),
        feedback:   make(map[string][]phono.ParamFunc),
        events:     make(chan eventMessage, 1), // канал для событий
        cancel:     make(chan struct{}),        // канал для отмены выполнения
        provide:    make(chan struct{}),
        consume:    make(chan message),
    }
    for _, option := range options {            // применяем опции
        option(p)()
    }
    go p.loop()                                 // запускаем главный цикл
    return p
}

Кроме инициализации и функциональных опций, здесь есть старт отдельной горутины с главным циклом. Что ж, посмотрим и на него:


// loop выполняется, пока не перейдет в nil состояние.
func (p *Pipe) loop() {
    var s state = ready         // изначальное состояние
    t := target{}
    for s != nil {
        s, t = s.listen(p, t)   // ждём перехода в новое состояние
        p.log.Debug(fmt.Sprintf("%v is %T", p, s))
    }
    t.dismiss()
    close(p.events)             // закрываем канал событий
}

// listen ждёт перехода из стадии ready.
func (s idleReady) listen(p *Pipe, t target) (state, target) {
    return p.idle(s, t)
}

// transition совершает переход в зависимости от полученного события.
func (s idleReady) transition(p *Pipe, e eventMessage) (state, error) {
    switch e.event {
    case cancel:
        interrupt(p.cancel)
        return nil, nil
    case push:
        e.params.applyTo(p.ID())
        p.params = p.params.merge(e.params)
        return s, nil
    case measure:
        for _, id := range e.components {
            e.params.applyTo(id)
        }
        return s, nil
    case run:
        if err := p.start(); err != nil {
            return s, err
        }
        return running, nil
    }
    return s, ErrInvalidState
}

Конвейер создан и замер в ожидании событий.


Пора работать


Вызываем p.Run()!



// Run посылает событие run в конвейер.
// Выполнение этого метода после pipe.Close вызовет панику.
func (p *Pipe) Run() chan error {
    runEvent := eventMessage{
        event: run,
        target: target{
            state: ready,               // целевое состояние покоя
            errc:  make(chan error, 1), 
        },
    }
    p.events <- runEvent
    return runEvent.target.errc
}

// listen ждёт перехода из стадии running.
func (s activeRunning) listen(p *Pipe, t target) (state, target) {
    return p.active(s, t)
}

// transition совершает переход в зависимости от полученного события.
func (s activeRunning) transition(p *Pipe, e eventMessage) (state, error) {
    switch e.event {
    case cancel:
        interrupt(p.cancel)
        err := Wait(p.errc)
        return nil, err
    case measure:
        e.params.applyTo(p.ID())
        p.feedback = p.feedback.merge(e.params)
        return s, nil
    case push:
        e.params.applyTo(p.ID())
        p.params = p.params.merge(e.params)
        return s, nil
    case pause:
        return pausing, nil
    }
    return s, ErrInvalidState
}

// sendMessage генерирует новое сообщение.
func (s activeRunning) sendMessage(p *Pipe) state {
    p.consume <- p.newMessage()
    return s
}

running генерирует сообщения и выполняется до тех пор, пока конвейер не завершит работу.


Сделай паузу


Во время исполнения конвейера, мы можем поставить его на паузу. В этом состоянии конвейер не будет генерировать новые сообщения. Для этого нужно вызвать метод p.Pause().



// Pause посылает событие pause в конвейер.
// Выполнение этого метода после pipe.Close вызовет панику.
func (p *Pipe) Pause() chan error {
    pauseEvent := eventMessage{
        event: pause,
        target: target{
            state: paused,              // целевое состояние покоя
            errc:  make(chan error, 1),
        },
    }
    p.events <- pauseEvent
    return pauseEvent.target.errc
}

// listen ждёт перехода из стадии pausing.
func (s activePausing) listen(p *Pipe, t target) (state, target) {
    return p.active(s, t)
}

// transition совершает переход в зависимости от полученного события.
func (s activePausing) transition(p *Pipe, e eventMessage) (state, error) {
    switch e.event {
    case cancel:
        interrupt(p.cancel)
        err := Wait(p.errc)
        return nil, err
    case measure:
        e.params.applyTo(p.ID())
        p.feedback = p.feedback.merge(e.params)
        return s, nil
    case push:
        e.params.applyTo(p.ID())
        p.params = p.params.merge(e.params)
        return s, nil
    }
    return s, ErrInvalidState
}

// sendMessage генерирует новое сообщение. Сообщение содержит функцию-параметр,
// которая вызывается при получении сообщения адресатом. Эта функция блокирует
// исполнение до тех пор, пока все адресаты не получат сообщение. Таким образом,
// гарантируется, что пауза наступает, когда:
//  1. Генерация новых сообщений остановлена
//  2. Все компоненты обработали последнее сообщение
func (s activePausing) sendMessage(p *Pipe) state {
    m := p.newMessage()
    if len(m.feedback) == 0 {
        m.feedback = make(map[string][]phono.ParamFunc)
    }
    var wg sync.WaitGroup   // новая группа для ожидания
    wg.Add(len(p.sinks))    // добавляем все Sink
    for _, sink := range p.sinks {
        param := phono.ReceivedBy(&wg, sink.ID())   // функция-параметр
        m.feedback = m.feedback.add(param)          
    }
    p.consume <- m          // отправляем сообщения
    wg.Wait()               // ждем, когда все получат сообщения
    return paused
}

Как только все адресаты получат сообщение, конвейер перейдет в состояние paused. Если сообщение окажется последним, то произойдет переход в состояние ready.


Снова за работу!


Чтобы выйти из состояния paused, нужно вызвать p.Resume().



// Resume посылает событие resume в конвейер.
// Выполнение этого метода после pipe.Close вызовет панику.
func (p *Pipe) Resume() chan error {
    resumeEvent := eventMessage{
        event: resume,
        target: target{
            state: ready,
            errc:  make(chan error, 1),
        },
    }
    p.events <- resumeEvent
    return resumeEvent.target.errc
}

// listen ждёт перехода из стадии paused.
func (s idlePaused) listen(p *Pipe, t target) (state, target) {
    return p.idle(s, t)
}

// transition совершает переход в зависимости от полученного события.
func (s idlePaused) transition(p *Pipe, e eventMessage) (state, error) {
    switch e.event {
    case cancel:
        interrupt(p.cancel)
        err := Wait(p.errc)
        return nil, err
    case push:
        e.params.applyTo(p.ID())
        p.params = p.params.merge(e.params)
        return s, nil
    case measure:
        for _, id := range e.components {
            e.params.applyTo(id)
        }
        return s, nil
    case resume:
        return running, nil
    }
    return s, ErrInvalidState
}

Тут все тривиально, конвейер опять переходит в состояние running.


Сворачиваемся


Конвейер может быть остановлен из любого состояния. Для этого есть p.Close().



// Close посылает событие cancel в конвейер.
// Повторное выполнение этого метода вызовет панику.
// Обязательно вызвать в конце, чтобы освободить ресурсы.
func (p *Pipe) Close() chan error {
    resumeEvent := eventMessage{
        event: cancel,
        target: target{
            state: nil,                 // конечное состояние
            errc:  make(chan error, 1),
        },
    }
    p.events <- resumeEvent
    return resumeEvent.target.errc
}

Кому это надо?


Далеко не всем. Чтобы понять, как именно управлять состоянием, нужно понимать свою задачу. Есть ровно два обстоятельства, при которых можно использовать событийный асинхронный автомат:


  1. Сложный жизненный цикл — есть три и более состояния с нелинейными переходами.
  2. Используется асинхронное исполнение.

Событийный автомат хоть и решает проблему, но это довольно сложный паттерн. Поэтому, его стоит использовать с большой осторожностью и только после полного понимания всех плюсов и минусов.


Ссылки


Комментарии (0)