Для того чтобы мы могли считать статистику, наш сайт в своей работе генерирует огромное количество событий. Например, при отправке сообщения другому пользователю, при заходе пользователя на сайт, при смене местоположения и т.д. События представляют из себя строку в формате JSON или GPB (Google Protocol Buffers) и содержат время отправки, идентификатор пользователя, тип события, а также поля, относящиеся непосредственно к самому событию (например, координаты пользователя).

Каждую секунду генерируются сотни тысяч событий, и нам нужны инструменты, чтобы их собирать и анализировать эффективно и с минимальной задержкой. Мы рассматривали несколько существующих решений для этой задачи и до недавнего времени использовали демон под названием Scribe от Facebook. Он в целом нас устраивал и позволял делать все, что нам нужно. Однако в какой-то момент Facebook забросил свою разработку, и при некоторых условиях Scribe начал у нас падать (например, при перегрузке upstream-серверов). Самостоятельно устранить причину падений демона у нас не получилось, поэтому мы начали искать альтернативу.

Наши требования к системе доставки событий были следующие:

  • наличие локального (прокси) демона;
  • сохранение событий на диск в случае недоступности принимающего сервера;
  • возможность маршрутизации событий по категориям;
  • шардирование потоков данных по хешу (от user_id или другого) и round-robin;
  • запись событий в файлы на принимающей стороне (scribe-like);
  • нормальная работа в условиях высокой latency сети (доставка событий между ДЦ);
  • масштабируемость приема и отсылки до миллиона событий в секунду;
  • легкость эксплуатации, адекватное потребление ресурсов.

Мы рассматривали следующие варианты:

  • Apache Flume: нестабилен, теряет события при падении, если не использовать Spooling Directory Source, который имеет очень неудобный API;
  • FluentD: слишком низкая производительность, в остальном очень хорош;
  • Apache Kafka: нет локального агента (см. issues.apache.org/jira/browse/KAFKA-1955).

К сожалению, ни один из этих вариантов не решает все наши проблемы, поэтому мы решили написать свою систему и назвали ее Live Streaming Daemon (LSD).

Что умел Scribe?


Чтобы понять, что делает LSD и зачем он нужен, давайте сначала подробнее рассмотрим фичи, которые мы использовали в scribe.

Наличие локального демона


Scribe работает по архитектуре «клиент-сервер», где клиентами называются машины, которые генерируют события, а серверами — машины, которые их получают. Чтобы экономить ресурсы и уметь буферизовать на диск события в случае проблем с доставкой, Scribe предлагать запускать инстансы клиента на каждой машине, на которой генерируются события. Приложение, генерирующее события, соединяется с локальным клиентом через unix или tcp socket и посылает в него события через протокол Apache Thrift. Предполагается, что локальный прокси будет всегда доступен и будет отвечать за небольшое время.

В целом в большинстве случаев все так и работает, однако иногда локальный инстанс Scribe может начать отвечать дольше обычного или вообще аварийно завершиться. Поэтому у нас был механизм, который сохраняет событие в локальные файлы вместо Scribe, если он недоступен. Отдельным cron-скриптом мы потом отправляли эти события в Scribe, когда он поднимался.

Возможность маршрутизации событий по категориям


Категорией события называется, по сути, имя директории, в которую будет записано то или иное событие на принимающем сервере. Разные типы событий имеет смысл класть в различные категории, поскольку обработчик для них может отличаться. В Scribe предусмотрена возможность посылать разные категории на разные сервера и задается она маской имени категории, например debug_*.

В нашей конфигурации все события по умолчанию доставляются в европейский ДЦ. Если нужно доставить событие в пределах ДЦ, мы посылаем событие, которое имеет префикс local_, или же, если мы хотим доставить событие в определенный ДЦ, мы добавляем префикс с именем этого ДЦ. В конфигурации демона прописаны разные маршруты для этих категорий, и они доставляются туда, куда нужно. При доставке в удаленные ДЦ могут использоваться промежуточные узлы для буферизации событий.

Шардирование потоков данных


Иногда бывает удобно доставлять данные, которые относятся к конкретному пользователю, на один и тот же сервер. В некоторых случаях это позволяет значительно улучшить производительность обработки за счет кеширования данных пользователя на небольшое время.

Как правило, данные распределяются просто по алгоритму round-robin, то есть каждый следующий кусок данных посылается на следующий сервер из списка, и так по кругу. У Scribe есть недостаток при работе в обоих режимах: демон «запоминает» сервер, на который нужно доставить конкретное событие, и при недоступности одного из принимающих серверов события будут копиться на диске и никуда не доставляться, даже если остальные сервера доступны и способны принять и обработать весь поток событий.

Запись событий в файлы на принимающей стороне


На принимающей стороне (т.е. на стороне сервера) все события пишутся в файлы вида <имя_категории>/<имя_категории>-<дата>_<счетчик>, а также создается симлинк вида <имя_категории>/<имя_категории>_current на последний файл в категории. Файлы ротируются на основании прошедшего времени (например, 60 секунд) или объема (например, 10 Мб) в зависимости от того, что случится раньше.

Если категория называется, к примеру, error_log, то иерархия файлов и директорий будет следующая:

/var/scribe/error_log/
|-- error_log-2016-09-13_004742
|-- error_log-2016-09-13_004743
|-- error_log-2016-09-13_004744
`-- error_log_current -> error_log-2016-09-13_004744

Запись осуществляется всегда в последний файл. В предыдущие файлы сервер не пишет, их можно свободно читать и удалять после того, как файл полностью обработан.

Нормальная работа в условиях высокой latency сети


Клиент Scribe отправляет данные небольшими пачками и ждет подтверждения с удаленной стороны перед тем, как отправить новую пачку. Это очень плохо работает, например, в случае пересылки событий через Атлантический океан, где задержка передачи данных составляет примерно 125 мс. Если максимальный размер пачки, к примеру, составляет 0,1 Мб, то за одну секунду таким способом можно передать лишь 0,1 Мб / 0,125 с = 0,8 Мб/с. Это ограничение можно обойти, если не ждать подтверждения для каждой пачки, а отправлять события в потоковом режиме.

Что предлагает LSD?


В целом основных претензий к Scribe у нас было всего две:

  1. Нестабильность и потеря данных при падении демона.
  2. При падении принимающего сервера трафик не перераспределяется между оставшимися серверами автоматически, требуется ручное вмешательство.

LSD решает эти две проблемы и удовлетворяет нашим требованиям по доставке событий, о которых мы говорили в начале.

Защита от потери данных при падении демона


Не бывает софта без ошибок, поэтому вместо того, чтобы постараться сделать LSD «неубиваемым» и всегда отвечающим за адекватное время, было решено пойти другим путем: клиенты будут всегда писать события в файлы, а LSD-клиент будет эти файлы читать и доставлять на нужные машины. Этот способ удобен еще и тем, что не требуется драйверов Thrift, Protocol Buffers и т.д., события можно отправлять хоть из shell-скрипта.

Чтобы отправить событие, нужно записать строку с этим событием в конец файла вида <category>/<filename>.log, где <category> — имя категории события. В качестве <filename> может служить любая монотонно возрастающая строка, основанная на текущей дате и времени. Такой формат выбран не случайно и позволяет пересылать на другие сервера события, доставленные с помощью LSD или Scribe. В качестве <filename> мы рекомендуем использовать дату и время в формате YYYYMMDDHHII (например, 201609131714). При выборе такого формата файлы создаются максимум раз в минуту и их имена монотонно возрастают.

Если нужно отправлять события размером больше 4 Кб ( stackoverflow.com/questions/1154446/is-file-append-atomic-in-unix ) из нескольких процессов, то нужно брать файловую блокировку перед записью события в файл, чтобы строки не перемешивались. Можно добавлять суффикс _big к имени файла и писать большие события в отдельный файл, чтобы не брать блокировку для маленьких событий.

Также поддерживается plain-формат вида <category>.log, и, в таком случае, создание поддиректории не требуется. Такой формат удобно использовать при отправке событий из shell-скриптов и для сбора логов.

Автоматическое перераспределение потока событий


Если падает один из серверов, на которые доставляются события, то они автоматически перераспределяются по оставшимся серверам. Если же один из серверов работает медленнее остальных, то в этом случае ему просто достанется такой поток событий, который он в состоянии принять.

Это также означает, что однократная доставка не гарантируется, поскольку недоступность сервера определяется на основании таймаута. Возможна ситуация, когда события могут успешно доставляться на сервер, но подтверждения об этом приходить не будут, или же будут приходить с большим опозданием. В таком случае LSD-клиент заново пошлет пачку событий, подтверждение для которой не пришло за таймаут (по умолчанию 30 секунд).

Доставка событий в режиме реального времени


Поскольку мы выбрали имя Live Streaming Daemon, нужно соответствовать :). Когда хватает пропускной способности сети и производительности сервера на принимающей стороне, доставка событий осуществляется в режиме реального времени — никаких искусственных задержек при доставке не вносится. Это удобно, если вы доставляете логи или создаете много промежуточных узлов для пересылки событий. С другой стороны, доставка в режиме реального времени требует большего количества ресурсов, чем если бы события накапливались и отправлялись раз в несколько секунд (с такими настройками мы использовали Scribe). Поэтому потребление CPU у LSD в среднем несколько выше, чем у Scribe, однако разница не очень значительна.

Производительность


К сожалению, мы не смогли измерить производительность Scribe на нашем потоке событий для внутренней системы аналитики UDS, поскольку scribe-клиенты падали под нагрузкой (про UDS не так давно рассказывал Александр Крашенников).

Один LSD-сервер легко справляется с потоком событий в 2 гигабита/с (400k событий/с), поступающим c тысяч серверов. Соответственно, чтобы принять поток в 1 миллион событий в секунду, нужно всего 3 сервера, при этом каждый из серверов должен быть оснащен двумя гигабитными сетевыми картами.

Open-source


Исходные коды LSD находятся на GitHub (для установки наберите команду go get github.com/badoo/lsd). Демон работает под Linux и macOS, но для промышленного использования рекомендуется использовать Linux.

Помимо LSD, у нас есть большое количество других проектов, выложенных в open source, посмотреть и изучить которые вы можете в нашем техблоге.

Юрий Насретдинов, старший разработчик, Badoo
Поделиться с друзьями
-->

Комментарии (33)


  1. RPG18
    19.09.2016 13:51

    Удивительно, что не тестировали RabbitMQ.


    1. Scalar
      19.09.2016 14:36
      +6

      Мне кажется, что RabbitMQ не удовлетворяет и половине изложенных требований, и вообще является системой более высокого уровня, чем просто доставка событий.


      1. RPG18
        19.09.2016 16:21

        А Apache Kafka удовлетворяла этим требованиям? А на выходе у них получилась не система высокого уровня?


        1. youROCK
          19.09.2016 16:22

          Kafka в целом удовлетворяет требованиям, однако не имеет локального агента и соответственно не может буферизовать события на диск в случае недоступности кластера. Об этом сказано в самом начале.


          1. Optik
            19.09.2016 17:28

            Почему не было варианта реализовать кастомный продьюсер для кафки с требуемым функционалом?


            1. youROCK
              19.09.2016 18:22

              Да в принципе наверное можно было бы так сделать. На самом деле, код клиента составляет 90% от всего демона, поэтому, чтобы не переписывать консьюмеры для scribe (которые пишут в файлы в описанном в статье формате), можно было уже и сервер написать заодно. Что мы и сделали.
              Задача стриминга из файлов (особенно в реальном времени) не такая уж простая из-за того, что сложно реализовать rotate файлов, поэтому её почти никто и не реализует по-человечески. А если писать не в файлы, то могут теряться события при проблемах с демоном. Так что нельзя сказать, что у нас был большой выбор, к сожалению.


              1. Doggy
                19.09.2016 21:20
                +1

                Можете рассказать подробнее какие сложности возникают при реализации ротейта по человечески?


                1. youROCK
                  20.09.2016 10:40
                  +1

                  Основная проблема заключается в том, чтобы не терять события во время ротейта (переименования и удаления) файлов.
                  Как реализовать ротацию? Допустим, что кто-то пишет в файл cat.log. В каждый момент времени может быть несколько процессов, которые держат файл открытым, поэтому может вообще не быть «окна» по времени, когда файл можно было бы удалить, чтобы появился новый. Ок, вместо этого переименуем файл в cat.log.old и продолжим стримить этот файл. Скорее всего в скором времени опять появится файл cat.log, потому в приложении записано именно это имя.

                  Теперь у нас 2 файла: cat.log и cat.log.old. Мы должны стримить оба, поскольку мы не знаем, пишет ли кто-нибудь в старый файл или нет. Переименование в .old-файл происходит по достижении определенного размера, и по умолчанию это по-моему 10 Мб. Допустим, что приложения все еще держат открытым файл cat.log.old, и cat.log уже превысил 10 Мб. Если мы еще раз переименуем cat.log в cat.log.old, то это не вызовет никаких ошибок и старый cat.log.old просто заменится на новый. В итоге мы удалим файл, в который кто-то еще мог писать и события потеряются.

                  Чтобы этого не происходило, мы должны откладывать ротацию до тех пор, пока .old-файл не перестал использоваться и когда его солержимое полностью доставлено на сервера-приемники. Определить использование можно с использованием утилиты fuser или lsof, но часто вызывать их на каждый файл может быть весьма затратно. Можно также воспользоваться трюком с flock() и заставлять писателей (!) брать LOCK_SH на файл, а необходимым условием для ротации файла тогда будет служить полученный LOCK_EX. К сожалению, эта схема не слишком удобна, особенно если вы хотите дать возможность посылать события откуда угодно, вплоть до shell-скриптов.

                  Вместо этого, чтобы определить, используется ли файл, LSD периодически (по умолчаниюраз в минуту) сканирует procfs и делает stat() на каждый файловый дескриптор, открытый в системе и сверяет номера inode'ов со списком inode'ов файлов, которые мы хотим заротировать. Это позволяет определить использование всех файлов сразу с небольшим оверхедом. Под капотом fuser и lsof точно также сканируют procfs, поэтому этот способ на самом деле не является чем-то особенным.

                  Теперь, когда с ротацией разобрались, осталась одна маленькая деталь: мы не хотим делать open()-read()-close() на каждое событие, полученное через inotify, поэтому мы держим открытыми файловые дескрипторы у всех файлов вида category.log. Рано или поздно открытый файловый дескриптор начинает смотреть на файл category.log.old и потом вообще на удаленный файл (после второго переименования). Организовать систему внутренних оповещений об изменении имени файла, оказывается, не так просто, как может показаться на первый взгляд, поскольку каждый файл нужно открыть только один раз и не забывать закрывать файловые дескрипторы для удаленных файлов.


                  1. mkevac
                    20.09.2016 11:01

                    Все ведь сильно упрощается, если в файл пишет ровно один писатель. И если после переименования в cat.log.old появился cat.log, то в cat.log.old уже никто не пишет.


                    1. youROCK
                      20.09.2016 11:10
                      +1

                      Да, Марко, но дело в том, что мы делали LSD для замены Scribe, а в scribe мы писали из разных процессов php-fpm :).


                  1. Doggy
                    20.09.2016 11:14

                    Спасибо за развернутый ответ.


                  1. amarao
                    20.09.2016 12:30

                    Мне кажется, основная проблема находится как раз в районе идеи «приложение пишет лог-файлы». Если приложение пишет лог-файлы, это значит, что у вас нет никаких гарантий стабильности системы. Потому что приложение может решить быстро писать логи или писать сообщение о том, что заканчивается место на диске для хранения логов со скоростью быстрее производительности дисковой подсистемы и т.д.

                    Правильное ршение: перевод всех приложений на syslog или journalctl. Если приложение плохо переводится, то его нужно научить писаться в stdout/stderr, а дальше его сам systemctl сложит в syslog правильным образом.


                  1. ITweb
                    20.09.2016 16:49

                    А почему не использовать уникальное имя для old файлов?


                    1. youROCK
                      20.09.2016 19:07

                      А как это поможет :)? Все равно нужно удалять файлы, когда они полностью доставлены и в них никто не собирается писать.


      1. amarao
        20.09.2016 12:27

        В чём разница между «событием» и «сообщением»?


  1. Alantr
    19.09.2016 14:07

    Как быть в случае бездисковых серверов? Писать на nfs накладно.


    1. mkevac
      19.09.2016 14:17
      +1

      Писать на memory диск?..


    1. youROCK
      19.09.2016 14:36
      +1

      В целом, никто не мешает писать в центральный LSD-сервер «напрямую» (через JSON-протокол или GPB-протокол). Но лучше все же писать куда-нибудь на диск, чтобы события могли там лежать какое-то время и не потеряться. Иначе может получиться, что сервер «лежит» и событий накопилось уже много и они не влезают в память и приходится их «дропать».


  1. vaniaPooh
    19.09.2016 14:29
    +1

    Я так понял, что там только интеграционные тесты на PHP. А модули покрывали тестами? Рекламирую для этих целей библиотечку матчеров от коллеги: https://github.com/aandryashin/matchers


    1. youROCK
      19.09.2016 14:33

      Нет, модули тестами не покрыты, есть только «самопальные» функциональные тесты на PHP :)


  1. vaniaPooh
    19.09.2016 14:47

    Второй вопрос — не смотрели ли вы на: https://github.com/nsqio/nsq прежде чем писать своё?


    1. youROCK
      19.09.2016 15:01
      +1

      Смотрели, это тоже больше похоже на сервер очередей, нежели на транспорт для событий и последующей батч-обработки.


  1. kozzztik
    19.09.2016 17:19

    ElasticSearch + Logstash\FileBeat? Мы по крайней мере использовали их. Протобуфы из коробки не переваривает, но JSON родной формат.


  1. dm9
    19.09.2016 17:21
    +5

    Для меня Live Streaming — это что-то из области видеотрансляций. Уже обрадовался — думал, найду тут замену какому-нибудь Wowza Streaming Engine :-)


    1. Alexufo
      19.09.2016 17:56
      +1

      а на nginx вроде есть замена?


      1. dm9
        19.09.2016 21:15
        +1

        Спасибо за комментарий. Погуглил — нашел https://github.com/arut/nginx-rtmp-module/wiki/Directives. Вроде, все, что мне надо, есть. Попробую перетащить сайт с Wowza. Реально, спасибо. А то как настроил стриминг в 2014, так и не смотрел, не появилось ли чего нового.

        Автор топика, прости за оффтоп :-)


        1. imgen
          19.09.2016 22:14

          Я бы копал дальше :) В сторону более современного hsl
          https://docs.peer5.com/guides/setting-up-hls-live-streaming-server-using-nginx/


          1. dm9
            19.09.2016 22:39

            У HLS большая задержка. Приятно иметь 4 секунды на rtmp и hls для совместимости. Или я его просто не умею готовить?


            1. le1ic
              20.09.2016 16:10

              4 секунды на rtmp? Это что-то до фига, он позволяет порядка сотни миллисекунд задержку иметь.


              1. dm9
                20.09.2016 16:21

                То, что я намерил, — это настройка Wowza Streaming Engine по умолчанию. При отправке сигнала из Москвы на сервер в Голландии и затем обратно. Плюс какое-то время на перекодирование/пережатие при отправке.

                В принципе, я слышал, что можно быстрее, но всё же при работе через дикий интернет иметь небольшой буфер хорошо, как мне кажется. 100 мс — это интересный предел, не знал, что этот протокол так может.


            1. imgen
              21.09.2016 00:14

              hls работает без flash player, который в скором времени выпилят даже из хрома


          1. dm9
            19.09.2016 22:43

            Собственно, мы про одно и то же. И Ваша ссылка и ссылка выше ведут на настройку того же модуля.


  1. 1Michael1
    20.09.2016 16:10

    а как на счет zeromq для транспорта?