Примечание переводчика: На Хабре и в нашем блоге о корпоративном IaaS мы много пишем об облачных технологиях, и рассматриваем интересные инфраструктурные проекты различных компаний. В прошлый раз мы рассказывали вам о компании-разработчике технологий для онлайн-видеотрансляций BAM, а сегодня представляем вашему вниманию адаптированный перевод заметки о том, как техническая команда стримингового сервиса Spotify занималась масштабированием Apache Storm.



Мы в Spotify создали несколько конвейеров задач для работы с аналитикой в реальном времени, которые применяются в таргетированной рекламе, сервисе рекомендации музыки и визуализации данных. Каждый из этих конвейеров, работающих в реальном времени, использует Apache Storm, связанный с различными системами, например, Kafka, Cassandra, Zookeeper, а также другими истоками и стоками. При создании приложений, количество активных пользователей которых превышает 50 миллионов по всему миру, нужно постоянно помнить о масштабируемости, чтобы обеспечить высокую доступность и хорошую производительность системы.

Что такое масштабируемость


Масштабируемость – это способность программного обеспечения поддерживать производительность на должном уровне в условиях увеличивающейся рабочей нагрузки с помощью простого добавления ресурсов. Но чтобы получить такую способность, нужно сделать несколько больше, чем просто увеличить ресурсы и настроить производительность. При разработке программного обеспечения нужно принимать во внимание такие моменты как качество, легкость обслуживания и эксплуатационные показатели. Когда мы создаем приложение, мы выдвигаем следующие условия:

Важные условия масштабируемости

  • Программное обеспечение должно быть высококачественным и иметь надежную архитектуру;
  • Программное обеспечение должно быть спроектировано таким образом, чтобы его можно было легко выпустить, легко сопровождать и легко отлаживать;
  • Чтобы совладать с дополнительной нагрузкой, производительность программного обеспечения нужно повышать путем линейного добавления ресурсов.

Масштабируемость в Storm


Что же включают в себя конвейеры при масштабировании в Storm? Я подробно рассмотрю это на примере нашего конвейера персонализации, работающего в реальном времени, и мы разберем различные аспекты масштабируемости.



В нашем конвейере персонализации имеется кластер Kafka с топиками для различных видов событий, например, для окончания воспроизведения песни или показа рекламы. Наша топология Storm подписывается на различные пользовательские события, снабжая их метаданными (например, жанром песни), которые считываются из Cassandra, затем группирует их [события] по пользователям и вычисляет пользовательские атрибуты с помощью комбинаций алгоритмов агрегирования и деривации. После эти атрибуты пользователя записываются в Cassandra, которая, в свою очередь, вызывается отдельными backend-сервисами, персонализирующими пользовательский опыт.

Проектирование и качество

Расширяя функциональность нашего конвейера персонализации, мы заметили, что с точки зрения настройки производительности и отладки потока событий, топологии начинают выглядеть очень сложно. Однако тесты показывали достаточно большую эффективность нашего кода, поэтому мы оперативно провели рефакторинг, дабы упростить топологии.

Архитектура топологии


Пройдя через весь цикл превращения одной сложной топологии в несколько маленьких и простых, мы извлекли для себя несколько уроков:

  • Создавайте маленькие логически понятные топологии для разных потоков работ;
  • Продвигайте многократное использование кода с помощью общих библиотек, а не общих топологий;
  • Убедитесь, что методы легко тестируются;
  • Распараллеливайте медленные операции ввода/вывода, используйте пакетную обработку.

Качество


Мы разработали наши конвейеры на Java, а для проверки бизнес-логики в разных вычислительных «болтах» (обработчиках кортежей) использовали JUnit. Мы провели непрерывные тесты в процессе симуляции работы кластера, используя зависимость backtype.storm.testing.

Удобство обслуживания

Мы сделали несколько модификаций, чтобы повысить удобство обслуживания, то есть, чтобы можно было с легкостью развертывать ПО на новых хостах нашего кластера и следить за их «здоровьем».

Конфигурация


Мы вынесли настройки параллелизма «болтов», параметры концевых точек стока и истока и параметры производительности топологии в файл config, тем самым получив возможность изменять настройки, не меняя при этом кода. Теперь стало просто делать небольшие инкрементные изменения и смотреть, как это сказывается на работе системы.

Визуализация метрик


У нас есть возможность просматривать метрики топологии в виде графиков, которые отражают «здоровье» системы и помогают выявлять проблемы. Мы убедились, что следим за самыми высокоуровневыми метриками (на картинке ниже), обобщающими состояние системы, иначе можно легко запутаться в большом количестве графиков, когда становится непонятно, какая из характеристик важна.



Развертывание топологии




Все вычисления в нашем конвейере персонализации независимы, а в нашей стратегии развертывания мы используем дублирование обработки событий, чтобы в процессе перехода на новую версию не потерять ни одного сообщения. Такой подход работает не во всех случаях, например, он не подойдет для транзакционных вычислений. На диаграмме выше этапы развертывания расставлены в хронологическом порядке слева направо, а t1…t8 представляют разные временные отрезки.

Наша стратегия развертывания требует, чтобы кластер Storm обладал достаточной производительностью для одновременного управления двумя работающими топологиями персонализации. В момент времени t1 кластер работает с версией топологии v1. Когда версия v2 готова, то мы запускаем её в кластере. В момент времени t4 кластер работает с двумя версиями одновременно. Каждая топология использует уникальный идентификатор groipId в Kafka, чтобы гарантировать доставку всех сообщений обеим версиям. В этот момент все сообщения обрабатываются дважды, но это не страшно, так как вычисления идемпотентны. В момент времени t5 мы отключаем v1, и она перестает получать сообщения от Kafka. После мы следим за изменением метрик v2 и смотрим, все ли в порядке. Если все хорошо, то мы полностью «убиваем» версию v1, и в момент времени t8 кластер функционирует только на версии v2. Однако если в момент времени t7 мы замечаем на графиках какую-либо проблему, то мы возвращаем v1 в работу. Здесь мы «убиваем» версию v2, таким образом, возвращаясь в начальную точку. Имея возможность безопасно вернуться в исходное состояние, мы можем часто откатывать небольшие изменения и минимизировать риски.

Мониторинг и предупреждения об ошибках


Мы следим за показателями кластера, топологии, истоков, стоков, а система предупреждает нас о нарушениях только в высокоуровневых метриках, иначе можно быстро устать от излишне большого количества предупреждений.

Производительность

Со временем мы изучаем проблемные места, ошибки системы и настраиваем некоторые элементы, чтобы достигнуть желаемой производительности. Для получения достойной производительности требуется достойное аппаратное обеспечение.

Аппаратное обеспечение


В самом начале мы запустили наши топологии в общем кластере Storm, но очень скоро столкнулись с ресурсным голоданием из-за их загруженности. В связи с этим мы запустили независимый кластер Storm, что было довольно просто. Наш нынешний кластер обрабатывает более 3 миллиардов событий в день и имеет 6 хостов с 24 ядрами (2 потока на ядро) и 32 Гбайтами памяти. Даже с таким небольшим кластером мы сможем работать еще долго, так как даже близко не подошли к порогу использования ресурсов, несмотря на две одновременно работающие в процессе развертки версии топологии персонализации. В будущем мы хотим запустить Storm на YARN на нашем кластере Hadoop, чтобы рациональнее использовать ресурсы и добиться больших возможностей масштабирования.

Пропускная способность и латентность


Чтобы получить желаемую пропускную способность и латентность, нам нужно настроить параметры истока и стока. Мы также настроили кэширование, параллельную обработку и параллельный доступ. Все это подробно описано ниже.

Настройка истока и стока


Настройка Kafka

  • Мы настроили rebalancing.max.tries, чтобы минимизировать ошибки балансировки нагрузки в Kafka;
  • Мы используем различные групповые id для каждого KafkaSpout в каждой версии, чтобы обеспечить избыточность обработки сообщений во время переходов на новые версии топологий.

Настройка Cassandra

  • Мы используем разные таблицы для разных TTL. Также выставили gc_grace_period = 0, чтобы отключить чтение с восстановлением для записей с TTL, так как они нам не нужны;
  • Мы использовали стратегию уплотнения таблиц, связанных датой, для новых данных;
  • Мы контролируем число открытых соединений между топологией Storm и Cassandra;
  • Мы настроили Snitch, чтобы обеспечить надлежащую маршрутизацию запросов.

Проблемы с параллелизмом


OutputCollector в Storm не является потокобезопасным, то есть он не может безопасно использоваться одновременно несколькими потоками, такими как callback-функции, использующие Future-объекты для асинхронной обработки. Чтобы обезопасить запросы на создание/подтверждение обработки кортежей Storm и их передачу обработчику, мы используем java.util.concurrent.ConcurrentLinkedQueue.

Настройка параллелизма

При настройке параллелизма наших топологий мы черпали вдохновение из презентации Storm с конференции Strata 2014. Мы следовали вот такому принципу, который отлично сработал:

  • 1 worker на узел топологии;
  • 1 executor на ядро для обработки задач ЦП;
  • 1-10 executor’ов на ядро для обработки задач ввода/вывода;
  • Посмотрите, какие задания можно распараллелить, и распределите ресурсы между заданиями, как можно тщательнее распараллеливая самые медленные из них (быстрые распараллеливайте в меньшей степени).

Кэширование для обработчиков («болтов»)


Чтобы обеспечить вычисление атрибутов пользователей обработчиками, нам пришлось выбирать между внешним и внутренним кешированием памяти. Мы предпочли внутреннее кеширование, поскольку внешнее кеширование требует проведения сетевых операций ввода/вывода, что ведет к повышению латентности и добавляет еще одну точку отказа. Однако внутренний кэш не персистентный и имеет отграниченный объем памяти. Вопрос персистентности нас мало беспокоил, но над ограничениями памяти пришлось поработать. Мы остановили свой выбор на кэше с определенным временем хранения, реализованном с помощью Guava. Чтобы контролировать его размер, мы можем установить количество хранимых элементов и время их хранения. Это дало нам возможность оптимально использовать память в нашем кластере.

Комплексный подход к масштабированию помог нам добавить новые возможности для нашей постоянно увеличивающейся активной пользовательской базы, одновременно сохранив высокую доступность конвейеров Storm.

Комментарии (3)


  1. vvnick
    11.08.2015 23:04
    -1

    В описании Сторма есть яркий маркетинг про: «процессинг более миллиона записей в секунду на одной ноде».
    А сколько в итоге ваша система перемалывает в секунду на ноде? (а также сколько памяти/цпу на ноду)
    И какое среднее время обработки одной записи?
    Как я понимаю при 50 миллионах активных пользователей и среднем времени трека 3-4 минуты (или нет?) будет около 200-300 тысяч событий в секунду, тоесть теоретически даже одна сферическая нода в вакууме должна справляться? ;)


    1. namespace
      12.08.2015 10:34
      +3

      Решил затеять чятик с переводом? Похвально!


      1. vvnick
        12.08.2015 15:50

        Не заметил, надо больше спать…
        и то что itgrad не имеет отношения к разработке спотифай меня почему-то тоже не смутило :)