Добрый день, дорогие друзья.


В данной статье хотел бы максимально просто и кратко описать механизм redux-saga каналов, на примерах приближенных к реальным кейсам, надеюсь у меня это вышло.


Итак, начнем.


Проблема модели watch-and-fork


Предположим что у нас обычная watch-and-fork модель, следующего вида:


import { take, fork } from 'redux-saga/effects'

function* watchRequest() {
  while (true) {
    const {payload} = yield take('INIT_REQUEST');
    // заметим, что вызов не блокирующий
    yield fork(makeRequest, payload);
  }
}

function* makeRequest(payload) { 
    // код саги
}

Этот подход плох тем, что при отлове нескольких событий INIT_REQUEST, идущих друг за другом, будет запущено, соответственно, несколько исполнений makeRequest. Что в свою очередь может вызвать их “гонку”.


И тут нам на помощь приходит механизм каналов.


Каналы обладают буферами, тем самым помогают выстраивать в очередь приходящие события (к примеру, INIT_REQUEST), и организуют их последовательное исполнение (к примеру, makeRequest исполнится последовательно несколько раз).


Грубо говоря, каналы образуют FIFO очередь на последовательное исполнение.


Класифируются по источнику событий:


  • channel — события ставятся в очередь вручную при помощи put;
  • actionChannel — события ловятся около redux store;
  • eventChannel — внешний источник событий, чаще всего web socket;

Итак, разберем вкратце каждый.


Подробнее о channel


Такие каналы обычно решают задачу общения между сагами. Используются очень редко. Например, если вам необходимо согласовать несколько запросов, начинающихся одновременно.


channel([buffer])

Имеет единственный аргумент buffer — аккумулирующий буфер (подробнее буфферы разберем ниже).


Подробнее об actionChannel


Чаще всего используется в случае необходимости реагирования на события из redux store.


actionChannel(pattern, [buffer])

Принимает два аргумента:


  • pattern — паттерн искомых событий, так же как и take;
  • buffer — аккумулирующий буфер;

Краткий пример использования:


import { take, actionChannel, call } from 'redux-saga/effects'

function* watchRequest() {
  const requestChannel = yield actionChannel('INIT_REQUEST')
  while (true) {
    const {payload} = yield take(requestChannel);
    // заметим что вызов теперь блокирующий
    yield call(makeRequest, payload);
  }
}

function* makeRequest(payload) {
    // код саги
}

Подробнее об eventChannel


Обычно через него решают задачу общения через web socket.


eventChannel(subscribe, [buffer], [matcher])

Принимает три аргумента:


  • subscribe — функция инициализирующая внешний источник событий (в примере ниже, setTimeout). В аргументах callback, называемый emitter, который будет вызываться при необходимости отправки данных в канал от источника. Вернуть должна функцию отписки;
  • buffer — аккумулирующий буфер;
  • matcher — функция для фильтрации входящих сообщений.

Краткий пример использования:


import { eventChannel, END } from 'redux-saga'
import { take, put, call } from 'redux-saga/effects'

function initSocketChannel(query) {
  return eventChannel(emitter => {
      // эмулируем получение данных через web socket
      const handshakeTimeoutId = setTimeout(() => {
          emitter('handshake - ok');
      }, 100);

      const messageTimeoutId = setTimeout(() => {
          emitter('message');
      }, 500);

      const endTimeoutId = setTimeout(() => {
          emitter(END);
      }, 1000);

      // функция отписки от канала
      return () => {
        clearTimeout(handshakeTimeoutId);
        clearTimeout(messageTimeoutId);
        clearTimeout(endTimeoutId);
      }
    }
  )
}

export function* saga() {
  const chan = yield call(initSocketChannel, query)
  try {    
    while (true) {
      const message = yield take(chan);
      // при возвращении каналом END сработает обычный brake
      console.log(`socket : ${message}`)
    }
  } finally {
    console.log('socket terminated')
  }
}

Наверняка вами было замечено присутствие константы END — это action, означающий окончание общения с каналом.


В исходном коде redux-saga представлен следующим образом:


var CHANNEL_END_TYPE = '@@redux-saga/CHANNEL_END';
var END = { type: CHANNEL_END_TYPE };
var isEnd = function isEnd(a) {
  return a && a.type === CHANNEL_END_TYPE;
};

и в исходном коде eventChannel видим следующий сценарий


function eventChannel(subscribe) {
    …
    if (isEnd(input)) {
        close();
        return;
    }
    ...
}

Что же такое buffer?


Заслуживает внимания, так как каждый канал имеет базовый буфер, при помощи него и формируется очередь на обработку.


Пример создания буфера:


import { buffers } from 'redux-saga'

const buffer = buffers.sliding(5);

buffers — это instance фабрики по созданию буферов с различными стратегиями.


Всего 5 стратегий, им соответствуют методы:


  • buffers.none() — отсутствие буферизации, новые сообщения будут потеряны, если нет ожидающих участников;
  • buffers.fixed(limit) — новые сообщения будут буферизированы до предела. Ошибка переполнения приведет к ошибке (exeption). По умолчанию limit равен 10;
  • buffers.expanding(initialSize) — как и fixed, но переполнение приведет к тому, что буфер будет расширяться динамически;
  • buffers.dropping(limit) — то же, что и fixed, но переполнение будет тихо отбрасывать сообщения;
  • buffers.sliding(limit) — то же, что и fixed, но переполнение добавит новое сообщение в конец и удалит самое старое сообщение в буфере.

Вместо зключения


Итак, мы разобрали зачем был придуман механизм каналов, и в каких практических задачах используются.


Надеюсь после прочтения складывается общее представление и мир стал чуточку проще.

Комментарии (2)


  1. anttoshka
    05.12.2018 13:03

    А не проще просто использовать takeLatest?


    1. shammasov
      05.12.2018 21:12

      Каналы — это способ накапливать события, чтобы потом их все обработать. Например по очереди.
      Хороший юзкейс — накапливать запросы с оптимистичным апдейтом.
      Или сообщения для отправки в сокет, на случай дисконнекта.
      takeLatest — пропустит все предыдущие сообщения.