Привет, Хабр!
Сегодня я расскажу, как с помощью Liquibase, GitLab и немного Python настроить прозрачный, безопасный и удобный процесс миграций для ClickHouse кластера.


Для Clickhouse кластера создать прозрачный инструмент для сборки, проверки и доставки миграций на окружения, исключив ручные изменения в БД.
Возможность отката миграций в один клик.
Простота: оставить разработчикам возможность писать миграции на привычном SQL.
Liquibase
Liquibase — универсальный инструмент для работы с миграциями, который поддерживает множество БД. Но есть только один маленький нюанс (как всегда) – из коробки Liquibase не умеет работать с ClickHouse кластером. Для решения этой проблемы был найден плагин Liquibase-ClickHouse от Arenadata. Плагин добавляет поддержку кластера, Liquibase создаст собственные таблицы как реплицируемые, добавляется возможность использовать ON CLUSTER в SQL скриптах.
Единственная проблема, которую не решает плагин – поддержка shared для системных таблиц Liquibase. Если ваш кластер, как и наш, настроен с несколькими shared, то имейте в виду, что системные таблицы Liquibase будут созданы лишь на том шарде и его нодах, к которому было создано подключение.
Давайте рассмотрим, как же работает процесс создания миграций с помощью Liquibase.
Liquibase использует файл changelog.xml для управления миграциями. Каждая миграция записывается как отдельный changeSet.
Плагин Liquibase-ClickHouse Arenadata обеспечивает поддержку SQL-операций с кластером, а путь до SQL-скриптов миграций можно описать changelog.xml с помощью конструкции sqlFile path="path/example_path". Таким образом, мы оставляем разработчикам возможность использовать знакомый SQL вместо изучения специфического формата Liquibase.
С помощью конструкции <tagDatabase tag="1"> мы можем фиксировать версию изменений, а значит, в дальнейшем сможем до нее откатиться.
-
Чтобы иметь возможность сделать rollback, после добавления скрипта создания миграций с помощью sqlFile добавляем в каждый changeSet блок rollback, в котором указываем путь до такого же sql скрипта, но уже с описанием процесса отката.
Приведу пример changelog.xml файла, который обеспечивает все описанные выше пункты.
<?xml version="1.0" encoding="UTF-8"?> <databaseChangeLog xmlns="http://www.liquibase.org/xml/ns/dbchangelog" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://www.liquibase.org/xml/ns/dbchangelog http://www.liquibase.org/xml/ns/dbchangelog/dbchangelog-3.8.xsd"> <changeSet id="1" author="ivanov"> <tagDatabase tag="1"/> </changeSet> <changeSet id="2" author="petrov"> <sqlFile path="migration/1_create_first_table.sql" relativeToChangelogFile="true"/> <rollback> <sqlFile path="rollback/1_delete_first_table.sql" relativeToChangelogFile="true"/> </rollback> </changeSet> <changeSet id="3" author="petrov"> <tagDatabase tag="2"/> </changeSet> </databaseChangeLog>
GitLab CI/CD: автоматизация миграций
Весь процесс доставки миграций ложится на плечи через GitLab CI/CD. Полный flow Pipeline показывать не будем, остановимся подробнее на 3х stage, которые непосредственно относятся к теме статьи.
Validate ChangeLog.xml
Так как changelog.xml разработчики заполняют вручную, нужно предотвратить возможные ошибки.
Что валидируем? Разберем на примере changelog.xml, который я привел выше.
-
Уникальность и правильную последовательность changeSet ID
-
Уникальность и валидность (все теги целое число, без sevmer и т.д.) тегов tagDatabase
-
Наличие блока
<rollback>
для каждого changeSet.
Так как за все сложные части CICD процессов у нас отвечает cli утилита на Python, то и валидацию changelog.xml было доверено проводить ей. Часть, которая отвечает за валидацию может быть полезна:
код туть
import logging
from enum import StrEnum
from functools import cached_property
from lxml import etree
class ValidateUniqIdsError(Exception):
def __str__(self):
return (
"Валидация уникальности changeset ID не пройдена! "
"Проверьте, что все id changeset'ов являются уникальными."
)
class ValidateChangesetIdsAscendingError(Exception):
def __str__(self):
return (
"Валидация корректной последовательности ID changeset бд не пройдена! "
"Проверьте, что все ID указаны в порядке возрастания."
)
class ValidateTagVersionsAscendingError(Exception):
def __str__(self):
return (
"Валидация правильной последовательности tag version схемы БД не пройдена! Проверьте, что все tag "
"version указаны в порядке возрастания."
)
class ValidateUniqTagVersionsError(Exception):
def __str__(self):
return (
"Валидация уникальности tag version схемы БД не пройдена. Проверьте, что все tag "
"version являются уникальными."
)
class ValidateAreValuesIncrementingByOneError(Exception):
def __init__(self, tag_id: int):
self.tag_id = tag_id
def __str__(self):
return (
f"TAG: {self.tag_id} больше предыдущего TAG'а более чем на 1. "
f"Из-за этого будет невозможно выполнить корректный rollback до предыдущей версии БД."
)
class ValidateExistenceRollbackInChangesetError(Exception):
def __init__(self, changeset_id: int):
self.changeset_id = changeset_id
def __str__(self):
return (
f"в ChangeSet c ID: {self.changeset_id} необходимо добавить <rollback></rollback> "
f"блок с SQL скриптом отката таблицы."
)
class LiquiShit(StrEnum):
common = "{http://www.liquibase.org/xml/ns/dbchangelog}"
rollback = f"{common}rollback"
sql_file = f"{common}sqlFile"
changeset = f"{common}changeSet"
tag_database = f"{common}tagDatabase"
class LiquibaseChangelogWorker:
def __init__(self, changelog_filename: str):
self.changelog_filename = changelog_filename
@cached_property
def xml_tree(self):
return etree.parse(self.changelog_filename).getroot()
@cached_property
def all_changesets(self):
return self.xml_tree.findall(LiquiShit.changeset)
@cached_property
def all_tags(self):
return [
int(changeset[0].get("tag"))
for changeset in self.all_changesets
if changeset.findall(LiquiShit.tag_database)
]
@cached_property
def all_sql_changesets(self):
return [
changeset
for changeset in self.all_changesets
if changeset.findall(LiquiShit.sql_file)
]
@cached_property
def all_changeset_ids(self) -> list:
return [
int(changeset.attrib.get("id"))
for changeset in self.all_changesets
]
def get_last_tag(self) -> str:
return str(self.all_tags[-1])
def validate_uniq_ids(self) -> None:
if len(self.all_changeset_ids) == len(set(self.all_changeset_ids)):
logging.info("Валидация uniq changeset ID прошла успешно!")
return
raise ValidateUniqIdsError
def validate_changeset_ids_ascending(self):
if self.all_changeset_ids == sorted(self.all_changeset_ids):
logging.info(
"Валидация корректной последовательности ID changeset прошла успешно!"
)
return
raise ValidateChangesetIdsAscendingError
def validate_tag_versions_ascending(self) -> None:
if self.all_tags == sorted(self.all_tags):
logging.info(
"Валидация последовательности tag version схемы БД прошла успешно!"
)
return
raise ValidateTagVersionsAscendingError
def validate_uniq_tag_versions(self) -> None:
if len(self.all_tags) == len(set(self.all_tags)):
logging.info(
"Валидация уникальности tag version схемы БД прошла успешно!"
)
return
raise ValidateUniqTagVersionsError
def validate_are_values_incrementing_by_one(self) -> None:
for index, _tag in enumerate(self.all_tags[:-1]):
if self.all_tags[index] + 1 != self.all_tags[index + 1]:
raise ValidateAreValuesIncrementingByOneError(
self.all_tags[index + 1]
)
logging.info(
"Валидация на корректное повышение tag version прошла успешно!"
)
def validate_existence_rollback_in_changeset(self) -> None:
for sql_changeset in self.all_sql_changesets:
if not sql_changeset.findall(LiquiShit.rollback):
raise ValidateExistenceRollbackInChangesetError(
sql_changeset.attrib.get("id")
)
logging.info(
"Валидация на наличие в ChangeSet блока с rollback прошла успешно!"
)
Deploy Dev, Stage, Prod. Выкатываем изменения на контур dev/stg/prod
script:
- liquibase update --changelog-file=changelog-master.xml --username=$CLICKHOUSE_USER --password=$CLICKHOUSE_PASSWORD --url=jdbc:clickhouse://$CLICKHOUSE_HOST:$CLICKHOUSE_PORT/$CLICKHOUSE_DB_NAME --driver=com.clickhouse.jdbc.ClickHouseDriver --defaultSchemaName=$CLICKHOUSE_DB_NAME --logLevel=OFF
Rollback Dev, Stage, Prod
Благодаря добавленным блокам <rollback> , наличию и описанному разработчиком в SQL формате процессу отката, tagDatabase GitlabJob в случае необходимости будет откатывать примененные изменения до предыдущей версии (для этого нам и пригодились tagDatabase). Также благодаря жесткой валидации changelog.xml мы всегда уверены, что предыдущая версия БД на 1 меньше текущей, что значительно облегчает логику скрипта rollback
script:
- ROLLBACK_TO_VERSION=$((LAST_TAG_DB-1))
- liquibase rollback --tag=${ROLLBACK_TO_VERSION} --changelog-file=changelog-master.xml --username=$CLICKHOUSE_USER --password=$CLICKHOUSE_PASSWORD --url=jdbc:clickhouse://$CLICKHOUSE_HOST:$CLICKHOUSE_PORT/$CLICKHOUSE_DB_NAME --driver=com.clickhouse.jdbc.ClickHouseDriver --defaultSchemaName=$CLICKHOUSE_DB_NAME --logLevel=OFF
P.S: откуда берется LAST_TAG_DB спросите Вы? В блоке кода есть метод get_last_tag, который вычисляет последний tagDatabase. После чего мы складываем build.env пайплайна так:
def save_last_tag(self):
with open("build.env", "w") as f:
f.write(
f"LAST_TAG_DB={self.liquibase_changelog_validator.get_last_tag()}\n"
)

Полная автоматизация миграций, от проверки до применения.
Поддержка кластерных миграций ClickHouse благодаря Liquibase + добрые люди.
Минимизация ошибок благодаря валидации.
Прозрачность: каждый changeSet документирован, а состояние БД можно откатить в случае необходимости с помощью нажатия кнопки в Gitlab Pipeline
Мы успешно применили этот подход для автоматизации миграций ClickHouse кластера. Хотелось бы услышать, как в других командах решают подобные задачи, делитесь в комментариях!