Привет, Хабр!
Меня зовут Иван Хозяинов, а работаю в ITSumma, где изучаю и применяю технологии, связанные с большими данными, машинным обучением и аналитикой. В этой статье хочу рассказать о системе хранения и обработки данных и инструментах, которые встречаются на пути от сырых исходников до представления, удобного для последующего анализа.
Поговорим, как связаны серверы в дата-центре и распределенные приложения для обработки данных и почему пришлось написать свой коннектор для Spark и Greenplum.
Какую задачу решаем
Обычно задача платформы обработки потоковых данных состоит в том, чтобы собрать, сохранить и провести аналитику данных, используя их в дальнейшем для бизнес-аналитики, машинного обучения и т.п. Причём:
Источников данных несколько (5-10), и они разнородные, например: журналы, логи, данные датчиков, телеметрия. Данные из этих источников через интернет попадают на приемник данных, через шину данных передаются в обработчик, записываются и попадают в хранилище. В этой статье уделим особое внимание как раз путям наполнения СХД.
Каждый источник генерирует порядка 500-1500 RPS. Чаще всего это структурированные пакеты данных в avro, json или бинарном формате. Но формат у разных датчиков отличается, соответственно, нужно уметь парсить входящие пакеты, чтобы уже потом записывать в СХД и обрабатывать.
Система хранения данных подключается к системам аналитики (BI) или ML-моделям для обучения на исторических данных.
Требования к железу всегда индивидуальны, но чаще всего начать можно с базовой конфигурации:
RAM: 32G на каждый сервер;
CPU: 8 ядер на каждый сервер;
HDD+SSD: 16 TБайт суммарно;
Сеть: внутренняя сеть с пропускной способностью не менее 1 Гбит/с.
А для прототип подойдёт и что-нибудь попроще: например, dedicated-серверы, где можно разворачивать виртуальные машины. Прежде чем масштабировать систему сбора, хранения и обработки данных, лучше её проверить в небольшой песочнице. Мы чаще всего работаем с виртуализацией KVM, но бывает OpenStack в связке с Ceph. А когда всё отработано — можно переходить и к работе на кластерах.
Архитектура платформы
Сейчас мы используем примерно такую схему:
На схеме можно условно выделить два основных блока: ПО, которое обрабатывает данные и складывает их в распределенную систему хранения данных, и системы обеспечения CI/CD.
Путь данных от источников до хранилища и дальнейших аналитических запросов, имеющих бизнес-ценность, включает следующие этапы:
Данные из источников попадают в шину данных. В качестве шины используется Apache Kafka.
Собранные Kafka данные из различных источников попадают в Spark и там обрабатываются.
Обработанные данные записываются в Greenplum.
После этого с данными могут работать аналитики и ML-системы.
Дальше я постараюсь раскрыть, почему выбран тот или иной продукт и какие рассматривали альтернативы. Например, для работы с данными сейчас используем Greenplum (продукт Pivotal), но ещё пробовали использовать Hadoop с HDFS и поверх этого Hive. Однако Greenplum показался быстрее, проще и у него открытый исходный код. Единственная проблема с Pivotal — как и Confluent, они не распространяют и не поддерживают свои продукты в РФ. Поэтому чтобы связать Spark с Greenplum, понадобится использовать что-нибудь опенсорсное (какие есть варианты — обсудим ниже).
Каждый узел платформы обработки данных мониторится. Для этого используем связку Prometheus + Grafana, а также свою систему мониторинга серверных параметров, по которым администраторы реагируют на инциденты раньше, чем наступили бы серьезные последствия.
За обеспечение непрерывной доставки отвечает следующий конвейер:
коммит попадает в GitLab;
Jenkins запускает сборщик продукта;
запуск собранного задания производится в Apache Spark;
статический анализатор SonarQube проверяет ошибки;
с помощью JMeter производится нагрузочное тестирование собранного задания.
Версии компонентов
К сожалению, нельзя просто взять последние версии всех компонентов — они могут не заработать вместе, и прощай ночные сборки. Поэтому привожу наш набор совместимости:
Apache Spark — 2.4.6
Java (OpenJDK) — 1.8.0
Apache Kafka — 2.4.1
Pivotal Greenplum — 6.9.0
PostgreSQL — 12.2
Apache JMeter — 5.2.1
Apache Ignite — 2.8.1
Jenkins — 2.235.5-lts
GitLab — 13.6.3
Sonarqube — 8.4.1
Grafana — 6.7
Prometheus — 2.17.0
Нагрузочное тестирование
Для того чтобы оценить, насколько построенная схема жизнеспособна, проводим нагрузочное тестирование, которое включает в себя следующие шаги.
Генерация правдоподобных данных —чтобы наполнить хранилище данными и проверить дальнейшие этапы, нужно создать профиль нагрузки, похожий на реальный. Это могут быть фейковые журналы с сервера, сгенерированные данные с датчиков и т.д.
Проверка на разных профилях нагрузки — чтобы узнать поведение при пиковой, повышенной и равномерной нагрузке и выявить компоненты, требующие масштабирования.
Анализ узких мест и их масштабирование — например, может понадобиться масштабировать БД, добавить узлы в Greenplum или ресурсы для обработчика данных.
Оценка пропускной способности — чтобы выяснить, как будет работать данная система целиком, насколько она пригодна к реальной нагрузке. Поэтому убеждаемся, что нет никаких аномалий: CPU не скачет, память не утекает и в целом решение готово к продакшену.
Также нагрузочное тестирование может быть частью CI/CD-процесса, тогда мы будем уверены в работоспособности системы после каждого деплоя, а не только после штучной проверки по списку.
Нагрузочное тестирование Яндекс.Танком
Первым вариантом, как проводить нагрузочное тестирование, была идея тестировать платформу обработки данных как веб-сайт — с помощью Яндекс.Танка: сделать микросервис, который принимает данные и посылает их в Kafka, и его тестировать. Для этого нужно предварительно подготовить нагрузку («патроны» для танка), чтобы трассировать данные. Это занимает некоторое время, а также оперативную память и место на диске.
В целом удобно — результаты и графики, как проходит нагрузка, можно посмотреть в облачном интерфейсе. Минус в том, что этот способ подходит только для HTTP-подобных протоколов и нагрузить можно только веб-ендпойнты, которые смотрят наружу.
Нагрузочное тестирование с Apache JMeter
Другой продукт, который можно использовать для нагрузочного тестирования, — это Apache JMeter. Он больше подходит для тестирования именно распределенных систем, потому что входит в экосистему Apache Software Foundation: для него есть плагины для прямой нагрузки на Kafka; можно нагружать не только веб-интерфейс, но и сразу шину данных; можно пробовать напрямую тестировать БД и есть поддержка сообщества.
Преимущества JMeter в итоге оказались решающими — на нём и остановились. Но перед этим свой велосипед тоже попробовали написать.
Нагрузочное тестирование самописным генератором
Если мы хотим обеспечить профилирование всего конвейера обработки данных от источников до СХД с учётом транспорта и обработки, то нужно иметь возможность трассировать и определить ёмкость каждого участка пути.
Один из подходов, который используется в Zipkin и Jaeger, состоит в генерации огромного количества пакетов данных с уникальными идентификаторами, создании с их помощью нагрузки и отслеживании всех этапов за счёт trace_id. При этом на каждом участке можно сделать обработчик и измерить, за сколько данные проходят участок пути от источника до обработчика, от обработчика до БД и т.д.
Общая схема понятна и позволяет подробно оценить возможности системы обработки данных, но чаще всего это избыточный метод, который требует дополнительных усилий и ресурсов. Обычно лучше использовать JMeter.
Брокер сообщений: Apache Kafka vs RabbitMQ
Выбрали Apache Kafka благодаря широким возможностям именно в контексте работы с большими данными и нагрузками, такими как хранение журнала, масштабируемость и достоверность. Более подробно о сравнении RabbitMQ и Apache Kafka можно прочитать в отдельной статье.
Apache Spark vs Flink
Для того чтобы выбрать один из этих фреймворков, решили попробовать оба. Подробное сравнение проводили 3 года назад, и тогда бросилось в глаза, что документации, примеров больше у Apache Spark, да и в целом распространенность у него больше. Но проведенное нагрузочное тестирование простых примеров, написанных на обоих фреймворках, показало лучшие результаты у Apache Flink. Например, Игорь Кураленок из "Яндекс.Облако" на недавнем митапе упоминал, что для потоковых данных рекомендуется использовать Apache Flink. Поэтому, судя по всему, этот фреймворк тоже зрелый и его можно использовать в разработке.
Но мы сделали выбор в пользу Apache Spark.
Требования к СХД
Три самых важных параметра, которые нужно оценить и учесть, составляя требования к СХД:
время восстановления после сбоя (RTO);
пропускная способность на запись (для потоковых данных из разнородных источников);
пропускная способность на чтение (для аналитики и машинного обучения).
Сложность работы СХД в случае обработки больших объемов потоковых данных в том, что нужно одновременно и быстро писать, и читать. Аналитики обычно хотят работать со свежими данными, желательно, с теми, которые только что пришли, — чтобы сразу что-то смотреть и делать свои аналитические выводы. Нагрузка на СХД при этом возрастает, и нужно следить, к каким репликам должны подключаться аналитики, и вообще, должны ли они подключаться к репликам или лучше работать напрямую.
Единственное требование с точки зрения железа — держать серверы в одном дата-центре.
К сожалению, точные параметры назвать трудно, они зависят от каждого конкретного случая и отличаются для разных объемов данных и выполняемых задач. Иногда может быть достаточно 4-х выделенных серверов, иногда нужно 12. В каких-то случаях нужны большие диски, а в каких-то лучше шардирование и диски по 1-2 ТБайт. Бывает, что общий объем хранилища измеряется петабайтами, но для нас более привычны объемы в несколько десятков терабайт.
Холодные хранилища данных
Пробовали работать с разными холодными хранилищами, в том числе, с Apache Hadoop и Cassandra.
Наш опыт работы с Cassandra небольшой, использовать её напрямую в системе аналитики сложнее. Это более медленная система и лучше подходит для долговременных хранилищ. Мы же говорим о быстрой обработке больших объемов данных — как структурированных, так и неструктурированных. Они попадают в озеро данных на Hadoop и хранятся там в чистом виде, пока не понадобятся, а потом уже передаются в тот же Greenplum и там обрабатываются.
В работе с Apache Hadoop использовали такую систему:
Apache Ambari для развертывания и управления. Иногда разворачиваем Ansible-скриптами — так больше степеней свободы, но зачастую через интерфейс управлять хранилищем проще, а у Ambari он удобный и хорошо всё разворачивается.
Дублирование namenode (активное и пассивное) — чтобы обеспечить high availability.
Репликация datanode с фактором репликации по умолчанию 3, но если ресурсов не хватает, то реплика-фактор может быть и 2.
Поверх HDFS Hive для аналитики и SQL-запросов и HBase как альтернатива Cassandra.
Pivotal Greenplum
Как уже писал ранее, в контексте работы с аналитикой остановились на Greenplum. High availability обеспечиваем за счет:
дублирования мастера — аналог namenode в Hadoop;
зеркалирования сегментов — аналог datanode в Hadoop, который представляет собой просто сервер в PostgreSQL;
RAID на железном уровне — это единственная рекомендация по железу, во всём остальном говорим про софт.
Spark+Greenplum = ?
Один из ключевых моментов статьи: каким образом можно обработать потоковые данные и записать их в систему хранения данных, которая сделана на Greenplum? Как уже говорил, несмотря на то, что Greenplum — продукт с открытым исходным кодом, вся обвязка и поддержка требует лицензии и контакта с западным рынком. Сейчас в России продукт не поддерживается.
Чтобы связать Spark и Greenplum, есть несколько вариантов:
Можно скачать коннектор Spark- Greenplum. Но в нём нет исходного кода (просто документация) и периодически нет доступа. На письма тоже отвечают не всегда, и непонятно, с чем это связано.
Есть opensource-библиотека, которая быстрее JDBC в несколько раз — в readme написано, что в 100 раз, если записывать напрямую. Но у неё нет интерфейса, который поддерживался бы самим Spark.
Мы сделали и совсем недавно выложили в opensource на GitHub свой коннектор Spark и Greenplum.
Наш Spark Greenplum connector распространяется под лицензией MIT, поддерживает DataSource API v2 Spark (только для Spark v2). Сейчас он ещё в альфа-версии и работает только на чтение, в дальнейшем доработаем и на запись, но пока, как и в предыдущем варианте, можно записывать при помощи JDBC-запросов через копирование. Над кодом работали несколько человек, но последнюю сборку и доработку интерфейса осуществлял @hovercraft-habr, и он сможет подробнее ответить на вопросы про технические тонкости.
Если статья вас заинтересовала, то, возможно, вы захотите попробовать этот коннектор. Мы с коллегами с удовольствием ответим на вопросы и учтём комментарии!
Yo1
интересно, но нифига не понятно.
в связке с greenplum, в чем спарк запускался? из текста складывается впечатление, что спарк на одной единственной ноде запутили в локальном режиме.
когда сравнивали с хадуп, что за енжин был у hive? hive on spark пробовали?
IvanKhozyainov Автор
На доступном железе для экспериментов пробовали стендалон. Спарк запускался на одной ноде, также пробовался на двух и трех нодах (тоже стендалон). Есть идея использовать yarn, до этого пробовался mesos (без связки с гринпламом).
Очень много разнородной информации, которая занимает время, а еще сам стек требует гораздо больше железных ресурсов, которые не всегда доступны :)
Если у вас был опыт и рекомендация, где все-таки лучше запускать spark в контексте удобства масштабирования/управления и возможности поставить рядом какую-нибудь базу, то буду рад ответу.
Hive запускался с TEZ (с mr не помню уже, какие-то проблемы возникали). Также пробовали impala — работает побыстрее, но лицензионные ограничения и периодически что-то отваливалось при нагрузочном тестировании. Hive on Spark даже не рассматривался, основные данные лежали в HDFS.
Yo1
на сколько я знаю на cвоем железе всего 2 варианта (не считая локал) пускать спарк — хадуп кластер или k8s кластер. k8s как менеджер для spark — совсем новая тема, в проде для совсем смелых и там большой минус — нет «data locality». на хадупе — spark с yarn кажется чаще встречается. без хадупа, поднять лишь yarn думаю не выйдет. yarn наверняка нужен и zookeeper, спарку точно нужен общий сторидж (hdfs). имхо разумней весь хадуп для спарка поднять, тем более что там же у вас и kafka будет.
Hive on Spark именно с HDFS и работает, но думаю медленее Impala на простых запросах, зато стабильно. Impala, да, с тяжелыми запросами и тучей пользователей и у нас плохо дружит.
мне вот интересно как была скорость у Impala/HDFS на фоне Greenplum, обгоняла, если хватало ресурсов выполнить запрос?
IvanKhozyainov Автор
Спасибо за хороший и развернутый ответ!
Да, похоже от хадупа не уйти никуда и надо ставить всю инфраструктуру, слишком много в этой экосистеме завязок — hdfs, yarn, hive опять же (если потребуется), mapreduce разве что не надо :)
Несмотря на то, что k8s сейчас почти везде и спарк научился с ним работать 3 года назад — тоже есть большие опасения для использования в проде, плюс про «data locality» хороший поинт
Ну hive on spark с hdfs я так понимаю работает через spark же? Просто я к тому, что это дополнительное звено и не уверен, что так быстрее — хотя про эту схему как-то не подумал
Честно говоря конкретных замеров и сравнений impala/hdfs именно с greenplum не было, но «по ощущениям» вроде было сопоставимо, хотя как пишет сама cloudera (еще бы им не верить :)) — должно быть быстрее blog.cloudera.com/apache-impala-leads-traditional-analytic-database
А вот в блоге tinkoff например пишут, что greenplum пошустрее работает, чем impala — habr.com/ru/company/tinkoff/blog/310620
По возможности, конечно, будет интересно проверить еще и самостоятельно, согласен
Но сейчас как сама impala, так и эксперименты с ней уже утеряны
EvgenyVilkov
Удивительно конечно что «тест Тинькофф» которому уже 5 лет продолжает быть референсным :)
А ведь о чем «тест» то был? Специалисты GreenPlum проверяли in-memory движки и на всякий случай решили запустить запросы на Impala (при этом не будучи специалистами в этой области). Ни конфигурации, ни версий, ни настроек стенда Cloudera и Impala, ни планов запроса, ни профилей запроса не предоставлено.
Сама методика тестирования тоже вызывает много вопросов — на системах массивных параллельных вычислений запускают один запрос (!) и сравнивают его время работы. Ээээ, что простите? Вы в реальной жизни где такую ситуацию увидите? В реальной жизни на MPP системе высококонкурентная нагрузка и поэтому тестировать надо ни одни запрос, а 20, 30, 40, 60 и тд одновременно работающих и сравнивать не только общее и среднее время работы но и разброс max, min (чтобы увидеть ситуацию когда пришел первый и сожрал все ресурсы, а остальные стоят в сторонке). А в этом конкретном тесте проверили как система может выделить ресурсы одной сессии.
В реальной жизни на большинстве задач Impala быстрее GP и вот почему:
-Impala читает только те данные, что удовлетворяют условиям запроса тк происходит фильтрация на уровне сканирования (отсеиваются блоки, а начиная с версии 3.4 отсеиваются даже страницы, не удовлетворяющие выборке). Аналог storage index (zone maps) в Exadata (Netezza)
-Умеет строить динамические и bloom фильтры, чтобы как раз сканировать то что надо, а не поднимать все
-Принцип работы обработки — непрерывный pipeline данных от storage слоя до выдачи результата
-В версии 3.4 появился полноценный intra node параллелизм, который распараллеливает не только по кластеру выполнение, но и внутри одного узла, что приводит к кратному (!) увеличению производительности.
GP иногда будет быстрее только если ключ сегментирования совпадает с условиями соединения и\или условия запроса совпадут с ключом секционирования. В реальной жизни, условно, никто не будет сегментировать банковскую проводку по номеру счета для скорости работы соединения с таблицей счетов тк это вызовет адский перекос данных. Во всех остальных ситуациях GP уйдет в full scan. Именно поэтому рекомендация по железу от вендоров — много маленьких и быстрых дисков на сегмент ноду.
Научить GP фильтрации данных на уровне чтения Pivotal обещает только к концу 2021 года и только в своей версии (в open source и прочих сборках поделках этого не будет скорее всего).
PS
Impala у вас плохо дружит с тучей пользователей потому что вы не настраивали admission control скорее всего. Я имею возможность работать с кластером где в сутки исполняется миллион SQL запросов на кластере Impala с одновременно работающими 50-60 сессиями
Yo1
ну не знаю, десяток узлов с импалой 3.2, Enable Impala Admission Control галочка стоит, mem_limit стоит и для Impala Daemon Coordinators и для Impala Daemon Executors (30 и 80Gb сотетсвенно), scratch_dirs настроены.
типичная ошибка
Memory limit exceeded: Failed to allocate row batch
EXCHANGE_NODE (id=5) could not allocate 32.00 KB without exceeding limit.
Error occurred on backend host010.domain.net:22000
Memory left in process limit: 9.51 GB
Query(3d4641a8c316a52d:f17a2b7100000000): Reservation=51.78 GB ReservationLimit=64.00 GB OtherMemory=22.00 MB Total=51.80 GB Peak=51.80 GB
Fragment 3d4641a8c316a52d:f17a2b7100000006: Reservation=320.00 MB OtherMemory=2.89 MB Total=322.89 MB Peak=322.89 MB
если в запросе ставить явно SET MEM_LIMIT тупо вываливается по достижении лимита. примерно те же проблемы на координаторах — он тянет весь результирующий датасет к себе в память, т.е. памяти на координаторе надо больше, чем клиент попытается утащить.
EvgenyVilkov
Конфигурация узлов по CPU и RAM какая?
Impala вычитывает данные в память и потом пайпом проводит все операции. Если вам не хватает памяти чтобы поднять блоки, нужные для обработки, запрос падает с таким вот сообщением. В скратч область может уходить только hash и сортировка.
В вашем случае правильное решение — понять сколько памяти нужно запросу и поднять mem_limit до нужного значения.
В любом случае — это проблема одного конкретного запроса, а не одновременного кол-ва запросов. Бест практис проектирования кластера под импалу с высококонкурентной нагрузкой — много памяти.
GP рабоатет немного под другому. Каждый воркер постгреса начинает вычитывать данные и сразу сваливать их в кэш, который уходит в файловый кэш операционки, который в свою очередь занимает оперативку. те оперативнка гринпламом жрется под файловый кэш оси :) именно поэтому поднятие памяти, выделяемого воркеру (по умолчанию 50мб), не дает никакого эффекта обычно.
Yo1
хабр дурковатая платформа, одним комментарием в день лимитирует.
Impala Daemon десять штук, Impala Daemon Executors mem_limit 80Gb, не понятно почему вы на этот запрос грешите, вроде очевидно Admission Control толком не работает: лимит 64, выбрано 51.78, в логе он откровенно пишет: Memory left in process limit: 9.51 GB
значит проблема не в лимите запроса, а том что Executor не может выделить запросу 32.00 KB. по мне так очевидно, что параллельные запросы тоже потребляют память на узле и все скопом не уместились, а Admission Control похоже не угадывает сколько запросу на этом узле следовало бы зарезервировать памяти, увеличивает резервацию по ходу выполнения.
EvgenyVilkov
Admission работает, с ним проблем нет.
Как выделяется память:
-определяется параметр mem_limit установленный на уровне ресурсного пула либо сессии. пусть будет 10Гб для простото расчета
-дальше эти 10Гб выделяются а каждом узле на котором есть демон импалы. Демон разумеется должен быть на всех узлах где есть HDFS. Иначе это говноконфигурация. Предположим у вас 10 узлов те суммарно запросу будет доступно 100Гб
-Далее, согласно дефолтовым параметрам, 80% от этого объема выделяется сканерам, читающим данные из HDFS в память (остальные 20% резервируются на другие операции, но они могут уходить в скратч, если что). те формально вы можете поднять сканерами не больше 80Гб.
-теперь оцените размер датасета, который вы поднимаете запросом. Бьюсь об заклад он выше.
Как побороть — поднять память на уровне сесси через set mem_limit либо на уровне очереди. Поднимите память на запрос и он отработает. Либо поменяйте настройки пула. В третьей ветки импалы есть параметр верхнего выделения запросу, выше которого он не может резервировать память пула, но если он не исчерпывается то память доступна конкурентным сессиям.
Цель admission контрола не угадывать память, а разруливаьт конкурентную работу, распределяя ресурсы.
Другие методы — поработать с планом запросы чтобы он не сканировал много. тут надо смотреть профиль и план. Методы есть (например задать параметр повышенного ожидания построения фильтра, которые потом будет поднимать с паркета только нужные данные; те Impala сперва построит фильтр, а потом наложит на данные вместо того чтобы сканировать все и потом фильтровать).
Так кстати и не написали конфигурацию дата узла.
Yo1
можете пояснить, что значит «Memory left in process limit: 9.51 GB» в моем логе?
мне не понятно зачем трогать лимит, если он и близко не превышен? Impala Daemon Executors mem_limit итак уже поднят до 80Gb, причем таких узлов 10. 800G — этого хватает одному такому запросу, если никто крупный параллельно не исполняется. значит в целом на кластере хватает и памяти и лимитов.
EvgenyVilkov
Почему вы путаете лимит, выделяемый Impala на одном узле и лимит выделяемый одному запросу на одном узле?
Вангую что лимит одного запроса на узле 10Гб. Собственно его превышение и фиксируется.
Чтобы запрос отработал нужно либо на уровне сессии поднять лимит например set mem_lim=20g Либо сразу на уровне пула (https://docs.cloudera.com/documentation/enterprise/latest/topics/impala_admission.html#admission_memory раздел Clamp MEM_LIMIT Query Option)
Yo1
если бы я путал, то рестарт запроса не помогал бы. лимит тут не причем, на свободном кластере запрос проходит. когда запрос упирается в лимит демона, на демоне вообще ничего не стартует. при срабатывании лимита демона ошибку кидает координатор, что-то типа такого:
Memory limit exceeded: Query a34a9afbf1a71fb1:85f3fbd100000000 could not start because the backend Impala daemon is over its memory limit
в моем же случае демон запрос пропустил и начал исполнять, в первую очередь потому что запрос проходит по всем лимитам. и по общему на запрос (в моем случае его не было) и по лимиту на демоне. демон начал исполнять, но заранее не зарезервировал достаточно памяти, потому свалился до достижении лимита демона.