image

Чтобы понять все преимущества Redis Stream, давайте бегло вспомним давно существующие структуры Redis, которые частично повторяют функциональность Redis Stream.

Redis PUB/SUB


Redis Pub/Sub — простая система сообщений, уже встроенная в ваше key-value хранилище. Однако за простоту приходится платить:

  • Если издатель по каким-либо причинам выходит из строя, то он теряет всех своих подписчиков
  • Издателю необходимо знать точный адрес всех его подписчиков
  • Издатель может перегрузить работой своих подписчиков, если данные публикуются быстрее, чем обрабатываются
  • Сообщение удаляется из буфера издателя сразу после публикации, не зависимо от того, какому числу подписчиков оно доставлено и как быстро те сумели обработать это сообщение.
  • Все подписчики получат сообщение одновременно. Подписчики сами должны как-то между собой согласовывать порядок обработки одно и того же сообщения.
  • Нет встроенного механизма подтверждения успешной обработки сообщения подписчиком. Если подписчик получил сообщение и свалился во время обработки, то издатель об этом не узнает.

Redis List


Redis List — структура данных, поддерживающая команды чтения с блокировкой. Вы можете добавлять и считывать сообщения из начала или конеца списка. На базей этой структуры можно сделать неплохой стек или очередь для вашей распределённой системы и этого в большинстве случаев будет достаточно. Основные отличия от Redis Pub/Sub:

  • Сообщение доставляется одному клиенту. Первый заблокированный чтением клиент получит данные первым.
  • Клинт должен сам инициировать операцию чтения каждого сообщения. List ничего не знает о клиентах.
  • Сообщения хранятся до тех пор, пока их кто-то не считает или не удалит явно. Если вы настроили Redis сервер, чтобы он сбрасывал данные на диск, то надёжность системы резко возрастает.



Введение в Stream


Redis Stream — новый абстрактный тип данных, представленный в Redis с выходом версии 5.0
Концептуально Redis Stream — это List, в который вы можете добавлять записи. Каждая запись имеет уникальный идентификатор. По умолчанию идентификатор генерируется автоматически и включает в себя временную метку. Поэтому вы можете запрашивать диапазоны записей по времени или получать новые данные по мере их поступления в поток, как Unix команда «tail -f» читает лог-файл и замирает в ожидании новых данных. Обратите внимание, что поток могут слушать одновременно несколько клиентов, как многие «tail -f» процессы могут одновременно читать файл, не конфликтуя друг с другом.

Добавление записи в поток


Команда XADD добавляет новую запись в поток. Запись — это не просто строка, она состоит из одной или нескольких пар ключ-значение. Таким образом, каждая запись уже структурирована и напоминает структуру CSV файла.

> XADD mystream * sensor-id 1234 temperature 19.8
1518951480106-0

В примере выше мы добавляем в поток с именем (ключом) «mystream» два поля: «sensor-id» и «temperature» со значениями «1234» и «19.8» соответственно. В качестве второго аргумента команда принимает идентификатор, который будет присвоен записи — этот идентификатор однозначно идентифицирует каждую запись в потоке. Однако в этом случае мы передали *, потому что мы хотим, чтобы Redis сгенерировал для нас новый идентификатор. Каждый новый идентификатор будет увеличиваться. Поэтому каждая новая запись будет иметь больший идентификатор по отношению к предыдущим записям.

Формат идентификатора


Идентификаор записи, возвращаемый командой XADD, состоит из двух частей:

{millisecondsTime}-{sequenceNumber}

millisecondsTime — Unix время в миллисекундах (время сервера Redis). Однако если текущее время оказывается таким же или меньшим, чем время предыдущей записи, то используется временная метка предыдущей записи. Поэтому если время сервера возвращается в прошлое, то новый индентификатор всё ещё будет сохранять свойство увеличения.

sequenceNumber используется для записей, созданных в одну и ту же миллисекунду. sequenceNumber будет увеличен на 1 относительно предыдущей записи. Поскольку sequenceNumber имеет размер 64 бита, то на практике вы не должны упереться в ограничение на количество записей, которые могут быть сгенерированы в течение одной миллисекунды.

Формат таких идентификаторов на первый взгляд может показаться странным. Недоверчивый читатель может задаться вопросом, почему время является частью идентификатора. Причина в том, что потоки Redis поддерживают запросы диапазона по идентификаторам. Поскольку идентификатор связан со временем создания записи, то это даёт возможность запрашивать диапазоны времени. Мы рассмотрим конкретный пример, когда перейдём к изучению команды XRANGE.

Если по какой-либо причине пользователю требуется указать свой собственный идентификатор, который, например, связан с какой-нибудь внешней системой, то мы можем его передать команде XADD вместо знака * как показано ниже:

> XADD somestream 0-1 field value
0-1
> XADD somestream 0-2 foo bar
0-2

Обратите внимание, что в этом случае вы должны сами следить за увеличением идентификатора. В нашем примере минимальный идентификатор равен «0-1», поэтому команда не примет ещё один идентификатор, который равен или меньше «0-1».

> XADD somestream 0-1 foo bar
(error) ERR The ID specified in XADD is equal or smaller than the target stream top item

Число записей в потоке


Можно получить количество записей в потоке, просто используя команду XLEN. Для нашего примера эта команда вернёт следующее значение:

> XLEN somestream
(integer) 2


Запросы по диапазону — XRANGE и XREVRANGE


Чтобы запросить данные по диапазону, нам нужно указать два идентификатора — начало и конец диапазона. Возвращаемый диапазон будет включать все элементы, включая границы. Также существуют два специальных идентификатора "-" и "+", соответственно означающие самый маленький (первая запись) и самый большой (последняя запись) идентификатор в потоке. Пример ниже выведет все записи потока.

> XRANGE mystream - +
1) 1) 1518951480106-0
   2) 1) "sensor-id"
      2) "1234"
      3) "temperature"
      4) "19.8"
2) 1) 1518951482479-0
   2) 1) "sensor-id"
      2) "9999"
      3) "temperature"
      4) "18.2"

Каждая возвращаемая запись представляет собой массив из двух элементов: идентификатор и список пар ключ-значение. Мы уже говорили, что идентификаторы записи имеют отношение ко времени. Поэтому мы можем запрашивать диапазон конкретного промежутка времени. Однако, мы можем указать в запросе не полный идентификатор, а только Unix время, опустив часть, относящуюся к sequenceNumber. Опущенная часть идентификатора автоматически приравняется к нулю в начале диапазона и к максимально возможному значению в конце диапазона. Ниже приведён пример, как можно запросить диапазон, равный двум миллисекундам.

> XRANGE mystream 1518951480106 1518951480107
1) 1) 1518951480106-0
   2) 1) "sensor-id"
      2) "1234"
      3) "temperature"
      4) "19.8"

У нас есть только одна запись в этом диапазоне, однако в реальных наборах данных возвращаемый результат может быть огромным. По этой причине XRANGE поддерживает опцию COUNT. Указав количество, мы можем просто получить первые N записей. Если нам нужно получить следующие N записей (пагинация), мы можем использовать последний полученный идентификатор, увеличить у него sequenceNumber на единицу и запросить снова. Давайте посмотрим на это в следующем примере. Мы начинаем добавлять 10 элементов с помощью XADD (предположим, что поток mystream уже был заполнен 10 элементами). Чтобы начать итерацию, получая по 2 элемента на команду, мы начинаем с полного диапазона, но с COUNT равным 2.

> XRANGE mystream - + COUNT 2
1) 1) 1519073278252-0
   2) 1) "foo"
      2) "value_1"
2) 1) 1519073279157-0
   2) 1) "foo"
      2) "value_2"

Чтобы продолжить итерацию со следующими двумя элементами, нам нужно выбрать последний полученный идентификатор, то есть 1519073279157-0, и добавить 1 к sequenceNumber. Результирующий идентификатор, в данном случае 1519073279157-1, теперь его можно использовать в качестве нового аргумента начала диапазона для следующего вызова XRANGE:

> XRANGE mystream 1519073279157-1 + COUNT 2
1) 1) 1519073280281-0
   2) 1) "foo"
      2) "value_3"
2) 1) 1519073281432-0
   2) 1) "foo"
      2) "value_4"

И так далее. Поскольку сложность XRANGE составляет O(log (N)) для поиска, а затем O(M) для возврата M элементов, то каждый шаг итерации является быстрым. Таким образом, с помощью XRANGE можно эффективно итерировать потоки.

Команда XREVRANGE является эквивалентом XRANGE, но возвращает элементы в обратном порядке:

> XREVRANGE mystream + - COUNT 1
1) 1) 1519073287312-0
   2) 1) "foo"
      2) "value_10"

Обратите внимание, что команда XREVRANGE принимает аргументы диапазона start и stop в обратном порядке.

Чтение новых записей с помощью XREAD


Часто возникает задача подписаться на поток и получать только новые сообщения. Эта концепция может показаться похожей на Redis Pub/Sub или блокирующий Redis List, но есть принципиальные различия в том, как использовать Redis Stream:

  1. Каждое новое сообщение по умолчанию доставляется каждому подписчику. Это поведение отличается от блокирующего Redis List, где новое сообщение будет прочитано только каким-то одним подписчиком.
  2. В то время как в Redis Pub/Sub все сообщения забываются и никогда не сохраняются, в Stream все сообщения сохряняются на неопределенный срок (если клиент явно не вызовет удаление).
  3. Redis Stream позволяет разграничить доступ к сообщениям внутри одного потока. Конкретный подписчик может видеть только свою личную историю сообщений.

Вы можете подписаться на поток и получать новые сообщения, используя команду XREAD. Это немного сложнее, чем XRANGE, поэтому мы сперва начнем с примеров попроще.

> XREAD COUNT 2 STREAMS mystream 0
1) 1) "mystream"
   2) 1) 1) 1519073278252-0
         2) 1) "foo"
            2) "value_1"
      2) 1) 1519073279157-0
         2) 1) "foo"
            2) "value_2"

В примере выше указана неблокирующая форма XREAD. Обратите внимание, что опция COUNT не является обязательной. Фактически единственной обязательной опцией команды является опция STREAMS, которая задает список потоков вместе с соответствующим максимальным идентификатором. Мы написали «STREAMS mystream 0» — мы хотим получать все записи потока mystream с идентификатором больше чем «0-0». Как видно из примера, команда возвращает имя потока, потому что мы можем подписаться на несколько потоков одновременно. Мы могли бы написать, например, «STREAMS mystream otherstream 0 0». Обратите внимание, что после опции STREAMS нам нужно сперва предоставить имена всех нужных потоков и только затем список идентификаторов.

В этой простой форме команда не делает ничего особенного по сравнению с XRANGE. Однако интересно то, что мы можем легко превратить XREAD в блокирующую команду, указав аргумент BLOCK:

> XREAD BLOCK 0 STREAMS mystream $

В приведенном выше примере, указана новая опцию BLOCK с временем ожидания 0 миллисекунд (это означает бесконечное ожидание). Более того, вместо передачи обычного идентификатора для потока mystream, был передан специальный идентификатор $. Этот специальный идентификатор означает, что XREAD должен использовать в качестве идентификатора максимальный идентификатор в потоке mystream. Так что мы будем получать только новые сообщения, начиная с момента, когда мы начали прослушивание. В некотором смысле это похоже на Unix команду «tail -f».

Обратите внимание, что при использовании опции BLOCK нам не обязательно нужно использовать специальный идентификатор $. Мы можем использовать любой существующий в потоке идентификатор. Если команда сможет обслужить наш запрос немедленно, без блокировки, она сделает это, в противном случае она заблокируется.

Блокирующий XREAD также может прослушивать сразу несколько потоков, просто нужно указать их имена. В этом случае команда вернет запись первого потока, в который поступили данные. Первый подписчик, заблокированный для данного потока, будет получать данные первым.

Consumer Groups


В определенных задачах мы хотим разграничить доступ подписчиков к сообщениям внутри одного потока. Примером, когда это может быть полезно — очередь сообщений с воркерами, которые будут получать разные сообщения потока, позволяя масштабировать обработку сообщений.

Если мы представим, что у нас есть три подписчика C1, C2, C3 и поток, который содержит сообщения 1, 2, 3, 4, 5, 6, 7, то обслуживание сообщений будет происходить как на диаграмме ниже:

1 -> C1
2 -> C2
3 -> C3
4 -> C1
5 -> C2
6 -> C3
7 -> C1

Чтобы получить этот эффект, Redis Stream использует концепцию, называемую Consumer Group. Эта концепция подобна псевдо-подписчику, который получает данные из потока, но фактически обслуживается несколькими подписчиками внутри группы, предоставляя определенные гарантии:

  1. Каждое сообщение доставляется разным подписчикам внутри группы.
  2. В пределах группы подписчики идентифицируются по имени, которое представляет собой строку с учетом регистра. Если какой-то подписчик временно выпадет из группы, то он может восстановиться в группу по собственному уникальному имени.
  3. Каждая Consumer Group следует концепции «первое непрочитанное сообщение». Когда подписчик запрашивает новые сообщения, он может получить только те сообщения, которые никогда ранее не доставлялись ни одному подписчику внутри группы.
  4. Существует команда явного подтверждения успешной обработки сообщения подписчиком. Пока не будет вызвана эта команда, запрошенное сообщение будет оставаться в статусе «pending» .
  5. Внутри Consumer Group каждый подписчик может запрашивать историю сообщений, которые были доставлены именно ему, но ещё не были обработаны (в статусе «pending»)


В некотором смысле, состояние группы может быть преставлено так:

+----------------------------------------+
| consumer_group_name: mygroup          
| consumer_group_stream: somekey        
| last_delivered_id: 1292309234234-92    
|                                                           
| consumers:                                          
|    "consumer-1" with pending messages  
|       1292309234234-4                          
|       1292309234232-8                          
|    "consumer-42" with pending messages 
|       ... (and so forth)                             
+----------------------------------------+


Теперь пришло время познакомиться с основными командами для Consumer Group, а именно:

  • XGROUP используется для создания, уничтожения и управления группами
  • XREADGROUP используется для чтения потока через группу
  • XACK — это команда позволяет подписчику пометить сообщение как успешно обработанное


Создание Consumer Group


Предположим, что поток mystream уже существует. Тогда команда создания группы будет иметь вид:

> XGROUP CREATE mystream mygroup $
OK

При создании группы мы должны передать идентификатор, начиная с которого группа будет получать сообщения. Если мы хотим просто получать все новые сообщения, то мы можем использовать специальный идентификатор $ (как в нашем примере выше). Если вместо специального идентификатора указать 0, то группе будут доступны все сообщения потока.

Теперь, когда группа создана, мы можем сразу же начать читать сообщения с помощью команды XREADGROUP. Эта команда очень похожа на XREAD и поддерживает необязательную опцию BLOCK. Однако есть обязательная опция GROUP, которая должна быть всегда указана c двумя аргументами: имя группы и имя подписчика. Опция COUNT поддеживается также.

Прежде чем читать поток, давайте поместим туда несколько сообщений:

> XADD mystream * message apple
1526569495631-0
> XADD mystream * message orange
1526569498055-0
> XADD mystream * message strawberry
1526569506935-0
> XADD mystream * message apricot
1526569535168-0
> XADD mystream * message banana
1526569544280-0

А теперь попробуем прочитать этот поток через группу:

> XREADGROUP GROUP mygroup Alice COUNT 1 STREAMS mystream >
1) 1) "mystream"
   2) 1) 1) 1526569495631-0
         2) 1) "message"
            2) "apple"

Приведенная выше команда дословно гласит следующее:
«Я, Алиса-подписчик, член группы mygroup, хочу прочитать из потока mystream одно сообщение, которое никогда не было никому доставлено ранее».
Каждый раз, когда подписчик выполняет операцию с группой, он должен указать свое имя, однозначно идентифицируя себя внутри группы. В приведенной выше команде есть еще одна очень важная деталь — специальный идентификатор ">". Этот специальный идентификатор фильтрует сообщения, оставляя только те, которые до сих пор ни разу не доставлялись.

Также, в особых случаях, вы можете указать реальный идентификатор, такой как 0 или любой другой действительный идентификатор. В этом случае команда XREADGROUP вернёт вам историю сообщений со статусом «pending», которые были доставлены указанному подписчику (Alice), но ещё не были подтверждены с помощью команды XACK.

Мы можем проверить это поведение, сразу указав идентификатор 0, без опции COUNT. Мы просто увидим единственное ожидающее сообщение, то есть сообщение с яблоком:

> XREADGROUP GROUP mygroup Alice STREAMS mystream 0
1) 1) "mystream"
   2) 1) 1) 1526569495631-0
         2) 1) "message"
            2) "apple"

Однако если мы подтвердим сообщение как успешно обработанное, то оно больше не будет отображаться:

> XACK mystream mygroup 1526569495631-0
(integer) 1
> XREADGROUP GROUP mygroup Alice STREAMS mystream 0
1) 1) "mystream"
   2) (empty list or set)


Теперь настала очередь Боба что-то прочитать:

> XREADGROUP GROUP mygroup Bob COUNT 2 STREAMS mystream >
1) 1) "mystream"
   2) 1) 1) 1526569498055-0
         2) 1) "message"
            2) "orange"
      2) 1) 1526569506935-0
         2) 1) "message"
            2) "strawberry"

Боб, член группы mygroup, попросил не более двух сообщений. Команда сообщает только о недоставленных сообщениях из-за специального идентификатора ">". Как видите, сообщение «apple» не отображется, так как оно уже доставлено Алисе, поэтому Боб получает «orange» и «strawberry».

Таким образом, Алиса, Боб и любой другой подписчик группы могут читать разные сообщения из одного и того же потока. Также они могут читать свою историю необработанных сообщений или помечать сообщения как обработанные.

Есть несколько вещей, которые нужно иметь в виду:

  • Как только подписчик считает сообщение командой XREADGROUP, это сообщение переходит в состояние «pending» и закрепляется за этим конкретным подписчиком. Другие подписчики группы не смогут прочитать это сообщение.
  • Подписчики автоматически создаются при первом упоминании, нет необходимости в их явном создании.
  • С помощью XREADGROUP вы можете читать сообщения из нескольких разных потоков одновременно, однако, чтобы это работало, вам нужно предварительно создать группы с одинаковым именем для каждого потока с помощью XGROUP


Восстановление после сбоя


Подписчик может восстановиться после сбоя и перечитать свой список сообщений со статусом «pending». Однако в реальном мире подписчики могут окончательно потерпеть неудачу. Что происходит с подвисшими сообщениями подписчика, если он не смог восстановиться после сбоя?
Consumer Group предлагает функцию, которая используется именно для таких случаев — когда необходимо сменить владельца сообщений.

Первым делом необходимо вызвать команду XPENDING, которая отображает все сообщения группы со статусом «pending». В своей простейшей форме команда вызывается только с двумя аргументами: именем потока и именем группы:

> XPENDING mystream mygroup
1) (integer) 2
2) 1526569498055-0
3) 1526569506935-0
4) 1) 1) "Bob"
      2) "2"

Команда вывела количество необработанных сообщений для всей группы и для каждого подписчика. У нас есть только Боб с двумя необработанными сообщениями, потому что единственное сообщение, запрошенное Алисой, было подтверждено с помощью XACK.

Мы можем запросить дополнительную информацию, используя больше аргументов:

XPENDING {key} {groupname} [{start-id} {end-id} {count} [{consumer-name}]]

{start-id} {end-id} — диапазон идентификаторов (можно использовать "-" и "+")
{count} — количество попыток доставки
{consumer-name} — имя группы

> XPENDING mystream mygroup - + 10
1) 1) 1526569498055-0
   2) "Bob"
   3) (integer) 74170458
   4) (integer) 1
2) 1) 1526569506935-0
   2) "Bob"
   3) (integer) 74170458
   4) (integer) 1

Теперь у нас есть детали для каждого сообщения: идентификатор, имя подписчика, время простоя в миллисекундах и, наконец, количество попыток доставки. У нас есть два сообщения от Боба, и они простаивают в течение 74170458 миллисекунд, около 20 часов.

Обратите внимание, что никто не мешает нам проверить, каким было содержание сообщения, просто используя XRANGE.

> XRANGE mystream 1526569498055-0 1526569498055-0
1) 1) 1526569498055-0
   2) 1) "message"
      2) "orange"

Мы просто должны повторить один и тот же идентификатор дважды в аргументах. Теперь, когда у нас есть некоторая идея, Алиса может решить, что после 20 часов простоя Боб, вероятно, не восстановится, и пришло время запросить эти сообщения и возобновить их обработку вместо Боба. Для этого мы используем команду XCLAIM:

XCLAIM {key} {group} {consumer} {min-idle-time} {ID-1} {ID-2} ... {ID-N}

С помощью этой команды мы можем получить «чужое» сообщение, которое ещё не было обработано, путём смены владельца на {consumer}. Однако мы также можем предоставить минимальное время простоя {min-idle-time}. Это помогает избежать ситуацию, когда два клиента пытаются одновременно сменить владельца у одних и тех же сообщений:

Client 1: XCLAIM mystream mygroup Alice 3600000 1526569498055-0
Clinet 2: XCLAIM mystream mygroup Lora 3600000 1526569498055-0

Первый клиент сбросит время простоя и увеличит счетчик количества доставок. Так что второй клиент не сможет его запросить.

> XCLAIM mystream mygroup Alice 3600000 1526569498055-0
1) 1) 1526569498055-0
   2) 1) "message"
      2) "orange"

Сообщение было успешно востребовано Алисой, которая теперь может обработать сообщение и подтвердить его.

Из приведенного выше примера видно, что успешное выполнение запроса возвращает содержимое самого сообщения. Однако это не обязательно. Опция JUSTID может использоваться для возврата только идентификаторов сообщения. Это полезно, если вам не интересны детали сообщения и вы хотите увеличить производительность системы.

Счетчик доставки


Счетчик, который вы наблюдаете в выводе XPENDING — это количество доставок каждого сообщения. Такой счетчик увеличивается двумя способами: когда сообщение успешно затребовано через XCLAIM или когда используется вызов XREADGROUP.

Это нормально, что некоторые сообщения доставляются по несколько раз. Главное, чтобы в итоге все сообщения были обработаны. Иногда при обработке сообщения возникают проблемы из-за повреждения самого сообщение или обработка сообщения вызывает ошибку в коде обработчика. В таком случае может оказаться, что это сообщение никто не будет в состоянии обработать. Поскольку у нас есть счетчик попыток доставки, мы можем использовать этот счетчик для обнаружения таких ситуаций. Поэтому, как только счетчик доставок достигнет заданного вами большого числа, вероятно, будет разумнее поместить такое сообщение в другой поток и отправить уведомление системному администратору.

Состояние потоков


Команда XINFO используется для запроса различной информации потока и его группах. Например, базовый вид команды выглядит следующим образом:

> XINFO STREAM mystream
 1) length
 2) (integer) 13
 3) radix-tree-keys
 4) (integer) 1
 5) radix-tree-nodes
 6) (integer) 2
 7) groups
 8) (integer) 2
 9) first-entry
10) 1) 1524494395530-0
    2) 1) "a"
       2) "1"
       3) "b"
       4) "2"
11) last-entry
12) 1) 1526569544280-0
    2) 1) "message"
       2) "banana"

Команда выше отображает общую информацию по указанному потоку. Теперь чуть более сложный пример:

> XINFO GROUPS mystream
1) 1) name
   2) "mygroup"
   3) consumers
   4) (integer) 2
   5) pending
   6) (integer) 2
2) 1) name
   2) "some-other-group"
   3) consumers
   4) (integer) 1
   5) pending
   6) (integer) 0

Команда выше отображает общую информацию по всем группам указанного потока

> XINFO CONSUMERS mystream mygroup
1) 1) name
   2) "Alice"
   3) pending
   4) (integer) 1
   5) idle
   6) (integer) 9104628
2) 1) name
   2) "Bob"
   3) pending
   4) (integer) 1
   5) idle
   6) (integer) 83841983

Команда выше отображает информацию по всем подписчикам указанного потока и группы.
Если вы забудите синтаксис команды, просто обратитесь за помощью к самой команде:

> XINFO HELP
1) XINFO {subcommand} arg arg ... arg. Subcommands are:
2) CONSUMERS {key} {groupname}  -- Show consumer groups of group {groupname}.
3) GROUPS {key}                 -- Show the stream consumer groups.
4) STREAM {key}                 -- Show information about the stream.
5) HELP                         -- Print this help.


Ограничение размера потока


Многие приложения не хотят собирать данные в поток вечно. Часто полезно иметь максимально допустимое количество сообщений в потоке. В остальных случаях полезно перенести все сообщения из потока в другое постоянное хранилище при достижении заданного размера потока. Ограничить размер потока можно с помощью параметра MAXLEN в команде XADD:

> XADD mystream MAXLEN 2 * value 1
1526654998691-0
> XADD mystream MAXLEN 2 * value 2
1526654999635-0
> XADD mystream MAXLEN 2 * value 3
1526655000369-0
> XLEN mystream
(integer) 2
> XRANGE mystream - +
1) 1) 1526654999635-0
   2) 1) "value"
      2) "2"
2) 1) 1526655000369-0
   2) 1) "value"
      2) "3"

При использовании MAXLEN старые записи автоматически удаляются при достижении указанной длины, поэтому поток имеет постоянный размер. Однако обрезка в этом случае происходит не самым проиводительным способом в памяти Redis. Улучшить ситуацию можно следующим образом:

XADD mystream MAXLEN ~ 1000 * ... entry fields here ...

Аргумент ~ в примере выше означет, что нам не обязательно нужно ограничивать длинну потока конкретным значением. В нашем примере это может быть любое число больше или равно 1000 (например, 1000, 1010 или 1030). Просто мы явно указали, что хотим, чтобы наш поток хранил не менее 1000 записей. Это делает работу с памятью намного эффективнее внутри Redis.

Также существет отдельная команда XTRIM, выполняющая то же самое:

> XTRIM mystream MAXLEN 10

> XTRIM mystream MAXLEN ~ 10

Постоянное хранение и репликация


Redis Stream асинхронно реплицируется на slave ноды и сохраняется в файлы типа AOF (снапшот всех данных) и RDB (лог всех операций записи). Также поддерживается репликация состояния Consumer Groups. Поэтому, если сообщение находится в статусе «pending» на master ноде, то на slave нодах это сообщение будет иметь такой же статус.

Удаление отдельных элементов из потока


Для удаления сообщений существует специальная команда XDEL. Команда получает имя потока, за которым следуют идентификаторы сообщений, который необходимо удалить:

> XRANGE mystream - + COUNT 2
1) 1) 1526654999635-0
   2) 1) "value"
      2) "2"
2) 1) 1526655000369-0
   2) 1) "value"
      2) "3"
> XDEL mystream 1526654999635-0
(integer) 1
> XRANGE mystream - + COUNT 2
1) 1) 1526655000369-0
   2) 1) "value"
      2) "3"

При использовании этой команды нужно учесть, что фактически память будет высвобождена не сразу.

Потоки нулевой длины


Разница между потоками и другими структурами данных Redis заключается в том, что когда другие структуры данных больше не имеют элементов внутри себя, в качестве побочного эффекта, сама структура данных будет удалена из памяти. Так, например, отсортированный набор будет полностью удален, когда вызов ZREM удалит последний элемент. Вместо этого потокам разрешается оставаться в памяти, даже не имея ни одного элемента внутри.

Заключение


Redis Stream идеально подходит для создания брокеров сообщений, очередей сообщений, унифицированных журналов и систем чата, хранящих историю.
Как однажды сказал Никлаус Вирт, программы — это алгоритмы плюс структуры данных, а Redis уже дает вам и то, и другое.

Комментарии (0)