Привет, Хабр! Доводилось ли вам тратить долгие бесплодные часы в попытке настроить коннекторы Kafka Connect, чтобы добиться адекватного потока данных? Мне, к сожалению, доводилось. Представляю вашему вниманию перевод статьи "How to Tune Kafka Connect Source Connectors to Optimize Throughput" автора Catalin Pop. Это прекрасное руководство от Confluent, где подробно и с примером описывается, как настроить Source коннекторы.
Kafka Connect – это инструмент интеграции данных с открытым исходным кодом, который упрощает процесс потоковой передачи данных между Apache Kafka® и другими системами. Kafka Connect имеет два типа коннекторов: Source коннекторы и Sink коннекторы. Source коннекторы позволяют считывать данные из различных источников и записывать их в топики Kafka. Sink коннекторы отправляют данные из топиков в другую конечную точку. В этой статье обсуждается, как настроить Source коннекторы, чтобы обеспечить максимальную пропускную способность ваших вычислительных ресурсов. Данная статья посвящена следующим темам:
Общий обзор того, что можно и что нельзя настроить
Настройка коннекторов
Какие конфигурационные параметры Producer можно изменить
Какие JMX метрики стоит мониторить
Пример пошаговой настройки
Что вы можете настроить? Общий обзор
Когда речь идет о настройке Source коннекторов, необходимо понимать, как работают коннекторы. Для начала давайте рассмотрим пример JDBC Source коннектора, который разбит на три секции:
У нас есть конечная точка/источник, из которого мы извлекаем записи (в нашем примере - база данных), а затем Kafka Connect, который содержит следующее:
Коннектор – возвращает некоторое число записей, полученных из источника.
Конвертер – преобразует записи в выбранный тип данных, например Avro.
Преобразования (трансформации) – применяет простые преобразования сообщений (Simple Message Transformations – SMT), которые были определены в конфигурациях коннектора.
Producer – отправляет записи в топик Kafka.
Наконец, следует Kafka, которая принимает записи продюсера.
Теперь, когда мы рассмотрели основы Kafka Connect, давайте обсудим, что нельзя настроить:
Конвертер – преобразование записи в Avro/Protobuf/JSON всегда занимает определенное количество времени.
Трансформации – хотя преобразование происходит очень быстро, для его применения всегда требуется определенное время. Если вы хотите избежать этой задержки, вы можете не использовать трансформации.
То, что можно настроить, включает в себя четыре компонента:
Коннектор – мы можем использовать конфигурации коннектора для дальнейшей настройки.
Producer – определенные конфигурационные параметры, такие как
batch.size
можно использовать для дальнейшего увеличения пропускной способности.Источник данных – не рассматривается, поскольку зависит от того, какой источник вы выберете.
Сама Kafka – не рассматривается в этой статье, однако, вы можете ознакомиться со следующим руководством, чтобы узнать больше.
Настройка коннекторов
Настройка коннектора зависит от того, какие конфигурационные параметры он имеет. Если коннектор не предоставляет никаких конфигураций для получения большего количества данных от источника, то коннектор не может быть настроен для достижения большей пропускной способности.
Например, Confluent JDBC Source коннектор имеет следующие конфигурации, предоставляемые коннектором:
batch.max.rows
– максимальное количество строк, которое можно включить в один пакет при запросе новых данных.poll.interval.ms
– интервал в миллисекундах для запроса новых данных в каждой таблице.
Мы можем заставить коннектор возвращать продюсеру больше записей, увеличив batch.max.rows
. Если бы коннектор не предоставлял эту конфигурацию и имел статическое значение 5 записей в качестве batch.max.rows
, то наибольшее количество записей, которое коннектор может получить по каждому запросу, составляло бы всего 5 записей. Коннектор может быть настроен только в соответствии с его открытыми параметрами конфигурации.
Какие конфигурационные параметры Producer можно изменить
Когда речь заходит о настройке Producer, необходимо рассмотреть несколько распространенных конфигураций:
batch.size
– указывает максимальный размер пакета в байтах (по умолчанию 16384).linger.ms
– указывает максимальную продолжительность заполнения пакета в миллисекундах (по умолчанию 0).buffer.memory
– общее количество байт памяти, которое продюсер может использовать для буферизации записей, ожидающих отправки на сервер (по умолчанию 33554432).compression.type
– указывает конечный тип сжатия для данного топика (по умолчанию "producer").
Конфигурация batch.size
должна соответствовать количеству записей, возвращаемых коннектором. Простой пример: если коннектор возвращает 500 записей, размер пакета следует задать с помощью следующего уравнения:
batch.size = number_of_records * record_size_average_in_bytes
Если, например, коннектор возвращает 500 записей каждый раз, когда он извлекает данные из базы данных, и каждая запись имеет размер 1,76KiB, тогда уравнение будет таким:
(500*1.76) * 1024 = 901,120
(* Умножается на 1024 для преобразования KiB в байты)
Конфигурация linger.ms
зависит от того, сколько записей коннектор возвращает продюсеру. При наличии 10 записей продюсеру может потребоваться подождать всего 5 мс, однако при возврате 100 000 записей время ожидания linger.ms
должно быть увеличено. Причина увеличения linger.ms
в том, что пакету потребуется больше времени для заполнения. Если для конфигурации linger.ms
установлено слишком низкое значение, это приведет к тому, что во многих запросах пакетам не будет достаточно времени для заполнения. Обратная ситуация может возникнуть для linger.ms
, если значение слишком велико, тогда продюсер ожидает без необходимости, и пакеты отправляются медленнее.
Конфигурация buffer.memory
представляет общее количество байт памяти, которое продюсер может использовать для буферизации записей, ожидающих отправки на сервер. Этот параметр должен примерно соответствовать общему объему памяти, который будет использовать продюсер, но он не является жестко привязанным, поскольку не вся память, используемая продюсером, используется для буферизации. Если, например, продюсер Kafka не может отправлять сообщения (пакеты) Kafka-брокеру (скажем, брокер не работает). Продюсер начнет накапливать пакеты сообщений в буферной памяти (по умолчанию 32 МБ). Когда буфер будет заполнен, он будет ждать max.block.ms
(по умолчанию 60 000 мс), чтобы буфер мог быть очищен. Если буфер не будет очищен, то продюсер выбросит исключение. Если значение buffer.memory
установлено слишком низко, буфер будет мгновенно заполняться и выбрасывать исключение. Хотя может произойти и обратное, когда слишком большое значение buffer.memory
может вызвать исключение OOM, если память в ОС будет исчерпана.
И наконец, последняя конфигурация – compression.type
, которая может быть включена для сжатия сообщений перед их отправкой. У каждого типа сжатия есть множество плюсов и минусов, поэтому вам придется провести личное исследование, чтобы определить, какой тип сжатия лучше всего подходит для вашего случая использования. Следующий KIP содержит дополнительную информацию о производительности каждого типа сжатия. Хотя сжатие отлично подходит для уменьшения размера сообщений, его использование увеличивает время доставки сообщений, так как сообщения должны быть сжаты.
Какие JMX метрики стоит мониторить
JMX-метрики следует разделить на три секции: метрики коннектора, метрики брокера и метрики продюсера (из Kafka Connect фреймворка).
Уровень |
Метрика |
Описание |
Почему это полезно? |
---|---|---|---|
Коннектор |
|
До применения трансформаций это среднее количество записей в секунду, производимых или запрашиваемых у источника задачей (task), принадлежащей указанному Source коннектору в рабочем процессе (worker). |
Сообщает нам о среднем количестве записей в секунду, производимых до трансформаций. |
Коннектор |
|
Среднее время в миллисекундах, затрачиваемое этой задачей на запрос пакета записей источника. |
Метрика, которая может сказать вам, сколько времени потребуется, чтобы запись была возвращена источником. |
Коннектор |
|
После применения трансформаций это среднее количество записей в секунду, полученных в результате трансформаций и записанных в Kafka, для задачи (task), принадлежащей именованному Source коннектору в рабочем процессе (worker) (исключая записи, отфильтрованные трансформациями). |
Полезно при применении трансформаций для определения их влияния на производительность. |
Брокер |
|
Скорость поступления байтов от клиентов. |
Полезно видеть пропускную способность по каждому топику, чтобы убедиться, что пропускная способность увеличилась. |
Продюсер |
|
Средний размер записи. |
Метрика, используемая для расчета |
Продюсер |
|
Среднее количество байт, отправляемых в партицию за один запрос. |
Проверьте, что |
Продюсер |
|
Среднее количество записей за запрос. |
Проверьте, сколько записей отправляется в каждом пакете продюсера. |
Продюсер |
|
Среднее количество записей, отправляемых в секунду в топик. |
Используется как индикатор того, что коннектор нуждается в дальнейшей настройке. |
Пример пошаговой настройки
В этом примере показано, как увеличить пропускную способность Confluent JDBC Source коннектора, подключающегося к базе данных MySQL (БД). Эта методология может быть применена к любому Source коннектору. Необходимо предпринять следующие шаги:
Определите, какие конфигурации коннектора/продюсера можно изменить, и настройте Grafana для отображения JMX метрик.
Определите исходную пропускную способность.
Определите, какие конфигурации следует изменить (продюсер vs. коннектор).
Измените конфигурацию продюсера .
Измените конфигурацию коннектора для повышения пропускной способности.
Шаг 1. Определите, какие конфигурации коннектора/продюсера можно изменить, и настройте Grafana для отображения JMX метрик.
Конфигурация JDBC Source коннектора:
batch.max.rows
– максимальное количество строк, которое можно включить в один пакетный запрос коннектора. Этот параметр может быть использован для ограничения объема данных, буферизованных коннектором (по умолчанию 100).poll.interval.ms
– интервал в миллисекундах для ожидания перед запросом новых данных для каждой таблицы (по умолчанию 5000).
Конфигурации продюсера:
batch.size
– указывает максимальный размер пакета в байтах (по умолчанию 16384).linger.ms
– указывает максимальную продолжительность заполнения пакета в миллисекундах (по умолчанию 0).buffer.memory
– общее количество байт памяти, которое продюсер может использовать для буферизации записей, ожидающих отправки на сервер (по умолчанию 33554432).compression.type
– указывает конечный тип сжатия для данного топика (по умолчанию не задан).
В этом примере используется kafka-docker-playground окружение, в котором скрипт mysql.sh
развертывает экземпляр MySQL, вставляет 10 миллионов записей в БД и запускает коннектор с Grafana, включив экспорт ENABLE_JMX_GRAFANA=true
.
Шаг 2 - Определите исходную пропускную способность
Во-первых, необходимо определить исходную пропускную способность при использовании стандартных конфигураций коннектора. Таким образом, вы сможете увидеть показатели JMX до и после, чтобы определить, как увеличить пропускную способность. Имейте в виду, что при настройке вам нужно выполнить настройку для одной задачи (task). После того как базовая пропускная способность определена для одной задачи, вы всегда можете увеличить task.max
позднее (если коннектор позволяет).
Ниже приведена конфигурация моего коннектора:
curl -X PUT \
-H "Content-Type: application/json" \
--data '{
"connector.class":"io.confluent.connect.jdbc.JdbcSourceConnector",
"tasks.max":"1",
"connection.url":"jdbc:mysql://mysql:3306/mydb?user=user&password=password&useSSL=false",
"mode":"bulk",
"producer.override.client.id": "mysql-base",
"query":"SELECT * FROM mydb.team WHERE mydb.team.id < 7900000 ",
"topic.prefix":"mysql-base"
}' \
http://localhost:8083/connectors/mysql-base/config | jq .
Базовые JMX-метрики Grafana:
Брокер
Коннектор
Продюсер
Шаг 3. Определите, какие конфигурации следует изменить (продюсер vs. коннектор).
Чтобы определить, какие конфигурации изменить, давайте суммируем наши метрики в простой таблице:
Метрика |
Значение |
---|---|
|
50.9 MB/s |
|
53.1K ops/s |
|
0.981 ms |
|
53.1K ops/s |
|
1.09 KiB |
|
15.1 KiB |
|
14.8 записей в среднем на запрос |
|
71.6K записей в среднем на запрос |
На основании вышеперечисленных метрик в первую очередь следует обратить внимание на метрики уровня продюсера. Основная причина, по которой мы решили сначала настроить продюсера, связана с JMX-метриками batch-size-avg
и records-per-request-avg
. Эти два значения должны насторожить, поскольку средний размер пакета составляет 15,1 KiB, а среднее количество записей на запрос -
14,8. Значение batch-size-avg
становится предельным, поскольку по умолчанию batch.size
равен 16384, и добавление еще одной записи невозможно, поскольку средний размер записи составляет 1,09 KiB. Если посмотреть на метрику records-per-request-avg
, в среднем отправляется 14,8 записей на запрос, однако коннектор возвращает 100 записей на каждый запрос из-за настройки batch.max.rows
. Это указывает на то, что коннектор возвращает на 100 записей больше, однако batch.size
ограничивается только ~15 записями.
Можно проверить количество записей, возвращаемых коннектором продюсеру, включив TRACE
уровень журналирования для AbstractWorkerSourceTask
:.
curl -s -X PUT -H "Content-Type:application/json" \
http://localhost:8083/admin/loggers/org.apache.kafka.connect.runtime.AbstractWorkerSourceTask \
-d '{"level": "TRACE"}' \
| jq '.'
Затем выполняется поиск в логах следующей строки:
About to send <integer> records to Kafka
Основываясь на метриках продюсера, первым компонентом, который необходимо настроить, будет продюсер.
С учетом вышесказанного можно спросить: «Как определить, что коннектор не является узким местом?» Когда нужно определить, является ли коннектор узким местом, обычно происходит один или оба следующих сценария:
Пропускная способность не увеличивается, даже если конфигурации продюсера были изменены.
При изменении конфигураций продюсера
send-rate
остается неизменной или не увеличивается.
Когда происходит один или оба вышеописанных сценария, это верный признак того, что коннектор нуждается в модификации. JMX-метрика send-rate
говорит нам о том, что продюсер делает в данный момент. Если send-rate
не увеличивается или остается относительно неизменным, это свидетельствует о том, что продюсер ждет записей от коннектора. Это означает, что коннектор является узким местом, поскольку продюсер находится в режиме "ожидания".
Шаг 4. Изменение конфигураций продюсера
Чтобы настроить продюсера, необходимо увеличить batch.size
. Для расчета размера пакета можно использовать следующее уравнение:
batch.size = number_of_records * record_size_average_in_bytes
Поскольку наш текущий batch.size
заполнен 100 записями, поступающими из коннектора, в этом примере я увеличу batch.size
и batch.max.rows
для дальнейшего увеличения пропускной способности. Причиной настройки также и batch.max.rows
является скорее личное предпочтение избегать перерасчета batch.size
во второй раз. Я знаю, что хочу получить больше данных из источника, поэтому предпочитаю настраивать и то, и другое одновременно. Эта модификация также устранит роль коннектора как узкого места, поскольку теперь мы будем запрашивать больше сообщений. Для параметра batch.max.rows
будет установлено значение 500 записей, и мы можем использовать приведенное выше уравнение для определения batch.size
. Чтобы определить средний размер пакета, обратите внимание на то, что batch-size-avg
указывается в KiB. KiB необходимо преобразовать в байты путем умножения на 1024
:.
batch.max.rows * record-size-avg * 1024(из-за KiB)
500 * 1.09 * 1024 = 558080
Сейчас конфигурация нашего коннектора будет обновлена, и вы должны заметить, что параметры batch.max.rows
и batch.size
увеличились. Еще раз, причина основана на JMX метрике batch.size
, поскольку мы знаем, что наш пакет заполнен 100 записями:.
curl -X PUT \
-H "Content-Type: application/json" \
--data '{
"connector.class":"io.confluent.connect.jdbc.JdbcSourceConnector",
"tasks.max":"1",
"connection.url":"jdbc:mysql://mysql:3306/mydb?user=user&password=password&useSSL=false",
"mode":"bulk",
"batch.max.rows": 500,
"producer.override.client.id": "mysql-batch",
"producer.override.batch.size": 558080,
"query":"SELECT * FROM mydb.team WHERE mydb.team.id < 7900000 ",
"topic.prefix":"mysql-batch"
}' \
http://localhost:8083/connectors/mysql-batch/config | jq .
После запуска коннектора вы получите следующий сводный результат метрик JMX:
Метрика |
Значение |
---|---|
|
61.0 MB/s |
|
61.5K ops/s |
|
4.41 ms |
|
61.4K ops/s |
|
1.09 KiB |
|
95.3 KiB |
|
95.9 записей в среднем на запрос |
|
60.8K записей в среднем на запрос |
На основании приведенных выше JMX-метрик можно заметить, что records-per-request-avg
не достигает количества записей, установленного в batch.max.rows
. records-per-request-avg
-
это индикатор того, что продюсеру необходимо ждать немного дольше, чтобы заполнить пакет. Когда происходит подобный сценарий, linger.ms
-
это параметр, который необходимо увеличить. linger.ms
может варьироваться в зависимости от вашего окружения, однако для этого примера я выберу простое небольшое число 10. Ниже приведена новая конфигурация коннектора:
curl -X PUT \
-H "Content-Type: application/json" \
--data '{
"connector.class":"io.confluent.connect.jdbc.JdbcSourceConnector",
"tasks.max":"1",
"connection.url":"jdbc:mysql://mysql:3306/mydb?user=user&password=password&useSSL=false",
"mode":"bulk",
"batch.max.rows": 500,
"producer.override.client.id": "mysql-batch-linger",
"producer.override.batch.size": 558080,
"producer.override.linger.ms": 10,
"query":"SELECT * FROM mydb.team WHERE mydb.team.id < 7900000 ",
"topic.prefix":"mysql-batch-linger"
}' \
http://localhost:8083/connectors/mysql-batch-linger/config | jq .
Обновленная сводка метрик JMX:
Метрика |
Значение |
---|---|
|
61.0 MB/s |
|
60.8K ops/s |
|
4.53 ms |
|
60.6K ops/s |
|
1.09 KiB |
|
543 KiB |
|
534 записей в среднем на запрос |
|
58.0K записей в среднем на запрос |
На данный момент среднее количество записей на запрос удовлетворено, однако показатели BytesInPerSec
и record-send-rate
не увеличились. Эти две метрики указывают на то, что проблема заключается в том, что коннектор недостаточно быстро отправляет записи продюсеру. Продюсер может отправлять больше записей, однако он не получает их достаточно быстро.
Шаг 5. Изменение конфигурации коннектора для повышения пропускной способности
На Шаге 4 мы настроили продюсера и определили, что коннектор является текущим узким местом, поскольку пропускная способность не увеличилась, а record-send-rate осталась относительно стабильной. Есть только две конфигурации, которые могут помочь увеличить пропускную способность со стороны коннектора:
batch.max.rows
– максимальное количество строк, которое можно включить в один пакет при запросе новых данных. Этот параметр может быть использован для ограничения объема данных, буферизованных коннектором.poll.interval.ms
– интервал в миллисекундах для запроса новых данных в каждой таблице.
Поскольку значение batch.max.rows
уже было изменено и пропускная способность не увеличилась, остается poll.interval.ms
. Этот параметр определяет частоту извлечения данных из новой таблицы, текущее значение равно 500. Это означает, что коннектор извлекает данные из таблицы только каждые 5 секунд.
Ниже приведены новые конфигурации коннектора, где для poll.interval.ms
установлено значение 1:
curl -X PUT \
-H "Content-Type: application/json" \
--data '{
"connector.class":"io.confluent.connect.jdbc.JdbcSourceConnector",
"tasks.max":"1",
"connection.url":"jdbc:mysql://mysql:3306/mydb?user=user&password=password&useSSL=false",
"mode":"bulk",
"poll.interval.ms": 1,
"batch.max.rows": 500,
"producer.override.client.id": "mysql-batch-linger-poll-interval-1",
"producer.override.batch.size": 558080,
"producer.override.linger.ms": 10,
"query":"SELECT * FROM mydb.team WHERE mydb.team.id < 7900000 ",
"topic.prefix":"mysql-batch-linger-poll-interval-1"
}' \
http://localhost:8083/connectors/mysql-batch-linger-poll-interval-1/config | jq .
Обновленная сводка метрик JMX:
Метрика |
Значение |
---|---|
|
88.2 MB/s |
|
88.1K ops/s |
|
19.3 ms |
|
87.7K ops/s |
|
1.09 KiB |
|
543 KiB |
|
534 записей в среднем на запрос |
|
80.3K записей в среднем на запрос |
На данный момент наша пропускная способность увеличилась на 57 % с первоначальных 50,9 МБ/с до 88,2 МБ/с. Основным узким местом в этом сценарии было то, что коннектор слишком медленно извлекал записи из источника.
Заключение
В этом примере скорость выросла с 50,9 МБ/с до 88,2 МБ/с, что означает увеличение пропускной способности на 57 %. При этом данный сценарий тестирования представляет собой сценарий "идеального мира", в то время как в реальности производственные среды не столь однозначны:
Обычно в таблицы баз данных не вставляют 10 миллионов строк размером ровно 1,09 килобайта.
Тестирование проводилось на одном инстансе EC2 с локальным запуском всех контейнеров Docker практически без задержек.
Демонстрационная БД не была настроена для использования всего ее потенциала.
Хотя большинство сценариев не настолько изолированы, как представлено в этом учебном примере, методология устранения неполадок остается неизменной:
Определите, является ли коннектор или продюсер узким местом.
Настройте продюсера, чтобы гарантировать, что пакеты заполняются.
Настройте коннектор на более быструю отправку сообщений продюсеру.