В конце прошлого года мы писали о сложном переезде нашего собственного сервиса в новый дата-центр в Детройте. Среди прочих задач мы переносили Clickhouse. Напомню, что речь идет о нагруженном сервисе, который обслуживает десятки серверов, принимающих сотни тысяч запросов с низкой latency в секунду.

В этой статье рассказываем, как мы переносили данные, не имея возможности отключить сервис или воспользоваться автоматической репликацией. 

Объем данных для Clickhouse у нас не такой уж и большой - процесс получился не столько объемный, сколько ресурсоемкий. Но в открытых источниках информации по использованным механизмам минимум, поэтому считайте это руководством к clickhouse-copier утилите (https://github.com/ClickHouse/copier) на конкретном примере со скриптами и командами для запуска.

Самый простой подход

Наш сервис функционирует 24/7, т.е. просто выключить и перенести все копированием нельзя. Мы спланировали переезд, но из-за некоторых ограничений самого Clickhouse (а точнее его схемы репликации) не смогли реализовать его в соответствии с планом.

Изначально мы предполагали, что сможем к каждой шарде в старом дата-центре подключить реплику из нового, подождем синхронизации, ну а потом просто отключим реплики в старом дата-центре. Но из-за географической удаленности мы получили слишком большую задержку передачи данных между дата-центрами, несмотря на скорость в 2-2,5 Гбита. В результате в ZooKeeper, который координировал репликацию, объем данных вырос на порядок. Так что процесс пришлось остановить, иначе он грозил затормозить продакшн. Пришлось подыскивать другие способы переезда.

В базе сохраняются все запросы, приходящие в наш сервис. В Clickhouse мы храним два типа данных:

  • “Сырые данные” за две недели. За это время накапливается около 6 Тб.

  • “Агрегаты” - важные для нас результаты обработки сырых данных. Агрегаты занимают порядка 300 Гб.

Нашим первым решением стало дождаться процесса агрегации данных и перенести в новый дата-центр только их; ну а пока это происходит, запускать новые шарды и переносить сервисы. Так что задача свелась к тому, что необходимо было найти способ перенести данные и не потерять ни байта из тех 300 Гб агрегатов.

Перечень возможных подходов мы нашли в статье от Альтинити: https://kb.altinity.com/altinity-kb-setup-and-maintenance/altinity-kb-data-migration/. В нашем случае было два варианта дальнейших действий: физический перенос или копирование с помощью clickhouse-copier. 

Далее поговорим про каждый из вариантов.

Физический перенос данных

Первый возможный вариант - низкоуровневый (физический) перенос данных.

Clickhouse хранит данные в виде Parts, которые можно физически скопировать с одного сервера на другой в виде файлов. Для этого на исходном сервере надо выполнить подобный запрос и отцепить все Parts последовательно для всех таблиц:

#DETACH PART/PARTITION
ALTER TABLE <DATABASE_NAME>.<TABLE_NAME> DETACH PARTITION|PART <PARTITION_EXPRESSION>

После чего скопировать файлы из директорий ClickHouse_data_dir/<DATABASE_NAME>/<TABLE_NAME>/detached на новый сервер, а потом выполнить обратный запрос:

#ATTACH PART/PARTITION
ALTER TABLE <DATABASE_NAME>.<TABLE_NAME> ATTACH PARTITION|PART <PARTITION_EXPRESSION>

Мы тестировали этот вариант, но в одном из экспериментов у нас не сошлось количество столбцов в таблицах на исходном и конечном серверах. Возможно, что-то некорректно смерджилось. Мы не стали разбираться, а тем более рисковать на проде, и решили использовать более безопасный метод.

Копирование с помощью clickhouse-copier

Второй вариант - копирование на более высоком уровне с помощью утилиты от Clickhouse. В БД на источнике выполняется SELECT, а затем вставляется INSERT-ом с другой стороны. При этом очередь всех своих заданий и прогресс утилита хранит в zookeeper, что исключает проблемы.

В открытых источниках информации по этому пути оказалось крайне мало. Пришлось разбираться с нуля.

Способ показался более надежным, хотя и не без особенностей. Например, оказалось, что если запустить утилиту на стороне источника, то данные в результате копирования не сходятся. Возможно, виноваты были сетевые проблемы - было ощущение, что некоторые фрагменты данных просто не достигают нового дата-центра и утилита это не фиксирует. А вот если запустить ее на стороне приемника, то данные сходились на 100%.

Сам процесс переноса данных одной шарды занимал 3-4 часа, еще около получаса требовалось на различные сопутствующие манипуляции, в частности копирование уже внутри сервера, т.к. данные мы изначально переносили во “временную” таблицу. Выполнить копирование напрямую в продакшн-таблицу мы не могли. Из-за того, что в процессе копирования кластер фактически состоит из машин в двух дата-центрах, в этот момент у нас происходило бы задвоение статистики. Так что мы копировали данные из Майами во временную таблицу в Детройте (преодолевали географическое расстояние), а затем уже внутри дата-центра в Детройте проводили слияние с продакшеном, выполняя вставки по 500-600 млн. столбцов.

Несмотря на предосторожности, мы все же столкнулись с парой инцидентов, когда клиенты вышли за установленные в админке лимиты, поскольку во время копирования у них отображалась неполная статистика. Но суммарно убытки были долларов 20.

Копирование на практике

Процесс копирования выглядит следующим образом.

Для начала работы нам нужно положить в zookeeper данные для утилиты.

zkCli.sh -server localhost:2181 create /clickhouse/copytasks ""

Далее нам нужно создать схему копирования - файл schema.xml.

<clickhouse>
    <!-- Configuration of clusters as in an ordinary server config -->
    <remote_servers>
        <source_cluster>
            <shard>
                <internal_replication>true</internal_replication>
                    <replica>
                        <host>IP</host>
                        <port>9000</port>
                        <user>default</user>
                        <password></password>
                    </replica>
            </shard>
        </source_cluster>

        <destination_cluster>
            <shard>
                <internal_replication>true</internal_replication>
                    <replica>
                        <host>IP</host>
                        <port>9000</port>
                        <user>default</user>
                        <password></password>
                    </replica>
            </shard>
        </destination_cluster>
    </remote_servers>

    <!-- How many simultaneously active workers are possible. If you run more workers superfluous workers will sleep. -->
    <max_workers>1</max_workers>

    <!-- Setting used to fetch (pull) data from source cluster tables -->
    <settings_pull>
        <readonly>1</readonly>
    </settings_pull>

    <!-- Setting used to insert (push) data to destination cluster tables -->
    <settings_push>
        <readonly>0</readonly>
    </settings_push>

    <!-- Common setting for fetch (pull) and insert (push) operations. Also, copier process context uses it.
         They are overlaid by <settings_pull/> and <settings_push/> respectively. -->
    <settings>
        <connect_timeout>3</connect_timeout>
        <!-- Sync insert is set forcibly, leave it here just in case. -->
        <insert_distributed_sync>1</insert_distributed_sync>
    </settings>

    <!-- Copying tasks description.
         You could specify several table task in the same task description (in the same ZooKeeper node), they will be performed
         sequentially.
    -->
    <tables>
        <!-- A table task, copies one table. -->

        <table_hits>
            <cluster_pull>source_cluster</cluster_pull>
            <database_pull>database</database_pull>
            <table_pull>table_local</table_pull>
            <cluster_push>destination_cluster</cluster_push>
            <database_push>database</database_push>
            <table_push>table_local1</table_push>
            <engine>
            ENGINE=ReplicatedSummingMergeTree('/clickhouse/tables/{shard}/ssp_report_common1', '{replica}')
            partition by toYYYYMMDD(sspRequestDate)
            order by (sspRequestDate, dspId, sspRequestCountry, endpointId)
            </engine>
            <sharding_key>rand()</sharding_key>
        </table_hits> 

    </tables>
</clickhouse>

Здесь:

  • секция clickhouse описывает сервера - откуда и куда мы производим копирование;

  • секция tables описывает какие таблицы мы копируем;

  • в table_hits находится само описание процесса.

После этого отправляем этот файл в zookeeper:

zkCli.sh -server localhost:2181 create /clickhouse/copytasks/description "`cat schema.xml`"

Далее переходим на сервер Clickhouse и создаем файл zookeeper.xml, необходимый для работы утилиты:

<clickhouse>
    <logger>
        <level>trace</level>
        <size>100M</size>
        <count>3</count>
    </logger>

    <zookeeper>
        <node index="1">
            <host>127.0.0.1</host>
            <port>2181</port>
        </node>
    </zookeeper>
</clickhouse>

Ну и запускаем командой:

clickhouse-copier --config-file=zookeeper.xml --task-path=/clickhouse/copytasks

После того как утилита отработает стоит удаляем данные из zookeeper:

zkCli.sh -server localhost:2181 deleteall /clickhouse/copytasks

После того, как мы проверили, что все данные совпали, остается выполнить SQL команду:  

INSERT INTO table_local SELECT * FROM table_local1;

В общей сложности переезд занял дней 10 (с учетом того, что параллельно мы переносили другие части сервиса). Надеемся, что эта статья поможет вам сэкономить время на поиске подхода к решению подобных задач.

Авторы статьи: Игорь Иванов и Денис Палагута, Максилект.

P.S. Мы публикуем наши статьи на нескольких площадках Рунета. Подписывайтесь на нашу страницу в VK или на Telegram-канал, чтобы узнавать обо всех публикациях и других новостях компании Maxilect.

Комментарии (1)


  1. Melonom
    19.04.2024 07:13
    +1

    То же недавно переносили кластер клика, но сделали по другому.
    У нас есть таблица с ретеншином в 50 дней и джобы которые в неё пишут. Мы подняли новый кластер, настроили джобы что бы они писали в два кластера, подождали пока данные будут одинаковые и потом выключили первый кластер.