Привет, Хабр! Мы продолжаем цикл статей, посвященный Apache Flume. В предыдущей части мы поверхностно рассмотрели этот инструмент, разобрались с тем, как его настраивать и запускать. В этот раз статья будет посвящена ключевым компонентам Flume, с помощью которых не страшно манипулировать уже настоящими данными.


File Channel


В прошлой статье мы рассмотрели Memory Channel. Очевидно, что канал, использующий для хранения данных память, не является надежным. Перезапуск узла приведет к тому, что все данные, хранящиеся в канале, будут потеряны. Это не делает Memory Channel бесполезным, есть некоторые случаи, когда его использование очень даже оправдано в силу быстродействия. Однако для действительно надежной транспортной системы необходимо более устойчивое к неполадкам решение.

Таким решением является файловый канал — File Channel. Несложно догадаться, что этот канал хранит данные в файлах. При этом канал использует Random Access для работы с файлом, позволяя таким образом и добавлять и забирать события, сохраняя их последовательность. Для быстрой навигации канал использует систему меток (checkpoints), с помощью которых реализуется механизм WAL. Всё это, в общем-то, спрятано «под капотом» канала, а для его настройки используются следующие параметры (жирным шрифтом — обязательные параметры).
Параметр Описание По умолчанию
type
Реализация канала, должно быть указано file -
checkpointDir
Папка для хранения файлов с метками. Если не указана, канал будет использовать домашнюю папку Flume.
$HOME/...
useDualCheckpoints
Делать ли бекап папки с метками.
false
backupCheckpointDir
Папка для бекапов файлов с метками, нужно обязательно указывать, если useDualCheckpoints=true (разумеется, этот бекап лучше держать подальше от оригинала — например, на другом диске).
-
dataDirs
Список папок через запятую, в которых будут размещаться файлы с данными. Лучше указывать несколько папок на различных дисках для повышения производительности. Если папки не указаны, канал также будет использовать домашнюю папку Flume.
$HOME/...
capacity
Вместимость канала, указывается число событий.
1000000
transactionCapacity
Максимальное число событий в одной транзакции. Очень важный параметр, от которого может зависеть работоспособность всей транспортной системы. Подробнее об этом будет написано ниже.
10000
checkpointInterval
Интервал между созданием новых меток, в миллисекуднах. Метки играют важную роль при перезапуске, позволяя «перепрыгивать» участки файлов с данными при восстановлении состояния канала. В итоге канал не перечитывает файлы с данными целиком, что существенно ускоряет запуск при «забитом» канале.
30000
checkpointOnClose
Записывать ли метку при закрытии канала. Замыкающая метка позволит каналу восстановиться при перезапуске максимально быстро — но её создание займет некоторое время при закрытии канала (на самом деле, очень незначительное).
true
keep-alive
Таймаут (в секундах) для операции добавления в канал. Т.е., если канал забит, транзакция «даст ему шанс», выждав некоторое время. И если свободного места в канале так и не появилось, то транзакция откатится. 3
maxFileSize
Максимальный размер файла канала, в байтах. Значение этого параметра не определяет, сколько места может «откусить» ваш канал — оно задает размер одного файла с данными, а этих файлов канал может создать несколько. 2146435071 (2ГБ)
minimumRequiredSpace
Если на вашем диске меньше свободного места, чем указано в этом параметре, то канал не будет принимать новые события. В случае, если папки с данными расположены на нескольких дисках, Flume будет использовать 524288000 (500МБ)
Остальные настройки относятся к шифрованию данных в файлах канала и процессу восстановления (replay). Теперь пара слов о том, что нужно учитывать при работе с файловым каналом.

  • Убедитесь, что Flume имеет право записывать данные в папки.
    Или, если быть точнее, пользователь, от чьего имени запущен Flume, имеет права записи в папках для checkpoints и data.

  • SSD значительно ускоряет работу канала.
    На графике ниже показано время отправки пачки из 500 событий на узлы Flume, использующие файловые каналы. Один из узлов использует SSD для хранения данных канала, другой — SATA. Разница существенная.


    Если выполнить нехитрое деление, то получим, что узел Flume с файловым каналом на SSD может переваривать до 500/0.025 = 20000 событий в секунду (для справки — размер сообщений в данном примере около 1КБ, а канал использует для хранения только один диск).

  • Capacity канала очень чувствительна к изменениям.
    Если вы вдруг решили поменять вместимость вашего канала, то вас может ждать неприятный сюрприз — канал запустит replay для восстановления данных. Это значит, что вместо использования файлов checkpoints для дальнейшей навигации/работы с данными канал полностью пробежит по всем файлам с данными. Если в данных в канале много, процесс может занять приличное время.

  • Нештатная остановка канала может привести к потере данных.
    Это может произойти, если вы убили процесс Flume (или hard reset). А может и не произойти. На моей памяти у нас такое случалось всего один раз — файл с данными был «испорчен» и пришлось вручную удалить все файлы с данными канала (благо, каналы у нас не забивались и потерь удалось избежать). Таким образом, 100% надежности канал всё-таки не дает — всегда есть вероятность, что кто-то «дёрнет рубильник» и произойдет непоправимое. Что ж, если такое произошло и канал отказывается запускаться, ваши действия могут быть такими:

    1. Попробуйте удалить файлы меток (checkpoints) — в этом случае канал попытается восстановиться только по файлам с данными.

    2. Если предыдущий пункт не помог и канал пишет что-то в стиле «Unable to read data from channel, channel will be closed», значит файл с данными испорчен. Тут поможет только полная чистка всех папок с данными. Увы.

В качестве альтернативы File-Channel Flume предлагает еще несколько каналов — в частности, JDBC-channel, использующий в качестве буфера базу данных, и Kafka-channel. Разумеется, что для использования таких каналов нужно отдельно разворачивать базу данных и Kafka.

Avro Source и Avro Sink


Avro — это один из инструментов сериализации данных, благодаря которому источник и сток получили свои названия. Сетевое взаимодействие этих компонентов реализовано с помощью Netty. В сравнении с Netcat Source, рассмотренным в предыдущей статье, Avro Source обладает следующими преимуществами:

  • Может использовать заголовки в событиях (т.е. передавать вместе с данными вспомогательную информацию).

  • Может принимать информацию от нескольких клиентов одновременно. Netcat работает на обычном сокете и принимает входящие соединения последовательно, а значит, может обслуживать только одного клиента в момент времени.

Итак, рассмотрим настройки, которые нам предлагает Avro Source.
Параметр Описание По умолчанию
type
Реализация источника, должно быть указано avro. -
channels
Каналы, в которые источник будет отправлять события (через пробел). -
bind
Хост/IP, за которым закрепляем источник. -
port
Порт, на котором источник будет принимать подключения от клиентов. -
threads
Число потоков, обрабатывающих входящие события (I/O workers). При выборе значения следует ориентироваться на число потенциальных клиентов, которые будут слать события этому источнику. Необходимо выставлять как минимум 2 потока, иначе ваш источник может попросту «зависнуть», даже если клиент у него всего один. Если не уверены, сколько потоков необходимо — не указывайте этот параметр в конфигурации. не ограничено
compression-type
Сжатие данных, здесь вариантов немного — либо none, либо deflate. Указывать необходимо только в том случае, если клиент передает данные в сжатом виде. Сжатие поможет вам существенно сэкономить трафик, и чем больше событий за раз вы передаете — тем существеннее будет эта экономия.
none
Как и для любого другого источника, для Avro Source можно указать:

  1. selector.type — селектор каналов, о них я упоминал в предыдущей статье. Позволяют делить или дублировать события в несколько каналов по некоторым правилам. Детальнее селекторы будут рассмотрены ниже.

  2. interceptors — список перехватчиков, через пробел. Перехватчики срабатывают ДО того, как события попадут в канал. Их используют, чтобы каким-то образом модифицировать события (например, добавить заголовки или изменить содержимое события). О них также речь пойдет ниже.

Также для этого источника предусмотрена настройка фильтров Netty и параметры шифрования данных. Для отправки событий этому источнику можно использовать вот такой код.

Примитивный Java-client для Avro Source
import java.util.HashMap;
import java.util.Map;
import org.apache.flume.Event;
import org.apache.flume.EventDeliveryException;
import org.apache.flume.api.RpcClient;
import org.apache.flume.api.RpcClientFactory;
import org.apache.flume.event.EventBuilder;
import org.apache.flume.event.SimpleEvent;

public class FlumeSender {
    
    public static void main(String[] args) throws EventDeliveryException {
        RpcClient avroClient = RpcClientFactory.getDefaultInstance("127.0.0.1", 50001);
        
        Map<String, String> headers = new HashMap<>();
        headers.put("type", "common");

        Event event = EventBuilder.withBody("Тело события".getBytes(), headers);
        
        avroClient.append(event);
        avroClient.close();
    }
}

Теперь рассмотрим конфигурацию Avro-стока.
Параметр Описание По умолчанию
type
Реализация стока, должно быть указано avro. -
channel
Канал, из которого сток будет вытягивать события. -
hostname
Хост/IP, на который сток будет отправлять события. -
port
Порт, на котором указанная машина (hostname) ожидает подключения клиентов. -
batch-size
очень важный параметр: размер «пачки» событий, отправляемых клиенту за один запрос. В то же время, это же значение используется при опустошении канала. Т.е., это еще и число событий, считываемых из канала за одну транзакцию.
100
connect-timeout
Таймаут соединения (handshake), в миллисекундах.
20000
request-timeout
Таймаут запроса (отправки пачки событий), в миллисекундах.
20000
reset-connection-interval
Интервал «смены хоста». Подразумевается, что за указанным hostname может скрываться несколько машин, обслуживаемых балансером. Этот параметр принудительно заставляет сток переключаться между машинами через указанный интервал времени. Удобство, по замыслу создателей стока, заключается в том, что если в зону ответственности балансера добавляется новая машина, отсутствует необходимость перезапускать узел Flume — сток сам сообразит, что появился еще один «пункт назначения». По умолчанию сток не осуществляет смены хостов.
-1
maxIoWorkers
Аналог threads для Avro Source.
2 * PROC_CORES
compression-type
То же самое, что и для Avro Source. Разница в том, что сток сжимает данные, а источник, напротив, распаковывает. Соответственно, если Avro Sink шлет события на Avro Source, тип сжатия на обоих должен быть одинаковый.
none
compression-level
Уровень сжатия, только если compression-type=deflate (0 — не сжимать, 9 — максимальное сжатие).
6
Теперь поговорим о том, что важно учитывать при настройке этих компонентов.

  • Аккуратно выбирайте Batch Size.

    Как я уже говорил, это очень важный параметр, непродуманный выбор которого может значительно подпортить вам жизнь. Прежде всего, batch-size обязательно должен быть меньше или равен вместимости транзакции канала (transactionCapacity). Это явно касается Avro Sink и неявно — Avro Source. Рассмотрим на примере:


    Здесь TC — это transactionCapacity, а BS — batch-size. Условие нормальной работы заключается в том, что: BS <= TC1 и BS <= TC2. То есть, необходимо учитывать не только вместимость транзакции канала, с которым работает сток, но вместимость транзакции канала (-ов), с которым работает принимающий Avro Source. В противном случае сток не сможет опустошать свой канал, а источник — добавлять события в свой. В таких случаях Flume начинает интенсивно лить в лог сообщения об ошибках.
    Случай из практики. В одном из стоков мы как-то поставили batch-size = 10000, в то время как на принимающем узле для канала была выставлена TC = 5000. И всё работало замечательно. Пока объём данных был небольшим, сток попросту не вытягивал из канала позволенные 10000 событий за раз — в канале не успевало накопиться столько событий. Но спустя некоторое время объем данных увеличился и у нас начались проблемы. Принимающий узел начал отклонять большие пачки данных. Ошибку вовремя заметили, изменили параметры и скопившиеся в канале данные озорным ручейком дотекли до места назначения.

  • Отправляйте события крупными пачками.
    Транзакция — операция довольно дорогостоящая по ресурсам. Меньше транзакций — больше производительность. Опять же, сжатие при передаче большого числа событий работает гораздо эффективнее. Соответственно, помимо batch-size придётся увеличить и transactionCapacity ваших каналов.

  • Переопределите зависимость netty для ваших узлов.
    Мы используем версию netty 3.10.5 Final, в то время как Flume подтягивает более старую netty 3.6.2 Final. Проблема старой версии заключается в небольшом баге, из-за которого Avro Sink / Avro Source не могут периодически подключиться друг к другу. Это приводит к тому, что в передаче данных периодически возникают простои на несколько минут (потом все приходит в норму). В случае, если данные должны поступать максимально быстро, такие «пробки» могут стать проблемой.


    В случае, если вы запускаете Flume средствами Java, переопределить зависимость можно средствами Maven. Если же вы настраиваете Flume средствами Cloudera или в виде сервиса, то зависимость Netty придётся менять вручную. Найти их можно в следующих папках:

    • Cloudera — /opt/cloudera/parcels/CDH-${VERSION}/lib/flume-ng/lib;
    • Service (stand-aloone) — $FLUME_HOME/lib.

File-Roll Sink


Итак, мы разобрались, как настроить транспортные узлы на основе Avro Source/Sink и файлового канала. Осталось теперь разобраться с компонентами, которые замыкают (т.е. выводят данные из зоны ответственности Flume) нашу транспортную сеть.


Первый замыкающий сток, который стоит рассмотреть, это File-Roll Sink. Я бы сказал, что это сток для ленивых. Он поддерживает минимум настроек и может делать только одну вещь — записывать события в файлы.
Параметр Описание По умолчанию
type
Реализация стока, должно быть указано file_roll. -
channel
Канал, из которого сток будет вытягивать события. -
directory
Папка, в которой будут храниться файлы. -
rollInterval
Интервал между созданием новых файлов (0 — писать всё в один файл), в секундах.
30
serializer
Сериализация событий. Можно указать: TEXT, HEADER_AND_TEXT, AVRO_EVENT или свой класс, реализующий интерфейс EventSerializer.Builder.
TEXT
batch-size
Аналогично Avro Sink, размер пачки событий, забираемых за транзакцию с канала.
100

Почему я считаю его стоком для ленивых? Потому что в нем абсолютно ничего нельзя настроить. Ни сжатия, ни наименоваия файлов (в качестве имени будет использован timestamp создания), ни группировки по подпапкам — ничего. Даже размер файла ограничить нельзя. Этот сток подходит, пожалуй, только для случаев, когда «нет времени объяснять — нам нужно срочно начать принимать данные!».
Примечание. Поскольку необходимость записывать данные в файлы всё-таки имеется, мы пришли к выводу, что целесообразнее реализовать свой файловый сток, чем использовать этот. Учитывая, что все исходники Flume открыты, сделать его оказалось несложно, мы уложились за день. На второй день поправили мелкие баги — и сток уже больше года исправно работает, раскладывая данные по папкам в аккуратные архивы. Этот сток я выложу на GitHub после третьей части цикла.

HDFS Sink


Этот сток уже посерьезней — он поддерживает уйму настроек. Немного удивительно, что File-Roll Sink не сделан аналогичным образом.
Параметр Описание По умолчанию
type
Реализация стока, должно быть указано hdfs. -
channel
Канал, из которого сток будет вытягивать события. -
hdfs.path
Папка, в которую будут записываться файлы. Убедитесь, что для этой папки выставлены нужные права доступа. Если вы настраиваете сток средствами Cloudera, то данные будут писаться от имени пользователя flume. -
hdfs.filePrefix
Префикс имени файла. Базовое имя файла, как и для File-Roll — timestamp его создания. Соответстенно, если вы укажете my-data, итоговое имя файла будет my-data1476318264182.
FlumeData
hdfs.fileSuffix
Постфикс имени файла. Добавляется в конец имени файла. Можно использовать, чтобы указать расширение, например, .gz. -
hdfs.inUsePrefix
Аналогично filePrefix, но для временного файла, в который еще ведется запись данных. -
hdfs.inUseSuffix
Аналогично fileSuffix, но для временного файла. По сути, временное расширение.
.tmp
hdfs.rollInterval
Период создания новых файлов, в секундах. Если файлы не нужно закрывать по такому критерию, ставим 0.
30
hdfs.rollSize
Триггер для закрытия файлов по объему, указывается в байтах. Также ставим 0, если этот критерий нам не подходит.
1024
hdfs.rollCount
Триггер для закрытия файлов по числу событий. Также можно поставить 0.
10
hdfs.idleTimeout
Триггер для закрытия файлов из-за неактивности, в секундах. То есть, если в файл некоторое время ничего не записывается — он закрывается. Этот триггер по умолчанию отключен.
0
hdfs.batchSize
То же самое, что и для других стоков. Хотя в документации к стоку написано, что это число событий, записываемых в файл, прежде чем они будут сброшены в HDFS. При выборе также ориентируемся на объем транзакции канала.
100
hdfs.fileType
Тип файла — SequenceFile (Hadoop-файл с парами ключ-значение, как правило, в качестве ключа используется timestamp из хидера «timestamp» или текущее время), DataStream (текстовые данные, по сути, построчная запись с указанной сериализацией, как в File-Roll Sink) или CompressedStream (аналог DataStream, но с сжатием).
SequenceFile
hdfs.writeFormat
Формат записи — Text или Writable. Только для SequenceFile. Отличие — в качестве значения будет писаться либо текст (TextWritable) или байты (BytesWritable).
5000
serializer
Настраивается для DataStream и CompressedStream, по аналогии с File-Roll Sink.
TEXT
hdfs.codeC
Этот параметр необходимо указывать, если вы используете тип файла CompressedStream. Предлагаются такие варианты сжатия: gzip, bzip2, lzo, lzop, snappy. -
hdfs.maxOpenFiles
Максимально допустимое число одновременно открытых файлов. Если этот порог будет превышен, то наиболее старый файл будет закрыт.
5000
hdfs.minBlockReplicas
Важный параметр. Минимальное число реплик на блок HDFS. Если не указан, берется из конфигурации Hadoop, указанной в classpath при запуске (т.е. настроек вашего кластера). Честно говоря, я не могу объяснить причину поведения Flume, связанного с этим параметром. Суть в том, что если значение этого параметра отличается от 1, то сток начнет закрывать файлы без оглядки на другие триггеры и в рекордные сроки наплодит уйму мелких файлов. -
hdfs.maxOpenFiles
Максимально допустимое число одновременно открытых файлов. Если этот порог будет превышен, то наиболее старый файл будет закрыт.
5000
hdfs.callTimeout
Таймаут обращения к HDFS (открыть/закрыть файл, сбросить данные), в миллисекундах.
10000
hdfs.closeTries
Число попыток закрыть файл (если с первого раза не получилось). 0 — пытаться до победного конца.
0
hdfs.retryInterval
Как часто пытаться закрыть файл в случае неудачи, в секундах.
180
hdfs.threadsPoolSize
Число потоков, осуществляющих IO операции с HDFS. Если у вас «солянка» из событий, которые расфасовываются по многим файлам, то лучше поставить это число побольше.
10
hdfs.rollTimerPoolSize
В отличии от предыдущего пула, этот пул потоков выполняет задачи по расписнию (закрывает файлы). Причем, он работает на основе двух параметров — rollInterval и retryInterval. Т.е. этот пул выполняет как плановое закрытие по триггеру, так и периодические повторные попытки закрыть файл. Одного потока должно быть достаточно.
1
hdfs.useLocalTimeStamp
HDFS сток предполагает использование элементов даты в назании формируемых файлов (например, hdfs.path = /logs/%Y-%m-%d позволит вам группировать файлы по дням). Использование даты предполагает, что она откуда-то должна быть получена. Этот параметр предлагает два варианта: использовать время на момент обработки события (true) или использовать время, указанное в событии — а именно, в заголовке «timestamp» (false). Если вы используете timestamp события, то убедитесь, что ваши собтия имеют этот заголовок. Иначе не будут записаны в HDFS.
false
hdfs.round
Округлять timestamp до некоторого значения.
false
hdfs.roundValue
Насколько округлять timestamp.
1
hdfs.roundUnit
В каких единицах округлять (second,minute или hour).
second
Вот такой огромный перечень настроек для HDFS-стока. Этот сток позволяет нарезать данные в файлы практически как угодно — особенно приятно то, что можно использовать элементы даты. Официальная документация по этому стоку находится на всё той же странице.

Возможно вы заметили интересную особенность конфигурации HDFS-стока — здесь нет параметра, указывающего адрес HDFS. Создатели стока предполагают, что данный сток должен быть использован на тех же машинах, что и HDFS.

Итак, что же необходимо учитывать при настройке этого стока.

  • Используйте крупные batch-size и transactionCapacity.
    В общем-то, здесь все аналогично с другими стоками — транзакция достаточно дорогая в плане ресурсов, поэтому лучше лить крупными порциями.

  • Не злоупотребляйте макросами в именовании файлов.
    Использование элементов даты в именах файлов/папок или плейсхолдеров для заголовков — это, безусловно, удобный инструмент. Но не очень быстрый. Мне кажется, подстановку даты создатели могли сделать оптимальнее — если вы заглянете в исходники, то удивитесь числу выполняемых операций для форматирования этих строк. Предположим, мы решили задать вот такую структуру папок:
    hdfs.path = /logs/%{dir}
    hdfs.filePrefix = %{src}/%Y-%m-%d/%Y-%m-%d-%H-%M-%S.%{host}.%{src}
    Здесь dir и src — значения заголовков событий с соотв. ключами. Результирующий файл будет иметь вид /logs/web/my-source/2016-04-15/2016-04-15-12-00-00.my-host.my-source.gz. На моем компьютере генерация этого имени для 1 млн. событий занимает почти 20 секунд! Т.е. для 10000 событий это займет примерно 200мс. Делаем вывод: если вы претендуете на скорость записи 10000 событий в секунду, будьте готовы отдать 20% времени на генерацию имени файла. Это ужасно. Вылечить это можно, взяв на себя ответственность за генерацию имени файла на стороне клиента. Да, для этого придется написать немного кода, но зато можно будет изменить настройки стока на вот такие:
    hdfs.path = /logs
    hdfs.filePrefix = %{file-name}
    Передавая сформированное имя файла в заголовке file-name вы сэкономите ресурсы и время. Формирование пути файла по таким заголовком занимает уже не 20 секунд, а 500-600 миллисекунд для 1 млн. событий. Т.е., почти в 40 раз быстрее.

  • Объединяйте события.
    Еще один маленький хак, позволяющий существенно повысить производительность стока. Если вы пишете события в файл построчно, то можно объединять их на стороне клиента. Например, ваш сервис генерирует логи, которые должны идти в один и тот же файл. Так почему бы не объединить несколько строк в одну, использовав в качестве разделителя \n? Сама по себе запись данных в HDFS или файловую систему занимает куда меньше времени, чем вся эта «цифровая бюрократия» вокруг данных.


    Объединяя события в соотношении хотя бы 5 к 1 вы уже получите существенный прирост производительности. Естественно, здесь нужно быть осторожным — если события на клиенте генерируются по одному, то наполнение буфера для объединения событий может занять некоторое время. Всё это время события будут храниться в памяти, ожидая формирования группы для объединения. А значит повышаются шансы потерять данные. Резюме:

    1. Для небольших объемов данных клиенту лучше отправлять события во Flume по одному — меньше шансов их потерять.

    2. Для больших объемов данных предпочтительно использовать объединение событий. Если события генерируются интенсивно, буфер для 5-10 событий будет набираться достаточно быстро. При этом вы существенно повысите производительность стоков.

  • Разверните стоки на нескольких машинах HDFS-кластера.
    При настройке Flume через Cloudera имеется возможность запустить на каждой ноде кластера отдельный узел Flume. И этой возможностью лучше воспользоваться — поскольку таким образом нагрузка распределяется между всеми машинами кластера. При этом, если вы используете общую конфигурацию (т.е. один и тот же файл конфигурации на всех машинах), убедитесь, что у вас не возникнет конфликтов имен файлов. Сделать это можно, задействовав перехватчик событий, добавляющий в заголовки название хоста. Соответвенно, вам останется только указать в шаблоне имени файла этот заголовок (см. ниже).
    Примечание. На самом деле при принятии такого решения стоит задуматься — ведь каждый сток будет писать однородные данные в свой файл. В результате вы можете получить кучу мелких файлов на HDFS. Решение должно быть взвешенным — если объем данных невелик, то можно ограничиться одним узлом Flume для записи в HDFS. Это так называемая консолидация данных — когда данные из множества источников в итоге попадают на один сток. Однако если данные «текут рекой», то одного узла может быть недостаточно. Подробнее о проектировании всей транспортной сети мы поговорим в следующей статье этого цикла.

Перехватчики событий (Flume Interceptors)


Я много раз упоминал эти таинственные перехватчики, пожалуй теперь самое время рассказать о том, что это такое. Перехватчики — это обработчики событий, которые работают на этапе между получением событий на источнике и отправкой их в канал. Перехватчики могут преобразовывать события, изменять их или фильтровать.

Flume предоставляет по умолчанию множество перехватчиков, позволяющих:

  • Добавлять статичные заголовки (константы, timestamp, hostname).
  • Генерировать случайный UUID в заголовках.
  • Извлекать значения из тела события (регулярными выражениями) и использовать их как заголовки.
  • Изменять содержимое событий (опять регулярными выражениями).
  • Фильтровать события на основе содержимого.

Пример конфигурации различных перехватчиков
# ============================ Avro-источник с перехватчиками ============================ #
# Обязательные параметры для Vvro-источника
my-agent.sources.avro-source.type = avro
my-agent.sources.avro-source.bind = 0.0.0.0
my-agent.sources.avro-source.port = 50001
my-agent.sources.avro-source.channels = my-agent-channel

# Добавляем к источнику перехватчики, указываем их названия (названия значения не имеют)
my-agent.sources.avro-source.interceptors = ts directory host replace group-replace filter extractor

# ------------------------------------------------------------------------------ #

# Первый перехватчик добавляет статичный заголовок ко всем событиям.
# Наименование заголовка будет "dir", а значение — "test-folder".
my-agent.sources.avro-source.interceptors.directory.type = static
my-agent.sources.avro-source.interceptors.directory.key = dir
my-agent.sources.avro-source.interceptors.directory.value = test-folder

# Если такой заголовок уже есть — сохранить имеющийся (по умолчанию — false)
my-agent.sources.avro-source.interceptors.directory.preserveExisting = true

# ------------------------------------------------------------------------------ #

# Второй перехватчик добавляет заголовок "timestamp" ко всем событиям с текущим значением времени, в миллисекундах
my-agent.sources.avro-source.interceptors.ts.type = timestamp
my-agent.sources.avro-source.interceptors.ts.preserveExisting = true

# ------------------------------------------------------------------------------ #

# Третий перехватчик добавляет заголовок с хостом/IP текущей машины
my-agent.sources.avro-source.interceptors.host.type = host
my-agent.sources.avro-source.interceptors.host.useIP = true

# Наименование заголовка (аналог directory.key)
my-agent.sources.avro-source.interceptors.host.hostHeader = host
my-agent.sources.avro-source.interceptors.host.preserveExisting = true

# ------------------------------------------------------------------------------ #

# Этот перехватчик заменяет все символы табуляции на ; в теле события
my-agent.sources.avro-source.interceptors.replace.type = search_replace
my-agent.sources.avro-source.interceptors.replace.searchPattern = \t
my-agent.sources.avro-source.interceptors.replace.replaceString = ;

# Тело передается как byte[], поэтому необходимо указать кодировку (по умолчанию — UTF-8)
my-agent.sources.avro-source.interceptors.replace.charset = UTF-8

# ------------------------------------------------------------------------------ #

# Более "умный" вариант замены
my-agent.sources.avro-source.interceptors.group-replace.type = search_replace

# Предположим, наша строка начинается с даты 2014-01-20 и нам нужно поменять ее формат на 20/01/2014
# при этом сохранив всё остальное. Мы "разбиваем" строку на 4 блока () и затем выполняем подстановку,
# используя индексы этих блоков в результирующей строке
my-agent.sources.avro-source.interceptors.group-replace.searchPattern = (\\d{4})-(\\d{2})-(\\d{2})(.*)
my-agent.sources.avro-source.interceptors.group-replace.replaceString = $3/$2/$1$4

# ------------------------------------------------------------------------------ #

# Перехватчик-фильтр, исключает события по регулярному выражению
my-agent.sources.avro-source.interceptors.filter.type = regex_filter
my-agent.sources.avro-source.interceptors.filter.regex = error$
# Если true — то фильтровать события, тело которых подходит под регулярное выражение,
# в противном случае — фильтровать то, что не подходит под регулярку
my-agent.sources.avro-source.interceptors.filter.excludeEvents = true

# ------------------------------------------------------------------------------ #

# Перехватчик, извлекающий данные из события и добавляющий их в заголовки
my-agent.sources.avro-source.interceptors.extractor.type = regex_extractor

# Например, мы передаем события вида: "2016-04-15;WARINING;КАКАЯ-ТО ИНФОРМАЦИЯ"
my-agent.sources.avro-source.interceptors.extractor.regex = (\\d{4}-\\d{2}-\\d{2});(.*);

# здесь важно — сериализаторы должны быть перечислены в том же порядке, 
# что и соотв. группы в регулярном выражении 
# (\\d{4}-\\d{2}-\\d{2}) -> $1 -> ts 
# (.*) -> $2 -> loglevel
my-agent.sources.avro-source.interceptors.extractor.serializers = ts loglevel

# Первую группу будем сериализовать специальным классом, который извлекая из даты TS
my-agent.sources.avro-source.interceptors.extractor.serializers.ts.type = org.apache.flume.interceptor.RegexExtractorInterceptorMillisSerializer
my-agent.sources.avro-source.interceptors.extractor.serializers.ts.name = timestamp
my-agent.sources.avro-source.interceptors.extractor.serializers.ts.pattern = yyyy-MM-dd

# Вторую группу будем сериализовать as is
my-agent.sources.avro-source.interceptors.extractor.serializers.loglevel.name = level

Среди стандартных перехватчиков, к несчастью, не обнаружилось фильтра по заголовкам. Впрочем, при желании такой перехватчик можно написать самому. Теперь, чтобы полноценно сконфигурировать транспорт Flume, нам необходимо рассмотреть еще один тип компонентов Flume — селекторы.

Канальные селекторы (Flume Channel Selectors)


Селектор необходим каналу для того, чтобы понимать, в какой канал какие события отправлять. Всего существует 2 типа селекторов:

  1. replicating — селектор, благодаря которому источник дублирует события во все связанные каналы. Именно он используется Flume по умолчанию. При этом, этот селектор позволяет выделить «опциональные» каналы. В отличии от основных, источник будет игнорировать неудачные добавления событий в такие каналы.

  2. multiplexing — селектор, распределяющий события между каналами по некоторым правилам. Реализация стандартного multiplexing-селектора позволяет распределять события между каналами на основе значений заголовков.

Пример конфигурации multiplexing-селектора
# ============================ Avro-источник с селектором ============================ #
my-source.sources.avro-source.type = avro
my-source.sources.avro-source.port = 50002
my-source.sources.avro-source.bind = 127.0.0.1
my-source.sources.avro-source.channels = hdfs-channel file-roll-channel null-channel

# Объявляем селектор — multiplexing, будем сортировать события
# Предположим, что мы ранее помечали события как "важные" и "обычные" и хотим,
# чтобы важные события записывались в файловую систему и HDFS, а обычные — только в файлы
my-source.sources.avro-source.selector.type = multiplexing

# указываем название заголовка, по которому будем делить события
my-source.sources.avro-source.selector.header = type

# если type = important, то отправляем события и в HDFS, и в файловый сток
my-source.sources.avro-source.selector.mapping.important = hdfs-channel file-roll-channel

# если type = common, то только в файловый сток
my-source.sources.avro-source.selector.mapping.common = file-roll-channel

# если заголовок type не найден или значение какое-то другое, отправляем событие на фильтрацию
# (как правило, для фильтрации используем небольшой memchannel и null-sink)
my-source.sources.avro-source.selector.mapping.default = hdfs-null-channel


Селекторы обрабатывают события после перехватчиков. Это значит, что вы можете выполнить над событиями некоторые манипуляции перехватчиками (например, понаизвлекать различные заголовки) и использовать результаты этих манипуляций уже в селекторе.

Заключение


Статья неожиданно получилась большой, поэтому обещанный мониторинг узла я решил рассмотреть в следующей части этого цикла статей. В заключение хочу продемонстрировать одну из рабочих конфигураций Flume для HDFS. Она неплохо подходит для доставки и организации небольших объемов данных — примерно до 2000 событий в секунду на одну ноду. Этот узел требует наличия в событиях заголовков roll («15m» или «60m»), dir и srс — с помощью них получается двухуровневая иерархия папок.

Конфигурация Flume для HDFS
flume-hdfs.sources = hdfs-source
flume-hdfs.channels = hdfs-15m-channel hdfs-60m-channel hdfs-null-channel
flume-hdfs.sinks = hdfs-15m-sink hdfs-60m-sink

# =========== Avro-источник, с селектором и добавлением заголовка host ============ #
flume-hdfs.sources.hdfs-source.type = avro
flume-hdfs.sources.hdfs-source.port = 50002
flume-hdfs.sources.hdfs-source.bind = 0.0.0.0
flume-hdfs.sources.hdfs-source.interceptors = hostname
flume-hdfs.sources.hdfs-source.interceptors.hostname.type = host
flume-hdfs.sources.hdfs-source.interceptors.hostname.hostHeader = host
flume-hdfs.sources.hdfs-source.channels = hdfs-null-channel hdfs-15m-channel
flume-hdfs.sources.hdfs-source.selector.type = multiplexing
flume-hdfs.sources.hdfs-source.selector.header = roll
flume-hdfs.sources.hdfs-source.selector.mapping.15m = hdfs-15m-channel
flume-hdfs.sources.hdfs-source.selector.mapping.60m = hdfs-60m-channel
flume-hdfs.sources.hdfs-source.selector.mapping.default = hdfs-null-channel

# ============================ Файловый канал, 15 минут ============================ #
flume-hdfs.channels.hdfs-15m-channel.type = file
flume-hdfs.channels.hdfs-15m-channel.maxFileSize = 1073741824
flume-hdfs.channels.hdfs-15m-channel.capacity = 10000000
flume-hdfs.channels.hdfs-15m-channel.transactionCapacity = 10000
flume-hdfs.channels.hdfs-15m-channel.dataDirs = /flume/flume-hdfs/hdfs-60m-channel/data1,/flume/flume-hdfs/hdfs-60m-channel/data2
flume-hdfs.channels.hdfs-15m-channel.checkpointDir = /flume/flume-hdfs/hdfs-15m-channel/checkpoint

# ============================ Файловый канал, 60 минут ============================ #
flume-hdfs.channels.hdfs-60m-channel.type = file
flume-hdfs.channels.hdfs-60m-channel.maxFileSize = 1073741824
flume-hdfs.channels.hdfs-60m-channel.capacity = 10000000
flume-hdfs.channels.hdfs-60m-channel.transactionCapacity = 10000
flume-hdfs.channels.hdfs-60m-channel.dataDirs =/flume/flume-hdfs/hdfs-60m-channel/data1,/flume/flume-hdfs/hdfs-60m-channel/data2
flume-hdfs.channels.hdfs-60m-channel.checkpointDir = /flume/flume-hdfs/hdfs-60m-channel/checkpoint

# =========== Сток для файлов, заворачиваемых каждые 15 минут (5 мин. неактивности) =========== #
flume-hdfs.sinks.hdfs-15m-sink.type = hdfs
flume-hdfs.sinks.hdfs-15m-sink.channel = hdfs-15m-channel
flume-hdfs.sinks.hdfs-15m-sink.hdfs.filePrefix = %{src}/%Y-%m-%d/%Y-%m-%d-%H-%M-%S.%{src}.%{host}.log
flume-hdfs.sinks.hdfs-15m-sink.hdfs.path = /logs/%{dir}
flume-hdfs.sinks.hdfs-15m-sink.hdfs.fileSuffix = .gz
flume-hdfs.sinks.hdfs-15m-sink.hdfs.writeFormat = Text
flume-hdfs.sinks.hdfs-15m-sink.hdfs.codeC = gzip
flume-hdfs.sinks.hdfs-15m-sink.hdfs.fileType = CompressedStream
flume-hdfs.sinks.hdfs-15m-sink.hdfs.minBlockReplicas = 1
flume-hdfs.sinks.hdfs-15m-sink.hdfs.rollInterval = 0
flume-hdfs.sinks.hdfs-15m-sink.hdfs.rollSize = 0
flume-hdfs.sinks.hdfs-15m-sink.hdfs.rollCount = 0
flume-hdfs.sinks.hdfs-15m-sink.hdfs.idleTimeout = 300
flume-hdfs.sinks.hdfs-15m-sink.hdfs.round = true
flume-hdfs.sinks.hdfs-15m-sink.hdfs.roundValue = 15
flume-hdfs.sinks.hdfs-15m-sink.hdfs.roundUnit = minute
flume-hdfs.sinks.hdfs-15m-sink.hdfs.threadsPoolSize = 8
flume-hdfs.sinks.hdfs-15m-sink.hdfs.batchSize = 10000

# =========== Сток для файлов, заворачиваемых каждые 60 минут (20 мин. неактивности) =========== #
flume-hdfs.sinks.hdfs-60m-sink.type = hdfs
flume-hdfs.sinks.hdfs-60m-sink.channel = hdfs-60m-channel
flume-hdfs.sinks.hdfs-60m-sink.hdfs.filePrefix = %{src}/%Y-%m-%d/%Y-%m-%d-%H-%M-%S.%{src}.%{host}.log
flume-hdfs.sinks.hdfs-60m-sink.hdfs.path = /logs/%{dir}
flume-hdfs.sinks.hdfs-60m-sink.hdfs.fileSuffix = .gz
flume-hdfs.sinks.hdfs-60m-sink.hdfs.writeFormat = Text
flume-hdfs.sinks.hdfs-60m-sink.hdfs.codeC = gzip
flume-hdfs.sinks.hdfs-60m-sink.hdfs.fileType = CompressedStream
flume-hdfs.sinks.hdfs-60m-sink.hdfs.minBlockReplicas = 1
flume-hdfs.sinks.hdfs-60m-sink.hdfs.rollInterval = 0
flume-hdfs.sinks.hdfs-60m-sink.hdfs.rollSize = 0
flume-hdfs.sinks.hdfs-60m-sink.hdfs.rollCount = 0
flume-hdfs.sinks.hdfs-60m-sink.hdfs.idleTimeout = 1200
flume-hdfs.sinks.hdfs-60m-sink.hdfs.round = true
flume-hdfs.sinks.hdfs-60m-sink.hdfs.roundValue = 60
flume-hdfs.sinks.hdfs-60m-sink.hdfs.roundUnit = minute
flume-hdfs.sinks.hdfs-60m-sink.hdfs.threadsPoolSize = 8
flume-hdfs.sinks.hdfs-60m-sink.hdfs.batchSize = 10000

# ================ NULL-сток + небольшой канал для него =============== #
flume-hdfs.channels.hdfs-null-channel.type = memory
flume-hdfs.channels.hdfs-null-channel.capacity = 30000
flume-hdfs.channels.hdfs-null-channel.transactionCapacity = 10000
flume-hdfs.channels.hdfs-null-channel.byteCapacityBufferPercentage = 20

flume-hdfs.sinks.hdfs-null-sink.channel = hdfs-null-channel
flume-hdfs.sinks.hdfs-null-sink.type = null

В следующей, заключительной статье цикла, мы рассмотрим:

  • Процесс построения полноценного транспорта данных на основе Flume.
  • Примеры разработки собственных компонентов.
  • Обещанный мониторинг узлов, который не вошел в эту статью.

Комментарии (12)


  1. facha
    21.04.2016 16:59

    Большое спасибо за статью. Есть несколько не связанных между собой вопросов. Если можете, ответьте пожалуйста.
    — Когда мы выбираем Avro Source и Avro Sink, Аvro используется только для передачи? Или данные сохраняются в Avro?
    — Насколько flume конкурент новомодной kafka?
    — Interseptors можно писать только на java?


    1. Deneb
      21.04.2016 17:19

      По вопросам:

      • Да, только для передачи, итоговый формат зависит от стока (вы можете записывать события в формате Avro, выставив соответствующий serializer в настройках HDFS / File Roll стока). Каналы получают на вход десериализованные данные и хранят их уже как-то по своему.
      • Конкуренции как таковой нет — скорее, симбиоз. Flume используется для манипуляции данными (дублирование, перенаправление, модификация и т.п.). Но при этом Flume может использовать kafk'у в качестве надежного канала (он даже есть в стандартной поставке Flume).
      • Признаться, не пробовал писать на других языках :) Думаю, можно на совместимых с JVM — interceptor создается рефлексивно, нужен только конструктор по умолчанию и реализация интерфейсных методов.


  1. voidnugget
    22.04.2016 14:54

    Было бы неплохо объяснить что:


    • Flume является средством потоковой обработки логов, т.е. вместо низких задержек у него в приоритете высокая пропускная способность и возможность простого партицирования (consistency + partition tolerance + weak availability)
    • У Storm с точностью до наоборот — быстро обработали и отдали, в очередь больше положенного не кладём (availability + consistency + weak partitionioning)
    • У Kafka задача — обеспечить высокую доступность с возможностью партицирования, но со слабой консистентностью (availability + partition tolerance + weak consistency)
    • Сamel вообще не заморачивается с производительностью и доступностью — ему главное один интерфейс преобразовать в другой (consistency + whatever)

    Грубо говоря, хаброобыватели из-за недопонимания специфических архитектурных решений высокодоступных или высокопроизводительных проектов сравнивают разные виды кошачих: тигра, гепарда, и льва; спрашивают: кто из них лучше добычу ловит? Вот надо сравнивать по обстоятельствам и ареалу обитания.


    1. zyrik
      22.04.2016 21:05

      А Вы не могли бы пояснить на пальцах, что все эти термины значат и где следуюет применять какое решение? Вот у меня пока совсем нету ещё представления, где лучше использовать Kafka, а где Flume. Возьмем пример, логи с помощью syslog-ng можно слать как в Flume, так и в Kafka. В каком случае какое решение надо выбирать?


      1. voidnugget
        22.04.2016 22:08

        CAP теорема


        Что где использовать — зависит от объёма и скорости прироста логов, количества машин.


        kafka спроектирована таким образом что бы работать в кластере, но вы не сможете получать наиболее актуальные данные в текущий момент времени. kafka это просто очередь событий/сообщений, никаких преобразований внутри не происходит. С flume ситуация совсем другая: он обрабатывает и преобразовывает логи в удобоваримый вид для последующей записи в БД hdfs / hbase / scylladb etc. flume может быть как потребителем (sink) данных с kafka, и преобразовывать логи, которые хранятся в очередях, так и поставщиком данных (source) для kafka.


        По этому, это проекты с совершенно различными задачами.
        Если вам не хватает пропускной способности flume — вы перед ним ставите kafka в качестве буфера.
        Также очереди на kafka довольно просто реплицировать для обеспечения отказоустойчивости.


        1. Deneb
          22.04.2016 22:15

          Если вам не хватает пропускной способности flume — вы перед ним ставите kafka в качестве буфера.
          Не могли бы вы пояснить, каким образом использование Kafka между клиентом и Flume поможет решить проблему с пропускной способностью?


          1. voidnugget
            22.04.2016 22:33

            Если объяснять на пальцах правой ноги, то:


            Вот у вас есть винт с пропускной способностью в 3Гбит'a и большими задержками? и есть оперативка с пропускной способностью в 12Гбит и низкими задержками?, вот вы ж в оперативку пишите избыток того что пишется на винт, и потребление памяти будет расти в логарифмической прогрессии, в идеальных условиях.


            Вот те же яйца и с kafka и flume: у вас есть flume со средней пропускной способностью и большими задержками?, и есть kafka, с высокой пропускной способностью и низкими задержками?, почти как запись в /dev/null.


            kafka по своей природе любит кушать гигабайты и десятки гигабайт оператвы и слаживать всё в чистом виде на винт, прямо как на грампластинку. Потом это всё можно обработать 4-8 flume'ами, положить в другую kafk'у или сразу записать в БД.


            ? — под задержкой нужно понимать готовность устройства к обработке следующего запроса.


            1. Deneb
              22.04.2016 22:39

              Что мешает сразу раскидывать всё это добро по 4-8 узлам Flume? Мы же рассматриваем потоковую обработку данных, у нас нет никакого «потом». Если в цепочке хотя бы 1 звено не успевает, то в итоге «забиваются» все звенья перед ним — это вопрос времени. Другое дело, если вы рассматриваете пиковые нагрузки (т.е. Flume не справляется эпизодически) — но что тогда мешает просто сделать каналы потолще?


              1. voidnugget
                22.04.2016 22:46

                Под пропускной способностью имеется ввиду не пропускная способность сетевого интерфейса, а то сколько запросов Flume может обработать в секунду. Тут пишется о сценарии ~300-500Гб логов в час, с которого потом выжимается 120-250Гб.


              1. voidnugget
                22.04.2016 22:53

                Недавно вышла хорошая книжка по похожей теме


                Site Reliability Engineering
                Edited by Betsy Beyer, Chris Jones, Jennifer Petoff and Niall Richard Murphy


    1. Deneb
      22.04.2016 21:16

      Я думаю, чтобы определиться с решением, нужно сначала понять — а что дальше планируется делать с логами? Ведь отправить их в Flume/Kafka — это не конечная цель.


      1. voidnugget
        22.04.2016 22:39

        Обычно потом пишут в БД и отрисовуют всякими graylog'aми и kibana. После устаревания утрамбовывают этот компост в "холодное" хранилище на магнитных лентах "аля бобинах", в сервисах типа Amazon Glacier, и подёргивают 6-7 раз в год.


        В чистом виде логи писать в БД получается очень расточительно — БД растут как на дрожжах, да и не каждый может позволить себе завести кластер elasticsearch'ей для kibana. Ну, чуваков c elastic.co тоже можно понять, они то разрабатывали всё это не для того что бы в больших масштабах с пол-пинка заводилось — нужно ещё кучу граблей и костылей, а им с того копеечка за поддержку и наставление падаванов.