Автор: Иван Калининский, участник профессионального сообщества Сбера SberProfi DWH/BigData.
Профессиональное сообщество SberProfi DWH/BigData отвечает за развитие компетенций в таких направлениях, как экосистема Hadoop, Teradata, Oracle DB, GreenPlum, а также BI инструментах Qlik, SAP BO, Tableau и др.
В этой статье вы узнаете о том, какими способами мы пытались обновлять таблицы в Hadoop, содержащие сотни терабайт данных.
И если в начале нашего пути процесс обновления длился несколько часов (до десяти-двенадцати часов), то теперь ему требуется всего тридцать-сорок минут, а использование вычислительных ресурсов уменьшено вдвое!
При этом была создана библиотека расширения Spark, которая предоставляет DataSource для преобразования данных в файлах в формат этого DataSource, изменения данных командой MERGE через DataFrame API или SQL, а в будущем ещё и UPDATE, DELETE и некоторые операции DDL.
Файлы при этом можно будет читать любым привычным способом, ведь они не модифицированы, а метаданные не обязательны для их чтения.
Вы увидите код этой библиотеки на языке Scala, который сможете использовать, а может быть даже доработать и поделиться своими успехами.
Я постараюсь пояснить, почему был сделан тот или иной выбор, но могу умолчать о чём-то, что кажется очевидным, или, наоборот, о чём я не имею представления. Вы сможете задать вопросы, а я постараюсь ответить на них.
Это первая статья из нескольких, и в ней будет рассказано только о немногих реализованных классах (они нужны для распределения данных определённым способом), поэтому наберитесь терпения, я расскажу всё по частям. Впрочем, пора перейти к повествованию.
Для начала договоримся о терминах и определениях. Если это вам пока неинтересно или уже необязательно, переходите к следующему пункту, а если встретите незнакомый или сомнительный термин, вернитесь сюда. Возможно, здесь вы найдёте разъяснение.
Термины и определения
Таблица: В нашем случае это копия таблицы из какой-либо RDBMS, сохраненная в одной директории HDFS("hdfs://nameservice/path/to/table") в файловом формате. Используемый нами в большинстве случаев файловый формат – Parquet, допустимы и другие форматы, например ORC, Avro, и даже CSV. Определение таблицы сохраняется в Hive с типом EXTERNAL. Мы не используем MANAGED таблицы;
Секция: Часть таблицы, выделенная по некоторому правилу. Данные хранятся в отдельных папках в директории таблицы: "/path/to/table/part=value1", "/path/to/table/part=value2". Правило секционирования может включать несколько полей, тогда секции в HDFS будут выглядеть так: "/path/to/table/part1=value_p1/part2=value_p2". Файлы с данными при этом находятся только в листовых папках. Информация о каждой секции должна присутствовать в HiveMetastore;
RDD: Resilient Distributed Dataset, устойчивый распределённый набор данных, используется в Apache Spark для обработки данных. Он называется распределённым потому, что на кластере состоит из множества частей, которые мы будем называть партициями RDD;
Партиция RDD: Часть RDD, я называю её партицией, а не секцией, чтобы отделить от секции Hive. В отношении данных - это итератор, который может пройти по всем записям, которые в нём содержатся. Как правило, будет сделан один и только один проход, после этого итератор будет считаться истощённым (exausted);
Датафрейм: Набор данных, можно сказать, что это RDD и план трансформаций RDD, обеспечиваемый оптимизатором Catalyst;
Партишенер: Класс, наследник org.apache.spark.Partitioner. Нужен для описания правила, по которому из RDD с определённым набором партиций получается RDD с другим набором партиций. Каждая запись исходного RDD должна быть по описанному правилу перемещена в определенную партицию другого RDD;
Ключи: Первичные ключи таблицы, как правило, это одно поле (суррогатный первичный ключ), но полей может быть несколько. Как правило, подразумеваются конкретные значения, ведь сами поля в любом случае есть в каждой записи.
Кардинальность: Количество уникальных значений определенного поля. Также может употребляться в значении общего количества записей или не пустых значений, но здесь мы будем говорить только об уникальных значениях.
Селективность: Избирательность, мера, показывающая, насколько хорошо будет работать фильтр на основе предиката некоторого выражения. Тесно связана с кардинальностью, ведь если включить существующее значение поля с высокой кардинальностью в фильтр, то будет получено мало записей, возможно, всего одна. Значит, селективность этого фильтра высокая. В то же время, поле с низкой кардинальностью, например поле с типом BOOLEAN, у которого только два возможных значения (null не учитываем), будет иметь низкую селективность, и, если входит в фильтр, в общем случае, выберет половину набора данных.
CDC: Change data capture, программное решение для получения изменений данных из RDBMS;
Shuffle: Перемешивание, перетасовывание, - операция, которая сопоставляет каждой записи из каждой партиции исходного RDD партицию конечного RDD;
Бакет: Bucket, "Ведро", некая объединенная группа значений. В нашем случае представляет собой записи, находящиеся в одном файле или в одной партиции RDD.
Кейс класс: Вид класса в Scala, класс-образец.
Трейт: Вид класса в Scala, описывающий поведение, которое может быть «подмешано» в другой класс. Похож на JavaInterface.
Суть проблемы
Apache Spark очень много всего умеет. Работает с разными файловыми форматами, с базами данных как RDBMS так и NoSQL, с потоковыми источниками данных. Но, к сожалению, иногда имеющиеся средства не подходят для решения казалось бы простой задачи. Например, есть таблица, хранящаяся в файловом формате в HDFS, а её определение сохранено в Hive. Конечно же, это копия какой-то таблицы из базы данных, а значит, она будет периодически обновляться: часть записей могут быть удалены, часть обновлены и ещё некоторое количество будет добавлено. Однако в файле формата Parquet нельзя ни обновить, ни удалить, ни даже вставить запись. Можно только создать новый файл, а старый удалить. Между тем, в таблице таких файлов может быть сотни тысяч.
Если это небольшая таблица, несложно переписать её полностью, а вот со структурой, размер которой несколько терабайт или десятков терабайт так работать нельзя.
Большие таблицы, с которыми мы работаем, разделены на секции, чтобы пользователи могли обращаться к отдельным частям. Достаточно указать spark.sql.sources.partitionOverwriteMode=dynamic или dataframe.write.option("partitionOverwriteMode", "dynamic").save(path), и, начиная с версии 2.3, Spark перепишет только те секции, которые будут обновляться. Однако в то время мы были привязаны к версии 2.2, поэтому пришлось аккуратно сохранять данные в отдельную директорию, и только после успешной записи удалять папки с устаревшими секциями и переименовывать обновлённые и совершенно новые секции, чтобы они оказались в нужной директории.
Кроме того, даже если бы мы работали с более новой версией Spark, понадобилось бы сделать эффективный join основного набора данных и пришедшей дельты. Spark умеет делать partition pruning только при явно указанных фильтрах при чтении данных (до версии 2.3.5 точно так). В случае join, на том этапе, когда сравниваются значения полей, Spark уже не помнит, что это "особенные поля", названия директорий. В одной партиции могут находиться несколько значений полей секционирования, поэтому даже если в условии join указать эти поля, то это ни к чему хорошему не приведёт.
Проще говоря, Spark при самом наивном подходе прочитает всю таблицу, сделает полный shuffle, затем то же самое сделает с дельтой, объединит эти два набора данных и запишет всю таблицу заново.
Это будет невообразимо долгий процесс, который может сломаться в середине или ближе к завершению, а значит, так делать нельзя. Взгляните на схему ниже, чтобы понять, что кое-где есть проблемы:
Представим себе, что мы имеем таблицу с секциями A...Z, причём пришедшая дельта затрагивает только секции A и Z. Схема выше описывает, как пойдёт процесс, если выполнить такой код (обновление таблицы наивным способом):
dfTable.as("t")
.where('part="A" || 'part="Z")
.join(dfDelta.as("d"), "t.part = d.part and t.id = d.id", "full_outer")
.select(columns: _*) //предположим, что проекция подготовлена заранее и полностью описывает те поля, которые нам нужно получить
.write
.mode("overwrite")
.option("partitionOverwriteMode", "dynamic")
.partitionBy("part")
.format("parquet")
.option("compression", "gzip")
.option("path", "/path/to/table/")
.saveAsTable("hive_db.hive_table")
Рассмотрим рисунок 1 подробнее (далее мы говорим исключительно о метаинформации; о потере данных речь не идёт, ведь если данные теряются, то никакого смысла в оптимизации нет и нужно полностью пересмотреть подход):
красные пересекающиеся стрелки обозначают энтропийный процесс, при котором теряется первоначально содержащаяся в таблице метаинформация;
параллельные синие стрелки обозначают процесс с сохранением метаинформации;
желтыми стрелками обозначен противоречивый процесс.
На этапе подготовки RDD основную роль играет размер исходных файлов. Значения секций Hive в полученной структуре будут перемешаны, так же, как и отдельные сегменты файлов.
Чтение данных полностью повторяет структуру уже перемешанного предыдущего состояния, информация не теряется.
Spark понимает, что нужно соединить данные, и, если их размер достаточно большой, то будет выбран SortMergeJoin. Чтобы выполнить этот вид соединения, Spark должен сделать так, чтобы одинаковые ключи из основного набора данных и из дельты расположились совместно (в партициях RDD с одинаковыми индексами) и были отсортированы в пределах партиций. Записи можно только перераспределить и отсортировать заново, затратив на это время и ресурсы. Производится Shuffle: записи каждого набора перемешиваются по значению хешей. Перемешивание устраняет любую сортировку, которая сохранилась при чтении, теперь порядок записей никак не зависит от первоначального.
SortMergeJoin делает своё дело, полученный набор данных сохраняет тот же порядок записей, что был после Shuffle и локальной сортировки.
Последний процесс записывает данные обратно в таблицу, удаляя предыдущие версии секций A и Z навсегда! Это противоречивый процесс, поэтому он изображён в виде двойных жёлтых стрелок. Дело в том, что непосредственно перед сохранением записи будут отсортированы по полю part локально (в пределах партиций RDD). Возможно, что сортировка будет получена из предыдущего состояния (SortMergeJoin), ведь часть условия соединения "t.part = d.part" заставляет сортировать каждую партицию по этому полю, но если в SparkUI в DAG последним виден синий прямоугольник с надписью Sort, записи всё же будут пересортированы.
Такое упорядочивание записей необходимо для последовательного помещения записей в конечный файл, чтобы не переключаться между файлами из разных секций для каждой строки. Поскольку значения секций перемешаны в пределах партиций RDD, то конечные файлы будут меньше, чем ожидает разработчик.
Как правило, из одной партиции RDD будет получено столько файлов, сколько есть изменяющихся секций. Если их только две, это может не быть проблемой, а если их тысяча или пять тысяч?
Процесс сохранения возвращает первоначальное состояние, но технические характеристики этого состояния могут быть намного хуже. Теперь очевидно, что нужна оптимизация.
И тут нужно вспомнить старую мудрость: «Нельзя заставить машину работать быстрее. Можно только сделать так, чтобы машина выполняла меньше работы».
Можно возразить, что затем и нужно горизонтальное масштабирование. Ведь очень просто добавить контейнеры в параметры запуска приложения Spark, подключить к кластеру новые ноды, поставить рядом несколько кластеров. Но это тушение пожара деньгами! Рано или поздно объем данных увеличится, и приложениям, запрашивающим огромные вычислительные ресурсы, будет тесно на любом кластере. Видя проблему, нужно попытаться решить её профессионально, не терять нужную информацию, а наоборот, сохранять и использовать её.
Наш путь решения выявленной проблемы
Так как полное обновление таблицы оказалось неприемлемым, мы разработали приложение, которое обновляет только имеющиеся секции. Способ работы с конкретными изменяющимися секциями получил развитие, и через некоторое время мы достигли такого результата (пошагово):
Изменения, полученные извне, сохраняются с точно таким же секционированием, как и сама таблица. Записи уже предобработаны CDC, и поставляются в таком виде: записи на удаление и на вставку в одном экземпляре, записи на обновление в двух вариантах: предыдущая версия и новая версия.
Таким образом, мы работаем с набором данных, в котором есть записи для удаления и записи для добавления. Можно сказать, что каждая секция дельты состоит из двух датафреймов, назовём их dataframeInsert и dataframeDelete. При этом решается проблема миграции записей из одной секции в другую - нет нужды проверять, обновлены ли поля, по которым делается секционирование, или нет, и как-то обрабатывать эти изменения;Каждая изменяемая секция основной таблицы (назовём её dataframeMain) проверяется: в ней должны быть ключи на удаление и не должно быть ключей на вставку;
Каждая секция основной таблицы соединяется с секцией данных, которые должны быть удалены, методом антисоединения: dataframeMain.join(dataframeDelete, primaryKeys, "left_anti");
К каждой секции основной таблицы добавляется секция данных, которые должны быть вставлены, методомdataframeMain.union(dataframeInsert);
Каждая секция основной таблицы проверяется: в ней должны быть ключи вставленных записей и не должно быть ключей на удаление;
Рассчитывается контрольная сумма данных для всех изменённых секций. Это просто функция functions.hashпримененная к каждому полю и просуммированная по всей секции;
Рассчитанные контрольные суммы сравниваются с контрольными суммами части полей исходной таблицы. У исходной таблицы нет предрасчитанных контрольных сумм, поэтому их приходится вычислять каждый раз.
К сожалению, и этот подход оказался наивным, что видно из следующей таблицы, которая содержит практические показатели работы приложения:
Таблица 1
Размер таблицы, Тб |
Количество секций |
Обновляется секций |
Время работы шага 2 |
Время работы шагов 3 и 4 |
Время работы шага 5 |
Суммарное время работы |
30 |
2000 |
500-600 |
2 часа |
8-10 часов |
2 часа |
12-14 часов |
7 |
2500 |
1000-1800 |
3 часа и более |
4 часа |
3 часа |
10 часов |
Требовалось найти лучший способ применения дельты к основному набору данных, и он появился. Основная идея – сделать так, чтобы при обращении к таблице, особенно, если нужно переписать часть файлов, было можно работать только с нужными файлами, которых, как правило, меньшинство, и получить результат значительно быстрее.
Это достигается тремя уровнями разделения файлов:
Опциональные:
1. Секционирование Hive: создание директорий, в названиях которых содержатся значения некоторого поля, а в них, в свою очередь, содержатся файлы с данными.
2. Неявное секционирование: при этом партиции Hive не создаются, но в каждый файл должны попадать записи с единственным значением поля неявного секционирования. Значения неявных секций будут сохраняться в именах файлов, но получать их, читая все файлы, непрактично, поэтому информация будет сохраняться в отдельной структуре, называемой «карта данных».
При этом важно избегать коллизий, поэтому стандартный способ HashPartitioning, используемый в Spark для shuffleможет оказаться неприемлемым.
Обязательный:
1. Разделение по непересекающимся интервалам значений единственного поля. Это поле в дальнейшем будет называться "поле упорядочивания".
Рассмотрим это в схематичном виде:
На рисунке 2 показан самый полный вариант использования DataSource. Обеспечивает секции, которые видны обычным, не знающим о внутреннем устройстве этого источника данных, инструментам: select * from hive_db.hive_table; sparkSession.table("hive_db.hive_table"); sparkSession.read.format("any_format").load("hdfs://nameservice/path/to/table").
Пользователи, пользующиеся такими инструментами, будут видеть эту таблицу как совершенно обычную таблицу с секциями, возможно, многоуровневыми (по нескольким полям, не обязательно по одному).
В то же время в каждой папке есть секции, которые внешним средствам не видны. Да, они содержатся в имени файла, но только для предупреждения, что файл не так прост, как может показаться. Чтение метаинформации в любом случае производится из карты данных.
Если посмотреть на рисунок 3, то видно, что для пользователей с обычными, не знающими этого DataSource, средствами это точно такая же таблица, как обычная таблица с секционированием Hive или таблица на рисунке 1. По сравнению с первым вариантом разницы совсем немного, отсутствие неявных секций не блокирует работу источника данных.
На рисунке 4 показан вариант таблицы, в которой нет явных секций (да, так тоже можно), поэтому пользователи ванильных инструментов будут видеть её как обычную таблицу, не разделённую на части. При этом процесс обновления получает преимущества от выделения файлов по значениям полей неявного секционирования.
Если пользователи не требуют секционирование, а данные, содержащиеся в таблице, дают возможность фрагментирования, то этим вариантом стоит воспользоваться.
На рисунке 5 изображен вариант, который содержит требующую обновления таблицу, но секционирование не требуется и не найден подходящий способ сделать его неявным. В таком случае, можно отказаться от создания секций и просто выбрать поле упорядочивания. Таблица при этом видна как обычная, "плоская" структура.
Я назвал этот способ "организованная таблица", потому что в сравнении с предыдущим способом, данные явно упорядочены, организованы весьма строгим образом. Идея формата организованной таблицы имеет немало общего с форматами Apache Iceberg и Apache Hudi, впрочем, при разработке я был вдохновлён Apache Iceberg, а о том, что в Hudi тоже используется схожая идея (интервалы значений) узнал значительно позже, когда основная часть уже была реализована. Кроме того, я пытался не совершить ошибок, которые сделаны в формате CarbonData, мой опыт использования его скорее негативный. И то, что Delta lake тоже содержит схожий подход: zOrder, стало очевидно позже.
Вначале я представил проверку концепции (далее PoC - от англ. «Proof of concept») - скрипты, которые выполнял в spark-shell. Это заняло примерно неделю. За это время я исследовал, как сохранить таблицу так, чтобы конечные файлы содержали непересекающиеся интервалы значений одного конкретного поля, первичного ключа, или поля составного первичного ключа с наибольшей селективностью. Но использование именно первичного ключа не обязательно, ведь можно взять любое другое поле, если оно обеспечивает высокую селективность (но не обязательно уникальное) и не содержит большого количества одинаковых значений, неважно null это, или какое-то конкретное значение. Очень хорошо, если это поле сможет разместить записи с высокой и редкой частотой изменения в разные группы файлов.
Также я смог выделить поле с низкой селективностью и сохранил файлы с разными значениями этого поля в отдельных группах. Это было поле неявного секционирования, которое помогло в выделении изменяющихся файлов. Я проработал сохранение нужной информации о полученных файлах, назвав дополнительную структуру картой данных (data map). И использовал карту данных для анализа дельты данных и выбора нужных файлов.
Затем я применял к этим файлам операции из шагов 3 и 4 (помните тот список над таблицей 1 выше?), записывал их и заменял старые файлы новыми.
Все операции занимали около пятнадцати минут, и я видел, что новые файлы суммарно составляют около 200 Гб (размер таблицы около 27 Тб), то есть очень небольшую часть таблицы.
Всё это было сделано стандартными средствами Spark и hadoop.FileSystem. Многие операции не были оптимальными, например, чтобы минимизировать коллизии хеширования (разделение по модулю хеша - стандартный способ партиционирования RDD), я увеличивал количество партиций RDD в восемь раз от требуемого.
Следующие два месяца ушли на разработку прототипа, который, как и PoC, содержал одну дополнительную деталь, дельта-файлы. В них должны были уходить новые записи, если они относились к файлу, в котором ни одна запись не удалялась (или не обновлялась, помните, я говорил, что обновления обрабатываются нами как удаление плюс вставка). В дальнейшем от дельта-файлов пришлось отказаться, но, возможно, кто-то предложит алгоритм, в котором они будут уместны.
Прототип работал достаточно долго, примерно год, и показал свою эффективность. Время основного обновления существенно сократилось:
Таблица 2
Размер таблицы, Тб |
Количество секций |
Обновляется секций |
Обновляется файлов |
Время применения дельты к основному набору данных |
Суммарное время работы |
30 |
2000 |
500-600 |
6000-10000 |
1,5-2 часа |
5,5-6 часов |
Впрочем, ни одна другая операция не изменилась, поэтому мы пришли к выводу о необходимости отключения логических проверок. Но это было вынужденное решение, ведь проверки позволяют нам уверенно говорить, что мы работаем профессионально и не поставляем некорректные данные пользователям.
Стало ясно, что требуется оптимизация, которая позволит вывести работу с данными на новый уровень, и эта оптимизация привела к решению о создании отдельной библиотеки, основанной на Spark.
Для полноценной библиотеки, которую могли бы применять пользователи, разумно взять за основу существующие внешний и внутренний API Spark и реализовать идеи так эффективно, как он позволит.
Я писал о создании DataSource, но он будет максимально полезен в сочетании с другими структурами: партишенер, FileIndex, FileWriter, различные логические и физические планы, средства работы с метаданными и т. д.
Все эти средства направлены на то, чтобы избавить нас от энтропийных процессов, а если возникает нештатная ситуация, то помочь взять энтропию под контроль.
Ниже, на рисунке 6 показано, как должно выглядеть обновление таблицы. Нет переходов с нарастанием энтропии, метаинформация бережно сохраняется от состояния к состоянию и каждый шаг с пользой может эти метаданные применить:
Компоненты библиотеки: Partitioner
Начать описание созданной библиотеки будет логично с партишенера. Дело в том, что партишенер представляет способ организации данных как для хранения, так и для работы с ними. Именно в результате его работы будут получены такие данные, которые можно записать, прочитать и эффективно обработать. Без партишенера процесс не может ни начаться, ни продолжиться, так что именно он идёт самым первым пунктом.
Общепринятый вариант для Hive и Spark – это секционирование по хешу одного или нескольких полей. Этот вариант хорош и удобен, потому что есть возможность обеспечить одинаковые партиции для одинаковых данных, при этом не требуется передавать состояние между инстансами партишенера. Формула хеширования обеспечивает одинаковое конечное распределение для любого набора данных.
Вот как это работает:
1. Создаётся экземпляр кейс-класса логического плана выполнения org.apache.spark.sql.catalyst.plans.logical.RepartitionByExpression.
2. Переданные в него выражения spark используются для создания экземпляра кейс-класса физического плана выполнения org.apache.spark.sql.catalyst.plans.physical.HashPartitioning
3. Кейс-класс org.apache.spark.sql.execution.exchange.ShuffleExchangeExec обеспечивает создание для RDD анонимного класса org.apache.spark.Partitioner. Сам класс Partitioner определён как абстрактный и содержит два нереализованных метода:
abstract class Partitioner extends Serializable { def numPartitions: Int def getPartition(key: Any): Int } |
4. По выбранным для партиционирования полям рассчитывается неотрицательный модуль хеша по числу партиций (выражение из класса HashPartitioning):
Pmod(new Murmur3Hash(expressions), Literal(numPartitions)) |
Теперь несложно обозначить основные характеристики этого алгоритма.
Как видно, для физического расчета будет использован org.apache.spark.unsafe.hash.Murmur3_x86_32.
Полученное неотрицательное число (Pmod(hash)) всегда одно и тоже для одних и тех же аргументов. Таким образом, состояние заложено в формуле хеширования и не должно передаваться между инстансами Partitioner.
Причина выбора именно такой реализации очевидна: тот же самый алгоритм применяется для формирования бакетов Hive. Поэтому таблица Hive, которую сохранили с указанием полей bucketBy(“DISTRIBUTED BY” в HiveQL), по распределению эквивалентна репартиционированному датасету.
Нужно отметить, что все расчёты производятся для сериализованных экземпляров класса InternalRow с использованием unsafe memory, и выполняются очень эффективно. Поэтому замена этого процесса на операции с RDD[Row] и попытка создания простого партишенера, который получал бы значение для хеша из конкретных полей, приведёт к большому объёму сериализации-десериализации, а это вызовет крайне серьёзное замедление.
С моей точки зрения, применение HashPartitioning имеет существенные недостатки:
1) Нет возможности разделить значения на часто обновляемые и редко обновляемые. Значения Murmur3Hash ничего не говорят об исходных данных, задача хеша в этом случае - распределить значения для равномерного использования всех полученных элементов набора данных, будь это файл или итератор по записям.
Наша же задача - сделать так, чтобы как можно меньше файлов были перезаписаны, а практика показывает, что переписываемые значения склонны к кластеризации, то есть, с большей вероятностью изменения придут по нескольким записям, идущим достаточно близко одна к другой.
При хешировании кластер значений должен быть разбит применяемой хеш-функцией и если он будет разделён на отдельные записи, то хеш-функция хорошо справилась со своей задачей. Это равномерное распределение очень полезно, когда нужно выполнить, например, Join всех данных, но при обновлении реальных данных часто бывает, что обновления составляют тысячные доли от всего объема таблицы. Поэтому разумно кластеризовать данные по определённому признаку, а не «разбрасывать» через хеш.
Да, может помочь LSH – Locality Sensitive Hash – хэш с учётом близости значений, но его также надо реализовывать для произвольного типа данных, а образец в виде RangePartitioner уже выполняет требуемое разделение, причём совершенно точно, и делает в перспективе возможной работу с интервалами значений и временными рядами! Кроме того, LSHпартишенер не обязательно поможет в динамическом увеличении количества бакетов, ведь функцию придётся модернизировать для нового числа секций.
Говоря более общим языком, фрагментирование набора данных по хешам предназначено для глобализации использования элементов данных, фрагментирование по интервалам значений предназначено для локализации использования элементов.
2) Ранее было сказано, что хеш всегда один и тот же для одинаковых аргументов, но верно и то, что хеш будет одним и тем же для разных аргументов.
Дело не в арифметическом модуле, который сильно ограничивает количество вариантов на выходе, а в том, что множество хешей в общем случае слабее множества исходных значений.
Поэтому я и обращаю внимание на то, что коллизии не просто могут появиться, а утверждаю, что они обязательно появятся в ходе работы процесса. И это не вполне оптимально, ведь в одну и ту же партицию RDD могут попасть значения, принадлежащие разным секциям Hive. Spark может корректно обработать и записать такой файл, но, как правило, это займёт больше времени, потребуются дополнительные проверки и сортировки.
Кроме того, неявное секционирование затруднено при этом способе, потому что стандартный FileWriter не сможет разделить данные, относящиеся к неявным секциям. Требуется сначала записать их как явные, а затем переименовать каждый записанный файл. Но это не главная причина, самое серьёзное последствие коллизий во время Shuffle наступает, если файл или итератор по данным включает в себя несколько (хотя бы два) непоследовательных интервала значений поля упорядочивания. «Непоследовательных» означает, что между этими интервалами в секции или таблице, если секционирование отсутствует, точно есть другие значения.
Разделить непоследовательные диапазоны не представляется возможным без дополнительных затрат, выбранная схема хранения метаданных не поддерживает несколько интервалов в одном файле. Такая коллизия приведёт к созданию "компонента", содержащего несколько файлов.
Компонент в конечном итоге содержит один интервал значений поля упорядочивания, не пересекающийся с прочими, но этот интервал может быть слишком большим и приведёт к длительному выполнению одного Spark task.
3) Если хочется совместить физическое секционирование Hive и бакетирование, то в каждой партиции может появиться файл, принадлежащий определённому бакету.
Получится, что бакет разделён на несколько файлов, находящихся в разных секциях. При этом, если бакеты сортируются по некоторым полям, то сортировка будет утрачена, нет реализованного способа прочитать разные файлы так, чтобы итератор от первой записи первого файла до последней записи последнего файла был отсортирован.
Таким образом, приложенные усилия по организации данных окажутся напрасными, данные придётся сортировать снова и снова, и делать это для всего набора данных, потому что Spark из коробки не умеет находить партиции RDD, по которым нужно сделать сортировку.
Данные для нового DataSource должны быть разделены по диапазонам значений поля упорядочивания (кластеризованы по этому полю), и алгоритм будет значительно отличаться от описанного способа. Зато алгоритм будет похож на тот, что используется в dataframe.repartitionByRange().
Опишем для начала алгоритм repartitionByRange():
1. Для итераций по RDD будет использован экземпляр класса MutablePair, первый элемент которого будет содержать всю сериализованную запись. Это делается для того, чтобы в каждой секции RDD мы работали бы только с одним объектом, а не создавали большое количество UnsafeRow
2. Создается implicit val ordering = new LazilyGeneratedOrdering(sortingExpressions, outputAttributes). Это значение нужно для того, чтобы сравнивать и сортировать значения переданных выражений, оно тоже применяется к InternalRow;
3. Созданный экземпляр RangePartitioner получает RDD, количество партиций, направление сортировки (по умолчанию-возрастающее, аргумент типа Boolean) и желаемое количество сэмплов в одной секции RDD;
4. Вычисляются верхние границы для каждой будущей секции RDD:
o Немного пересчитав желаемое количество сэмплов, класс приступает к сэмплированию RDD. Делается это так:
- желаемое количество первых записей помещается в массив фиксированного размера; если записи на этом кончились, то массив урезается до реального размера и возвращается;
- если же записи ещё есть, то цикл проходит по записям и записывает каждую следующую вместо одной случайной записи в массив. Одновременно подсчитывается общее количество. Полученный результат передаётся на драйвер;
o Теперь, с известным размером партиций и полученными массивами сэмплов, класс проводит оценку, нет ли больших секций, из которых получено недостаточно записей;
o Если такие секции встретились, то они дополнительно сэмплируются с использованием очень интересного класса PartitionPruningRDD (описание этого класса выходит за рамки этой статьи). Сэмплирование осуществляется стандартным методом RDD.sample. Полученный результат передаётся на драйвер;
o Полученные InternalRow, а их количество сейчас заметно больше, чем будет в итоге, чтобы обеспечить приблизительно равный размер секций статистически, сортируются с использованием ordering, и производится отбор верхних границ, исходя из накопительной суммы весов.
5. Определяется метод бинарного поиска нужного интервала (меньше или равен определённой верхней границе, больше предшествующей верхней границы), который должен ускорить обработку больших наборов данных;
6. Во время работы класса, полученная запись сравнивается с границами методом линейного поиска, или, если будущих секций больше, чем 128, то бинарным поиском находится, к какой секции она относится;
7. Класс должен быть сериализуемым, чтобы его можно было передавать на экзекуторы, поэтому в него добавлены методы writeObject и readObject.
Я хочу уточнить, почему меня не устроила реализация RangePartitioner:
1) Он не имеет разделяемого состояния.
Псевдослучайные функции для выбора сэмпла RDD, даже сидированные одним и тем же значением, не могут обеспечить полностью стабильную выборку, границы партиций RDD могут быть определены по-разному даже для одного и того же набора данных. Если же наборы данных разные, то одни и те же значения поля упорядочивания из каждого набора могут находится в разных секциях RDD, ведь RangePartitioner знает о распределении только одного набора данных. Поле rangeBounds приватное и не может быть передано в другой RangePartitioner, а если бы оно и было открыто, RangePartitionerне может использовать готовую информацию о распределении;
2) Сэмплирование осуществляется в общем случае, два раза, если находятся слишком большие партиции;
3) Материализация предварительного результата на драйвере выглядит преждевременной, ведь часть bigdata – это тоже bigdata, не исключено, что памяти на драйвере не хватит, если мы пытаемся разделить таблицу размером в десятки или сотни терабайт. Лучше произвести отбор верхних границ в RDD, и получить на драйвере только необходимые данные;
4) Наша задача может требовать фиксированное разделение по секциям Hive, и динамическое формирование партиций RDD, "вложенное" в это строгое разделение, а в RangePartitioner существует только вариант с сортировкой по всем полям. При сохранении этот порядок может измениться, поскольку из одной партиции наверняка будут получены несколько файлов;
5) Я решил использовать как основной аргумент не желаемое количество партиций RDD, а желаемое количество записей в файле – это в конечном итоге должно привести к лучшему распределению.
Кроме того, фиксированные секции станут ключами карты (Map[InternalRow, Array[InternalRow]) и каждый элемент карты будет содержать массив с верхними границами конкретной партиции, а это позволит уменьшить порядок бинарного поиска.
6) Получить весь InternalRow нужно, если мы собираемся выполнить dataframe.repartitionAndSortWithinPartitions. Это очень полезный метод, позволяющий внести локальную сортировку в shuffle. Но мы собираемся сортировать по полям, в общем случае отличным от поля упорядочивания, а значит можно сразу же выделить необходимые для партиционирования колонки и сократить объем получаемых данных потенциально в десятки или сотни раз, и заодно сделать партишенер подходящим для секционирования совершенно других наборов данных, в которых есть эквивалентные поля. Именно этот пункт поможет с реализацией передачи состояния (см. первый пункт этого списка) и с локальной сортировкой, объединённой с shuffle!
Поскольку текст уже весьма большой, сама реализация будет в следующей части.