Введение
В какой то момент перед нашей командой встала непростая задача: объем данных для аналитики вырос до 300 миллионов строк в день. Прежние решения перестали справляться с такой нагрузкой, отчеты строились слишком медленно, а масштабировать существующую систему было дорого и сложно. Нужно было срочно находить новое решение для хранилища данных (DWH), способное глотать миллионы строк ежедневно и отдавать результат аналитических запросов практически мгновенно.
После оценки различных вариантов (классические СУБД, облачные DWH и др.) мы остановились на ClickHouse. Эта колоночная база данных открытого кода изначально создавалась для работы с большими объемами потока событий. ClickHouse славится впечатляющей скоростью агрегаций и фильтрации на терабайтах данных и отлично подходит для аналитики при больших нагрузках. В этой статье расскажем, как мы выбрали и внедрили ClickHouse в нашем проекте, построив систему сбора и анализа данных с нагрузкой сотни миллионов строк в сутки.
Поговорим об архитектуре (как данные летят из Kafka в ClickHouse), о двух подходах загрузки данных (пакетная и стриминговая), о том, какие табличные движки ClickHouse мы использовали и зачем, как нам помогли материализованные представления, об оркестрации процессов через Airflow и dbt. Отдельно разберем типичные ошибки, с которыми столкнулись в процессе, и поделимся улучшениями, которые планируем учесть при следующей реализации подобного решения.
Перед прочтением статьи советую ознакомиться с общей концепцией построения платформы BI-аналитики
Почему мы выбрали ClickHouse при 300 млн строк в день
Рост объема данных до сотен миллионов строк заставил нас пересмотреть подход к хранению и обработке. Традиционные реляционные базы (PostgreSQL, MySQL) работали отлично на меньших данных, но при таких масштабах начали сбоить: требовали дорогостоящих машин или шардирования, сложнее справлялись со сложными аналитическими запросами. Мы рассмотрели варианты вроде вертикального масштабирования классической СУБД, использования распределенных систем (например, Hadoop + Hive) или перехода на облачные решения (BigQuery, Amazon Redshift). Однако хотелось решение, которое можно развернуть on-premise, с открытым исходным кодом, при этом способное обрабатывать real-time поток данных.
ClickHouse оказался идеальным кандидатом по нескольким причинам:
Производительность на чтение: это колоночная СУБД, оптимизированная под аналитические запросы. Агрегации, фильтрация и сортировка по столбцам на миллиардах строк выполняются на порядки быстрее, чем в строко-ориентированных базах.
Горизонтальное масштабирование: ClickHouse поддерживает кластеризацию и шардирование «из коробки», что позволяет расти по мере увеличения данных.
Сжатие данных: данные эффективно компрессируются, снижая требования к хранению (что критично, когда каждый день прилетает 300 млн новых строк).
Встроенная интеграция с Kafka: важный фактор — ClickHouse умеет читать сообщения прямо из Kafka-топиков с помощью специального табличного движка, упрощая построение стримингового конвейера.
Материализованные представления: позволяют в режиме реального времени обрабатывать входящий поток и раскладывать данные по целевым таблицам, минимизируя необходимость в отдельном промежуточном слое ETL.
Сообщество и инструменты: вокруг ClickHouse уже сформировалось сообщество, появились инструменты вроде коннекторов, драйверов для Airflow, адаптер для dbt и интеграции с BI, что тоже сыграло роль.
Таким образом, выбор пал на ClickHouse как на ядро нового аналитического хранилища.
Общая архитектура: от Kafka до ClickHouse
Наш конвейер данных выстроен по типичному для event streaming паттерну. События (например, пользовательские действия, логи сервисов и т.д.) сначала попадают в Kafka, где временно накапливаются в виде сообщений. Далее эти данные оперативно перетекают в ClickHouse, выступающий в роли хранилища для последующего анализа.
Общая схема выглядит так:
Источники данных (веб-сервисы, приложения) публикуют события в Kafka в режиме реального времени.
Kafka-топики буферизируют поток событий. Kafka обеспечивает надежную доставку сообщений и может держать историю несколько дней, разгружая источники от прямой записи в базу.
ClickHouse считывает данные из Kafka и складывает их в колоночные таблицы для постоянного хранения.
Оркестрация: в процессе участвуют Airflow и dbt, которые помогают автоматизировать загрузку и дальнейшую обработку данных (например, формирование витрин, агрегатов).
Важно отметить, что для поглощения стриминговых данных в ClickHouse мы используем встроенную интеграцию: создается специальная Kafka-таблица (с движком Kafka), которая «подписывается» на нужный топик Kafka. Через материализованное представление поток из Kafka непрерывно записывается в основную таблицу ClickHouse.
Пакетная и потоковая загрузка данных
При проектировании системы мы реализовали два подхода загрузки данных: пакетный (batch) и потоковый (streaming). Оба подхода оказались востребованными в нашей архитектуре.
Потоковая (Streaming) загрузка: основной поток событий поступает непрерывно. Как только данные появляются в Kafka, они почти мгновенно записываются в ClickHouse через связку «Kafka‑таблица + материализованное представление». Это обеспечивает минимальную задержку: от появления события до его попадания в хранилище проходит обычно несколько секунд. Потоковый подход позволил нам строить почти real-time аналитику — например, видеть текущую активность пользователей на сервисе или оперативно считать метрики.
Пакетная (Batch) загрузка: хотя основной поток обрабатывается в режиме реального времени, у нас также есть процессы, работающие пакетно. Например:
Ежедневная загрузка справочных данных или результатов длительных вычислений.
Пересчет некоторых агрегатов за прошедший период.
Загрузка исторических данных (при бэкапах или миграциях).
Пакетная обработка организована через планировщик (Airflow): по расписанию запускаются задания, которые подтягивают необходимые данные (из файлового хранилища, внешних API или баз), а затем либо напрямую вставляют их в ClickHouse, либо обновляют существующие таблицы. Также batch-процессы используются для обслуживания хранилища: например, отложенная очистка старых разделов таблиц, перестроение материализованных представлений, оптимизация хранения (команда OPTIMIZE
в ClickHouse).
Таким образом, streaming-конвейер покрывает случаи, требующие минимальных задержек, а batch-процессы дополняют его там, где допустима периодическая обработка больших объемов.
Табличные движки в ClickHouse: какие и почему
В ClickHouse существует целое семейство табличных движков, и правильный выбор влияет и на производительность, и на удобство обработки данных. Мы остановились на следующих вариантах для разных задач:
-
MergeTree — основной столбцовый движок, лежащий в основе большинства наших таблиц. Он поддерживает сортировку и секционирование данных, что критично для быстрого чтения на больших объемах. Конкретно, мы используем его разновидности:
ReplacingMergeTree для таблиц, где возможны обновления/поздние данные. Этот движок позволяет «заменять» дубликаты по определенному ключу, сохраняя только последнюю версию записи.
SummingMergeTree для некоторых агрегирующих витрин, где нам достаточно суммировать числовые значения по ключу (например, предварительно агрегированные суточные метрики).
В иных случаях можно было бы задействовать CollapsingMergeTree, VersionedCollapsingMergeTree, AggregatingMergeTree — в зависимости от специфики данных, но в нашем проекте пока хватило возможностей Replacing и Summing.
Kafka Engine — специальный движок для временной таблицы, связанной с Kafka‑топиком. Такая таблица не хранит данные постоянно, а служит буфером‑представлением сообщений из Kafka внутри ClickHouse. Мы создали по Kafka‑таблице на каждый топик, откуда подтягиваем события.
Buffer — в некоторых сценариях использовали Buffer‑движок, который аккумулирует небольшие вставки и периодически сбрасывает их пачками в основную MergeTree‑таблицу. Это полезно при нагрузках с большим количеством мелких инсёртов: Buffer снижает накладные расходы, собирая множество маленьких записей в одну большую партицию перед записью.
Секционирование и сортировка: отдельно стоит упомянуть, как мы настроили PARTITION BY
и ORDER BY
в таблицах MergeTree. Для наших временных рядов (события с меткой времени) мы секционируем таблицы по дате (например, PARTITION BY toDate(event_time)
), а в сортировку включаем идентификаторы и временные метки (что-то вроде ORDER BY (user_id, event_time)
). Это обеспечивает:
Локализацию данных по дням для удобного управления хранением (можно легко отрезать старые разделы).
Быстрый выбор временных диапазонов данных конкретного пользователя или другого ключа благодаря сортировке по первичному ключу.
Правильно выбранные движки и схема таблиц позволили достичь высокой скорости загрузки и чтения. Но ключевую роль в реалтайм-конвейере сыграли материализованные представления.
Материализованные представления для потоковой загрузки
Материализованное представление (Materialized View) в ClickHouse — это мощный механизм, который мы использовали для автоматической обработки входящих данных. Идея проста: на лету преобразовывать поток событий и складывать их в целевую таблицу.
В нашем случае материализованное представление читает данные из Kafka-таблицы и записывает их в таблицу на движке MergeTree. Это происходит автоматически при поступлении новых сообщений. Ниже приведен упрощенный пример создания такой связки:
CREATE TABLE events (
user_id UInt32,
event_type String,
event_time DateTime,
processed_date Date DEFAULT today()
) ENGINE = MergeTree
PARTITION BY toDate(event_time)
ORDER BY (user_id, event_time);
CREATE TABLE kafka_events (
user_id UInt32,
event_type String,
event_time DateTime,
_timestamp UInt64 -- служебное поле Kafka (время сообщения)
) ENGINE = Kafka
SETTINGS kafka_broker_list = 'kafka:9092',
kafka_topic_list = 'events_topic',
kafka_group_name = 'events_consumer',
kafka_format = 'JSONEachRow',
kafka_num_consumers = 1;
CREATE MATERIALIZED VIEW consume_events TO events AS
SELECT
user_id,
event_type,
event_time,
today() AS processed_date
FROM kafka_events;
В этом примере мы создаем три объекта:
Таблица
events
на MergeTree для постоянного хранения событий.Таблица
kafka_events
на движке Kafka, которая "слушает" топикevents_topic
. Как только в топике появляются новые сообщения, при обращении к этой таблице они читаются.Материализованное представление
consume_events
, привязанное кkafka_events
и складывающее поступающие данные в таблицуevents
.
Такой подход позволяет полностью избавиться от внешних скриптов-потребителей Kafka: ClickHouse сам занимается выгрузкой из очереди и сохранением. Важно отметить, что материализованное представление читает из kafka_events
данными партиями (размер партий настраивается параметрами, например kafka_max_block_size
и др.). То есть события пишутся в ClickHouse тоже пачками, что эффективно с точки зрения производительности.
Нам также пришлось внимательно настроить некоторые детали интеграции:
Параллелизм потребления: мы увеличили количество разделов (partition) Kafka-топика и задали несколько потребителей (
kafka_num_consumers
), чтобы обеспечить параллельную загрузку с нужной скоростью.Обработка ошибок парсинга: если вдруг в Kafka придет сообщение неправильного формата, материализованное представление может "споткнуться". Мы реализовали отложенный контроль таких случаев, заведя отдельный "мёртвый" топик (Dead Letter Queue), куда ClickHouse отправляет проблемные сообщения, а мы их потом анализируем и исправляем.
Настройки retention в Kafka: ClickHouse фиксирует offset (позицию) прочтения Kafka. Если наш потребитель отстанет, Kafka какое-то время держит историю. Мы выставили достаточно большой retention (несколько дней) для подстраховки, чтобы при остановке загрузки данные не потерялись.
Оркестрация процессов: Airflow и dbt
Наша инфраструктура данных не ограничивается только прокачкой сырых событий из Kafka в ClickHouse. Поверх этого работают процедуры преобразования данных, поддержания актуальности витрин, контроль качества данных и т.д. Для автоматизации таких процедур мы используем связку Apache Airflow и dbt (Data Build Tool).
Apache Airflow выступает главным оркестратором. Мы настроили несколько DAG (Directed Acyclic Graph, сценариев из задач):
Один DAG отвечает за ежедневные задачи: бэкапы, снятие метрик, очистку старых данных. Например, каждый ночной запуск удаляет партиции старше N дней из ClickHouse, вызывая SQL-команду
ALTER TABLE ... DROP PARTITION
.Другой DAG следит за тем, что стриминговое потребление идет нормально: проверяет отставание потока (лаг) Kafka относительно ClickHouse. Если лаг начинает расти, DAG отправляет алерты и при необходимости перезапускает потребляющий контур.
Также Airflow инициирует batch-загрузки, о которых говорилось выше. Например, по расписанию запускается задача: вытянуть свежий справочник из внешней системы и обновить соответствующую таблицу в ClickHouse.
dbt мы применяем для управления схемой данных и трансформациями внутри ClickHouse. Хотя dbt больше известен в связке со Snowflake/BigQuery, уже есть адаптеры для ClickHouse. С помощью dbt мы:
Ведем версионируемые SQL-модели витрин и представлений. Проще говоря, описываем в репозитории, какие таблицы и с какими полями должны быть в DWH.
Определяем трансформации: например, создаем представления или агрегированные таблицы для аналитики. dbt-модель может содержать SELECT-запрос, а при деплое dbt превращает его в материальную таблицу в ClickHouse.
Управляем зависимостями: dbt сам определяет порядок, в котором нужно создавать объекты. Можно одной командой применить все изменения (
dbt run
), и инструмент последовательно создаст/обновит таблицы и представления в правильном порядке.Пишем тесты на данные: dbt позволяет задать проверки (уникальность ключей, отсутствие NULL в критичных колонках и т.д.). Наш Airflow DAG раз в день запускает
dbt test
, чтобы убедиться, что в данных нет аномалий.
Связка Airflow + dbt дала нам гибкость: инфраструктурный код (DAG) отделен от бизнес-логики трансформаций (SQL-модели в dbt). Это упростило командную работу: data-инженеры пишут DAG для загрузок, а аналитики и инженеры по данным описывают, какие витрины нужны, в виде dbt-моделей.
Подводные камни и ошибки на пути
Как и любой сложный проект, внедрение ClickHouse и стриминга принесло нам несколько уроков. Поделимся основными ошибками и граблями, на которые мы наступили:
Неправильный первичный ключ в MergeTree: сначала мы не до конца продумали
ORDER BY
. Из-за этого при загрузке образовывалось слишком много мелких кусков данных, и чтение было не оптимальным. Пришлось переразбить ключ, включив в него идентификатор пользователя и дату — так части объединяются эффективнее.Мелкие партиции и нагрузка на слияния: на старте мы секционировали таблицу по дате события, но при тестировании на высоком потоке обнаружилось, что внутри дня формируется очень много мелких частей (parts), которые потом асинхронно сливаются. Это создавало заметную нагрузку на диск. Решение: увеличить размер батчей вставки из Kafka (настроить
kafka_max_block_size
) и при необходимости использовать Buffer-движок для сглаживания мелких вставок.Отсутствие контроля задержек: сначала мы упустили мониторинг отставания потока. В какой-то момент из-за всплеска нагрузки ClickHouse не успевал потреблять Kafka в реальном времени, и лаг рос. Мы оперативно добавили в Airflow задачу, которая регулярно проверяет разницу между последним offset в Kafka и позицией, до которой дочитал ClickHouse, и шлет алерт при превышении порога. Так мы избежали потери данных из-за потенциального превышения retention в Kafka.
Изменение схемы данных: когда понадобилось добавить новые поля в события, мы поняли, что изменение схемы в системе с материализованным представлением — нетривиальная задача. Нужно менять структуру и Kafka-таблицы, и основной таблицы, и представления. Пару раз мы ловили рассинхрон (когда, скажем, столбец добавили в одну таблицу, но забыли в другой, и данные перестали корректно передаваться). После этого ввели практику вносить изменения через миграции dbt и тщательно тестировать их сначала на тестовом контуре.
Переоценка возможностей одного узла: сперва мы запускали ClickHouse на одном мощном сервере. Но при росте до сотен миллионов строк в сутки и одновременном выполнении сложных запросов стало ясно, что нужен кластер. В какой-то момент запросы начали работать медленнее из-за конкуренции за ресурсы (CPU, IO). Мы вовремя перешли на кластер из нескольких узлов ClickHouse, распределив данные по шардам, что повысило общую пропускную способность. В идеале стоило закладывать кластерную архитектуру с самого начала, учитывая прогноз роста данных.
Что бы мы сделали иначе (улучшения)
Оглядываясь назад, мы видим несколько вещей, которые можно было бы улучшить при повторной реализации подобного проекта:
Раннее планирование кластерности: как упомянуто выше, стоит изначально проектировать DWH на ClickHouse как кластерный, если ожидаются такие объемы. Перенос данных и перестройкав кластер «на лету» — непростая задача. Лучше сразу развернуть 2–3 узла и настроить Distributed-таблицы, чтобы масштабирование было прозрачным.
Оптимизация хранения и TTL: заранее продумать политику хранения данных. В нашем случае мы впоследствии добавили TTL для старых записей (автоудаление данных старше определенного срока). Если бы сделали это сразу, не пришлось бы чистить «вручную» и беспокоиться о переполнении дисков.
Гибкая схема топиков и версионирование событий: в процессе мы поняли, что полезно иметь версионирование схемы событий. Например, при существенном изменении формата сообщений — публиковать их в новый Kafka-топик (или раздел), чтобы старые и новые события обрабатывались параллельно без конфликтов. В следующий раз заложим эту практику, чтобы упростить миграции.
Мониторинг и алертинг с первого дня: мы добавили важные метрики и алерты не сразу, а лишь после первых проблем. Правильнее было бы с самого начала внедрить наблюдаемость: метрики скорости вставки, размеры лагов Kafka, время выполнения ключевых запросов, использование CPU/Memory на ClickHouse. Интеграция ClickHouse с Prometheus + Grafana могла быть сделана заранее, чтобы видеть узкие места до того, как они ударят в продакшене.
Тестирование на близких к реальным объемах: наш изначальный прототип обрабатывал порядок 10 млн строк в день, и многие проблемы (мелкие партиции, падение скорости при высокой конкурентной нагрузке) просто не проявились. Повторно делая проект, мы бы заложили нагрузочное тестирование под целевые 300 млн/день заранее, чтобы выявить и устранить узкие места (например, подобрать оптимальные настройки
max_partitions_per_insert_block
илиkafka_num_consumers
для Kafka Engine) до выхода в продакшн.
Заключение
Проект по внедрению ClickHouse в качестве высоконагруженного DWH оказался для нас серьезным вызовом, но принес ощутимые результаты. Мы получили систему, способную в реальном времени переваривать поток огромных объемов данных и мгновенно отвечать на сложные аналитические запросы. Интеграция с Kafka через материализованные представления показала себя надежным и удобным решением: минимум внешнего кода, максимум скорости. Оркестрация процессов с помощью Airflow и dbt дала гибкость в управлении данными и их преобразованиями, а использование Yandex DataLens упростило доступ к результатам для бизнеса.
Конечно, не обошлось без ошибок и уроков — мы поделились ими, чтобы коллегам было легче строить подобные системы. Главный вывод, который мы для себя сделали: планируйте на рост и закладывайте резерв производительности с самого начала. Поток данных имеет свойство расти, и лучше, когда ваша архитектура готова "съесть" и миллиард строк в день, если потребуется, не меняя кардинально свою структуру.
Надеемся, наш опыт будет полезен тем, кто собирается строить аналитику на больших данных с помощью ClickHouse. Проектируйте смело, тестируйте тщательно и не бойтесь осваивать новые инструменты — они себя оправдают, когда вы увидите на дашбордах результаты в режиме «здесь и сейчас». Удачи!