Привет коллеги. Да, не прошло и три года с первой статьи, но проектная пучина отпустила только сейчас. Хочу с вами поделиться своими соображениями и проблемами касательно Spark streaming в связке с Kafka. Возможно среди вас есть люди с успешным опытом, поэтому буду рад пообщаться в комментариях.

Итак, в нашем проекте есть потребность принимать решения в режиме реального времени. Мы успешно используем Spark, для пакетной обработки данных, и поэтому для реалтайма решили использовать его же. Это даёт нам единую технологическую платформу и единую кодовую базу.

Workflow выглядит так: Все события приходят в очередь (Apache Kafka), а затем вычитываются и обрабатываются потребителями на базе Spark streaming. Потребители должны решить две задачи:

  • Роутинг данных (перенаправлять потоки данных в различные хранилища)
  • Принятие решений в режиме реального времени.

Данные которые приходят в Kafka, в конечном итоге должны попасть в HDFS в виде “сырых” лог файлов, конвертированных в паркет, и в HBase, в виде атрибутов пользовательских профилей. В своё время, для похожего роутинга мы довольно успешно использовали Apache Flume, но в этот раз решили поручить это дело Spark streaming. Spark из коробки умеет работать и с HDFS и с HBase, кроме того, разработчики гарантируют “exactly once” семантику. А вот теперь давайте немного подробнее разберемся с семантикой доставки данных (Message Delivery Semantics).
Их существует три вида:

  • At most once — Сообщение может быть потеряно, но никогда не доставлено более одного раза.
  • At least once — Сообщение никогда НЕ может быть потеряно, но может быть доставлено более одного раза.
  • Exactly once — Это то, то что люди хотят. Сообщение может быть доставлено только один раз, и не может быть потеряно.

И вот здесь кроется самое большое недопонимание. Когда разработчики Spark говорят об exactly once семантике, они имеют ввиду только Spark. Т.е если данные попали в процесс спарка, то они обязательно будут единожды доставлены до всех пользовательских функций участвующих в обработке, в том числе расположенных на других хостах.

Но как вы понимаете полный workflow не состоит из одного лишь спарка. В нашем процессе участвуют три стороны, и семантику стоит рассматривать для всей связки.

В итоге мы имеем две проблемы — доставка данных из Kafka в Spark, и доставка данных из Spark в хранилище (HDFS, HBase).

Из Kafka в Spark


Теоретически* проблема доставки данных из Kafka в Spark решена, причем двумя способами.

Способ первый, старый (Receiver-based Approach)


В спарке реализован драйвер, который использует Kafka consumer API для трекинга вычитанных данных (offsets). Эти самые офсеты по классике жанра хранятся в Zookeeper. И всё бы ничего, но существует ненулевая вероятность доставки сообщения более одного раза, в моменты сбоев, а это At least once.

Способ второй, новый (Direct Approach (No Receivers))


Разработчики реализовали новый спарковский драйвер, который сам занимается трекингом офсетов. Информацию о вычитанных данных он хранит в HDFS, в так называемых чекпойнтах (checkpoints). Этот подход гарантирует exactly once семантику, и именно его мы используем.

Проблема #номер раз


Spark иногда портит checkpoints, причем настолько, что не может потом с ними работать, и переходит в состояние тяжелого наркотического опьянения. Он перестает вычитывать данные, но при этом продолжает висеть в памяти и говорить всем, что с ним всё в порядке. В чем причина этой проблемы пока совершенно не понятно. Соответственно убиваем процесс, удаляем чекпойнты, запускаемся и вычитываем все сначала, или с конца. И это тоже уже не exactly once )) В силу исторических причин мы используем версию 1.6.0 на Cloudera. Возможно стоит обновиться, и всё пройдет.

Проблема #номер два


Kafka — бьет редко но метко. Бывают такие ситуации, что падает какой либо брокер. Понять, почему произошло падение просто невозможно, из за абсолютно неинформативных логов. Падение какого либо брокера — это не страшно, Кафка рассчитана на это. Но вот если проморгать, и вовремя не перезапустить брокер, то весь кластер оказывается неработоспособен. Это конечно происходит не за один час, но тем не менее.

Из Spark во внешнее хранилище


Здесь дела обстоят не так хорошо. Заботится о гарантиях доставки данных из спарка во внешние хранилища должен сам разработчик, что привносит не слабый оверхед в разработку и архитектуру. Если на данном уровне нужна exactly once семантика, то придется не хило заморочиться. К слову сказать мы так и не решили проблему в этой части системы, и довольствуемся At most once семантикой.

Итоги:


По моим ощущениям использовать Spark streaming можно, но только в том случае если ваши данные не имеют особой финансовой ценности и вы не боитесь их потерять. Идеальный случай — это когда данные гарантированно попадают в хранилище с помощью какой то другой, более надежной подсистемы, а Spark streaming используется в виде грубого инструмента, помогающего сформировать какие то грубые выводы или не точную статистику, но в режиме реального времени, с последующим уточнением в режиме пакетной обработки.
Поделиться с друзьями
-->

Комментарии (9)


  1. Triffids
    16.06.2017 08:20

    ну все таки в spark 2.х они выкатили совершенно другие api под стриминг и по хорошему надо именно на них надо тестировать. то что было в 1.6 у них не позиционируется как стриминг.


    1. 2ANikulin
      16.06.2017 08:20

      Я согласен что надо на новый спарк переходить. Но не соглашусь что 1.6 — не позицианировался как стриминг


    1. Envy
      16.06.2017 15:58

      Еще не выкатили, это альфа


  1. madmis
    16.06.2017 08:28

    У нас в конторе как message broker используют AWS Kinesis.
    Сначала на php пилили под него консьюмера (паблишер на Scala написан), получилось черти что. Потом ребята на Scala перепилили.
    Я вот только до сих пор понять не могу, зачем для обычных сообщений (json), где немного данных, был выбран Kinesis с которым заморочек немерянно (или я просто его совсем не понял).

    Может кто-то статью про Kinesis напишет?
    О том для чего его стоит использовать, для чего не стоит. Его плюсы и минусы. Ну и личный опыт.
    Было бы очень интересно почитать.


    1. 2ANikulin
      16.06.2017 08:30

      Попробуйте сами разобраться с Kinesis на досуге. Потыкайте его палочкой, а потом статейку напишете. Получите сразу +100500 опыта и почета ))


  1. kokorins
    16.06.2017 14:01

    А flink не рассматривали?


    1. 2ANikulin
      16.06.2017 14:05

      Нет, к таким эксперементам, пока мы не готовы )


  1. Huntlet
    20.06.2017 14:59

    Есть ещё такой способ — вычитывать данные с помощью KafkaUtils.createRDD, а офсеты хранить самостоятельно. Если есть возможность сохранить данные и офсеты одной транзакцией, то получается Exactly once.


    1. 2ANikulin
      20.06.2017 16:04

      Да, это выход.
      Но это уже будет не стримминг