Привет, Хабр! Меня зовут Евгений Ненахов, я работаю в центре Big Data МТС Digital. В этой статье я расскажу о том, как мы создали универсальный инструмент потоковой обработки данных и построили с его помощью мощную систему стриминга. Если вам интересна обработка данных – welcome!

Последние несколько лет я занимаюсь проектами Big Data, поэтому расскажу о методологии потоковой обработки данных и её применении на практике. Статья получилась подробной и масштабной и для простоты восприятия я разделил её на две части. В первой половине мы обсудим основные компоненты методологии, а во второй части поговорим о том, как их применять.

Что это за методология такая?

Преимущество подобной методологии потоковой обработки данных в том, что она универсальна и позволяет решить широкий круг задач. А ещё её можно реализовать практически на любом стеке технологий. Я верю в то, что после прочтения статьи вы сможете создать подобный инструмент сами, используя эту концепцию. И сэкономите тем самым массу времени и сил.

В тексте под словами стриминг, стриминг процессинг или streaming я подразумеваю потоковую обработку данных.

Все мы прекрасно знаем что «время — деньги», а применительно к IT эта фраза означает вот что: чем быстрее мы обработаем данные – тем больше выгоды из этих данных мы сможем получить. Лучший способ — обрабатывать данные в режиме реального времени, то есть применять так называемый стриминг процессинг.

Стриминг процессинг дает возможность обрабатывать и получать выгоду из скоропортящихся данных, актуальных в небольшой промежуток времени. На основе этих данных мы можем делать персонализированную рекламу, предлагать скидки, кредиты, да почти что угодно. Телеком, например, такие данные может использовать для предложений новых тарифных планов и более сложных кейсов.

В МТС много данных, которые можно и нужно обрабатывать в режиме реального времени. В основном это данные, которые поступают с коммутационного оборудования базовых станций, различных устройств, сайтов, приложений. Все эти данные можно разделить на несколько доменов: геолокация, звонки, нахождение абонента в роуминге, типичный кликстрим (большой поток по URL) и прочая информация.

Данные нужно обрабатывать, а для этого требуется мощная и при этом универсальная концепция. Требования к такой системе предъявлять сложно, поэтому мы пошли от наших желаний. Нам хотелось производительности, причем достаточной высокой, от 5 млн до 10 млн событий в секунду. Не хотелось терять события – поэтому понадобился надежный механизм отказоустойчивости. Хотелось масштабироваться без боли как вертикально, так и горизонтально. 

Нам хотелось иметь гибкий функционал фильтрации, трансформации данных из одного формата в другой, научиться дедублицировать данные на потоке. Важно было сделать zero code- или хотя бы low code-настройки обработки данных, подключения новых источников. Желания закладываться в полный жизненный цикл разработки ПО при малейших изменениях и тратить на это время у нас не было. Стриминг — вещь капризная, за ним нужно следить, причем следить пристально. Поэтому мы захотели организовать детализированный мониторинг.

Какие варианты?

С «хотелками» разобрались, а что есть на рынке из стека технологий, которые позволят исполнить хотя бы часть наших желаний? В процессе поиска список сократился до четырех технологий: Spark Streaming, Flink, NiFi и Kafka. Для каждой мы провели отдельно RND, взяли совокупность «железок» и свой специфический набор данных. 

Мы учитывали несколько критериев:

  • развитость комьюнити технологии;

  • как часто выходят релизы;

  • куда движется технология, как развивается;

  • опыт и компетенции нашей команды (или «уровень душнильства).

  • возможность реализовать наши пожелания

В итоге выбор пал на Spark Streaming. В качестве основного языка программирования мы выбрали Scala — считаю, что это мощный язык, который со своим функциональным стилем идеально подходит для решения задач потоковой обработки данных. В качестве шины данных взяли Kafka — инструмент, который отлично себя зарекомендовал для решения задач Big Data.

Стек выбрали, а что с ним делать?

Первая мысль, которая приходит в голову — подобрать обработчик, написать монолит, взять щепотку Scala, запихнуть это всё в мясорубку Spark, запустить на кластере и пустить гонять данные из Kafka в Kafka. А можно сделать наоборот: декомпозировать задачу, на каждый поток сделать свою кастомную обработку. Но у любого из этих решений есть минусы, которые нас не устраивали. 

Любое изменение — это долгий процесс. Это полный жизненный цикл разработки ПО: мы ставим задачу в аналитику, начинаем писать код, тестировать на различных стендах, ставить план релизов, релизить. Долго, занимает несколько дней или даже недель. А к нам может прийти клиент и сказать: хочу посмотреть гипотезу на потоковых данных за пару часов. 

Такие системы сложно документировать, сложно тестировать. А значит, что появляется высокая вероятность появления критичных багов на продакшне. Возникает эффект лавины, становится сложно в принципе писать код, потому что мы тратим время не на развитие нашего проекта или на внедрение новых фич, а занимаемся поиском этих неуловимых багов. И мы не чиним баги, а просто делаем их меньше. 

Сложнее становится релизить, появляется куча различных версий, конфигураций, которые тяжело поддерживать. 

В таких условиях легко выгореть, уехать куда-нибудь в деревню, выращивать овощи и жить замечательной жизнью. Мол, ну вас со всеми вашими Scala и Spark. Заманчивая идея, но не сегодня, дружок.

Поэтому решение, приведенное выше, не подходит, хочется что-то более универсальное, какую-то более гибкую концепцию. Выход – сделать так называемый конвейер данных, в нашей терминологии — Pipeline. Мы берём данные из потока, ставим их на конвейерную ленту, и эти данные путешествуют от обработчика к обработчику последовательно, при этом изменяя свое текущее состояние. Когда данные доходят до терминального (то есть последнего) обработчика, он смотрит на конечное состояние этих данных и принимает решение, что же с ними делать.

Какие у нас обработчики?

В нашей реализации Pipeline всего пять обработчиков, два из них – обязательные, это Source и Action. Если у нас есть источник и направление передачи данных – значит,  поток у нас тоже есть. Остальные обработчики, в принципе, могут не принимать участия в процессе. 

Но мы же помним, что у нас есть «хотелки»? А значит, у нас есть и другие обработчики. Тут стоит обратить внимание на обработчик фильтрации. Мы очень хотели иметь гибкую систему фильтрации и задавать в рамках этого пайплайна не один фильтр, а несколько. Чтобы создать такую гибкую методологию достаточно было сделать небольшой DSL и реализовать два логических операнда. Это логические операторы AND и OR, а также скобки для установки приоритета. Уже этих операндов достаточно, чтобы создавать большие выражения, которые позволят гибко фильтровать данные. Потребителю достаточно указать только это выражение, чтобы фильтровать свои данные. 

Но нам и этого функционала пайплайна было недостаточно. Хотелось сделать что-то типа конструктора Lego, чтобы у нас был своего рода кубик (наш пайплайн) и из таких кубиков мы могли бы собрать что-то большое. Хотелось таким способом расширить  наши возможности. 

Мы поняли, что нужен конструктор, чтобы все пайплайны могли собираться в один мега-пайплайн в цепочке, и запускаться в рамках единственного Spark Streaming. А ещё мы хотели, чтобы это запускалось параллельно, чтобы несколько потоков работало. Вот тогда наши возможности станут поистине безграничными.

Наша реализация выглядит так: обычный класс пайплайна, который содержит в основном только два метода: start и stop. Причем start принимает основную, главную движущую силу нашего решения, — стриминг контекст. 

StreamingContext — это сердце, наше все. Он создается один раз на уровень абстракции выше, в так называемом пайплайн-менеджере, который следит за всем жизненным циклом коллекции пайплайнов. Чтобы запустить или перезапустить какой-нибудь пайплайн, достаточно обновить эту коллекцию и перезапустить стриминг контекст. 

Запуск пайплайна — в принципе вещь тяжёлая. Это перезапуск стриминг контекста, поэтому прежде чем запускать какой-то пайплайн, нужно сначала проверить все конфигурации. А так как терминальный обработчик у нас Action, лучше проверить именно его настройки, прежде чем запускать конкретный пайплайн. Ну и, конечно же, хотелось бы реализовать метод Stop, чтобы всё-таки останавливать пайплайн, когда нам хочется. 

Теперь давайте поподробнее поговорим о каждом обработчике в пайплайне. 

Source — он должен принимать данные из источника, создавать на основе этих данных стриминг и десериализовывать эти данные в нашу внутреннюю модель. Дальше эта внутренняя модель, Message, будет путешествовать от обработчика к обработчику, изменяя своё состояние. Вводится своего рода контракт.

Основной источник данных для нас — это Kafka, туда отправляются в потоковом режиме все данные которые у нас есть. Мы реализовали несколько типов Source Kafka, основные – Thrift и Json. 70-80% данных хранятся именно в формате Thrift.

Можно ли сделать другой источник данных, не Kafka? Конечно, можно использовать HDFS Avro, натравить этот Source на HDFS-директорию и смотреть, как появляются новые файлики, какие изменения вносятся в какой-нибудь файл. Чтобы это сделать, достаточно реализовать trait Source, в котором есть основные методы.

Основные методы – это stream, message, и onRDDEnd. Stream принимает StreamingContext, причём он у нас у нас будет путешествовать транзитивно через каждый пайплайн и каждый обработчик данных. На основе StreamingContext создается InputDStream, то есть дискретный поток данных. После этого применяется метод message, который в зависимости от типа десериализует наши данные во внутреннюю модель.

Еще один интересный метод – onRDDEnd — он позволяет нам реализовывать некоторые вещи, связанные с доступностью данных на источниках. Например, для Kafka мы можем вручную коммитить оффсеты. Это очень удобно: если вдруг в нашем пайплайне произойдет инцидент, какой-то из обработчиков будет себя плохо вести, выдавать исключения, то мы не потеряем данные. Мы перезагрузим пайплайн и начнем считывать данные с того момента, на котором последний раз закоммитились. 

Переходим к следующему обработчику — это фильтры, и мы сделали их много. Причем сначала мы клепали более-менее общие фильтры. Это видно по названиям: есть Equals, Contains, нахождение по паттерну. Но все эти фильтры были недостаточно производительны. У нас огромный поток, и самая его мякотка идет как раз-таки на фильтрацию. CPU загружен «в потолок» именно на этих фильтрах, тем более, что у нас может быть множество фильтров в одном DSL. 

Соответственно, мы стали делать более производительные и специальные фильтры, учитывающие специфику данных. Например, стоит обратить внимание на Trie contains list. Он содержит реализацию алгоритма Ахо-Корасика для поиска подстроки в строке. 

В нашем случае он используется для поиска доменов определенного уровня в потоке URL-ов. Такая штука нам позволила увеличить производительность по сравнению с обычным Contains-ом на 30% и снизить нагрузку на CPU.

Фильтр у нас имеет частично определенную функцию, которая содержит по умолчанию сразу два метода — isDefinedAt и Apply. isDefinedAt у нас проверяет, участвует ли этот фильтр в обработке данных, участвует ли он в пайплайне, в конкретном DSL. Если участвует, то применяют метод Apply, в котором реализована фильтрация к конкретному сообщению. 

Следуем дальше. Когда данные отфильтрованы – они поступают уже в трансформацию, и тут все просто. Так как основной формат данных — Thrift, то потребители хотят видеть глазами эти данные, а Thrift у нас — это sequence byte, глазами его не очень получается смотреть. Соответственно, мы сделали трансформацию данных в CSV, в Json, есть даже трансформация Json to Json. 

В эту трансформацию мы просто убираем лишние поля и переделываем их так, как хочется потребителю. Например, если на вход у нас пришло сто полей – мы выдаем только пять, нужных клиенту. И уменьшаем тем самым размер самого потока. Все это, конечно же, настраивается потребителем на самообслуживании. Нужно только взять trait и реализовать, реализация похожа на фильтрацию. Методы аналогичные, isDefineAt и Apply с тем же назначением. 

Потом наступает очередь следующего обработчика, дедубликации. Есть вопрос: а как в принципе делать дедубликацию данных на потоке? Когда у нас есть конечный набор данных — там все понятно: есть сообщения и записи, на основе которых мы можем понять, где какие данные являются дублем. На потоке у нас бесконечное количество данных, соответственно, нам нужно выбрать какой-то интервал времени, в рамках которого необходимо хранить состояние нашего объекта. И только после него уже сравнивать состояние конкретно других объектов для поиска дублей. 

Где хранить эти объекты и какое время хранить? Время мы отдали на откуп потребителям, а вот место… Мы хотели иметь самодостаточный инструмент и не плодить большое количество технологий или же использовать для этого какую-то внешнюю хранилку.

Мы подумали: есть же Spark Streaming, у него есть executors, они запускаются на различных нодах, которые умеют между собой общаться, есть механизм бродкастов. Может быть есть что-то, что позволит именно из спарк стриминга сделать так, чтобы можно было сохранить состояние? У нас есть dstream, дискретный поток данных, а у него есть так называемый метод mapWithState. Причем в исходниках спарка он помечен как experimental. Мы именно экспериментировали с этим решением и нам этот эксперимент очень понравился. Достаточно просто передать туда функцию по правилам определения дедубликации и всё, спарк стриминг сам у себя будет хранить определенное состояние. Нам даже любезно предоставили метод timeout, время жизни состояния объекта, то есть сколько времени хранить этот объект.

Это решение казалось очень удачным, но в какой-то момент мы поняли, что и у него есть недостатки. Если мы вдруг перезагрузим наше приложение или инструмент, то получится ситуация, когда мы уже потеряем отфильтрованные и трансформированные данные. Это нас не устраивало. При этом мы не могли никак понять, правильно ли у нас работает дедубликация. 

Выход нашли такой: просто разделить на два пайплайна конкретное решение. То есть взять, вычитать данные, отфильтровать, трансформировать, отправить эти данные с помощью Action в промежуточный топик Kafka для дедубликации. И вторым пайплайном этим же приложением вычитать данные и просто дедублицировать, в конечном итоге отправив их в конкретный топик.

Реализация дедубликации — это метод Apply и DStream с MapWithState, туда передаем функцию высшего порядка, которая дедублицирует данные и сохраняет все по механизму бродкастов. Таймаут указан для timetoleave нашего конкретного состояния. 

Переходим к терминальному обработчику. В нем нам достаточно было реализовать три выхода: это Kafka, HDFS и HBASE. В Kafka мы можем лить данные в любой кластер в любом направлении, также мы можем данные записывать как файлики на HDFS в различных форматах. Причем мы умеем ролировать файлики как по времени, так и по размеру. А еще мы можем записывать данные в HBASE по ключу, для этого всего у нас есть конфигурации, и пользователи этими конфигурациями пользуются.

При этом в рамках пайплайна у нас может быть несколько Action, то есть Source у нас минимум один в пайплайне, а экшнов может быть несколько. Это сделано для того, чтобы трансформированные и уже отфильтрованные данные мы могли параллельно записывать в несколько топиков Kafka или в несколько директорий HDFS. 

Я уже упоминал о том, что Action должен принимать решения о том, что же делать с данными, когда данные к нему уже пришли и когда Action уже видит его конечное состояние. Здесь есть интересный момент. Допустим, у нас есть некоторый простой DSL и несколько фильтров, которые соединены логическим операндом AND. И у нас есть сообщения, которые вычитываются из источника в поток. 

У нас есть выражения, у нас есть фильтры, и каждый фильтр принимает участие в обработке сообщения. То есть сообщение путешествует абсолютно по всем фильтрам, которые указаны у нас в DSL. Каждый фильтр устанавливает отметку в нашем сообщении, точнее даже битик в битовой маске этого сообщения о том, что конкретный фильтр успешно прошел фильтрацию для этого сообщения.

Соответственно, пройдя каждый фильтр, каждое сообщение накопит в себе конкретное состояние и, когда оно придет в Action, у него посмотрят так называемый триггер. Триггер сравнит состояние битовой маски конкретного сообщения с условием DSL фильтрации и поймет, что же с этими данными делать. Если условие по битовым маскам совпадают, то данные передаются в Action, и Action уже знает, куда эти данные дальше отправлять. Если нет – то данные будут просто отфильтрованы. 

Action реализовывается просто. Это тот же PartialFunction, значит у него есть isDefinaAt и Apply, но при этом есть еще пара интересных методов, например, OnPartitionEnd и OnRDDEnd. Для чего нам нужен OnPartitionEnd? Например, для реализации HDFS Action. По записи данных в HDFS мы в этом методе ролируем файлик по времени и по его размеру. А OnRDDEnd в случае успешной обработки RDD может отправлять какие-то дополнительные метрики помимо технических метрик, например, метрики данных для Data Quality. 

На этом заканчиваем первую часть. Если у вас возникли вопросы или вы хотите поделиться опытом – добро пожаловать в комментарии!

Продолжение – в ближайшее время.

Комментарии (4)


  1. kxl
    26.08.2022 01:23
    +1

    Интересно.... Если нужно было хранить состояние, то почему не подошёл Flink?


    1. newnew94 Автор
      26.08.2022 12:13

      Эту методологию на текущий момент можно реализовать и с помощью Flink. Такое решение тоже имеет право на жизнь. В нашем случае мы провели RnD на сравнение Flink vs Spark, то есть реализовали несколько кейсов, которые учитывают специфику наших данных, объёмы данных, особенности обработки. Запустили решение на одном и том же железе, на одних и тех же объёмах. Выяснили, что для решения одних и тех же задач, чтобы не создавать большие задержки в обработки данных Flink требуется больше ресурсов (CPU, RAM), чем Spark. То есть Spark обходился нам дешевле по ресурсам. Также важным фактором на тот момент был опыт и компетенции команды, в команде было больше опыта со Spark и мы могли более качественно проектировать решение. Было ещё несколько критериев, но эти моменты для нас были важнее. При этом считаю, что Flink отличный фреймворк, так как он имеет свои преимущества, на его основе можно сделать мощную систему стриминга, используя эту методологию.


  1. HADGEHOGs
    26.08.2022 18:20

    Отличный образчик кусков мыслеблудия от ИТ- менеджера.


    1. newnew94 Автор
      26.08.2022 18:38

      Эта статья состоит из двух частей. Первая часть содержит информацию о том, каким образом можно организовать подход к потоковой обработки данных, а вторая часть реализация этого подхода. В этой части статьи объясняется концепция, которая может быть реализована на многих популярных стеках технологий. На основе этой концепции реализована и работает система стриминга, что показывает, что эта концепция имеет право на жизнь. Подробнее о результатах и о нюансах реализации, со скриншотами метрик, характеристик железа и кусками кода будет во второй части, которая выйдет позднее.