В конце прошлого года мы писали о сложном переезде нашего собственного сервиса в новый дата-центр в Детройте. Среди прочих задач мы переносили Clickhouse. Напомню, что речь идет о нагруженном сервисе, который обслуживает десятки серверов, принимающих сотни тысяч запросов с низкой latency в секунду.
В этой статье рассказываем, как мы переносили данные, не имея возможности отключить сервис или воспользоваться автоматической репликацией.
Объем данных для Clickhouse у нас не такой уж и большой - процесс получился не столько объемный, сколько ресурсоемкий. Но в открытых источниках информации по использованным механизмам минимум, поэтому считайте это руководством к clickhouse-copier утилите (https://github.com/ClickHouse/copier) на конкретном примере со скриптами и командами для запуска.
Самый простой подход
Наш сервис функционирует 24/7, т.е. просто выключить и перенести все копированием нельзя. Мы спланировали переезд, но из-за некоторых ограничений самого Clickhouse (а точнее его схемы репликации) не смогли реализовать его в соответствии с планом.
Изначально мы предполагали, что сможем к каждой шарде в старом дата-центре подключить реплику из нового, подождем синхронизации, ну а потом просто отключим реплики в старом дата-центре. Но из-за географической удаленности мы получили слишком большую задержку передачи данных между дата-центрами, несмотря на скорость в 2-2,5 Гбита. В результате в ZooKeeper, который координировал репликацию, объем данных вырос на порядок. Так что процесс пришлось остановить, иначе он грозил затормозить продакшн. Пришлось подыскивать другие способы переезда.
В базе сохраняются все запросы, приходящие в наш сервис. В Clickhouse мы храним два типа данных:
“Сырые данные” за две недели. За это время накапливается около 6 Тб.
“Агрегаты” - важные для нас результаты обработки сырых данных. Агрегаты занимают порядка 300 Гб.
Нашим первым решением стало дождаться процесса агрегации данных и перенести в новый дата-центр только их; ну а пока это происходит, запускать новые шарды и переносить сервисы. Так что задача свелась к тому, что необходимо было найти способ перенести данные и не потерять ни байта из тех 300 Гб агрегатов.
Перечень возможных подходов мы нашли в статье от Альтинити: https://kb.altinity.com/altinity-kb-setup-and-maintenance/altinity-kb-data-migration/. В нашем случае было два варианта дальнейших действий: физический перенос или копирование с помощью clickhouse-copier.
Далее поговорим про каждый из вариантов.
Физический перенос данных
Первый возможный вариант - низкоуровневый (физический) перенос данных.
Clickhouse хранит данные в виде Parts, которые можно физически скопировать с одного сервера на другой в виде файлов. Для этого на исходном сервере надо выполнить подобный запрос и отцепить все Parts последовательно для всех таблиц:
#DETACH PART/PARTITION
ALTER TABLE <DATABASE_NAME>.<TABLE_NAME> DETACH PARTITION|PART <PARTITION_EXPRESSION>
После чего скопировать файлы из директорий ClickHouse_data_dir/<DATABASE_NAME>/<TABLE_NAME>/detached
на новый сервер, а потом выполнить обратный запрос:
#ATTACH PART/PARTITION
ALTER TABLE <DATABASE_NAME>.<TABLE_NAME> ATTACH PARTITION|PART <PARTITION_EXPRESSION>
Мы тестировали этот вариант, но в одном из экспериментов у нас не сошлось количество столбцов в таблицах на исходном и конечном серверах. Возможно, что-то некорректно смерджилось. Мы не стали разбираться, а тем более рисковать на проде, и решили использовать более безопасный метод.
Копирование с помощью clickhouse-copier
Второй вариант - копирование на более высоком уровне с помощью утилиты от Clickhouse. В БД на источнике выполняется SELECT, а затем вставляется INSERT-ом с другой стороны. При этом очередь всех своих заданий и прогресс утилита хранит в zookeeper, что исключает проблемы.
В открытых источниках информации по этому пути оказалось крайне мало. Пришлось разбираться с нуля.
Способ показался более надежным, хотя и не без особенностей. Например, оказалось, что если запустить утилиту на стороне источника, то данные в результате копирования не сходятся. Возможно, виноваты были сетевые проблемы - было ощущение, что некоторые фрагменты данных просто не достигают нового дата-центра и утилита это не фиксирует. А вот если запустить ее на стороне приемника, то данные сходились на 100%.
Сам процесс переноса данных одной шарды занимал 3-4 часа, еще около получаса требовалось на различные сопутствующие манипуляции, в частности копирование уже внутри сервера, т.к. данные мы изначально переносили во “временную” таблицу. Выполнить копирование напрямую в продакшн-таблицу мы не могли. Из-за того, что в процессе копирования кластер фактически состоит из машин в двух дата-центрах, в этот момент у нас происходило бы задвоение статистики. Так что мы копировали данные из Майами во временную таблицу в Детройте (преодолевали географическое расстояние), а затем уже внутри дата-центра в Детройте проводили слияние с продакшеном, выполняя вставки по 500-600 млн. столбцов.
Несмотря на предосторожности, мы все же столкнулись с парой инцидентов, когда клиенты вышли за установленные в админке лимиты, поскольку во время копирования у них отображалась неполная статистика. Но суммарно убытки были долларов 20.
Копирование на практике
Процесс копирования выглядит следующим образом.
Для начала работы нам нужно положить в zookeeper данные для утилиты.
zkCli.sh -server localhost:2181 create /clickhouse/copytasks ""
Далее нам нужно создать схему копирования - файл schema.xml.
<clickhouse>
<!-- Configuration of clusters as in an ordinary server config -->
<remote_servers>
<source_cluster>
<shard>
<internal_replication>true</internal_replication>
<replica>
<host>IP</host>
<port>9000</port>
<user>default</user>
<password></password>
</replica>
</shard>
</source_cluster>
<destination_cluster>
<shard>
<internal_replication>true</internal_replication>
<replica>
<host>IP</host>
<port>9000</port>
<user>default</user>
<password></password>
</replica>
</shard>
</destination_cluster>
</remote_servers>
<!-- How many simultaneously active workers are possible. If you run more workers superfluous workers will sleep. -->
<max_workers>1</max_workers>
<!-- Setting used to fetch (pull) data from source cluster tables -->
<settings_pull>
<readonly>1</readonly>
</settings_pull>
<!-- Setting used to insert (push) data to destination cluster tables -->
<settings_push>
<readonly>0</readonly>
</settings_push>
<!-- Common setting for fetch (pull) and insert (push) operations. Also, copier process context uses it.
They are overlaid by <settings_pull/> and <settings_push/> respectively. -->
<settings>
<connect_timeout>3</connect_timeout>
<!-- Sync insert is set forcibly, leave it here just in case. -->
<insert_distributed_sync>1</insert_distributed_sync>
</settings>
<!-- Copying tasks description.
You could specify several table task in the same task description (in the same ZooKeeper node), they will be performed
sequentially.
-->
<tables>
<!-- A table task, copies one table. -->
<table_hits>
<cluster_pull>source_cluster</cluster_pull>
<database_pull>database</database_pull>
<table_pull>table_local</table_pull>
<cluster_push>destination_cluster</cluster_push>
<database_push>database</database_push>
<table_push>table_local1</table_push>
<engine>
ENGINE=ReplicatedSummingMergeTree('/clickhouse/tables/{shard}/ssp_report_common1', '{replica}')
partition by toYYYYMMDD(sspRequestDate)
order by (sspRequestDate, dspId, sspRequestCountry, endpointId)
</engine>
<sharding_key>rand()</sharding_key>
</table_hits>
</tables>
</clickhouse>
Здесь:
секция
clickhouse
описывает сервера - откуда и куда мы производим копирование;секция
tables
описывает какие таблицы мы копируем;в
table_hits
находится само описание процесса.
После этого отправляем этот файл в zookeeper:
zkCli.sh -server localhost:2181 create /clickhouse/copytasks/description "`cat schema.xml`"
Далее переходим на сервер Clickhouse и создаем файл zookeeper.xml, необходимый для работы утилиты:
<clickhouse>
<logger>
<level>trace</level>
<size>100M</size>
<count>3</count>
</logger>
<zookeeper>
<node index="1">
<host>127.0.0.1</host>
<port>2181</port>
</node>
</zookeeper>
</clickhouse>
Ну и запускаем командой:
clickhouse-copier --config-file=zookeeper.xml --task-path=/clickhouse/copytasks
После того как утилита отработает стоит удаляем данные из zookeeper:
zkCli.sh -server localhost:2181 deleteall /clickhouse/copytasks
После того, как мы проверили, что все данные совпали, остается выполнить SQL команду:
INSERT INTO table_local SELECT * FROM table_local1;
В общей сложности переезд занял дней 10 (с учетом того, что параллельно мы переносили другие части сервиса). Надеемся, что эта статья поможет вам сэкономить время на поиске подхода к решению подобных задач.
Авторы статьи: Игорь Иванов и Денис Палагута, Максилект.
P.S. Мы публикуем наши статьи на нескольких площадках Рунета. Подписывайтесь на нашу страницу в VK или на Telegram-канал, чтобы узнавать обо всех публикациях и других новостях компании Maxilect.
Melonom
То же недавно переносили кластер клика, но сделали по другому.
У нас есть таблица с ретеншином в 50 дней и джобы которые в неё пишут. Мы подняли новый кластер, настроили джобы что бы они писали в два кластера, подождали пока данные будут одинаковые и потом выключили первый кластер.