Эта статья повествует об эволюции структур данных, которые мы использовали при выполнении запросов в исторической части базы данных временных рядов Agoda, и демонстрирует примеры реализации нескольких дисковых индексных баз данных.

Задача

Во время поглощения (ingestion) данных в реалтайм части нашей базы данных, мы интернируем (выдаем неизменяемый int идентификатор) все входящие строки. Результатом этого процесса является StringInt словарь. Позже, все еще во время выполнения запроса в нашей исторической части, чтобы работать с полученными данными, нам сначала нужно определить, какой идентификатор был присвоен каждой конкретной строке из запроса. Затем результаты нужно преобразовать обратно в строки.

Вот пример запроса:

{
    "metric": "whitefalcon.messages.consumed",
    "start": "2022-04-05 11:27:21",
    "end": "2022-04-05 11:29:21",
    "tags": {
        "dc": [ "HK", "SG" ],
        "cluster": [ "A", "B" ]
    },
    "groupBy" : [ "messageType" ]
}

Чтобы интернировать запрос, нам нужно выполнить обратный поиск / reverse(String->Int) lookup, то есть нам нужно найти идентификатор Int для конкретной строки. После интернирования запрос будет выглядеть следующим образом:

{
    "metric": 5962,
    "start": "2022-04-05 11:27:21",
    "end": "2022-04-05 11:29:21",
    "tags": {
        3: [ 12, 14 ],
        15: [ 52, 315 ]
    },
    "groupBy" : [ 411 ]
}

После выполнения запроса нам нужно выполнить прямой поиск / forward(Int->String) lookup, чтобы получить сами строки по их int’ам, прежде чем мы сможем представить ответ клиенту.

Для обработки каждого запроса нам может потребоваться объединить результаты запросов к нескольким разделам. У каждого раздела есть свой словарь, поскольку они были созданы в разное время и разными инстансами поглощения данных.

Следовательно, мы можем сформулировать задачу следующим образом — как организовать несколько словарей в нашем приложении таким образом, чтобы иметь наиболее эффективный доступ к прямому и обратному поиску?

Примечание: потенциально эту проблему можно решить с помощью некоторых KV-библиотек, таких как RocksDB/LevelDB, SQLite, или даже с помощью отдельно развернутых распределенных систем (хотя сетевая задержка резко снизит общую производительность). Сегодня же мы обсудим только самодельные решения.

На протяжении всей статьи будут использоваться следующие инструменты и языки:

Давайте создадим контракт для нашей структуры данных:

trait InternedStrings {
  override def lookup(id: Int): String
  override def lookup(v: String): Int 
}

Поскольку идентификаторы (id) выдаются последовательно, начиная с 0, наш словарь хранится в формате массива строк. Здесь в качестве идентификатора строки используется индекс ячейки:

+-------------------+----------------+---------------+------
| number of entries | string1 length | string1 bytes | ....
+-------------------+----------------+---------------+------

Количество строк в каждом словаре сильно разнится в зависимости от множества факторов (время раздела, размер раздела и т. д.), а также от содержимого самих строк. Для простоты давайте представим, что все словари имеют строки, содержащие 20 буквенно-цифровых символов, и в свою очередь содержат примерно по 100 тысяч строк, что в сумме составит около 2 МБ необработанных данных на диске под каждый словарь.

Давайте определим наши ограничения:

  • Размер инстанса приложения — 8 ядер процессора + 16 ГБ RAM.

  • Рабочая нагрузка — 1  миллион прямых и 3 тысячи обратных запросов в секунду.

  • Общее количество словарей ~ 10 тыс.

  • ~1,5 тысяч словарей требуется для 90% запросов

  • Все словари являются неизменяемыми

Хэш-map’ы в памяти

Начнем с простого подхода — полностью загрузить словарь с диска в память. Это ускоряет прямой поиск, поскольку нам просто нужно получить доступ к ячейке массива, а для обратного поиска просто агрегировать Map[String,Int].

class SimpleInternedStrings(private val strings: Array[String]) extends InternedStrings {
  private val reverseMapping = strings.zipWithIndex.toMap

  override def lookup(id: Int): String =
    if (id == NullId) null else strings(id)

  override def lookup(v: String): Int =
    reverseMapping.getOrElse(v, NullId)
}

Для тех, кто не знаком со Scala strings.zipWithIndex.toMap делает следующее:

  1. Отображает массив строк ["hello","world"] в последовательность кортежей [("hello",0), ("world",1)]

  2. Функция toMap агрегирует созданную последовательность с индексами в Map[String, Int]

Flamegraph для создания словаря
Flamegraph для создания словаря

Для каждого запроса будут выделены дополнительный массив из 100 тыс объектов и reverse map. Reverse  map будет содержать целые упакованные целые числа, то есть на 100 000 объектов больше (мы могли бы смягчить это, увеличив размер целочисленного кэша, но что, если бы наш словарь содержал 5 миллионов строк?). В общей сложности это составит несколько сотен КБ аллокаций только для преобразования нашего строкового массива в запрашиваемую структуру данных.

Мы уже загрузили процессор созданием нашего словаря, а позже GC (сборщик мусора) забьет остальную часть процессора. И это все только для одного инстанса. Что, если вам нужно прочитать около 100 из них для каждого запроса?

Мы могли бы улучшить ситуацию, внеся следующие изменения:

  1. Используем коллекции примитивов, например koloboke, чтобы избежать упаковки примитивов.

  2. Перепишим причудливую цепочку монад в старый добрый цикл for, который не потребует чрезмерных выделений памяти.

class ArrayBackedInternedStrings(private val strings: Array[String]) extends InternedStrings {
  private val reverseMapping = {
    val map: HashObjIntMap[String] = HashObjIntMaps
      .newUpdatableMap[String](strings.length)

    for (i <- strings.indices) {
      map.put(strings(i), i)
    }

    map
  }

  override def lookup(id: Int): String = 
    if (id == NullId) null else strings(id)

  override def lookup(v: String): Int = 
      reverseMapping.getOrDefault(v, NullId)
}

Хотя эти изменения улучшают нагрузку на ЦП, картину кардинально они не меняют. Посмотрев на 80% flamegraph’а, занятого созданием словарей, следующая идея, к которой мы приходим, проста — давайте их все кэшируем. Звучит неплохо; мы не будем загонять наш процессор, а вместо этого будем использовать нашу оперативную память.

Для кэширования будем использовать политику LRU, иначе рано или поздно у нас закончится память. Учитывая размер инстанса нашего приложения, мы можем выделить около 10% его для кэша. Затем мы начнем вытеснять наименее используемые элементы, если размер нашего кэша превышает 1,6 ГБ. Это поможет нам сохранить словари, которые используются для большинства запросов, и не тратить память на те, которые используются для наименее популярных запросов.

Для текущей реализации каждый словарь (с нашими ограничениями) будет иметь размер около 9 МБ (string инстансы, массив необработанных строк, хэш-map обратного поиска). Это означает, что у нас может быть чуть больше 150 словарей в каждом инстансе одновременно. Этого недостаточно — у нас будет много повторных инвалидаций кэша, так как нам нужно гораздо больше словарей для обслуживания большинства запросов.

К счастью для нас, мы заметили интересную закономерность — вам не нужны все строки в словаре при каждом запросе. Обычно вам требуется менее сотни различных поисковых запросов.

Это многое меняет. Теперь мы можем лениво создавать Map[String,Int], экономя около 60% оперативной памяти по сравнению с предыдущим решением.

Теперь мы можем повторно реализовать наш словарь следующим образом:

class RawInternedStrings(private val strings: Array[Array[Byte]]) extends InternedStrings {
  private val string2Id = new TrieMap[String, Int]()

  override def lookup(id: Int): String = {
    if (id == NullId) {
      null
    } else {
      new String(strings(id))
    }
  }

  override def lookup(word: String): Int = {
    string2Id.getOrElseUpdate(
      word, {
        val wordBytes = word.getBytes()
        var i         = 1
        var found     = false
        while (!found && i < strings.length) {
          if (util.Arrays.compare(wordBytes, strings(i)) == 0) {
            found = true
          } else {
            i += 1
          }
        }
        if (found) i else NullId
      }
    )
  }
}

Здесь мы видим несколько изменений:

  1. strings больше не содержит String-инстансы, вместо этого мы храним байты, прочитанные с диска, чтобы не выделять память для String инстансов

  2. При обратном поиске мы используем брутфорс, чтобы найти нужную строку. На самом деле нас не беспокоит сложность поиска O(N), потому что он не будет выполняться более одного раза для каждой строки.

  3. Используем TrieMap (lock-free scala аналог для java ConcurrentHashMap) для кэширования результатов поиска и сделаем класс потокобезопасным.

Теперь наша реализация требует ~4 МБ, хотя мы и храним необработанные байты, мы сохраняем внутренние массивы для каждой записи. Это стоит нам 1,5 МБ по сравнению с версией с необработанным диском. Теперь мы можем кэшировать до 400 словарей одновременно, что лучше, но все же недостаточно, так как скорость вытеснения все еще слишком высока.


Перевод материала подготовлен в преддверии старта курса «Архитектура и шаблоны проектирования». Пройдите вступительное тестирование, если интересно узнать свой уровень знаний для поступления на курс.

Комментарии (1)


  1. Al-Capona
    26.05.2022 03:42
    +1

    Ставьте изначально задачи правильно.

    Что значит "загрузить всё с диска в ОЗУ-кэш"?

    Если все ~10К словарей по 2 млн байт каждый (опять же, каких букв - весь английский алфавит, или из UTF-8?) то это 20 млрд байт ОЗУ без учёта индексной нагрузки в неассемблерных языках. Вы же указываете 16 ГБ оперативы, доступной в задаче.

    Если загружать частично, и, так понимаю, значительно меньше, чем фул база, то при чём тут вообще 16 ГБ RAM, из которой будет неиспользована большая часть, так как задержки на подгрузках сведут на "нет" 8-ми потоковую зависимость (при 8-ми ядрах) и при условии использования HHD (опять же, в условиях не указан тип постоянного носителя - SSD, или ещё что быстрее, или просто сетевое хранилище)

    Что значит "1 млн запросов в секунду"? Тоже "от балды" цифра?

    8 ядер реагируют, грубо, на 3 млрд тактов генератора в секунду при 3 ГГц процессоре.

    Итого, 24 млрд инструкций он способен исполнить в секунду.

    Делим на 1 млн запросов, получаем, что за 3 000 инструкций, нужно:

    0) Получить запрос (так понимаю, что из сетевого подключения, на который потребуется минимум 4 КБ на сокет и 10 КБ на обработку, примерно и минимально, итого 14 КБ * на 1 млн коннектов = 14 ГБ ОЗУ только на удержание соединений, и это теоретический минимум, на который, видимо, задействуется дополнительная память, необозначенная в "инстансе")

    1) Чекнуть базу, размером в ~20 ГБ

    2) Отработать с диском / хранилищем

    3) Выдать результат в сокет - здесь много зависит от драйвера сетевой карты, от операционной системы и её API, сколько она сожрёт тактов, я хз, кроме этого, должна происходить иная работа с другими службами OS фонового режима

    4) И всё это на одной машине, без горизонтального масштабирования.

    5) Здесь ещё используются различные общеизвестные базы данных, которые тоже захотят себе часть ресурсов системы.

    Крч, эта задача = херь собачья, отнимающая время просто так.

    Если говорить о безумии, что вообще такая задача может потребоваться для решения каких-то непридуманных и несуществующих задач, то я бы просто держал все данные в минимум (навскидку) 128 ГБ ОЗУ, особо структурировав их, и актуализируя на самые быстрые хранилища, которые только придуманы человечеством, а для обработки использовал бы только ассемблер с ТОПовыми инструкциями предоставленного процессора.

    Может моих знаний недостаточно, и на Хакатоне, каком-нибудь, норм пацаны делают такие проектики "на коленках" и за полчаса максимум, и на пеньках третьих, но, на мой взгляд, физику сложно обмануть, шаманов не существует.

    Сконкретизируйте более точно, иначе разговор ниочём, а я сижу и потею, как лошара.