Привет, Хабр!

Данная задача в разных вариациях мне давалась на нескольких собеседованиях несколько лет назад. Хоть мой дизайн и проходил, мне стало интересно реализовать это в коде с нуля. Сыроватый и сильно урезанный по функционалу MVP готов, ссылка на github будет под катом. Пока что мной запланировано 3 статьи - эта, по бэкенду и по фронту. Будет много кода на scala, много котов (cats effect), стримов (fs2), пара lock-free техник, scala js, и постараюсь сделать так, чтобы мозг от всего этого не взорвался.

Все, кому интересно - добро пожаловать под кат.


Постановка задачи

Для начала, определимся с вводными, а именно - что мы проектируем.

  1. Первое довольно очевидное вводное - распределенное горизонтально масштабируемое web-приложение с api, по которому может достучаться браузер (то есть UDP не берем)

  2. Хотим at least once гарантию доставки сообщений - сообщения не теряются, но могут дублироваться

  3. Доставка сообщений за разумное время - то есть, если клиент онлайн, сообщение ему должно приходить по возможности быстрее

  4. Сообщения хранятся. Клиент может получить историю сообщений с момента своего последнего полученного сообщения

  5. Чаты на произвольное количество участников, каждый из которых должен иметь возможность получить сообщение. Произвольное количество чатов.

Что ж, теперь разберем требования по пунктам.

Пункт 1 дает нам на выбор несколько вариантов архитектур:

  1. stateless-приложение. AppServer-ы не хранят состояние, все состояние хранится в БД и опционально в распределенном кэше. AppServer-ы ничего друг про друга не знают. Балансер простой, запрос может прилететь на любой appServer. Добавление/удаление инстансов простое, происходит быстро и с минимальными последствиями. Накладные расходы на пересылку сообщений между appServer-ами, на взаимодействие с БД. Это рекомендуемая по умолчанию архитектура, и именно ее я выбрал в качестве целевой. Не подходит для большого трафика и минимального latency - видео- аудио-звонки, онлайн игры.

  2. stateful-приложение, в котором клиенты, относящиеся к одному чату, обслуживаются одним appServer-ом. В данном случае обеспечивается минимальный latency, отсутствует внутренний трафик на пересылку сообщений. Подходит для видео- аудио-звонков, онлайн-игр. Большое кол-во клиентских соединений, так как клиент в общем случае подключается к нескольким чатам. Балансер сложный, на основе consistent hashing по id чата. При добавлении-удалении инстансов нужно обеспечить миграцию клиентов, возможна ситуация раздвоения чатов (проблема, похожая на split brain).

  3. stateful-приложение на Akka. Комбинация первых двух подходов. Один чат обслуживается одним актором, минимальные затраты на синхронизацию с БД. Есть внутренний трафик, в общем случае 2 пересылки сообщения. Проблемы типа split brain решаются кластером самостоятельно (вроде там разновидность gossip, точно не помню). Нужен ack при обмене сообщениями, т.к. у Akka гарантии at most once (сообщения могут теряться). Второй рабочий вариант, и я планировал немного отдохнуть и реализовать его тоже.

Пункт 2 в пояснении не нуждается.

Пункт 3 требований подразумевает асинхронную доставку. В случае с web-клиентом это практически безальтернативно webSocket. Есть вариант с long polling и с server side events, но наиболее удобен webSocket, так что возьмем его.

Пункт 4 нам намекает на то, что нужно использовать БД web-масштабов, да еще и такую, которая умеет сортировать.

Пункт 5 скорее про организацию бизнес-логики, но требование "произвольное количество чатов" нам отсекает kafk-у, у нее вроде как были с этим проблемы.

Еще одно требование-пожелание, которое явно не вытекает из озвученных, но дает очень весомый плюс. Это - асинхронный ввод/вывод. При синхронном вводе/выводе мы ограничены количеством тредов, которые на одной не топовой машине имеют порядок тысяч, часто 2048 или 4096. Да, это пресловутая C10K-problem. При асинхронном вводе/выводе с использованием механизмов epoll/kqueue/iocp мы на одной машине ограничены количеством сокетов, которые имеют порядок сотни тысяч. Требование асинхронного ввода/вывода распространяется также и на коннекты к БД, и на саму БД в частности, и ставит под сомнение postgres или mysql. Также это требование снова отсекает kafk-у, у нее клиент по-моему только синхронный (если есть нормальный асинхронный клиент кафки, который не является оберткой над синхронным, просьба поделиться).

Бизнес-логика

Теперь немного поговорим про бизнес-логику. Она довольно простая в принципе: клиент подключается webSocket-ом к инстансу приложения и посылает сообщение о том, что присоединился к чату. Инстанс идет в БД и записывает информацию о том, что сообщения данному клиенту с clientId можно доставлять по host:port:clientId, где host:port - это адрес самого инстанса, на котором он слушает входящие от других инстансов. (UPD:) Только после этого клиент делает запрос истории и передает timestamp последнего полученного сообщения, или 0, если их еще не было. Отправитель делает запрос к инстансу, к которому он подключен, тот идет в БД, читает список кортежей host:port:clientId, группирует их по host:port и отправляет сообщение на все нужные инстансы. Далее собирает со всех список не ответивших клиентов, обновляет записи об этом в БД и отдает ответ отправителю, что все хорошо. Для внутренней коммуникации по host:port используем grpc.

Забегая немного вперед в выборе БД, накидалась вот такая картинка:

Архитектура глазами автора
Архитектура глазами автора

И тут вылезает целый ворох проблем синхронизации!

  1. Пока одно сообщение отправляется адресатам, подключается новый адресат, и он может не успеть сообщение получить. Нам нужно предоставить гарантию того, что либо он сообщение получит, либо сможет прочитать его в логе, передав timestamp последнего имеющегося у него сообщения.

  2. Проблема в самих timestamp-ах. Дело в том, что нормального времени у нас нет. Точнее, есть, но у каждой машины оно свое. Насколько это критично? Ну, я видел своими глазами сервер, который думал, что живет на 10 минут в будущем. Добавить еще такого же пришельца, но из прошлого, и мы получим разброс в 20 минут, который порвет нам любую синхронизацию. Приходит такой клиент со своим timestamp-ом из будущего, и часть сообщений ему не придет - те, которые были отправлены тем, кто застрял в прошлом. Есть, конечно, сервисы типа TrueTime, которые отдают интервал [low, high], в котором точно находится точное время, и который довольно мал, но для наших задач это избыточно. Есть векторные часы, но с ними сложно. Нам подойдет механизм эпох.

  3. Механизм эпох работает следующим образом. У нас есть неубывающий номер эпохи типа Long, который увеличивается на 1 всегда, когда изменяется список адресатов - на добавление или удаление. Внутри одной эпохи может быть много сообщений, и они маркируются обычным timestamp-ом (можно взять ts из записи в бд). И мы гарантируем, что сообщение будет записано в лог с текущим номером эпохи. Таким образом мы гарантируем, что все сообщения будут доставлены. Попробуем доказать. Клиент может не получить сообщение только если оно добавится в лог с той же эпохой, но меньшим timestamp-ом, который уже есть у клиента. Если он его не получил, то его удалят из списка рассылки, увеличат номер эпохи и запишут это сообщение с увеличенным номером эпохи - таким образом, получаем противоречие. У не доставленного напрямую сообщения всегда номер эпохи будет больше, чем последний сохраненный номер у отключенного клиента, и это наш главный инвариант, который нам предстоит обеспечить.

  4. С использованием механизма эпох возникает трудность в том, что запись в лог и изменение эпохи должны происходить взаимоисключающе. Конкурирующие изменения эпохи также должны быть синхронизированы. У нас вырисовывается перспектива использования механизма распределенных блокировок, а это дорого. Есть вариант с транзакциями в БД. Есть вариант с zookeeper-ом, в curator-е есть соответствующий рецепт, но беда в том, что zookeeper - это система класса replicated state machine, и все его состояние должно умещаться на одной машине, что нарушает условие "произвольное количество чатов". По-моему, у kafk-и были похожие проблемы в свое время. В общем, вариант с блокировками - не вариант.

  5. Остается еще одна возможность - использование lock-free техник. Когда-то давно, больше 10 лет назад, я увлекался lock-free алгоритмами. Если у нас есть CAS (compare-and-swap, compare-and-set, в общем, атомарное сравнение с обменом), то мы можем написать любую синхронизацию. Из распространенных БД CAS нам предоставляют mongodb и hbase. Я выбрал HBase, так как она хранит сами данные сортированными, тогда как в mong-е для этого нужен индекс.

Наконец-то код

Итак, нам нужно написать 2 алгоритма - отправки сообщения и подключения/отключения клиента. Опишем их в терминах нескольких функций, оперирующих состоянием и записью/отправкой сообщений. Функции описаны в терминах эффекта cats.effect.IO.

Отправка сообщения:

trait MessagingLogic[A, M] {                // A - address type, M - message type
  type State = MessagingLogic.State[A]      // state type
  type MsgData = MessagingLogic.MsgData[M]
  val algo = MessagingLogic.Algo.apply[M] _
  val msg = MessagingLogic.Msg.apply[M] _

  // Yes, I know about "interface segregation principle", but here I'd prefer to keep
  // all functions in one list for understandability (we'll really need it!)
  
  // sends message to all addresses, return failed addresses
  protected def sendToAll(addresses: Set[A], epoch: Long, message: M): IO[Set[A]]
  // state logic
  protected def readState(recipient: String): IO[State]
  protected def casState(recipient: String, expect: State, newState: State): IO[Boolean]
  // log logic
  protected def readMessages(recipient: String, fromEpoch: Long, fromTS: Long, untilEpoch: Long): FStream[IO, M]
  protected def writeMessage(key: String, message: M): IO[Long] // returns timestamp
  protected def logMessage(recipient: String, epoch: Long, timestamp: Long, key: String): IO[Unit]

  // sends message with known last state
  private def sendMsgs(
                        msgData: MsgData,
                        lastState: State,
                        sent: Set[A] = Set.empty,
                        fails: Set[A] = Set.empty): FStream[IO, Algo[M]] =
  {
    val sendTo = lastState.addresses -- sent -- fails
    val MsgData(recipient, key, message, timestamp) = msgData

    FStream.eval(logMessage(recipient, lastState.epoch, timestamp, key)) >>
    FStream(algo(lastState.epoch, -1, s"start send to ${sendTo.size}, sent=${sent.size}, last failures=${fails.size}")) ++
    FStream.eval(sendToAll(sendTo, lastState.epoch, message)).flatMap { failures =>
      if (failures.isEmpty && fails.isEmpty) {
        // no failures and no previous fails - just check epoch
        // if epoch don't match - re-send
        FStream.eval(readState(recipient)).flatMap { state =>
          if (lastState.epoch == state.epoch)
            FStream(algo(lastState.epoch, state.epoch, "OK"))
          else
            FStream(algo(lastState.epoch, state.epoch, s"retry, no failures")) ++
            sendMsgs(msgData, state, sendTo)
        }
      } else {
        val allFails = failures ++ fails
        // failures or previous fails, changes addresses set => changes epoch
        val newState = State(lastState.epoch + 1, lastState.addresses -- allFails)
        FStream.eval(casState(recipient, lastState, newState)).flatMap { result =>
          if (result)
            FStream(algo(lastState.epoch, newState.epoch, s"failures=${failures.size}, epoch change OK")) ++
            sendMsgs(msgData, newState, sendTo ++ sent -- failures)
          else
            FStream(algo(lastState.epoch, -1, s"failures=${failures.size}, sendTo=${sendTo.size}, sent=${sent.size}, lastFails=${fails.size} epoch change failed, retry")) ++
            sendKey(msgData, sendTo ++ sent -- failures, allFails)
        }
      }
    }
  }

  // send function, where we read current state and then perform logic with this
  // last observed state
  private def sendKey(msgData: MsgData, sent: Set[A] = Set.empty, fails: Set[A] = Set.empty): FStream[IO, Algo[M]] = {
    val MsgData(recipient, key, message, timestamp) = msgData
    FStream.eval(readState(recipient)).flatMap { state =>
      FStream(algo(-1, state.epoch, s"start send")) ++
      sendMsgs(msgData, state, sent, fails)
    }
  }

  // main send function, where we just write message to 'messages' table and
  // perform subsequent steps
  def send(recipient: String, key: String, message: M): FStream[IO, Algo[M]] = {
    FStream(algo(-1, -1, s"start write message key=$key")) ++
    FStream.eval(writeMessage(key, message)).flatMap { timestamp =>
      sendKey(MsgData(recipient, key, message, timestamp))
    }
  }

Код написан в терминах fs2.Stream, который тут переименован в FStream. Функция send просто возвращает стрим шагов, которые произвел алгоритм. Забегая вперед, это было не самое лучшее решение, но тогда оно мне казалось удачным - не нужно дополнительно вводить логирование, и все шаги можно получить и записать в тот же лог в одну строку, что в теории должно облегчать в будущем отладку. Итак, функция send просто записывает сообщение в таблицу messages, и передает управление дальше. SendKey читает текущее состояние и передает управление в sendMsgs. Там сообщение пишется в лог с текущей эпохой, затем пересылается всем адресатам, кому оно еще не было отправлено, и собираются ошибки. Если ошибок не было - перечитывается состояние, если эпоха не поменялась - успех, выходим. Если поменялось - рекурсивно вызываемся с новым состоянием. Если были ошибки - накидываем номер эпохи, формируем новый список адресатов и пробуем поменять состояние, если успех - вызываемся от нового состояния, если нет - значит, кто-то успел состояние поменять, возвращаемся на этап sendKey.

Функция подключения клиентов проще:

private def addClientCurrent(recipient: String, lastState: State, addresses: Set[A]): FStream[IO, Algo[M]] = {
  FStream(algo(lastState.epoch, -1, s"set ${addresses.size} addresses")) ++
  FStream.eval(casState(recipient, lastState, State(lastState.epoch + 1, lastState.addresses ++ addresses))).flatMap { result =>
    if (result) {
      FStream(algo(lastState.epoch, lastState.epoch + 1, s"cas OK"))
    } else {
      FStream(algo(lastState.epoch, -1, s"cas failed")) ++
      addClient(recipient, addresses)
    }
  }
}

def addClient(recipient: String, addresses: Set[A]): FStream[IO, Algo[M]] = {
  FStream(algo(-1, -1, s"read state")) ++
  FStream.eval(readState(recipient)).flatMap { state =>
    addClientCurrent(recipient, state, addresses)
  }
}

Здесь все примерно так же, как в случае ошибок при отправке сообщения: читаем состояние, накидываем эпоху, докидываем адресов, пытаемся изменить состояние, если получилось - выходим, если нет - возвращаемся с чтения состояния.

И здесь есть 2 проблемы.

  1. Состояние хранится сериализованным, список адресов вместе с эпохой. А это значит, что любое добавление адреса - это O(N), соответственно, добавление N адресов - это O(N^2), что плохо. Эта проблема решается в лоб делением списка адресов на chunk-и и связыванием их в односвязный список с записью головного чанка в состояние. Функция send, раз ей все равно все адреса нужны, разматывает этот список и производит compaction. Это решение с одной стороны не очень сложное, с другой - мне хотелось написать MVP побыстрее. Так что проблема решаема, но на данном этапе мне не хотелось ее решать. Тем более, есть проблема похуже, и это

  2. Contention. Если приглядеться, то увидим, что функция добавления адресата - это типичный CAS-loop, в котором тот, кому не удалось произвести изменение, уходит на второй круг. Так вот, в случае высокого contention lock-free алгоритм проигрывает алгоритму с блокировками именно тем, что большое количество потоков будут раз за разом делать бесполезную работу, пытаясь успеть внести изменения, но изменения всегда вносит только один.

Если пошла жара...

Представим ситуацию - в наш мессенджер пришла популярная модель с целью выкладывать фотки своей пятой точки и рубить деньги на рекламе, и - о ужас! - открыла доступ к комментариям с целью типа создать видимость работы с аудиторией (нет). И нам начинают сыпаться как из рога изобилия запросы на добавление в чат и сообщения (комментарии), которым, по-хорошему, место в /dev/null, но бизнес есть бизнес, и у нас вырисовывается довольно сильный contention.

На помощь приходит буферизация. Идея - собрать побольше запросов на добавление и сообщений вместе, и обработать все скопом. Заодно и проблема O(N^2) при добавлении адресов решится - если мешаем эти запросы с сообщениями в кучу, то нам так и так все сохраненные адреса читать.

Такой буфер довольно несложно сделать. Заводим таблицу, в которую будем скидывать запросы на добавления и сообщения вместе, как было с сообщениями в предыдущем параграфе, в сортированном виде, и заведем еще один регистр с еще одной эпохой, и сделаем так: поток читает эпоху, пишет в таблицу запрос на добавление или сообщение, перечитывает эпоху, если изменилась - начинаем сначала, если нет - отлично, ждем несколько секунд, накидываем номер эпохи, пытаемся эпоху изменить, если неудача - просто выходим, дело сделано. Если текущий поток был тем "счастливчиком", который первый изменил эпоху - тогда он собирает все запросы предыдущей эпохи и закидывает их в алгоритм обработки (тот, что в следующем коде назван очень говорящим названием "out"):

trait BufferLogic[M, E] { // M - message type, E - event type (returns from out)
  type BufferState = BufferLogic.State
  type Msg = BufferLogic.Msg[M]
  // delay
  protected def timeout: Duration
  // state stuff
  protected def readBufferState(recipient: String): IO[BufferState]
  protected def casBufferState(recipient: String, expect: BufferState, newState: BufferState): IO[Boolean]
  // messages stuff
  protected def addMessage(recipient: String, msg: Msg): IO[Unit]
  protected def changeMessageEpoch(recipient: String, epoch: Long, timestamp: Long, key: String, value: M, toEpoch: Long): IO[Unit]
  protected def readMessages(recipient: String, epoch: Long): FStream[IO, Msg]
  // output - processing of whole messages batch
  protected def out(recipient: String, msgs: Seq[Msg]): FStream[IO, E]

  // adds 'msg' to db and checks if it's epoch matches with current epoch
  private def consistentAdd(recipient: String, msg: Msg, lastState: BufferState, prevState: Option[BufferState] = None): IO[Long] = {
    async[IO] {
      prevState.fold(addMessage(recipient, msg.copy(epoch = lastState.epoch))) { ps =>
        changeMessageEpoch(recipient, ps.epoch, msg.timestamp, msg.key, msg.msg, lastState.epoch)
      }.await

      val st = readBufferState(recipient).await

      if (st.epoch == lastState.epoch) lastState.epoch
      else consistentAdd(recipient, msg, st, Some(lastState)).await
    }
  }

  def bufferSend(recipient: String, key: String, timestamp: Long, msg: M): FStream[IO, E] = {
    val io = async[IO] {
      val state = readBufferState(recipient).await
      val lastEpoch = consistentAdd(recipient, Msg(state.epoch, key, timestamp, msg), state).await
      // sleep for timeout to guarantee "timeout" period between epochs
      IO.sleep(timeout).await
      val casRes = casBufferState(recipient, State(lastEpoch), State(lastEpoch + 1)).await
      // if we are "lucky", we are the first who moved epoch, so we should
      // gather all epoch messages and move them to "out";
      // otherwise do nothing
      if (casRes) readMessages(recipient, lastEpoch).compile.fold(IndexedSeq.empty[Msg])(_ :+ _).await
      else IndexedSeq.empty[Msg]
    }

    FStream.eval(io).flatMap { seq =>
      out(recipient, seq)
    }
  }

  def initBufferState(recipient: String): IO[Unit] = IO(())
}

object BufferLogic {
  case class State(epoch: Long)

  case class Msg[M](epoch: Long, key: String, timestamp: Long, msg: M)
}

Здесь преимущество в том, что у нас отсутствует contention - потоки, которые не смогли изменить состояние (а изменить состояние может только один) не уходят на второй круг, а просто выходят.

А недостаток - теряется гарантия at least once) Теперь наш алгоритм уязвим к сбоям вида "поменял эпоху и умер". К счастью, эта проблема решается добавлением к сообщению адреса колбэка, но на данном этапе я не хочу ее решать. Поэтому просто констатируем - запросы на добавление мы будем буферизовать, а для сообщений оставим на выбор - буферизовать, теряя гарантии, или нет. Отправка сообщений не дает большого contention-а, там на второй круг логика уходит только в случае отключения клиентов.

Теперь нам надо написать алгоритм обработки пачки перемешанных запросов на добавление и сообщений. Буферизованные запросы, приходящие пачками, остаются, тем не менее, конкурентными. К счастью, наш send-алгоритм от этого почти не поменяется (обещаю, этот код - последний, больше на сегодня не будет):

// TODO: refactor
  private def sendMsgs(
                        msgData: MsgData,
                        newAddresses: Set[A],
                        lastState: State,
                        sent: Set[A] = Set.empty,
                        fails: Set[A] = Set.empty): FStream[IO, Algo[M]] =
  {
    val allAddrs = lastState.addresses ++ newAddresses -- fails
    val sendTo = allAddrs -- sent
    val MsgData(recipient, messages, timestamp) = msgData

    // we log messages only when we add all 'newAddresses', because
    // if we have 'newAddresses' to add, we are guaranteed that there
    // will be new epoch, and we add messages to log only then
    val log =
      if (newAddresses.isEmpty)
        FStream.eval(logMessages(recipient, lastState.epoch, timestamp, messages.keySet))
      else FStream(())

    log >>
    FStream(algo(lastState.epoch, -1, s"start send to ${sendTo.size}, sent=${sent.size}, last failures=${fails.size}")) ++
    FStream.eval(sendToAll(recipient, sendTo, lastState.epoch, messages.map { case (id, m) =>
      id -> FullMessage(id, lastState.epoch, timestamp, m)
    })).flatMap { failures =>
      // it's important here that only if we have no new addresses,
      // no current failures and no previous failures,
      // we can go without epoch increment
      if (newAddresses.isEmpty && failures.isEmpty && fails.isEmpty) {
        // no failures and no previous fails - just check epoch
        // if epoch don't match - re-send
        FStream.eval(readState(recipient)).flatMap { state =>
          if (lastState.epoch == state.epoch)
            FStream(algo(lastState.epoch, state.epoch, "OK"))
          else
            FStream(algo(lastState.epoch, state.epoch, s"retry, no failures")) ++
            sendMsgs(msgData, newAddresses, state, allAddrs)
        }
      } else {
        // we have failures, or prev fails, or new addresses -
        // in any case, there is address set change, so - epoch movement
        val allFails = failures ++ fails
        val newAllAddrs = allAddrs -- failures
        // failures or previous fails, changes addresses set => changes epoch
        val newState = State(lastState.epoch + 1, newAllAddrs)
        FStream.eval(casState(recipient, lastState, newState)).flatMap { result =>
          if (result)
            FStream(algo(lastState.epoch, newState.epoch, s"failures=${failures.size}, epoch change OK")) ++
            sendMsgs(msgData, Set.empty, newState, newAllAddrs)
          else
            FStream(algo(lastState.epoch, -1, s"failures=${failures.size}, sendTo=${sendTo.size}, sent=${sent.size}, lastFails=${fails.size} epoch change failed, retry")) ++
            sendKeys(msgData, newAddresses, newAllAddrs, allFails)
        }
      }
    }
  }

  def sendKeys(msgData: MsgData, newAddresses: Set[A], sent: Set[A] = Set.empty, fails: Set[A] = Set.empty): FStream[IO, Algo[M]] = {
    FStream.eval(readState(msgData.recipient)).flatMap { state =>
      FStream(algo(-1, state.epoch, s"start send")) ++
      sendMsgs(msgData, newAddresses, state, sent, fails)
    }
  }

  def send(recipient: String, messages: Map[String, M], newAddresses: Set[A]): FStream[IO, Algo[M]] = {
    FStream(algo(-1, -1, s"start write ${messages.size} messages")) ++
    FStream.eval(writeMessages(messages)).flatMap { timestamp =>
      sendKeys(MsgData(recipient, messages, timestamp), newAddresses)
    }
  }

Тут просто разбираемся, кому, что и куда отправлять, куда, что и с какой эпохой сохранять, а общий алгоритм не поменялся.

Итог

Если честно, я думал, статья будет раза в 3 короче, но у меня пока нет идей, как ее ощутимо подсократить (и это я еще тесты не описывал). Зато была проделана основная работа - определен вариант архитектуры, выбраны БД, технология, и описана бизнес-логика.

В следующей статье напишем обвязку - те самые функции, которые сегодня остались абстрактными и в терминах которых работает логика. Напишем бинды к HBase, хендлеры webSocket, grpc, свяжем вместе буфер и логику отправки, и попробуем сохранить мозг целым, а нервы - расслабленными (хотя я свои примерно все там и оставил).

Код к статье тут

Спасибо за внимание!

Комментарии (3)


  1. fuCtor
    20.06.2023 18:56

    А смотрели для БД на что-нибудь ещё? По описанию не плохо подходит сюда FoundationDB, есть и сортировка, и внутренние "часы", она же версия, и полностью асинхронный.


    1. iboltaev Автор
      20.06.2023 18:56

      Foundation DB я посмотрел, спасибо. Вроде как на первый взгляд у них нет CAS (https://github.com/apple/foundationdb/wiki/Proposal:-compare_and_set-mutation), а есть распределенные транзакции. Это в общем случае дороговато, тогда как compareAndSet в HBase по производительности не отличается от put.


      1. fuCtor
        20.06.2023 18:56

        Тут вопрос, а нужны ли они, если исходить из описанных пунктов, то:

        Проблема в самих timestamp-ах. Дело в том, что нормального времени у нас нет.

        Есть атомарные операции https://apple.github.io/foundationdb/developer-guide.html#atomic-operations
        И если нам нужно писать сообщения, то получаем следующую структуру:
        префикс/<channel>/<version>/<ts>/<user> = <payload>

        version - может проставляться самой БД в момент коммита, что гарантирует всегда растущую последовательность
        тогда чтение лога будет выглядеть как getRange(префикс/<channel>/<last_version>, префикс/<channel>/\xff)

        В целом, не обязательно использовать CAS, если можно использовать особенности и возможности самой БД.