Кадр из к/ф «Телефонистки»
В этой статье я опишу наш путь реализации глобальных индексов в шардированной базе данных. Расскажу обо всех проблемах, с которыми столкнулись, и решениях, которые приняли, чтобы их обойти. Мы поговорим про реализацию на основе базы данных Tarantool, но общий подход применим и к другим шардированным базам данных без встроенной поддержки таких индексов, да и встроенная реализация часто строится по похожим принципам. Эта статья поможет разобраться в деталях, компромиссах и ограничениях работы глобальных индексов. Но если нужно только в общих чертах познакомиться с концепциями глобальных индексов, советую почитать статью Алекса ДеБри How do distributed databases handle secondary indexes? A survey.
Для начала немного о себе. Я разработчик, за свою карьеру писал на разных языках, в стеке есть и питон, и гошка. Сейчас я лид одного из проектов в компании Picodata. Пишу на Rust и Lua. Наша компания специализируется на распределённых системах с высокими требованиями к производительности. Например, сервис, над которым я работаю сейчас, выдерживает 400 000 запросов в секунду при latency не больше 50 миллисекунд в 95 процентиле.
Зачем
Для того чтобы выдержать такие высокие требования к производительности, наш проект шардирован. Он состоит из нескольких тысяч инстансов (включая реплики и инстансы, отвечающие за выполнение той или иной логики) и должен иметь возможность при необходимости масштабироваться дальше. В такой системе мы не можем позволить себе иметь локальные вторичные индексы и делать мапредьюс по всему кластеру, поэтому запросы по обычным вторичным индексам в проекте запрещены совсем. Но пользователям они всё равно нужны, поэтому у нас появилась необходимость всё же сделать возможным поиск объектов не только по первичным ключам. Единственным вариантом стала реализация глобальных вторичных индексов.
Глоссарий
Тут список некоторых терминов, которые я буду использовать на протяжении статьи. Необязательно сейчас вникать в каждый, к этому списку всегда можно будет вернуться, к тому же смысл терминов вполне может быть понятен из контекста.
Инстанс — один запущенный процесс базы данных. Может быть предназначен как для хранения данных, так и для выполнения какой-то логики.
Бакет — единица балансировки. Некоторая виртуальная сущность, к которой однозначно привязаны данные.
Шард — инстанс базы данных, предназначенный для хранения данных. К одному шарду обычно привязано несколько сотен бакетов.
Роутер — инстанс базы данных, предназначенный для обработки поступающих запросов, получении запрошенных данных из шардов, и возвращении их в качестве результата выполнения запросов.
Ребалансировка — процесс физического перемещения бакетов (вместе со связанными записями) с одного шарда на другой. Например, для вывода какого-то шарда из эксплуатации.
Шардированная таблица — таблица, созданная на каждом шарде. Каждая запись в ней связана с каким-то бакетом, и, соответственно, эта запись хранится на том шарде, к которому привязан её бакет. Записи могут перемещаться с шарда на шард в процессе ребалансировки.
Кардинальность — степень уникальности значений. Высокая кардинальность означает, что в поле большинство значений уникальные, возможно все. Низкая кардинальность означает, что среди значений много дублей.
Мапредьюс — так мы называем опрос всех шардов кластера для получения данных и их какую-то последующую обработку, например объединение в общий список для отдачи результата пользователю.
Начало пути. Основная концепция
Глобальные индексы — штука не новая, есть даже их встроенная поддержка в некоторых базах данных, например: Amazon DynamoDB, ScyllaDB, Couchbase.
Общий концепт следующий. Нам нужно построить таблицу, которая будет хранить соответствие между значением вторичного ключа записи и его бакета или первичного ключа. Сама таблица, по-хорошему, тоже должна быть шардирована.
Дальше нам нужно как-то эту таблицу поддерживать в актуальном состоянии. Можно придумать синхронный способ её обновлять или сделать всё асинхронно.
Потом требуется реализовать на роутере логику получения по значению вторичного индекса связанных с ним бакетов, где физически лежат записи, и получить уже данные с шардов, где сейчас находятся эти бакеты.
Концептуальная схема запросов с глобальными индексами выглядит следующим образом:
Грабли первые. Хранение
Структура таблицы индексов выглядит примерно следующим образом:
{
{ name = "secondary_key", type = "string" },
{ name = "bucket_id", type = "unsigned" },
{ name = "refs", type = "array" },
}
В поле secondary_key
лежит значение вторичного ключа. Можно было бы использовать хеш от вторичного ключа, но тогда пришлось бы решать проблему коллизий хешей при удалении.
Для поля secondary_key
у меня для примера указан тип string
. Но тип этого поля должен быть таким же, как у значений вторичного индекса в таблице с данными.
Мы сразу сделали таблицы индексов шардироваными. Для Tarantool в шардированных таблицах нужно дополнительное поле bucket_id
, куда записывается идентификатор бакета для возможности делать ребалансировку при необходимости. Вычисляется бакет на основе значения вторичного ключа (поле secondary_key
).
В поле refs
лежит массив бакетов, в которых находятся записи с соответствующим значением вторичного ключа.
Таблица с записями глобального индекса будет выглядеть примерно следующим образом:
secondary_key |
bucket_id |
refs |
---|---|---|
Иерузалимски |
23414 |
[23123, 14235, 4123] |
Россум |
1001 |
[9022] |
Алгоритм выглядит в такой реализации достаточно простым: при получении запроса по вторичному ключу мы по переданному значению ищем в таблице индексов нужную запись. Берём из поля refs список бакетов исходной таблицы и дальше идём в них, чтобы по локальному вторичному индексу получить нужные записи.
Первая мина, с которой столкнулись, достаточно очевидна: в поле refs
может оказаться слишком много записей, и тогда мы либо упадём из-за того, что Lua в Tarantool не сможет вообще работать с этим полем, либо, если подкрутим лимиты, ухудшим производительность.
Изначально было оговорено, что вторичные индексы должны быть с высокой степенью кардинальности, то есть достаточно уникальными. Расчёт был на то, что будет не больше пары дублей на всю таблицу. Полагаться на это, конечно, было довольно наивно. На самом деле нам повезло, и эту проблему мы отловили ещё до продакшена благодаря тестировщику, который в своих нагрузочных тестах забыл сделать генерацию уникальных значений.
Решение проблемы нашлось достаточно быстро. Можно убрать поле refs
и вместо него добавить поле entry_bucket_id
, сделав его частью первичного ключа. В итоге в таблице глобального индекса теперь хранится не одна запись для каждого значения вторичного индекса, а столько записей, сколько уникальных строк с этим значением хранится в исходной таблице. При поиске мы из таблицы индекса выбираем все записи, соответствующие нужному вторичному ключу, и также собираем из них бакеты.
После этого изменения таблица индексов из примера выше превратилась примерно в такую:
secondary_key |
bucket_id |
entry_bucket_id |
---|---|---|
Иерузалимски |
23414 |
23123 |
Иерузалимски |
23414 |
14235 |
Иерузалимски |
23414 |
4123 |
Россум |
1001 |
9022 |
Грабли вторые. Хранение и производительность
Для приложения мы используем Tarantool. Tarantool — это однопоточная база данных. Это значит, что весь код, все запросы выполняются строго последовательно. Другими словами, максимум, что можно выжать из одного инстанса, ограничен одним ядром. Отсюда мы плавно подбираемся к проблеме: наш код, который создаёт и читает записи в таблице индекса, влияет на производительность всех остальных запросов. Решение этой проблемы достаточно очевидное: таблицу глобального индекса можно перенести на отдельные инстансы, которые будут использоваться только для работы с глобальным индексом.
Грабли третьи. Синхронизация
Пока я обходил стороной вопрос того, как вообще записи должны появляться в глобальном индексе. На самом деле есть, как обычно, два пути: надёжный и производительный.
Начнём с надёжного. Под надёжностью я подразумеваю, что индекс всегда полностью консистентен с данными. Чтобы этого добиться, нужно использовать синхронную запись в таблицу индекса. Но на этом пути встаёт главный вопрос, который придётся решить: как что-то синхронно записать в распределённой системе. Можно, наверное, воспользоваться двухфазным коммитом или вариацией паттерна Saga.
Сразу отмечу, что в каждом из случаев нужно не забыть учесть и обработать записи, которые могли повиснуть на половине процесса. Но решение этого вопроса выходит за пределы этой статьи.
Вариант второй: производительный. Это подход с асинхронным обновлением индекса. На самом деле он тоже вполне надёжный, но консистентен лишь в конечном счёте. То есть индекс и таблица записей всегда стремятся к тому, чтобы прийти в согласованное состояние. Однако в реальном мире это произойдёт, только если запись в кластер полностью остановить. Аналогичный подход использован, например, в DynamoDB от Амазона.
Мы пошли по второму пути. Для нас производительность записи очень важна, а появление созданных данных в API не сразу допустимо. Именно поэтому я расскажу об этом пути подробнее. Общая схема синхронизации выглядит примерно как-то так:
Для работы этого способа понадобится дополнительная таблица. Если говорить о Tarantool, таблица должна быть ещё и с таким же движком (специфичный для Tarantool тип таблиц: memtx
или vinyl
), как все остальные таблицы, на которых есть глобальные индексы. Если существуют таблицы двух типов, понадобится и две дополнительные таблицы событий обновлений индекса. Это нужно потому, что в Tarantool нет транзакций, работающих между разными типами таблиц (в одной транзакции нельзя записать одновременно и в memtx
, и в vinyl
таблицы).
Принцип такой: пользователь меняет какую-то запись в кластере (создаёт, удаляет, обновляет). Мы вместе с этим действием в одной транзакции создаём запись (или две записи в случае обновления, как на схеме выше) в отдельной таблице. Эта запись фиксирует изменение, которое нужно применить на глобальном индексе. Структура таблицы с записями будет примерно следующая:
{
{ name = "bucket_id", type = "unsigned" }, -- такой же бакет, как у исходной записи
{ name = "data_table_name", type = "string" }, -- имя таблицы с данными
{ name = "secondary_index_name", type = "string" }, -- имя вторичного индекса
{ name = "secondary_key", type = "string" }, -- значение вторичного ключа
{ name = "operation", type = "string" }, -- создание или удаление записи в индексе
}
Эта таблица тоже шардирована, и шардирована тем же бакетом, что и исходная запись, то есть от первичного ключа. Потом этот бакет будет использован при сохранении записи глобального индекса как значение поля entry_bucket_id
. Таким образом, все записи в этой таблице будут храниться на том же шарде, что и связанные данные.
У нас получается своего рода очередь событий, которые нужно последовательно вычитать и отправить на нужный инстанс индекса. Инстанс определяется по бакету, а бакет вычисляется из вторичного ключа. Дальше всё просто: вычитываем последовательно события из таблицы и отправляем в глобальный индекс.
И тут мы подошли к новым граблям. Я выше написал «вычитываем последовательно», но что значит последовательно — непонятно. У нас нет никакого поля, по которому можно было бы отсортировать наши события в таблице. Поле нам это нужно добавить. Первое, что обычно в таких случаях приходит на ум, — использовать время. После чего по нему можно будет спокойно сортировать. Но в распределённых системах время — это очень подлый друг. Об это сломано уже много копий и написано немало статей — например, можно окунуться в статью Лесли Лэмпорта Time Clocks and the Ordering of Events in a Distributed System. В общем, при использовании времени в распределённых системах всплывает сразу очень много проблем. К примеру, время может быть сейчас текущее, а через секунду, из-за сброса часов сервера, в следствии синхронизации по NTP, в прошлом. Или в одну единицу времени может произойти несколько событий (да, можно выбрать наносекунды и сказать, что вероятностью можно пренебречь). А ещё может произойти ребалансировка бакетов, и часть записей перетекут на другой инстанс, где время может отличаться, и хорошо если оно будет в будущем, но может оказаться и в прошлом. В кластерной системе нет точного времени (с точностью до миллисекунд), так как на каждом узле часы либо немного спешат, либо немного отстают. В общем, время использовать не стоит.
Второе очевидное решение — использовать какой-то монотонно возрастающий счётчик. Благо Tarantool одарил нас такой прекрасной функциональностью — к сожалению, полностью нерабочей в контексте распределённых систем. Счётчик монотонно возрастает в рамках одного узла, но у нас всегда есть вероятность ребалансировки бакетов. И после неё мы можем получить конфликт значений. Ниже я попробую чуть подробнее показать, как это выглядит.
До ребалансировки:
shard 1 |
shard 2 |
---|---|
row A (counter = 1) |
row X (counter = 1) |
row B (counter = 2) |
row Y (counter = 2) |
row C (counter = 3) |
row Z (counter = 3) |
После ребалансировки записи A и B переезжают на второй шард:
shard 1 |
shard 2 |
---|---|
row C (counter = 3) |
row X (counter = 1) |
row Y (counter = 2) |
|
row Z (counter = 3) |
|
row A (counter = 1) |
|
row B (counter = 2) |
Теперь, если мы начнём сортировать таблицу по полю counter на втором шарде, мы, вероятно, нарушим порядок событий.
Чтобы решить эту проблему, нам понадобится ещё одна шардированная таблица — таблица счётчиков. Фактически эта таблица будет хранить соответствие бакет — счётчик, для каждого бакета мы будем поддерживать отдельный счётчик и порядок событий. В таком случае мы будем гарантировать порядок только в рамках бакета. Но это не проблема, забегая вперёд скажу, что для предотвращения разных краевых случаев упорядочивания событий нам нужно сохранять порядок для событий, касающихся только одной и той же записи, а каждая запись всегда жёстко привязана к своему бакету, который никогда не меняется.
Теперь мы завели таблицу. Каждый раз при создании нового события, в одной транзакции с этим созданием мы самостоятельно увеличиваем на единицу счётчик соответствующего бакета и записываем новое значение этого счётчика в событие. В контексте Tarantool важно не забыть добавить поле со значением счётчика в первичный индекс таблицы событий, чтобы Tarantool автоматически сразу сортировал по нему записи. Всё, порядок восстановлен, теперь мы можем производить выборку из таблицы событий записи пачками по бакетам и аккуратно последовательно отправлять их в инстансы индекса.
Уже на инстансах индекса мы должны проверить, что значение счётчика больше, чем было в последнем обновлении индекса конкретной записи. Для этого значение счётчика, с которым выполняется обновление индекса, нужно положить в запись индекса.
Схема таблицы индекса теперь выглядит примерно так:
{
{ name = "secondary_key", type = "string" },
{ name = "bucket_id", type = "unsigned" },
{ name = "entry_bucket_id", type = "unsigned" },
{ name = "event_id", type = "unsigned" },
}
Так, в записи будет лежать счётчик в поле event_id
, с которым она была последний раз обновлена. А все обновления, которые придут в индекс со значением меньше, чем последний сохранённый, будут отброшены, потому что последний счётчик всегда содержит самое актуальное состояние записи.
Как это примерно выглядит:
Сейчас система почти полностью надёжна. Осталось учесть один кейс. Мы храним в записи индекса счётчик, с которым она была обновлена. Но что, если нам придут два следующих события:
Событие 1: создание записи <- Тут мы соответственно должны создать запись индекса и записать в неё счётчик.
Событие 2: удаление записи <- Тут мы должны удалить запись, которую создали на прошлом событии.
Но допустим, подвис инстанс, где изначально хранились события, подвис в момент отправки этих событий, фейловер автоматически переключил мастера. Новый мастер начал снова отправлять события. А старый проснулся и доотправил те, которые успел вычитать, но ещё не отправил. В итоге эти два события пришли в обратном порядке. Тогда мы на событие удаления не сделаем ничего, так как записи в индексе ещё нет. А на событие создания эту запись после создадим, так как счётчик сравнивать не с чем, записи-то нет.
Чтобы обработать этот кейс, нам следует использовать так называемые надгробия (Wiki). Если коротко, то надгробия — это механизм, применяемый в разных ситуациях, когда нам нужно сохранить информацию про удаление. Идея простая: если записи нет, мы её создаём, но помечаем как удалённую. Дополнительно, если запись есть, мы её всё равно не удаляем сразу, а помечаем для удаления. Эту идею мы реализовали достаточно просто, добавив в запись индекса ещё одно поле — delete_after
. В случае удаления записи заполняем это поле текущим временем, прибавив к нему час. Мы принимаем допущение, что события не могут перепутаться и отправиться с разницей больше часа. Но, конечно, тут всегда что-то может пойти не так. Можно использовать время больше часа, но нужно учесть, что индекс будет содержать записи, которые уже не используются, но занимают место.
Ещё один момент, о котором желательно не забыть, — при поиске записи в индексе для получения бакетов нужно отфильтровать все записи, у которых заполнено delete_after
поле.
Саму же очистку удалённых записей можно организовать достаточно просто, запустив отдельный фоновый процесс: он будет периодически выбирать записи из таблицы индекса, у которых delete_after
меньше текущего времени. Чтобы не фулсканить, на это поле неплохо будет повесить индекс.
Грабли четвёртые. Снова хранение
Тут речь даже не о граблях, а так, о грабельках. Мы можем захотеть повесить глобальный индекс на какое-нибудь опциональное поле, то есть на поле, значением которого может быть null. А null — это вполне себе такое настоящее значение. И под каждую отдельную запись данных, у которой будет null в значении вторичного индекса, мы будем вынуждены создавать отдельную запись в глобальном индексе. Об этом стоило бы подумать, и мы подумали, однако не обработали эту ситуацию сразу. Но тестировщик, который на этот раз вообще забыл сгенерировать значения для полей вторичных индексов, нас подтолкнул решить проблему сразу. После тестов он увидел что создано всего 200 записей глобального индекса, когда разных объектов было нагенерировано несколько миллионов. Дело в следующем: он смотрел количество записей только на одном инстансе глобального индекса. Если бы он посмотрел на другой, то увидел бы несколько миллионов записей там. Все записи попали на один инстанс. И это нормально, так как инстанс выбирается исходя из значения вторичного индекса, а значение у всех записей одинаковое — null. Вот записи индекса и легли все дружно в один инстанс.
Решение тут простое и эффективное, как топор: мы добавили функциональность, которая позволяет включить игнорирование нулов. То есть, если у записи в значении индексируемого поля null, эта запись не будет проиндексирована в глобальном индексе. Соответственно, эту запись и найти не получится, используя индекс. Но, если оставить индексирование нулевых значений, мы можем получить ситуацию, когда скатимся в запросы во все бакеты. В тот самый мапредьюс, который ударит по производительности всего кластера.
Включение индекса на уже существующих данных
Выше разобрались с тем, как поддерживать глобальный индекс консистентным с течением времени. Но в жизни не всегда заранее можно определить конечный список вторичных индексов, которые понадобятся в ходе эксплуатации.
Тут, в общем, подходы могут быть разными. Но главный момент будет общим: нам надо как-то построить индекс для данных, которые были сохранены до его включения. Решением стало создание специальной ручки для эксплуатации, которую можно дёрнуть, и индекс пойдёт перестраиваться для всей таблицы, используя тот же самый механизм синхронизации. Можно было бы сделать процесс автоматическим, но мы хотели получить больший контроль над началом процесса синхронизации.
Принцип работы следующий:
Администратор системы включает глобальный индекс на поле нужной таблицы. С этого момента все новые изменения будут синхронизированы с индексом автоматически. После запускает команду синхронизации индекса.
Команда первым делом создаёт отдельную таблицу, которая будет хранить прогресс выполнения синхронизации. Это нужно просто для того, чтобы персистентно хранить прогресс, чтобы в случае, например, рестарта или сбоя этот процесс не нужно было бы начинать сначала.
После того как таблица была создана, запускается фоновый процесс, который фулсканит всю таблицу с данными и для каждой записи из таблицы создаёт событие для обновления глобального индекса, сохраняя постоянно свой прогресс в таблицу прогресса.
Когда процесс дойдёт до конца таблицы с данными, он очистит таблицу прогресса и завершит работу.
Во время работы процесса администратор системы может получать прогресс выполнения через отдельный специальный API системы, также может прервать процесс и возобновить в случае сбоя. При возобновлении будет использована таблица прогресса, чтобы «вспомнить» последнюю обработанную позицию курсора в таблице.
Мониторинг
Мы написали кучу всякой машинерии, теперь это всё как-то нужно наблюдать. Для всех компонентов мы стараемся использовать методику REDS. Если коротко, эта методика подразумевает, что мониторится общее количество операций в системе (R), количество операций, завершённых с ошибками (E), время обработки операций (D) и насыщение системы (S). Поэтому мы выбрали следующие метрики: количество обработанных событий синхронизации за единицу времени, процент ошибок, тепловая карта скорости обработки, длина очереди необработанных событий.
Из перечисленных остановлюсь только на одной, самой интересной — тепловой карте. Она показывает время обработки от попадания в очередь события до создания записи в индексе. Чтобы эту метрику собрать, в запись события мы дополнительно добавили поле, в котором храним время создания события. При обработке этого события на инстансе глобального индекса вычисляется, сколько прошло времени с момента создания, и записывается в метрики.
Есть ещё две важные метрики, связанные с кардинальностью индексов: амплификация запросов по глобальному вторичному индексу и количество неуникальных записей.
Под амплификацией я имею в виду то, во сколько шардов нам понадобится сходить, чтобы собрать запрошенные пользователем данные. Эту метрику посчитать довольно просто: на роутере при запросе бакетов из глобального индекса мы можем посчитать количество бакетов и отдать его в метрику. Либо чуть умнее — посчитать количество шардов, на которых лежат эти бакеты, ведь некоторые бакеты могут лежать на одном и том же шарде, следовательно, несколько раз на один и тот же шард ходить не нужно. Можно всё вытащить одним запросом, а в метрике нам интересно именно количество запросов в шарды.
А вот последняя метрика — это боль. Чтобы как-то посчитать количество записей, привязанных к одному вторичному ключу, может быть много подходов:
Самый простой — в лоб фулсканить всю таблицу индекса и считать количество записей по каждому вторичному ключу. Но это будет работать только для очень маленьких таблиц.
Можно немного улучшить предыдущий вариант: создать фоновый процесс, который будет бесконечно фулсканить таблицу индекса по кругу и считать метрику. Чем актуальнее мы захотим от него результатов, тем чаще он должен выполнять сканирование. Следовательно, сильнее своей работой будет влиять на работу инстанса.
Можно зайти с другой стороны — создать таблицу, где для каждого значения вторичного ключа хранить количество связанных с ним записей в индексе и обновлять вместе с обновлением индекса. Но если вторичный индекс будет полностью уникальным, размер этой таблицы будет равен таблице индекса, как минимум по количеству записей.
Создать отдельную таблицу для хранения количества записей, но хранить количество по группам. Об этом способе дальше.
Чтобы не хранить таблицу счётчиков, равную таблице индекса, можно хранить в ней группы (на самом деле это никакие не группы, это я тут выдумал этот термин. В терминологии мониторинга это обычно называется бакетами, но у нас уже есть термин бакет в контексте шардирования, и, чтобы не путаться, я буду тут называть бакеты мониторинга группами). Чтобы было проще, начнём со структуры таблицы.
{
{ name = "le", type = "string" },
{ name = "bucket_id", type = "unsigned" },
{ name = "value", type = "unsigned" },
}
le
— можно понимать как less or equal. Тут указана та самая группа для мониторинга. Может принимать, например, одно из следующих значений:1
,3
,10
,40
,100
,+Inf
. Дальше будет понятнее, что это значит.bucket_id
— чтобы счётчики оставались правильными даже после ребалансировки, их надо считать в рамках бакета, а не шарда. Поэтому для каждого бакета будет создано по одной записи для каждой группы. Общее количество записей в таблице можно посчитать как количество групп, умноженное на количество бакетов на одном шарде. Это, конечно, если хранить группы с нулевыми значениями, а делать это необязательно.value
— это количество записей, которые относятся к этой группе (другими словами, если у нас группа1
, то в этом поле будет лежать количество записей, у которых полностью уникальное значение вторичного индекса).
Как это всё считать. Достаточно просто, но с потерей производительности. При обновлении глобального индекса мы должны сделать селект по вторичному ключу и посчитать количество вернувшихся записей. После чего вычислить правильную группу для метрики и увеличить на единицу значение value у нужной группы в таблице счётчиков (ну либо отнять, если запись удаляли, а не добавляли). Но ещё надо не забыть про переход между группами. Например, если до добавления запись формально принадлежала к группе 1
, то после добавления она уже будет принадлежать к следующей группе 3
. А это значит, что у группы 3
мы value должны увеличить, а у группы 1
уменьшить.
В общем, этот мониторинг — это, прямо говоря, непросто. Лично для себя мы решили, что не очень-то нам эта последняя метрика и нужна, и делать её не стали совсем, потому что для общего понимания кардинальности индекса можно посмотреть на размер таблицы индекса на разных шардах. Если там будут перекосы значит, скорее всего, в индексах много дублей. По метрике амплификации запросов мы можем ловить индексы с низкой кардинальностью, по которым выполняются запросы, но она не показывает неуникальные индексы, которые просто лежат в таблицах и по ним ничего не запрашивают. Тем не менее мы смогли принять эту жертву.
Принятие
Продолжая тему принятия, обобщу все ограничения, которые существуют у нашей реализации глобальных индексов.
Индекс строится асинхронно. Это значит, что данные после записи видны по вторичному индексу не сразу. При нормальной работе системы синхронизация происходит за несколько десятков миллисекунд. Но при сбоях время никак не ограничено сверху, всё зависит от характера сбоя.
При включении индекса на существующих данных все записи после включения будут попадать в индекс почти сразу (как я написал в предыдущем пункте), но данные, которые до этого содержались в базе, будут синхронизироваться в фоне и окажутся не видны до тех пор, пока не попадут в индекс. Этот процесс может занять продолжительное время, всё зависит от количества данных.
Процесс построения индекса для старых данных создаёт дополнительную нагрузку на систему. Если её запустить на нагруженном под завязку кластере, это замедлит и штатную синхронизацию индексов для новых записей.
Низкая кардинальность глобального индекса при самом худшем сценарии приведёт к запросам по всем шардам, то есть к мапредьюсу. Чтобы это предотвратить, в системе введён лимит: не больше 100 записей в запросе по глобальному индексу. В нашем случае такой лимит уместен, потому что индексы должны быть с высокой степенью кардинальности, иначе горизонтальное масштабирование кластера перестанет работать, а система — укладываться в SLA.
Каждая запись глобального индекса требует дополнительного места для хранения. И так как таблица индексов сама тоже проиндексирована локальным индексом, то и он требует места. В нашем конкретном кейсе одна запись данных весит около 6 Кб. А одна запись индекса для неё — около 0,1 Кб.
Для поддержания схемы нам нужны дополнительные таблицы: таблица счётчиков для поддержания порядка событий, таблица буфера (возможно, две, если используются разные движки), по одной отдельной таблице на каждый глобальный вторичный индекс, таблица прогресса синхронизации, если пользовались командой синхронизации.
Записи событий синхронизации индекса занимают место, пока процесс синхронизации их оттуда не удалит. Это может привести к тому, что память на узлах хранения закончится — например, в случае сбоя, в результате которого синхронизация перестанет работать.
Вместо заключения будет реклама
Мы в компании разрабатываем свой велосипед свою базу данных. Называется оригинально — Picodata. В отличие от Tarantool уже скоро там будет встроенная поддержка глобальных индексов, которые будут использоваться в распределённом SQL. О самой базе мы писали тут, а про реализацию SQL — тут. Может быть, когда глобальные индексы попадут в релиз, о них напишем тоже.
Спасибо всем, кто нашёл в себе силы дочитать эту статью до конца.
Комментарии (2)
yonesko
29.05.2024 19:17По поводу метрики о не уникальности значений бакета.
Это можно сделать приблизительно достаточно легко.
Можно поддерживать структу данных в памяти, типо "Find top k (or most frequent) numbers in a stream". Можно использовать вероятностную структуру которая не будет хранить все данные.
Дальше делим количество топ 10 на общее
Biga
Отличная статья!
Решения вроде как простые и известные, и грабли многие известные. Но опыт все равно интересный, и написано хорошо.