Привет, Хабр!

Сегодня я расскажу, как с помощью Liquibase, GitLab и немного Python настроить прозрачный, безопасный и удобный процесс миграций для ClickHouse кластера.

  • Для Clickhouse кластера создать прозрачный инструмент для сборки, проверки и доставки миграций на окружения, исключив ручные изменения в БД.

  • Возможность отката миграций в один клик.

  • Простота: оставить разработчикам возможность писать миграции на привычном SQL.

Liquibase

Liquibase — универсальный инструмент для работы с миграциями, который поддерживает множество БД. Но есть только один маленький нюанс (как всегда) – из коробки Liquibase не умеет работать с ClickHouse кластером. Для решения этой проблемы был найден плагин Liquibase-ClickHouse от Arenadata. Плагин добавляет поддержку кластера, Liquibase создаст собственные таблицы как реплицируемые, добавляется возможность использовать ON CLUSTER в SQL скриптах.

Единственная проблема, которую не решает плагин – поддержка shared для системных таблиц Liquibase. Если ваш кластер, как и наш, настроен с несколькими shared, то имейте в виду, что системные таблицы Liquibase будут созданы лишь на том шарде и его нодах, к которому было создано подключение.

Давайте рассмотрим, как же работает процесс создания миграций с помощью Liquibase.

  1. Liquibase использует файл changelog.xml для управления миграциями. Каждая миграция записывается как отдельный changeSet.

  2. Плагин Liquibase-ClickHouse Arenadata обеспечивает поддержку SQL-операций с кластером, а путь до SQL-скриптов миграций можно описать changelog.xml с помощью конструкции sqlFile path="path/example_path". Таким образом, мы оставляем разработчикам возможность использовать знакомый SQL вместо изучения специфического формата Liquibase.

  3. С помощью конструкции <tagDatabase tag="1"> мы можем фиксировать версию изменений, а значит, в дальнейшем сможем до нее откатиться.

  4. Чтобы иметь возможность сделать rollback, после добавления скрипта создания миграций с помощью sqlFile добавляем в каждый changeSet блок rollback, в котором указываем путь до такого же sql скрипта, но уже с описанием процесса отката.

    Приведу пример changelog.xml файла, который обеспечивает все описанные выше пункты.

    <?xml version="1.0" encoding="UTF-8"?>
    <databaseChangeLog
            xmlns="http://www.liquibase.org/xml/ns/dbchangelog"
            xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
            xsi:schemaLocation="http://www.liquibase.org/xml/ns/dbchangelog
                            http://www.liquibase.org/xml/ns/dbchangelog/dbchangelog-3.8.xsd">
        <changeSet id="1" author="ivanov">
            <tagDatabase tag="1"/>
        </changeSet>
        <changeSet id="2" author="petrov">
            <sqlFile path="migration/1_create_first_table.sql" relativeToChangelogFile="true"/>
            <rollback>
                <sqlFile path="rollback/1_delete_first_table.sql" relativeToChangelogFile="true"/>
            </rollback>
        </changeSet>        
        <changeSet id="3" author="petrov">
            <tagDatabase tag="2"/>
        </changeSet>        
    </databaseChangeLog>

GitLab CI/CD: автоматизация миграций

Весь процесс доставки миграций ложится на плечи через GitLab CI/CD. Полный flow Pipeline показывать не будем, остановимся подробнее на 3х stage, которые непосредственно относятся к теме статьи.

Validate ChangeLog.xml

Так как changelog.xml разработчики заполняют вручную, нужно предотвратить возможные ошибки.

Что валидируем? Разберем на примере changelog.xml, который я привел выше.

  1. Уникальность и правильную последовательность changeSet ID

  2. Уникальность и валидность (все теги целое число, без sevmer и т.д.) тегов tagDatabase

  3. Наличие блока <rollback> для каждого changeSet.

Так как за все сложные части CICD процессов у нас отвечает cli утилита на Python, то и валидацию changelog.xml было доверено проводить ей. Часть, которая отвечает за валидацию может быть полезна:

код туть
import logging
from enum import StrEnum
from functools import cached_property

from lxml import etree


class ValidateUniqIdsError(Exception):
    def __str__(self):
        return (
            "Валидация уникальности changeset ID не пройдена! "
            "Проверьте, что  все id changeset'ов являются уникальными."
        )


class ValidateChangesetIdsAscendingError(Exception):
    def __str__(self):
        return (
            "Валидация корректной последовательности ID changeset бд не пройдена! "
            "Проверьте, что все ID указаны в порядке возрастания."
        )


class ValidateTagVersionsAscendingError(Exception):
    def __str__(self):
        return (
            "Валидация правильной последовательности tag version схемы БД  не пройдена! Проверьте, что все tag "
            "version указаны в порядке возрастания."
        )


class ValidateUniqTagVersionsError(Exception):
    def __str__(self):
        return (
            "Валидация уникальности tag version схемы БД  не пройдена. Проверьте, что все tag "
            "version являются уникальными."
        )


class ValidateAreValuesIncrementingByOneError(Exception):
    def __init__(self, tag_id: int):
        self.tag_id = tag_id

    def __str__(self):
        return (
            f"TAG: {self.tag_id}  больше предыдущего TAG'а более чем на 1. "
            f"Из-за этого будет невозможно выполнить корректный rollback до предыдущей версии БД."
        )


class ValidateExistenceRollbackInChangesetError(Exception):
    def __init__(self, changeset_id: int):
        self.changeset_id = changeset_id

    def __str__(self):
        return (
            f"в ChangeSet c ID: {self.changeset_id} необходимо добавить <rollback></rollback> "
            f"блок с  SQL скриптом отката таблицы."
        )


class LiquiShit(StrEnum):
    common = "{http://www.liquibase.org/xml/ns/dbchangelog}"
    rollback = f"{common}rollback"
    sql_file = f"{common}sqlFile"
    changeset = f"{common}changeSet"
    tag_database = f"{common}tagDatabase"


class LiquibaseChangelogWorker:
    def __init__(self, changelog_filename: str):
        self.changelog_filename = changelog_filename

    @cached_property
    def xml_tree(self):
        return etree.parse(self.changelog_filename).getroot()

    @cached_property
    def all_changesets(self):
        return self.xml_tree.findall(LiquiShit.changeset)

    @cached_property
    def all_tags(self):
        return [
            int(changeset[0].get("tag"))
            for changeset in self.all_changesets
            if changeset.findall(LiquiShit.tag_database)
        ]

    @cached_property
    def all_sql_changesets(self):
        return [
            changeset
            for changeset in self.all_changesets
            if changeset.findall(LiquiShit.sql_file)
        ]

    @cached_property
    def all_changeset_ids(self) -> list:
        return [
            int(changeset.attrib.get("id"))
            for changeset in self.all_changesets
        ]

    def get_last_tag(self) -> str:
        return str(self.all_tags[-1])

    def validate_uniq_ids(self) -> None: 
        if len(self.all_changeset_ids) == len(set(self.all_changeset_ids)):
            logging.info("Валидация uniq changeset ID прошла успешно!")
            return
        raise ValidateUniqIdsError

    def validate_changeset_ids_ascending(self):
        if self.all_changeset_ids == sorted(self.all_changeset_ids):
            logging.info(
                "Валидация корректной последовательности ID changeset  прошла успешно!"
            )
            return
        raise ValidateChangesetIdsAscendingError

    def validate_tag_versions_ascending(self) -> None:
        if self.all_tags == sorted(self.all_tags):
            logging.info(
                "Валидация последовательности tag version схемы БД прошла успешно!"
            )
            return
        raise ValidateTagVersionsAscendingError

    def validate_uniq_tag_versions(self) -> None:
        if len(self.all_tags) == len(set(self.all_tags)):
            logging.info(
                "Валидация уникальности tag version схемы БД прошла успешно!"
            )
            return
        raise ValidateUniqTagVersionsError

    def validate_are_values_incrementing_by_one(self) -> None:
        for index, _tag in enumerate(self.all_tags[:-1]):
            if self.all_tags[index] + 1 != self.all_tags[index + 1]:
                raise ValidateAreValuesIncrementingByOneError(
                    self.all_tags[index + 1]
                )
        logging.info(
            "Валидация на корректное повышение tag version прошла успешно!"
        )

    def validate_existence_rollback_in_changeset(self) -> None:
        for sql_changeset in self.all_sql_changesets:
            if not sql_changeset.findall(LiquiShit.rollback):
                raise ValidateExistenceRollbackInChangesetError(
                    sql_changeset.attrib.get("id")
                )
        logging.info(
            "Валидация на наличие в ChangeSet блока с rollback прошла успешно!"
        ) 

Deploy Dev, Stage, Prod. Выкатываем изменения на контур dev/stg/prod

script:
  - liquibase update --changelog-file=changelog-master.xml --username=$CLICKHOUSE_USER --password=$CLICKHOUSE_PASSWORD --url=jdbc:clickhouse://$CLICKHOUSE_HOST:$CLICKHOUSE_PORT/$CLICKHOUSE_DB_NAME --driver=com.clickhouse.jdbc.ClickHouseDriver --defaultSchemaName=$CLICKHOUSE_DB_NAME  --logLevel=OFF

Rollback Dev, Stage, Prod

Благодаря добавленным блокам <rollback> , наличию и описанному разработчиком в SQL формате процессу отката, tagDatabase GitlabJob в случае необходимости будет откатывать примененные изменения до предыдущей версии (для этого нам и пригодились tagDatabase). Также благодаря жесткой валидации changelog.xml мы всегда уверены, что предыдущая версия БД на 1 меньше текущей, что значительно облегчает логику скрипта rollback

script:
  - ROLLBACK_TO_VERSION=$((LAST_TAG_DB-1))
  - liquibase rollback --tag=${ROLLBACK_TO_VERSION} --changelog-file=changelog-master.xml --username=$CLICKHOUSE_USER --password=$CLICKHOUSE_PASSWORD --url=jdbc:clickhouse://$CLICKHOUSE_HOST:$CLICKHOUSE_PORT/$CLICKHOUSE_DB_NAME --driver=com.clickhouse.jdbc.ClickHouseDriver --defaultSchemaName=$CLICKHOUSE_DB_NAME  --logLevel=OFF


P.S: откуда берется LAST_TAG_DB спросите Вы? В блоке кода есть метод get_last_tag, который вычисляет последний tagDatabase. После чего мы складываем build.env пайплайна так:

def save_last_tag(self):
  with open("build.env", "w") as f:
    f.write(
        f"LAST_TAG_DB={self.liquibase_changelog_validator.get_last_tag()}\n"
            )
  1. Полная автоматизация миграций, от проверки до применения.

  2. Поддержка кластерных миграций ClickHouse благодаря Liquibase + добрые люди.

  3. Минимизация ошибок благодаря валидации.

  4. Прозрачность: каждый changeSet документирован, а состояние БД можно откатить в случае необходимости с помощью нажатия кнопки в Gitlab Pipeline

Мы успешно применили этот подход для автоматизации миграций ClickHouse кластера. Хотелось бы услышать, как в других командах решают подобные задачи, делитесь в комментариях!

Комментарии (0)