Kafka — это популярный способ потоковой загрузки данных в ClickHouse. ClickHouse имеет встроенный коннектор для этой цели — движок Kafka. Наши друзья из Cloudfare первоначально создали этот движок для ClickHouse. С тех пор движок Kafka был значительно переработан и теперь поддерживается разработчиками Altinity. Однако не всегда очевидно, как использовать его наиболее эффективным образом. Мы попытались восполнить этот пробел, проведя вебинар по Kafka, который прошел успешно. В этой статье мы собрали типичные вопросы, которые мы получаем по поводу использования движка Kafka. Надеемся, что наши рекомендации помогут избежать распространенных проблем.

Какую версию ClickHouse лучше использовать для движка Kafka?

В 2019 году было проделано много работы для того, чтобы сделать движок Kafka стабильным. Он был окончательно стабилизирован в версии 19.16.14 Altinity Stable, поэтому эта или любая более поздняя версия 19.16 ClickHouse подходит. Более старые версии могут иметь проблемы с согласованностью данных при потреблении данных из Kafka (потери и дубликаты).

Версии 20.x должны также корректно работать с Kafka, но убедитесь, что эти версии сертифицированы как Altinity Stable.

Как я могу использовать таблицу движка Kafka? Могу ли я напрямую выбирать данные из таблицы Kafka?

Движок Kafka предназначен для однократного получения данных. Это означает, что как только данные запрашиваются из таблицы Kafka, они считаются уже полученными из очереди. Поэтому вы никогда не должны выбирать данные из таблицы движка Kafka напрямую, а вместо этого использовать материализованное представление. Материализованное представление запускается, как только данные становятся доступными в таблице движка Kafka. Оно автоматически перемещает данные из таблицы Kafka в таблицу MergeTree или Distributed engine. Таким образом, вам нужны как минимум 3 таблицы:

  • Исходная таблица движка Kafka

  • Destination table — место назначения (семейство MergeTree или Distributed)

  • Материализованное представление для перемещения данных

Вот типичный пример:

-- Consumer
CREATE TABLE test.kafka (key UInt64, value UInt64)
    ENGINE = Kafka
    SETTINGS kafka_broker_list = 'kafka1:19092',
             kafka_topic_list = 'my_topic',
             kafka_group_name = 'my_conumber_group_name',
             kafka_format = 'JSONEachRow',
             kafka_max_block_size = 1048576;
-- Destination table
CREATE TABLE test.view (key UInt64, value UInt64)
    ENGINE = MergeTree()
    ORDER BY key;
-- Materialized View to move the data from a Kafka topic to a ClickHouse table
CREATE MATERIALIZED VIEW test.consumer TO test.view AS
SELECT * FROM test.kafka;

Иногда необходимо применять различные преобразования к данным, поступающим из Kafka, например, для хранения необработанных данных и агрегатов. В этом случае можно иметь несколько материализованных представлений, привязанных к одной таблице движка Kafka, или каскадные материализованные представления. К сожалению, использование некоторые или каскадные материализованные представления менее надежно, поскольку операции вставки (inserts) не являются атомарными. В случае сбоя вы получите непоследовательное состояние. Атомарные операции материализованных представлений запланированы на 2020 год.

Как я могу узнать координаты потребленного сообщения?

Таблица движка Kafka содержит следующие виртуальные столбцы:

  • _topic String

  • _key String

  • _offset UInt64

  • _partition UInt64

  • _timestamp Nullable(DateTime)

Виртуальные колонки не следует создавать в таблице движка Kafka, поскольку они доступны автоматически.

Как изменить настройки таблицы движка Kafka?

Чтобы изменить настройки, необходимо удалить и заново создать таблицу Kafka. ALTER TABLE MODIFY SETTINGS для таблиц движка Kafka запланирован на 2020 год.

Как я могу использовать таблицу движка Kafka в кластере?

Наилучшей практикой является создание таблицы Kafka engine на каждом сервере ClickHouse, чтобы каждый сервер использовал некоторые разделы и отправлял строки в локальную таблицу ReplicatedMergeTree. Обратите внимание, что все таблицы движка Kafka используют одно и то же имя группы потребителей, чтобы параллельно была одна и та же тема.

Если количество потребителей (серверов ClickHouse с таблицами движков Kafka) больше, чем количество разделов в теме, некоторые потребители ничего не будут делать. В теме должно быть достаточно разделов Kafka, чтобы тема могла параллельно использоваться несколькими серверами ClickHouse.

Другая возможность заключается в пересылке данных из таблицы движка Kafka в таблицу Distributed (распределенная). Однако это требует более тщательной настройки. В частности, таблица Distributed должна иметь некоторый ключ шардирования (рандомный хэш). Это необходимо для того, чтобы дедупликация (устранение повторов) ReplicatedMergeTree работала правильно. Distributed таблицы будут повторять вставки одного и того же блока, и они могут быть дедуплицированы ClickHouse.

Как настроить таблицу движка Kafka для достижения максимальной производительности?

Производительность одной таблицы зависит от размера строки, используемого формата, количества строк в сообщении и т.д. Одна таблица Kafka обычно может обрабатывать 60K-300K простых сообщений в секунду.

Чтобы достичь наилучшей производительности для одной таблицы, параметр 'kafka_max_block_size' должен быть увеличен до значений 512K-1M. Значение по умолчанию составляет 64K, что слишком мало. Мы собираемся решить эту проблему в следующих версиях.

Дальнейшие улучшения возможны, если одна тема потребляется несколькими серверами (“реплики” — точные копии) или несколькими таблицами движка Kafka на одном сервере.

В текущей реализации всегда следует использовать 'kafka_num_consumers = 1', так как увеличение не дает никаких улучшений — в настоящее время оно блокируется в одном потоке.

Вместо этого можно создать несколько таблиц движка Kafka, соединенных с соответствующими материализованными представлениями, которые будут перемещать данные в одну и ту же таблицу target (назначения). Таким образом, каждая таблица движка Kafka будет работать в отдельном потоке.

Надлежащая реализация многопоточности — это еще один пункт плана действий на 2020 год.

Какие параметры конфигурации можно настроить?

Следующие настройки определяют то, как движок Kafka потребляет сообщения:

kafka_max_block_size (по умолчанию 65536) — порог для фиксации блока в ClickHouse в количестве строк — настраивается на уровне таблицы

kafka_skip_broken_messages — количество ошибок, которые можно допускать при парсинге сообщений — настраивается на уровне таблицы

stream_flush_interval_ms (по умолчанию 7500) — порог фиксации блока в ClickHouse в миллисекундах - настраивается на уровне профиля пользователя; потенциально может влиять и на другие потоковые таблицы

kafka_max_wait_ms — таймаут ожидания при подтверждении сообщения - настраивается на уровне профиля пользователя

Существует также ряд опций, специфичных для библиотеки Kafka. Их можно добавить в отдельный раздел config.xml или лучше использовать в отдельном файле в config.d/. Список опций можно посмотреть здесь.

В целом, модель настроек сейчас странновата и нуждается в доработке.

Поддерживает ли движок Kafka аутентификацию на кластере Kafka? Наш кластер Kafka настроен как SASL_PLAINTEXT. Как мне указать имя пользователя и пароль для этого кластера?

Движок Kafka поддерживает аутентификацию. Это настройка на уровне сервера, которая должна быть внесена в config.xml. Вот пример конфигурации:

<kafka>
    <security_protocol>sasl_plaintext</security_protocol>
    <sasl_mechanism>PLAIN</sasl_mechanism>
    <sasl_username>test</sasl_username>
    <sasl_password>test</sasl_password>
    <debug>all</debug>
    <auto_offset_reset>latest</auto_offset_reset>
    <compression_type>snappy</compression_type>
</kafka>

Ознакомьтесь

Примечание для пользователей Docker: исходящие SSL-соединения не работали в docker image (образ) ClickHouse из-за отсутствия сертификатов. Проблема исправлена в версиях 19.16.19.85, 20.1.12, 20.3.9 и всех последующих.

Как я могу контролировать хранение данных в Kafka?

Пользователи обычно не удаляют данные из брокеров Kafka после их потребления. Это регулируется на стороне брокера (пример см. в этой статье). Вместо этого, как только сообщение потреблено, корректируется текущая позиция темы для данной группы потребителей. Это делает возможным потребление одних и тех же данных несколькими различными группами потребителей независимо друг от друга (т.е. можно передавать одни и те же данные в ClickHouse, в Hadoop и т.д.). Также существует возможность возвращать в начало или перематывать вперед позицию группы потребителей.

Как возвращать к началу или воспроизводить сообщения? Как перемотать вперед или пропускать сообщения?

Выполните следующую процедуру:

Шаг 1: Отсоедините таблицы Kafka в ClickHouse на всех узлах кластера.

Шаг 2: Запустите инструмент 'kafka-consumer-groups':

kafka-consumer-groups --bootstrap-server <kafkahost:port> --group <group_id> --topic <topic_name> --reset-offsets --to-earliest --execute

Существует несколько вариантов сброса; подробности см. в документации kafka-consumer-groups:

  • shift-by <positive_or_negative_integer>

  • to-current

  • to-latest

  • to-offset <offset_integer>

  • to-datetime <datetime_string>

  • by-duration <duration_string>

Шаг 3: Повторное присоединение таблиц kafka

Смотрите также параметры конфигурации:

<kafka>
    <auto_offset_reset>smallest</auto_offset_reset>
</kafka>

Как работать с неправильно сформированными сообщениями?

В целом, хорошо сформированные данные весьма желательны в ClickHouse. В зависимости от формата данных ClickHouse может лучше или хуже переносить проблемы в потоке.

ClickHouse имеет тенденцию использовать более быстрые, но менее надежные опции для разбора входных данных. Существует довольно много опций для изменения поведения парсера в тупиковых ситуациях, но тонкая настройка поведения сложна и не всегда возможна. Например, существует настройка 'input_format_skip_unknown_fields', применимая к формату JSONEachRow. Если она активирована на уровне профиля, ClickHouse будет разбирать JSON в сообщениях, содержащих больше полей, чем указано в определении таблицы.

Еще одна настройка — 'kafka_skip_broken_messages'. Она работает не для всех форматов и иногда может давать неожиданные результаты. Например, если одно сообщение содержит несколько строк, и одна из них неправильно сформирована, то будет пропущена именно эта строка, а не все строки в самом сообщении (как следует из названия параметра).

В случае с JSON / TSV и подобными текстовыми форматами есть возможность разобрать все поля как строки и привести их к соответствующим типам на уровне материализованного представления.

Здесь можно многое улучшить. В частности, ClickHouse мог бы помещать неправильно сформированные сообщения в отдельный поток со специальными виртуальными колонками, такими как _error и, возможно, _raw_message. Затем пользователь мог бы подключить материализованное представление, чтобы отфильтровать их или хранить отдельно.

Система с большим количеством таблиц движка Kafka генерирует таймауты. Что происходит?

Каждая таблица Kafka использует поток из пула потоков "background_schedule". Если таблиц Kafka слишком много, есть смысл увеличить значение параметра background_schedule_pool_size. Также важно следить за работой BackgroundSchedulePoolTask.

Кроме того, библиотека Kafka (librdkafka) создает по одному потоку на брокер,а а также сервисные потоки, поэтому общее количество потоков, используемых ClickHouse, может быть слишком большим, что приводит к большому количеству переключений контекста.

Для того чтобы уменьшить количество таблиц движка Kafka, потребляющих различные темы, можно использовать одну таблицу движка Kafka и позволить нескольким материализованным представлениям фильтровать данные по виртуальному столбцу '_topic'.

В данных отсутствует определенная информация при ее получении из таблицы Kafka

Этого не должно произойти, если используется рекомендованная версия ClickHouse. Это может произойти в более старых версиях, где было несколько ошибок, приводящих к отсутствию данных. Если вы столкнулись с этим в рекомендуемой версии или выше, пожалуйста, свяжитесь с нами и сообщите об ошибке.

При потреблении данных из таблицы Kafka возникают их дубликаты.

Дубликаты теоретически возможны, поскольку в данном случае обеспечивается at-least-once contract (контракт “хотя бы один раз”). Этого не должно произойти при нормальных обстоятельствах, если используется рекомендованная версия ClickHouse. Однако это может произойти в редких тупиковых ситуациях. Например, если данные были вставлены в ClickHouse, а сразу после этого соединение с брокером Kafka было потеряно, то ClickHouse не смог зафиксировать новое смещение.

Обратите внимание, что дубликаты были досадной проблемой в старых версиях, поэтому использование рекомендованной версии ClickHouse является обязательным.

Когда что-то не работает, как я могу устранить неполадки?

В этот момент наиболее информативными источниками являются журнал трассировки ClickHouse и журнал отладки (debug) для библиотеки Kafka (librdkafka), который можно активировать в config.xml следующим образом:

<kafka>
   <debug>all</debug>
</kafka>

Журналы библиотеки librdkafka записываются в stderr.log.

Какие есть альтернативы движку Kafka?

Для пользователей Java вполне естественно создавать группы потребителей и передавать темы Kafka в ClickHouse с помощью JDBC-драйвера ClickHouse. Это дает максимальный контроль.

Вот некоторые другие инструменты, которые могут быть использованы для интеграции Kafka и ClickHouse:

Заключение и план действий

ClickHouse Kafka engine — это отличная функциональная часть, которая позволяет легко интегрировать ClickHouse с Apache Kafka. Его с успехом используют многие пользователи ClickHouse. Однако в мире нет ничего идеального, и есть много возможностей для улучшений. Именно отзывы пользователей помогают нам совершенствовать ClickHouse и движок Kafka. У нас есть четкий план, как сделать движок Kafka еще лучше, надежнее и проще в использовании. Мы рады поделиться с вами этим планом и приветствуем любые предложения:

  • Удобство в использовании:

  • Многопоточный потребитель

  • Настройка парсера для конкретного формата

  • Поддержка заголовков Kafka в качестве виртуальных колонок

  • Изменение настроек таблицы

  • Улучшенная обработка ошибок

  • Согласованная конфигурация (сервер, профиль, уровень таблиц)

  • Поддержка семантики "только однажды" (EOS) после выхода версии 1.5 библиотеки rdkafka от Confluent.

  • Инструменты интроспекции для упрощения мониторинга и устранения неполадок:

  • Таблица system.kafka для мониторинга потребителей

  • Метрики, связанные с движком Kafka, в system.metrics

  • Перенаправление журналов Kafka в журналы ClickHouse (вместо stderr).

Существует также список проблем, которые можно найти по метке comp-kafka в репозитории ClickHouse GitHub.

ClickHouse — это весело, а ClickHouse с Kafka — весело вдвойне! Оставайтесь с нами!


Материал подготовлен в рамках курса «Нагрузочное тестирование». Если вам интересно узнать больше о курсе, формате обучения и программе, познакомиться с преподавателем — приглашаем на день открытых дверей онлайн. Регистрация здесь.

Комментарии (0)