В этой статье ведущий администратор баз данных ITSumma Алексей Пономаревский расскажет о том, как мы интегрировали популярный фреймворк для распределенной обработки данных Apache Spark с мощной массивно-параллельной базой данных Greenplum.
Текст будет полезен для разработчиков, решающих схожие задачи по интеграции распределенных фреймворков обработки с реляционными БД, которые используют параллельные вычисления.
Коротко о чем статья:
Зачем нужно было создавать коннектор для взаимодействия между Spark и Greenplum.
Как проходила разработка коннектора: архитектура и этапы.
Дальнейшее развитие и оптимизация решения.
Best Practices в разработке подобных решений.
Статья сделана из доклада Алексея на нашем вебинаре, посмотреть его можно тут.
Необходимость создания коннектора
Apache Spark из коробки предоставляет ряд коннекторов для взаимодействия с различными источниками данных, включая реляционные БД, через интерфейс JDBC. Такое обобщенное решение ограничивает производительность и масштабируемость, так как обмен данными происходит через один ведущий узел кластера Spark, не задействуя параллельные вычисления на всех узлах.
Поэтому, чтобы обеспечить эффективное распределенное взаимодействие между Spark и Greenplum, требуется специализированный коннектор, который сможет в полной мере использовать возможности массово-параллельных вычислений обеих систем. Разработка такого коннектора была вызвана потребностями в высокопроизводительной передаче данных между Spark и Greenplum, и нашим желанием создать решение с открытым исходным кодом и полным контролем над внутренней реализацией.
Архитектура и этапы разработки
Эффективный параллельный обмен данными между распределенными системами, такими как Spark и Greenplum, является достаточно сложной инженерной задачей. Помимо координации и синхронизации работы множества узлов в кластерах, необходимо обеспечить поддержку транзакционности, присущей Greenplum как ACID-совместимой базе данных. Это требует применения механизмов буферизации данных и двухфазной фиксации транзакций.
Первоначально мы рассчитывали использовать некоторые встроенные средства и библиотеки Spark, но выяснилось, что для создания полноценного коннектора этого будет недостаточно. Поэтому был принят двухэтапный план разработки:
1. Создание собственного инструментария и базовой функциональности параллельного обмена данными.
2. Дальнейшее развитие и оптимизация решения, в том числе улучшение пропускной способности и снижение задержек.
На первом этапе были разработаны следующие ключевые компоненты:
Классы для синхронизации и координации взаимодействия узлов Spark (master и slave) на основе механизма удаленного вызова процедур RMI.
Средства обмена данными между узлами поверх того же RMI.
Компоненты для эффективной буферизации передаваемых данных, использующие подход "zero-copy". Была создана специальная реализация коллекций и событийной модели в Java.
Классы для преобразования формата данных между представлением в Spark, в сетевом протоколе и в Greenplum.
В результате был получен базовый работоспособный коннектор, обеспечивающий параллельный обмен данными с возможностью масштабирования как в кластере Spark, так и на стороне Greenplum. При этом пропускная способность на один сегмент Greenplum увеличилась с изначальных 2-5 МБ/с до 10 МБ/с и более.
Дальнейшее развитие и оптимизация
Получив функциональный коннектор на базе собственного инструментария, мы стали анализировать производительность и искать возможностей для ее повышения. Применили принцип оценки "идеальности" алгоритма по доле времени, которое он проводит в режиме ожидания завершения ввода-вывода.
Анализ показал, что узким местом, препятствующим росту скорости передачи данных, является сериализация и десериализация данных между внутренними форматами Spark, Greenplum и сетевым протоколом. Эти операции являются затратными по времени для CPU, не позволяя в полной мере задействовать пропускную способность сети и дисковой подсистемы.
Для снижения этих накладных расходов мы выработали два подхода:
1. Увеличение количества CPU-ядер, выделяемых на каждый executor Spark, либо общего числа задействованных executor'ов. Это позволило повысить скорость сериализации и десериализации в 1.5-2 раза.
2. Исключение промежуточных этапов преобразования данных путем передачи их в родном бинарном формате сериализации Java вместо текстового представления. Это потребует внесения изменений на стороне Greenplum, в частности, поддержки Protocol Buffers и использования расширения PXF.
По нашим оценкам, эти оптимизации позволят дополнительно повысить пропускную способность коннектора в несколько раз, достигнув величин порядка 50 МБ/с на сегмент Greenplum. Дальнейший рост может быть нецелесообразен, так как он начнет оказывать негативное влияние на работу других пользователей и чрезмерно увеличивать нагрузку базы данных.
Факторы, которые помогли успешно разработать решение
Разработка эффективных механизмов обмена данными между гетерогенными распределенными системами — это нетривиальная и сложная инженерная задача. Вот что поможет с ней эффективно справиться:
Глубокое понимание архитектуры и особенностей взаимодействующих систем, как на уровне концепций, так и протоколов, форматов данных и практик разработки.
Создание максимально универсального инструментария, не привязанного жестко к конкретным используемым средам и фреймворкам. Такой подход позволяет в дальнейшем использовать наработки для других сочетаний систем.
Применение правильных критериев и метрик, которые выявят узкие места производительности на всех этапах обработки и передачи данных.
Стремление обеспечить максимально возможного параллелизма, балансировки нагрузки и минимизации копирования данных между компонентами.
Дальнейшее развитие open-source коннектора между Spark и Greenplum видится в повышении его базовой производительности и масштабируемости, и в реализации дополнительных возможностей, специфичных для этих систем.
Ссылка на репозиторий с примерами использования коннектора: https://github.com/itsumma/spark-greenplum-connectorhttps://github.com/itsumma/spark-greenplum-connector
Комментарии (4)
iboltaev
13.06.2024 22:04+1Я может чего не понимаю, но в jdbc-коннекторе есть возможность параллельного чтения данных с разбиением на партиции по одной колонке, с параллельной записью тоже проблем нет, тем более gp должен поддерживать copy, можно ж просто сделать dataframe.foreachPartition и в ней через copy залить. Едтнственное, где я вижу необходимость в своем коннекторе - если spark-ноды и gp-ноды нахожятся на одних машинах, и надо по возможности read local обеспечить. В любом случае, исходники на гитхаб было бы плюсом, а так в статье одна вода "какие мы молодцы бла бла бла и как мы круто сделали бла бла бла"
sd1ver
13.06.2024 22:04Поддерживаю, совершенно непонятный выпад в сторону параллелизма spark. Еще в первые вижу, чтобы driver и executor называли master и slave.
dolfinus
Так и не выложили коннектор в Maven :(
ITSumma Автор
К сожалению, не выложили. Нам мешает ряд нетехнических моментов, повлиять на которые мы попросту не в состоянии. Но мы надеемся и верим, что всё образуется и наш коннектор ещё окажется в Maven.