Продолжаем тему потери данных в ClickHouse. Рекомендую прочитать первую статью цикла, в которой раскрывается проблема атомарности INSERT и дедупликация (и, конечно же, решение проблемы потери данных). А тема текущего обсуждения - буферные таблицы и то, как они теряют данные.
P.S. читать каждую ссылку из статьи совсем не обязательно, основные тезисы из ссылок изложены в статье.
1. Введение
Как известно, ClickHouse не любит, когда в него делают 1 000 000 вставок по одной записи - он любит, когда делают одну вставку на 1 000 000 записей. Об этом сказано в официальной документации, а еще об этом рассказывал лично Алексей Миловидов (основатель ClickHouse) на одной из конференций, стенограмму которой можно почитать в статье на Хабре. Краткая выжимка из статьи: все кейсы (кроме Kafka), которые в ней описаны - ненадежные способы решения проблемы батчинга данных перед вставкой. Как правило, все эти методы сводятся к сбору информации от 10 000 операций INSERT в некий буфер, который будет раз в секунду делать INSERT в ClickHouse. Нас же из всего этого интересует именно то, как сам ClickHouse может потерять данные и как этого избежать, поэтому рассматривать будем именно способ с буферными таблицами.
Итак, описываем кейс: у вас есть стримы, генерирующие большое количество вставок данных в ClickHouse в секунду (условно 10 000 операций INSERT в секунду). Вы натравили стрим на обычную MergeTree и получаете следующее (цитата из выступления Алексея Миловидова): "Что будет, если делать плохо? Вставляем по одной строке в таблицу MergeTree и получается 59 строк в секунду. Это в 10 000 раз медленнее. В ReplicatedMergeTree – 6 строк в секунду. А если еще кворум включится, то получается 2 строки в секунду. По-моему, это какой-то кромешный отстой. Как можно так тормозить? У меня даже на футболке написано, что ClickHouse не должен тормозить. Но тем не менее бывает иногда."
Помимо невероятного торможения, конечно же, получим еще и перегруженность железа: под капотом отрабатывает слияние кусков данных (собственно поэтому движки таблиц и имеют в названии Merge, где есть Merge - там присутствует процесс слияния данных). Проще говоря: каждая операция INSERT в таблицу, содержащую в названии движка *Merge*, вставляет данные не в файл целевой таблицы, а в небольшие кусочки данных, хранящиеся рядом с основным файлом таблицы. И под капотом с определенным интервалом, задающимся настройкой max_merge_selecting_sleep_ms, запускается процесс слияния всех кусков данных в один файл данных (как правило, есть крупный файл таблицы/партиции и мелкие файлики от кучи инсертов, которые мержатся с этим крупным файлом и удаляются). Именно процесс слияния и будет убивать производительность вашего ClickHouse.
Пытаемся решить проблему. Нам стало очевидно, что данные надо как-то собирать в крупные пачки перед вставкой. Находим простое решение из коробки - буферные таблицы (это ведь даже звучит ровно как то, что нужно).
2. Буферные таблицы
Идея буферной таблицы очень простая - она пишет данные для записи в оперативную память, периодически сбрасывая их в целевую таблицу. Условия для сброса данных вы пишите сами, например: либо по истечении 100 секунд с момента предыдущего сброса данных на диск, либо по накоплению миллиона записей, либо по накоплению 1ГБ данных. Очень удобно + из коробки. 10 000 операций INSERT попадут не в файлы на диске (которые потом еще и слить надо с основной таблицей), а в оперативную память, и затем в один-единственный файл на диске вместо кучи файлов, что существенно снизит нагрузку при операции слияния.
Потеря данных произойдет, например, при падении ClickHouse. Все, что было в оперативной памяти, будет утеряно, включая те данные в буферных таблицах, которые еще не успели сброситься на диск.
Как же не потерять данные? Самый правильный, на мой взгляд, способ - использовать Kafka (как бы не хотелось добавлять еще одну распределенную систему) в связке с Kafka Engine. Данный способ хорошо зарекомендовал себя на рынке.
3. Kafka Engine
В рамках статьи, конечно, не получится рассказать обо всем. Поэтому оставлю ссылки на полезные статьи, которые позволят теоретически освоить как Kafka, так и Kafka Engine.
ClickHouse® Kafka Engine Tutorial - Altinity | Run open source ClickHouse® better
ClickHouse® Kafka Engine FAQ - Altinity | Run open source ClickHouse® better

А мы же сконцентрируемся именно на решении проблемы потери данных. Нам нужно доказательство атомарности технологии Kafka Engine. Для этого подготовил демонстрационный кейс, рассмотрим его.
Конфигурация максимально простая: докер, одна нода Kafka, одна нода Zookeper, одна нода ClickHouse

В Kafka создан тестовый топик с названием test_topic_json
, в котором сообщение в формате json с одним полем message и строковым значением (на изображении плохо видно, продублирую текстом)
{"message": "qq"} |
{"message": "bb"} |

В ClickHouse создан классический пайплайн Kafka Engine:
drop table if exists test_kafka_json_parse;
create table test_kafka_json_parse
(
message String
)
engine = Kafka
SETTINGS kafka_broker_list = 'kafka:29092'
, kafka_topic_list = 'test_topic_json'
, kafka_group_name = 'my-group4'
, kafka_format = 'JSONEachRow'
, kafka_max_block_size = 2097152
;
drop table if exists message_json_parse;
create table message_json_parse
(
message String
)
engine = MergeTree
order by message
;
drop view if exists test_kafka_json_parse_view;
create materialized view test_kafka_json_parse_view
TO default.message_json_parse
(
`message` String
)
AS
SELECT message
FROM test_kafka_json_parse
;
Что этот пайплайн делает: читает свежие данные из Kafka из топика test_topic_json
. Коннект осуществляется благодаря таблице test_kafka_json_parse
, материализованное представление test_kafka_json_parse_view
является консьюмером, message_json_parse
- конечная таблица, в которую мат вьюха (консьюмер) пишет данные.
Реализовав все выше изложенное, получаем следующий результат: в конечной таблице видим оба сообщения, присутствующие в Kafka

Также видим следующе значение offset
для консьюмера my-group4
(продублирую текстом - всего сообщений 2, offset
тоже равен 2):

Получается, что консьюмер my-group4
вычитал полностью топик test_topic_json
, что абсолютно верно.
Теперь необходимо доказать атомарность. Для этого пересоздадим конечную таблицу message_json_parse
таким образом:
drop table if exists message_json_parse;
create table message_json_parse
(
message UInt64
)
engine = MergeTree
order by message
;
message UInt64
- ключевой момент (UInt64
- строго положительное число). Мы специально ломаем конечную таблицу, чтобы мат вьюха не могла записать в нее данные. Затем закидываем новое сообщение в топик test_topic_json
, пускай будет
{"message": "gg"} |
При всем при этом мы НЕ трогаем ни Kafka-таблицу, ни мат вьюху. И вот что получаем в таком случае:

Видим, что теперь для консьюмера my-group4
Offset
не равен End
. Сообщений в топике 3, но смещение у консьюмера равно 2. Очевидно, что мат вьюха не может "gg" записать в колонку с типом UInt64. Посмотрим на лог ошибок клика, который даст ответы на все вопросы:

Продублирую текстом самое важное и очевидное: Cannot parse string 'gg' as UInt64
. А о чем же суммарно с несмещенным offset
нам это говорит? - это говорит о том, что offset коммитится только при успешной отработке материализованного представления, что и гарантирует атомарность.
Из кейса выше очевидно, что ClickHouse прочитал сообщение "gg" из топика. Это было важно доказать в контексте потери данных. Мы понимаем, что операция чтения из Kafka была осуществлена, данные физически залетели в ClickHouse (в RAM). Затем происходят бесконечные попытки записать сообщение в таблицу. К новым сообщениям консьюмер, конечно же, не перейдет (хотя и есть настройки, конфигурирующие это - на практике не встречал и не советую).
Далее можно делать что угодно: положить ClickHouse, рестартнуть докер/машину, да хоть удалить ClickHouse и проделать все манипуляции заново, главное - не трогать Kafka, так как она бережно хранит в себе offset
для нашего консьюмера.
Ну а мы же поправим ошибку, чтобы убедиться, что после этого сообщение будет успешно прочитано:
drop table if exists message_json_parse;
create table message_json_parse
(
message String
)
engine = MergeTree
order by message
;
И закономерный результат:

Также видим закономерный результат и в Kafka (End
=3, Offset
=3):

Получается, что после решения проблемы чтение продолжилось, очевидно, с последнего offset
, и корректная работа была восстановлена БЕЗ потерь данных. Тоже самое будет и в любой другой ситуации: падение сервера, ошибки коллег, отклонение от контракта данных в Kafka и т.д.
P.S. Если есть желание, то можно посмотреть исходный код консьюмера.
4. Заключение
Далеко не факт, что вы столкнетесь с ситуацией "10000 INSERT Per Second". Но всегда помните следующее:
Автор большой фанат ClickHouse и хочет, чтобы ClickHouse не только не тормозил, но и не терял данные.
ClickHouse не любит, когда в него делают 1 000 000 вставок по одной записи - он любит, когда делают одну вставку на 1 000 000 записей.
Все способы буферизации (включая асинхронные вставки) чреваты потерей данных.
Kafka Engine - проверенная временем технология, не теряющая данные.
Если есть возможность формировать крупные пачки данных надежным способом, отличным от Kafka Engine - то это тоже решение проблемы.
Пользуйтесь на здоровье!
ondister
А что делать, если нужно читать из кафки, изменить данные и сохранить в clickhouse? Что еще можно предпринять кроме накопления батча, fetch/commit?
select_zvezdo4ka_from Автор
За изменение данных отвечает мат вьюха (консьюмер), в ней можно накручивать любую логику изменений, а не просто переливку данных. Но учтите, что так делать не желательно, первичный слой (raw, stage, history, как угодно называйте его) желательно должен соответствовать контракту данных (что в кафке лежит - в таком же виде и у вас в клике). Иначе можно оказаться в неприятной ситуации, например: потеря данных, если решили мат вьюхой фильтровать; потеря данных из-за изменения самих данных; данные в вашем клике будут отличаться от данных других потребителей кафки. Заливайте данные в клик, а внутри него уже преобразуйте, как вам нужно.