Продолжаем тему потери данных в ClickHouse. Рекомендую прочитать первую статью цикла, в которой раскрывается проблема атомарности INSERT и дедупликация (и, конечно же, решение проблемы потери данных). А тема текущего обсуждения - буферные таблицы и то, как они теряют данные.

P.S. читать каждую ссылку из статьи совсем не обязательно, основные тезисы из ссылок изложены в статье.

1. Введение

Как известно, ClickHouse не любит, когда в него делают 1 000 000 вставок по одной записи - он любит, когда делают одну вставку на 1 000 000 записей. Об этом сказано в официальной документации, а еще об этом рассказывал лично Алексей Миловидов (основатель ClickHouse) на одной из конференций, стенограмму которой можно почитать в статье на Хабре. Краткая выжимка из статьи: все кейсы (кроме Kafka), которые в ней описаны - ненадежные способы решения проблемы батчинга данных перед вставкой. Как правило, все эти методы сводятся к сбору информации от 10 000 операций INSERT в некий буфер, который будет раз в секунду делать INSERT в ClickHouse. Нас же из всего этого интересует именно то, как сам ClickHouse может потерять данные и как этого избежать, поэтому рассматривать будем именно способ с буферными таблицами.

Итак, описываем кейс: у вас есть стримы, генерирующие большое количество вставок данных в ClickHouse в секунду (условно 10 000 операций INSERT в секунду). Вы натравили стрим на обычную MergeTree и получаете следующее (цитата из выступления Алексея Миловидова): "Что будет, если делать плохо? Вставляем по одной строке в таблицу MergeTree и получается 59 строк в секунду. Это в 10 000 раз медленнее. В ReplicatedMergeTree – 6 строк в секунду. А если еще кворум включится, то получается 2 строки в секунду. По-моему, это какой-то кромешный отстой. Как можно так тормозить? У меня даже на футболке написано, что ClickHouse не должен тормозить. Но тем не менее бывает иногда."

Помимо невероятного торможения, конечно же, получим еще и перегруженность железа: под капотом отрабатывает слияние кусков данных (собственно поэтому движки таблиц и имеют в названии Merge, где есть Merge - там присутствует процесс слияния данных). Проще говоря: каждая операция INSERT в таблицу, содержащую в названии движка *Merge*, вставляет данные не в файл целевой таблицы, а в небольшие кусочки данных, хранящиеся рядом с основным файлом таблицы. И под капотом с определенным интервалом, задающимся настройкой max_merge_selecting_sleep_ms, запускается процесс слияния всех кусков данных в один файл данных (как правило, есть крупный файл таблицы/партиции и мелкие файлики от кучи инсертов, которые мержатся с этим крупным файлом и удаляются). Именно процесс слияния и будет убивать производительность вашего ClickHouse.

Пытаемся решить проблему. Нам стало очевидно, что данные надо как-то собирать в крупные пачки перед вставкой. Находим простое решение из коробки - буферные таблицы (это ведь даже звучит ровно как то, что нужно).

2. Буферные таблицы

Идея буферной таблицы очень простая - она пишет данные для записи в оперативную память, периодически сбрасывая их в целевую таблицу. Условия для сброса данных вы пишите сами, например: либо по истечении 100 секунд с момента предыдущего сброса данных на диск, либо по накоплению миллиона записей, либо по накоплению 1ГБ данных. Очень удобно + из коробки. 10 000 операций INSERT попадут не в файлы на диске (которые потом еще и слить надо с основной таблицей), а в оперативную память, и затем в один-единственный файл на диске вместо кучи файлов, что существенно снизит нагрузку при операции слияния.

Потеря данных произойдет, например, при падении ClickHouse. Все, что было в оперативной памяти, будет утеряно, включая те данные в буферных таблицах, которые еще не успели сброситься на диск.

Как же не потерять данные? Самый правильный, на мой взгляд, способ - использовать Kafka (как бы не хотелось добавлять еще одну распределенную систему) в связке с Kafka Engine. Данный способ хорошо зарекомендовал себя на рынке.

3. Kafka Engine

В рамках статьи, конечно, не получится рассказать обо всем. Поэтому оставлю ссылки на полезные статьи, которые позволят теоретически освоить как Kafka, так и Kafka Engine.

визуализация пайплайна данных
визуализация пайплайна данных

А мы же сконцентрируемся именно на решении проблемы потери данных. Нам нужно доказательство атомарности технологии Kafka Engine. Для этого подготовил демонстрационный кейс, рассмотрим его.

Конфигурация максимально простая: докер, одна нода Kafka, одна нода Zookeper, одна нода ClickHouse

В Kafka создан тестовый топик с названием test_topic_json, в котором сообщение в формате json с одним полем message и строковым значением (на изображении плохо видно, продублирую текстом)

{"message": "qq"}

{"message": "bb"}

В ClickHouse создан классический пайплайн Kafka Engine:

drop table if exists test_kafka_json_parse;
create table test_kafka_json_parse
(
    message String
)
    engine = Kafka
    SETTINGS kafka_broker_list = 'kafka:29092'
        , kafka_topic_list = 'test_topic_json'
        , kafka_group_name = 'my-group4'
        , kafka_format = 'JSONEachRow'
        , kafka_max_block_size = 2097152
;

drop table if exists message_json_parse;
create table message_json_parse
(
    message String
)
engine = MergeTree
order by message
;


drop view if exists test_kafka_json_parse_view;
create materialized view test_kafka_json_parse_view
TO default.message_json_parse
            (
             `message`  String
                )
AS
SELECT message
FROM test_kafka_json_parse
;

Что этот пайплайн делает: читает свежие данные из Kafka из топика test_topic_json . Коннект осуществляется благодаря таблице test_kafka_json_parse, материализованное представление test_kafka_json_parse_view является консьюмером, message_json_parse - конечная таблица, в которую мат вьюха (консьюмер) пишет данные.

Реализовав все выше изложенное, получаем следующий результат: в конечной таблице видим оба сообщения, присутствующие в Kafka

Также видим следующе значение offset для консьюмера my-group4 (продублирую текстом - всего сообщений 2, offset тоже равен 2):

Получается, что консьюмер my-group4 вычитал полностью топик test_topic_json, что абсолютно верно.

Теперь необходимо доказать атомарность. Для этого пересоздадим конечную таблицу message_json_parse таким образом:

drop table if exists message_json_parse;
create table message_json_parse
(
    message UInt64
)
engine = MergeTree
order by message
;

message UInt64 - ключевой момент (UInt64 - строго положительное число). Мы специально ломаем конечную таблицу, чтобы мат вьюха не могла записать в нее данные. Затем закидываем новое сообщение в топик test_topic_json, пускай будет

{"message": "gg"}

При всем при этом мы НЕ трогаем ни Kafka-таблицу, ни мат вьюху. И вот что получаем в таком случае:

Видим, что теперь для консьюмера my-group4 Offset не равен End. Сообщений в топике 3, но смещение у консьюмера равно 2. Очевидно, что мат вьюха не может "gg" записать в колонку с типом UInt64. Посмотрим на лог ошибок клика, который даст ответы на все вопросы:

Продублирую текстом самое важное и очевидное: Cannot parse string 'gg' as UInt64. А о чем же суммарно с несмещенным offset нам это говорит? - это говорит о том, что offset коммитится только при успешной отработке материализованного представления, что и гарантирует атомарность.

Из кейса выше очевидно, что ClickHouse прочитал сообщение "gg" из топика. Это было важно доказать в контексте потери данных. Мы понимаем, что операция чтения из Kafka была осуществлена, данные физически залетели в ClickHouse (в RAM). Затем происходят бесконечные попытки записать сообщение в таблицу. К новым сообщениям консьюмер, конечно же, не перейдет (хотя и есть настройки, конфигурирующие это - на практике не встречал и не советую).

Далее можно делать что угодно: положить ClickHouse, рестартнуть докер/машину, да хоть удалить ClickHouse и проделать все манипуляции заново, главное - не трогать Kafka, так как она бережно хранит в себе offset для нашего консьюмера.

Ну а мы же поправим ошибку, чтобы убедиться, что после этого сообщение будет успешно прочитано:

drop table if exists message_json_parse;
create table message_json_parse
(
    message String
)
engine = MergeTree
order by message
;

И закономерный результат:

Также видим закономерный результат и в Kafka (End=3, Offset=3):

Получается, что после решения проблемы чтение продолжилось, очевидно, с последнего offset, и корректная работа была восстановлена БЕЗ потерь данных. Тоже самое будет и в любой другой ситуации: падение сервера, ошибки коллег, отклонение от контракта данных в Kafka и т.д.

P.S. Если есть желание, то можно посмотреть исходный код консьюмера.

4. Заключение

Далеко не факт, что вы столкнетесь с ситуацией "10000 INSERT Per Second". Но всегда помните следующее:

  • Автор большой фанат ClickHouse и хочет, чтобы ClickHouse не только не тормозил, но и не терял данные.

  • ClickHouse не любит, когда в него делают 1 000 000 вставок по одной записи - он любит, когда делают одну вставку на 1 000 000 записей.

  • Все способы буферизации (включая асинхронные вставки) чреваты потерей данных.

  • Kafka Engine - проверенная временем технология, не теряющая данные.

  • Если есть возможность формировать крупные пачки данных надежным способом, отличным от Kafka Engine - то это тоже решение проблемы.

Пользуйтесь на здоровье!

Комментарии (2)


  1. ondister
    06.08.2025 05:23

    А что делать, если нужно читать из кафки, изменить данные и сохранить в clickhouse? Что еще можно предпринять кроме накопления батча, fetch/commit?


    1. select_zvezdo4ka_from Автор
      06.08.2025 05:23

      За изменение данных отвечает мат вьюха (консьюмер), в ней можно накручивать любую логику изменений, а не просто переливку данных. Но учтите, что так делать не желательно, первичный слой (raw, stage, history, как угодно называйте его) желательно должен соответствовать контракту данных (что в кафке лежит - в таком же виде и у вас в клике). Иначе можно оказаться в неприятной ситуации, например: потеря данных, если решили мат вьюхой фильтровать; потеря данных из-за изменения самих данных; данные в вашем клике будут отличаться от данных других потребителей кафки. Заливайте данные в клик, а внутри него уже преобразуйте, как вам нужно.