Введение

При масштабной работе с Apache Kafka вы рано или поздно столкнетесь с проблемой доступного дискового пространства, темпами роста тем или вопросами использования диска. Это особенно актуально при работе с внешними источниками - вне вашего прямого контроля и настройки жестких ограничений по размеру в вашей конфигурации хранения.

Инструмент kafka-log-dirs позволяет определить текущий размер и расположение всех доступных разделов в вашем кластере Kafka. Из-за своей природы этот инструмент представляет собой лишь снимок текущего состояния без какой-либо истории, и пользоваться им неудобно

Хотите знать, насколько неудобно? Вот фрагмент вывода:

Мы хотим превратить приведенный выше вывод JSON во что-то более понятное и читаемое, например:

Итак, давайте разберемся, как можно использовать Apache NiFi, Apache Kudu и Tableau для получения настраиваемой визуализации и практических сведений о вашем кластере Kafka.

Архитектура

На высоком уровне мы собираемся использовать Apache NiFi для сбора, преобразования и приема данных, предоставляемых инструментом kafka-log-dirs. Мы будем использовать Apache Kudu для хранения данных, Apache Impala для их анализа и Tableau для визуализации.

Используемые версии

В данной статье используются указанные компоненты следующих версий:

  • Kafka 2.0

  • Nifi 1.9.0

  • Kudu 1.8.0

  • Impala 3.1.0

  • Tableau Desktop 2020.1.7

Создание таблицы Kudu

Прежде чем начать с какого-либо потока NiFi, нам сначала нужно убедиться, что таблица, в которую мы будем записывать данные из инструмента kafka-log-dirs, доступна в Kudu и Impala.

Мы создадим таблицу посредством Impala, выполнив следующую команду: 

CREATE TABLE default.monitoring_kafka (
  broker INT,
  log_dir STRING,
  partition_name STRING,
  processed_at_utc TIMESTAMP,
  size DECIMAL(18,6),
  offset_lag INT,
  is_future BOOLEAN,
  PRIMARY KEY (broker, log_dir, partition_name, processed_at_utc)
)
PARTITION BY HASH(broker, log_dir, partition_name) PARTITIONS 4,
RANGE(processed_at_utc)
(
    PARTITION '2020-01-01' <= values < '2020-07-01',
    PARTITION '2020-07-01' <= values < '2021-01-01',
    PARTITION '2021-01-01' <= values < '2021-07-01',
    PARTITION '2021-07-01' <= values < '2022-01-01',
    PARTITION '2022-01-01' <= values < '2022-07-01',
    PARTITION '2022-07-01' <= values < '2023-01-01'
)
STORED AS KUDU
TBLPROPERTIES ('kudu.num_tablet_replicas' = '3',
'kudu.master_addresses' = 'staging-kudu1:7051');

Мы будем использовать хэш-секционирование для брокера столбцов, log_dir и partition_name в сочетании с секционированием диапазона по шкале времени. В зависимости от количества ожидаемых вносимых данных и ожидаемого объема запросов при анализе данных ваша стратегия секционирования может различаться по количеству разделов и диапазонам на раздел.

NiFi

Наш поток NiFi разбит на три части:

  • Ввод: получение данных о текущих размерах разделов с помощью инструмента kafka-log-dirs.

  • Преобразование: цель состоит в том, чтобы очистить и преобразовать входные данные в формат на основе записей, который позволяет легко выполнить операцию вставки в Kudu. Это делается в три простых шага:
    - Удаление шума bash
    - Сглаживание JSON (актуальные данные, которые нас интересуют)
    - Добавление метки времени now()

  • Вывод: вставка строк в Kudu.

Для простоты мы не будем здесь останавливаться на обработке отказов. Пути отказов либо автоматически завершаются, либо заканчиваются воронкой.

Выполнение Kafka-Log-Dirs

Процессор NiFi ExecuteProcess позволяет выполнять команды на уровне ОС, как если бы вы использовали интерфейс командной строки. Свойство Command указывает фактическую команду, которую вы выполняете. Любые аргументы входят в свойство Command Arguments. Оба свойства поддерживают реестр переменных, который мы собираемся использовать.

Мы хотели бы получить размер раздела для всех существующих тем Kafka, поэтому нам нужно выполнить в ExecuteProcessor следующее:

kafka-log-dirs --describe --bootstrap-server <list-of-your-kafka-brokers:port>

Эта команда будет разделена на оба упомянутых свойства следующим образом:

Команда: kafka-log-dirs

Ее аргументы:

--describe --bootstrap-server ${kafka_broker}

Если вы хотите ограничить вывод определенным списком тем, просто добавьте в свойство Command Arguments аргумент --topic-list, за которым следует список ваших тем, разделенных запятыми:

--topic-list <myTopic1>,<myTopic2>

Убедитесь, что этот процессор выполняется только на основном узле. В противном случае каждый узел выполнит инструмент kafka-log-dirs и перешлет результат соседнему процессору, что приведет к дублированию. А пока мы будем запускать процессор каждые 15 минут. Вы можете это планировать в соответствии со своими потребностями.

Вот результат работы нашего процессора:

Устранение шума в выводе Bash

Инструмент kafka-log-dirs выводит не только размеры разделов в массиве JSON, но также некоторую информацию журнала, как показано выше. Но поскольку нас интересует только JSON и фактическая информация, которую он дает, мы можем воспользоваться тем фактом, что шум и фактический результат JSON четко разделены в разных строках. Таким образом, мы можем использовать процессор RouteText для удаления шума.

Если вы следуете приведенной ниже конфигурации, этот процессор отбросит все строки, которые не начинаются с "{". Все совпавшие строки перенаправляются по пути valid_json.

Примечание: это всего лишь один подход, могут быть способы получше. 

Вот результат работы нашего процессора:

Подготовка JSON

Вот где происходит волшебство. Теперь, когда у нас есть вложенный вывод JSON, нам нужно упаковать его в "плоский" формат. Процессор JoltTransformJSON - инструмент, который мы выбрали для данного сценария использования. Кроме того, для удобства использования в Tableau мы преобразуем размер раздела из байтов в ГБ. Просто потому, что 237,98 ГБ легче читать и интерпретировать, чем 237898518923 байта.

Вот полное преобразование Jolt, которое мы применим к входящему JSON: 

[
  { // convert size from bytes to GB
    "operation": "modify-overwrite-beta",
    "spec": {
      "brokers": {
        "*": {
          "logDirs": {
            "*": {
              "partitions": {
                "*": {
                  "size": "=divideAndRound(6,@(1,size),1073741824)"
                }
              }
            }
          }
        }
      }
    }
  },
  { // extract all values in independent arrays
    // e.g. "log_dir" : [ "/data/01", "/data/02"],
    // "partition_name" : [ "CustomerEvent-0", "CustomerTransaction-1"],
    // "size" : [ 0.009356, 0.042702 ],
    "operation": "shift",
    "spec": {
      "brokers": {
        "*": {
          "logDirs": {
            "*": {
              "partitions": {
                "*": {
                  "partition": "partition_name[]",
                  "size": "size[]",
                  "offsetLag": "offset_lag[]",
                  "isFuture": "is_future[]",
                  "@(4,broker)": "broker[]",
                  "@(2,logDir)": "log_dir[]"
                }
              }
            }
          }
        }
      }
    }
  },
  { // transpose/pivot/merge/younameit the individual arrays into separate arrays, 1 array per partition
    "operation": "shift",
    "spec": {
      "broker": {
        "*": {
          "*": {
            "$": "[&2].broker"
          }
        }
      },
      "log_dir": {
        "*": {
          "*": {
            "$": "[&2].log_dir"
          }
        }
      },
      "partition_name": {
        "*": {
          "*": {
            "$": "[&2].partition_name"
          }
        }
      },
      "size": {
        "*": {
          "*": {
            "$": "[&2].size"
          }
        }
      },
      "offset_lag": {
        "*": {
          "*": {
            "$": "[&2].offset_lag"
          }
        }
      },
      "is_future": {
        "*": {
          "*": {
            "$": "[&2].is_future"
          }
        }
      }
    }
  }
]

Вот результат работы нашего процессора:

Добавление метки времени

К сожалению, инструмент kafka-log-dirs не предоставляет никакой информации о том, к какому времени и к какой дате относятся выходные данные. Если вы посмотрите на JSON без какого-либо дополнительного контекста, то не узнаете, к какой дате размер раздела относится: к сегодняшней, к прошлой неделе или даже к прошлому году. Поэтому мы просто добавим к потоковому файлу во время его обработки метку времени. Для этого есть несколько способов, но пока для выполнения данного требования мы будем использовать процессор QueryRecord и язык выражений Nifi.

Следующее выражение NiFi возвращает текущую метку времени в формате микросекунд (поскольку тип данных столбца Kudu - unixtime_micros)

${now():toNumber():multiply(1000)}

Это выражение Nifi можно напрямую использовать в операторе SQL для процессора:

SELECT
    broker
    ,${now():toNumber():multiply(1000)} processed_at_utc
    ,log_dir
    ,partition_name
    ,size
    ,offset_lag
    ,is_future
FROM FLOWFILE

Добавьте новое свойство, назовите его result и добавьте оператор SQL. Свойство result является названием отношения, в которое после применения представленного оператора SQL отправляются все FlowFiles.

Record Reader: JsonTreeReader (поскольку входящие данные, предоставляемые преобразованием Jolt, имеют формат JSON)

Record Writer: AvroRecordSetWriter (мы стараемся использовать Avro везде, где это имеет смысл и применимо. JsonRecordSetWriter теоретически также будет работать в этом сценарии)

Вот результат работы нашего процессора:

Вставка в Kudu

Теперь, когда у нас есть окончательные данные на основе записей, мы можем напрямую загрузить их в нашу таблицу Kudu с помощью процессора PutKudu. Опять же, из-за наличия нескольких сред мы не указываем фиксированные имена серверов в свойстве Kudu Masters. Вместо этого мы используем переменную NiFi $ {kudu_masters}, которая хранит строку всех мастеров Kudu в зависимости от того, в какой среде настраивается поток (та же ситуация, что и в разделе Execute Kafka-Log-Dirs).

Имя таблицы Table Name: impala::default.monitoring_kafka (обратите внимание на префикс "impala ::", потому что таблица Kudu была создана с помощью Impala)

Операция вставки: UPSERT (Мы привыкли к гибкости и мощности команды upsert, которую во многих случаях используем как операцию вставки по умолчанию).

После запуска всех процессоров Nifi мы можем проверить вывод нашего потока Nifi, запросив таблицу Kudu через Impala (и Hue):

Таблица

 Теперь пришло время реально использовать инструмент kafka-log-dirs, проанализировав предоставленные данные. С помощью Tableau мы хотели бы получить ответы на два вопроса:

  1. Какие разделы Kafka самые большие в кластере, и как они распределены по узлам и дискам?

  2. Как со временем меняется размер раздела?

Чтобы ответить на каждый из этих вопросов, нужна разная визуализация. Ответ на первый можно красиво представить в виде тепловой карты, где каждый брокер Kafka показан в столбцах, а каждый диск (log_dir) отображается в строках. Размер раздела будет представлен цветом и размером блока. Второй вопрос - это, по сути, временной ряд в виде линейного графика с размером раздела по оси Y и временем по оси X.

Подключение к Impala

Прежде чем мы сможем что-либо визуализировать, нам нужно подключиться к нашей таблице. Удобно, что Tableau можно напрямую подключать к Cloudera (Impala и Hive2). Просто выберите Cloudera Hadoop, введите сервер, порт и выберите Impala. Введите соответствующие параметры безопасности в соответствии со своими настройками (Kerberos, LDAP, без аутентификации).

На следующем экране выберите правильную схему (по умолчанию) и перетащите таблицу monitoring_kafka. Убедитесь, что типы данных правильно интерпретируются Tableau, и при необходимости скорректируйте их. Здесь вы также можете переименовать столбцы для более удобного использования в визуализации. В нашем примере мы переименуем Processed At Utc в Timestamp UTC.

Теперь перейдите в раздел рабочего листа.

Прежде чем, наконец, начать, нужно  исправить одну небольшую проблему. Поскольку брокер столбца имеет тип INTEGER, Tableau интерпретирует его как метрику, хотя брокер для нас является dimension. Мы не хотим суммировать идентификаторы брокеров, а хотим разделить данные по идентификаторам брокера.

Просто щелкните правой кнопкой мыши Broker и преобразуйте его в dimension:

Временной ряд размеров разделов Kafka

Теперь перетащите метку времени Timestamp UTC в столбцы Columns, перетащите размер Size в строки Rows (выберите MAX в качестве меры) и перетащите имя раздела Partition Name на цвет Color. Кроме того, вы можете добавить фильтр на основе MAX (размер). В приведенном ниже примере этот фильтр установлен как минимум на 0,5, то есть все разделы, размер которых меньше 0,5 ГБ, будут отфильтрованы. Это означает, что все небольшие темы, такие как темы внутреннего смещения Kafka, из графика исключаются.

Вот последний график временных рядов, который показывает эволюцию наших разделов Kafka с течением времени. Если этот график определяет тему, которая постоянно растет с течением времени, вы можете адаптировать для этой темы стратегию хранения, например, снизить время хранения (retention.ms) или применить хранение фиксированного размера (retention.bytes).

Тепловая карта разделов Kafka

Тепловая карта покажет вам, равномерно ли распределены разделы между вашими брокерами и дисками, или вы столкнулись с некоторым "перекосом" в данных. Сначала перетащите Broker в столбцы Columns, Log Dir в Rows, , Partition Size в Color AND Label, Size в Size (выберите MAX в качестве меры) и Timestamp UTC в Pages (применение отметки времени Timestamp UTC на страницах позволит вам перейти на любой момент времени. Если вы хотите получить текущее состояние тепловой карты, просто переместите ползунок [под легендой названия раздела Partition Name] вправо или выберите самую последнюю временную метку из раскрывающегося списка.)

Какие выводы можно из этого извлечь? Тепловая карта показывает фактическое местоположение каждого раздела в кластере. Приведенный выше пример демонстрирует, что разделы хорошо распределены между брокерами, но диски используются неравномерно. /data/06 используется только в брокерах 194,195 и 196 и совсем не используется в брокере 197. С другой стороны, на /data/07 у каждого брокера есть два раздела среднего размера (показаны зеленым и оранжевый). Таким образом, одним из результатов этого визуального анализа может быть перемещение одного раздела из /data/07 в /data/06 на каждом брокере. Это можно сделать с помощью инструмента kafka-reassign-partitions.

Вживую наблюдайте за перемещением разделов!

Замечательно, что когда вы фактически переназначаете разделы Kafka после включения мониторинга, это позволяет наблюдать, как инструмент kafka-reassign-partitions перемещает разделы с одного диска на другой. Вы можете сократить планирование в процессоре NiFi Execute Kafka-Log-Dirs до 1 минуты или даже 30 секунд вместо 15 минут, что даст вам более тонкую детализацию из-за более высокой частоты приема данных. Далее вы можете наблюдать повторение нашего последнего перемещения раздела.

Перемещение трех разделов на пустой диск, по одному разделу за раз:

Здесь вы можете легко увидеть, что Kafka сохраняет исходный раздел, пока зеркалирует раздел на другой диск. Только после завершения зеркалирования старый раздел сразу уничтожается.

Итоги

Мониторинг вашего кластера Kafka - важная часть его масштабной работы с ним, и ключевую роль играет действенный анализ. Чем раньше вы сможете обнаружить потенциальные недостатки, тем больше облегчите жизнь операторам и администраторам. С помощью NiFi, Kudu и Tableau можно легко управлять мониторингом Kafka. Хотя Tableau и Kudu можно заменить другими технологиями и инструментами, NiFi снова оказался универсальным средством, "швейцарским армейским ножом" для произвольной интеграции данных. В стеке нашей платформы Cloudera Data Platform мы предоставляем готовый инструмент для комплексного мониторинга всех вспектов работы кластера Кафка - Streams Messaging Manager.