Мечтал ли я когда-либо ворочать миллиардами? Честно признаюсь, да. И нельзя сказать, что Вселенная меня не услышала. Вот только я никак не имел в виду миллиарды записей в базе данных...

Ранее я уже писал о нашем опыте использования ScyllaDB в качестве архивного хранилища. Разумеется, исследования и открытия, связанные с новой базой данных, для нас на этом не закончились... Создавая архив для данных, вероятность обращения к которым близка к нулю, мы, конечно, допускали, что время от времени клиенты будут просить вернуть данные обратно в оперативное хранилище. Но запрос на извлечение из архива сразу всех записей стал для нас неожиданностью. Хорошо ещё, что клиента интересовал только ключ записи, а не вся запись целиком. Тем не менее достать 10 млрд. ключей из ScyllaDB за приемлемое время звучало как челлендж. Ну надо так надо.

Постановка задачи

Информацию по ключам из архива предполагалось использовать в аналитических целях. Способ предоставления этой информации также был задан клиентом: файлы, много файлов с ключами. При этом к алгоритму выгрузки предъявлялись определенные требования:

  • Алгоритм должен быть как можно более быстрым. В этих целях стоило рассмотреть возможность распараллеливания алгоритма.

  • Алгоритм должен уметь сохранять своё состояние между вызовами, чтоб в случае остановки (как управляемой, так и аварийной) и перезапуска, не начинать всю работу заново.

Таблица, из которой планировалось извлекать ключи, выглядела так (рис. 1):

Рис. 1
Рис. 1

Готовые решения

Казалось бы, чего мудрить? Открываешь cqlsh и пишешь:

SELECT key FROM archive.products;

Но есть нюанс. Ключей-то в таблице — миллиарды. Запрос такой будет исполняться не один день. Ну как, отвалится сервер на половине решенной задачи, и начинай всё заново. Да и с результатом такой выборки что делать? В файл бы его, да лучше не в один большой, а во много маленьких, чтоб читать-передавать проще было. В общем, чистый CQL — Cassandra Query Language — не вариант.

Из готовых решений нашлось одно — утилита dsbulk — написанная на Java надстройка над CQL. dsbulk из коробки включает распараллеливание и выгрузку. Количество потоков для распараллеливания определяется как N * C, где С — количество ядер на рабочей машине, а N — конфигурируемый параметр, по умолчанию равный 0.5. Каждый поток пишет данные в свой файл. Правда, есть нюанс: параметр N и максимальное количество записей в файле (по умолчанию, -1 — безлимит) можно поменять через опции -maxConcurrentFiles и -maxRecords соответственно, но только при запуске в режиме коннектора. Также распараллеливание включается не для всех типов запросов. Так, например, запрос вида:

./dsbulk unload -query "select key from archive.products"

выполняется в однопоточном режиме. Для задействования многопоточности укажем выгрузку по диапазону токенов, как указано в примере из официальной документации:

./dsbulk unload -query "select key from archive.products where token(key) >= -9223372036854775808  and token(key) <= 9223372036854775807" -url results -delim "\t" -maxErrors 1%

Для такого запроса на 16-ядерной машине утилита отработала без проблем, выгрузив ключи в 8 отдельных файлов Но обнаружился и существенный минус: если операция прерывается, например, из-за недоступности сервера Сциллы, выгрузку придётся начинать сначала.

Кстати, ещё одна, казалось бы, более специализированная для целевой базы данных утилита — scylla bulk loader, к сожалению, умеет только загружать данные в Сциллу, но никак не выгружать.

Безальтернативность dsbulk и отсутствие возможности приостанавливать выгрузку и возобновлять её не с начала, а с какого-то фиксированного состояния, побудила нас написать своё решение. Хотя бы чтоб было с чем сравнивать производительность dsbulk.

Какие особенности Сциллы следовало при этом учесть, чтоб решение получилось как можно ближе к оптимальному? Давайте разберёмся.

Виртуальные узлы Сциллы

Ключи в Сцилле, а точнее — соответствующие этим ключам токены, группируются по диапазонам (рейнджам), так что каждый диапазон лежит на своём узле в кластере (всего у нас 5 узлов). Получается, если выделить работу с отдельным диапазоном в отдельный поток, то каждый поток будет нагружать отдельный узел кластера... Да, но нет. Дело в том, что Сцилла реализует концепцию виртуальных узлов (VNodes). И как раз между этими виртуальными узлами и распределяются диапазоны токенов. При этом виртуальные узлы в свою очередь распределяются между физическими узлами так, что, например, смежные диапазоны токенов могут оказаться как на одном, так и на разных физических узлах. Да, и не стоит забывать про фактор репликации, в нашем случае равный трём. Получается такая схема узлов (по мотивам оригинала) (рис. 2)

Рис. 2
Рис. 2

Чтобы увидеть, как это распределение выглядит в реальности, выполняем на одном из серверов кластера команду:

nodetool describering <keyspace>

Получаем набор диапазонов, где отдельный диапазон может выглядеть, например, так:

TokenRange(start_token:-8955022471896900761, end_token:-8926088324259165954, endpoints:[192.168.1.5, 192.168.1.3, 192.168.1.7], rpc_endpoints:[192.168.1.5, 192.168.1.3, 192.168.1.7], endpoint_details:[EndpointDetails(host:192.168.1.5, datacenter:datacenter1, rack:rack1), EndpointDetails(host:192.168.1.3, datacenter:datacenter1, rack:rack1), EndpointDetails(host:192.168.1.7, datacenter:datacenter1, rack:rack1)])

Вообще, количество виртуальных узлов на одном физическом определяется параметром num_tokens в конфигурационном файле scylla.yaml. По умолчанию значение num_tokens равно 256. В кластере из 5 серверов (физических узлов с 64 CPU на каждом) получаем 256 * 5 = 1280 виртуальных узлов, т.е. 1280 диапазонов токенов. Поскольку для расчёта значения токена по ключу Сцилла использует функцию MurmurHash3 с 64-битными значениями хэша, то возможные значения токенов варьируются от -2^63 до 2^63 - 1. Небольшую неприятность может доставить тот факт, что диапазоны токенов формируют цикл, так что границы одного из диапазонов будут выглядеть так:

start_token:9213098081152187938, end_token:-9206933700428716886

Если эти границы с переходом через максимальное 64-битное целое указать в типовом cql-запросе:

select key from archive.products where token(key) >= 9213098081152187938 and token(key) <= -9206933700428716886;

естественно, по нему ничего не вернётся. И правда, какое число может быть одновременно больше некоторого положительного, но меньше некоторого отрицательного целого. CQL эту вашу кольцевую архитектуру не понимает. Поэтому разделим такой диапазон на два: от -9223372036854775808 до -9206933700428716886 (-2^63) и от 9213098081152187938 до 9223372036854775807 (2^63 - 1), – и  каждый будем рассматривать отдельно.

Учитывая, что Сцилла реализует shard-per-core архитектуру, а виртуальный узел по факту и является шардом, адресуя в каждом запросе конкретный диапазон токенов, мы будем загружать этим запросом конкретное ядро. Такая точечная нагрузка при отправке множества параллельных запросов по диапазонам позволит как можно более равномерно задействовать все физические ядра кластера. Поскольку виртуальных узлов на сервере всё равно больше, чем физических ядер, часть запросов будет приходиться на одни и те же ядра, но с этим уже придётся смириться. 

Своё решение

Определившись, по какому принципу проводить распараллеливание, приступаем к реализации алгоритма выгрузки. На входе у нас будет файл с 1281-м диапазоном токенов. Этот файл мы получим,  применив команду nodetool describering archive, сформировав на основе её выходных данных список из 1280 пар start_token:end_token и разделив пару 9213098081152187938:-9206933700428716886 на две: -9223372036854775808:-9206933700428716886 и 9213098081152187938:9223372036854775807.

Алгоритм реализуем на Python, поскольку ранее использовали именно python-драйвер для Сциллы и знаем, что он вполне может обеспечить необходимую производительность, не усложняя саму разработку. Распараллеливание осуществим посредством процессов. А как иначе, если это Python? Создаём пул процессов обработчиков на базе multiprocessing.Pool. В главном процессе идём по набору диапазонов и закидываем каждый диапазон в пул. Обработчик в пуле принимает уникальный диапазон, отправляет запрос к Сцилле на получение ключей из этого диапазона и начинает итерироваться по полученным ключам, записывая их в файл. Этот файл сначала создается с меткой временного (например,  в виде префикса tmp_) и указанием одной из границ диапазона (например, верхней) в имени. Когда все ключи из диапазона успешно записываются в файл, убираем метку временного из названия файла, фактически обозначая его как законченный. Схема алгоритма приведена на рис. 3.

Рис. 3
Рис. 3

Такой порядок позволит нам даже в условиях распараллеливания отличать успешно обработанные диапазоны от тех, чья обработка завершилась с ошибкой и не была доведена до конца. Если вдруг программа аварийно завершится, мы сможем её перезапустить, предварительно исключив из исходного файла с диапазонами те диапазоны, для которых существуют законченные файлы. И нам не придётся повторно вытаскивать одни и те же ключи. Правда, точность здесь — до диапазона.

При реализации этого алгоритма следует учесть ряд нюансов:

  1. Для оперативного задействования освобождающихся процессов мы закидываем диапазоны в пул при помощи метода apply_async(). Чтоб при этом не загружать RAM, пока все процессы заняты (а это может быть надолго, т.к. диапазоны большие), мы приостанавливаем чтение диапазонов из файла, если размер очереди задач в пуле превышает заданное ограничение (должно быть не меньше размера пула). Размер очереди задач можно проверять по длине атрибута _cache у объекта пула.

  2. При записи в файл, чтоб его размер не вырастал до неимоверного, вводим ограничение на количество записей (ключей) в файле. Если количество записей достигает ограничения, создаём новый файл с таким же именем + номер файла, начиная с 1, и далее продолжаем запись уже в этот новый файл. Главное, не забыть убрать метки временных со всех файлов, относящихся к обрабатываемому диапазону. Также стоит предусмотреть свободное место на диске, достаточное для хранения всех выгруженных ключей.

  3. Выборка ключей, принадлежащих заданному диапазону — операция длительная. А таймаут по умолчанию на неё — range_request_timeout_in_ms — всего-то 10000. Стоит скорректировать его значение в scylla.yml — конфигурационном файле Сциллы — на всех машинах и перезагрузить сервис. Поскольку мы не собирались параллельно с выгрузкой нагружать Сциллу другими конкурирующими задачами, то смело указали 3600000 (подобрав значение эмпирическим путём), чего нам и хватило с запасом.

Далее, помещаем код главного процесса и код обработчиков в класс для удобного пользования общими параметрами. И в итоге получаем примерно такой скрипт:

import multiprocessing.pool as mp_pool
from pathlib import Path

from cassandra.cluster import Cluster


class LimitedMultiprocessingPool(mp_pool.Pool):
    def get_pool_cache_size(self):
        return len(self._cache)


class ScyllaKeysLoader:
    TMP_PREFIX = "TMP"

    scylla_cluster = None
    scylla_session = None

    def __init__(
        self,
        scylla_servers,
        scylla_keyspace,
        in_file_name,
        out_dir_name,
        max_lines_per_file,
    ):
        self._scylla_servers = scylla_servers
        self._scylla_keyspace = scylla_keyspace
        self._in_file_name = in_file_name
        self._out_dir_name = out_dir_name
        self._max_lines_per_file = max_lines_per_file

    def worker(self, tokens_range):
        # В каждом из форкнутых процессов будет своё подключение к Сцилле
        # на всё время существования процесса
        if ScyllaKeysLoader.scylla_cluster is None:
            ScyllaKeysLoader.scylla_cluster = Cluster(self._scylla_servers)
            ScyllaKeysLoader.scylla_session = (
                ScyllaKeysLoader.scylla_cluster.connect(
                    self._scylla_keyspace
                )
            )
        scylla_session = ScyllaKeysLoader.scylla_session

        token_start, token_end = tokens_range
        out_file_name = f"{self.TMP_PREFIX}_{token_start}"
        file_num = 0

        # Открываем для записи новый файл с префиксом TMP
        out_file = open(
            Path(self._out_dir_name, f"{out_file_name}_{file_num}"),
            "w",
            buffering=1024,
        )
        file_cur_line_num = 0

        prepared = scylla_session.prepare(
            "SELECT key FROM products WHERE token(key) > ? and token(key) <= ?"
        )
        cursor = scylla_session.execute(
            prepared, (token_start, token_end)
        )
        for record in cursor:
            # Проверяем количество строк в файле на превышение лимита
            if file_cur_line_num >= self._max_lines_per_file:
                # Текущий файл закрываем
                out_file.close()
                # Новый файл открываем, увеличивая номер в имени файла
                file_num += 1
                out_file = open(
                    Path(self._out_dir_name, f"{out_file_name}_{file_num}"),
                    "w",
                    buffering=1024,
                )
                file_cur_line_num = 0
            # Записываем ключ в файл
            out_file.write(f"{getattr(record, 'key')}\n")
            file_cur_line_num += 1
        out_file.close()

        # Убраем из имени у всех успешно записанных файлов
        # префикс TMP - отметку временного файла
        num = 0
        while num <= file_num:
            file_name = f"{token_start}_{num}"
            tmp_file_path = Path(
                self._out_dir_name, f"{self.TMP_PREFIX}_{file_name}"
            )
            tmp_file_path.rename(tmp_file_path.with_name(file_name))
            num += 1

    def load_keys(self, pool_size):
        queue_size = pool_size * 2

        with LimitedMultiprocessingPool(processes=pool_size) as pool:
            with open(self._in_file_name) as in_file:
                for line in in_file:
                    while pool.get_pool_cache_size() >= queue_size:
                        # Здесь можно предусмотреть sleep
                        pass

                    token_start_str, token_end_str = line.strip("\n").split(":")
                    tokens_range = (int(token_start_str), int(token_end_str))
                    pool.apply(self.worker, (tokens_range,))

Для краткости из этого примера убрана обработка всевозможных исключительных ситуаций. Также здесь не предусмотрена возможность переподключения и повторной отправки запроса к Сцилле на случай разрыва соединения с сервером. Но логику нашего алгоритма этот код вполне отражает.

Сравнение с dsbulk: кто быстрее

В тесте на 8-ми процессах для 70 млн. ключей наш алгоритм отработал за ~5,6 часа, показав скорость выгрузки ~12 млн. ключей в час.

dsbulk на 8-ми потоках для тех же 70 млн. ключей отработал за ~12,7 часа, показав скорость выгрузки ~5,4 млн. ключей в час.

Детали сравнения приведены в таблице (таб. 1):

Таб. 1
Таб. 1

Таким образом, обойдя в тесте dsbulk больше чем в 2 раза, наш алгоритм оказался наиболее быстрым и управляемым решением для поставленной задачи.

Что ж? Пора на Прод.

Для чистоты эксперимента все прочие операции со Сциллой (архивация, автокомпактификация, repair) были приостановлены.

Запустили! Скрипт выгрузил 10 млрд. ключей за 14 дней. К нашему великому удивлению Сцилла достойно выдержала нагрузку, даже ни разу не оборвав соединение. Алгоритм отработал безостановочно.

Положительный результат — это тоже результат

Ещё только начиная применять Сциллу в качестве архивного хранилища, мы неоднократно натыкались на многочисленные подводные камни её непредсказуемого поведения, отбив на них всё что только можно. Так что в этот раз к решению новой масштабной задачи, связанной со Сциллой, мы подготовились основательно. Перестраховались, подстелили соломки по всему датацентру, чтоб Сцилле (и нам заодно) не было больно падать. И даже изобрели велосипед, оказавшийся на удивление управляемым и быстрым. В итоге всё сработало без сучков, задоринок и фейлов, о которых так интересно рассказывать. Но, как нам думается, наше мини-исследование кольцевой архитектуры Сциллы и успешное практическое применение его результатов может-таки пригодиться всем рискнувшим доверить свои данные этой своенравной, но весьма любопытной СУБД.

Комментарии (2)


  1. WVitek
    23.06.2023 16:05
    +1

    Какой, интересно, фактический размер выгруженных данных по ключам?
    (интересна скорость выгрузки в байт/сек)


    1. avtozavodetz Автор
      23.06.2023 16:05

      Фактический размер получился небольшой - всего ~260 Гбайт.
      А скорость выгрузки на Проде составила примерно 231481 байт/сек.