После вводной статьи про Spark Shell мне хотелось бы рассмотреть некоторые приемы оптимизации запросов в Spark и Hive, и сравнить их с теми, что обычно применяются в классических СУБД типа Oracle или MS SQL.

Базовая модель данных


Рассмотрим разные техники на базе известной всем модели библиотеки. И так, у нас есть книги, они разложены по полкам в шкафах. Шкафы пронумерованы, если библиотека большая — то к шкафу еще прилагается и номер комнаты. Книги разложены в определенном порядке, это «физический» порядок, его можно поменять — но это сложно и долго. Даже если у нас «всего лишь» такая библиотека, как у российских императоров:


А тем более если такая, как в Тринити колледже в Дублине:


Кроме этого, у нас есть каталожные шкафы, где размещены карточки книг. Карточки разложены в логическом порядке, и этих порядков может быть больше одного. Например, алфавитный порядок по названию книги, алфавитный порядок по фамилии автора (авторов и названий бывает несколько, например русское и английское названия), возможно еще какой-то тематический порядок (в жизни бывает редко, так как в оффлайн библиотеке поддерживать даже два порядка уже сложно).



Кроме всего прочего, нужно понимать, что размеры каталога тоже немаленькие:


Итак, мы хотим в библиотеке что-то найти. Допустим, мы хотим найти книгу по названию. Или две книги, чтобы было не так тривиально. Что мы делали бы в обычной реальной библиотеке, если нам хочется сделать это быстро?

Метод самый медленный: идем вдоль полок всех шкафов, и просматриваем названия книг. В случае библиотеки виртуальной такой способ называется fullscan. То есть, полный просмотр всего, что имеется. Годится, а возможно и будет самым быстрым, если у вас книг скажем одна полка, или один шкаф, и они все в пределах видимости.

Метод получше: шкафы подписаны (тематические разделы, или алфавит), и мы просматриваем только те из них, которые нас интересуют. Вполне типично также для книжного магазина средних размеров. При этом очевидно, что надписи на шкафах — это обычно жанр, или буква алфавита, и алфавитный порядок расстановки книг всегда один — либо по названиям, либо по авторам. Одна книга не может стоять в двух шкафах, на букву А по автору, и ну букву Я по названию.

Метод еще получше: открываем каталог по названиям книг, на нужную букву. Просматриваем карточки в поисках интересующей. Просмотреть нужно примерно один ящик с карточками.

Метод еще получше, если нам нужно две книги и более: Просим библиотекаря выполнить поиск по одному названию за нас. Сами ищем по другому.

Как видим, есть несколько способов поиска книг, с разными свойствами. Какие-то из них, как нам подсказывает опыт, будут быстрее других. Но выбор зависит от задачи, не всегда один способ самый быстрый.

В библиотеке виртуальной всему этому будут соответствовать, скорее всего, некие схемы, таблицы и колонки данных в них. Ну, если наша виртуальная библиотека построена на базе реляционной СУБД, что вполне вероятно. Ну и возможно файлы с самими книгами.

Как все происходит в библиотеке виртуальной?

Основные методы для типичных СУБД


Если взять типовые классические СУБД, особенно широко распространенные, то по степени распространенности средства оптимизации запросов можно расположить примерно так:

  • Индексы
  • Партиционирование (секционирование)
  • Шардирование. Примерно тоже самое, что и предыдущее, плюс физическое разнесение по разным дискам или узлам сети

Индексы


Если описать классический индекс простыми словами, то это будет просто таблица. Обычная таблица, которая на физическом уровне хранится как B-дерево, и отличается от той таблицы, для которой индекс построен, в основном двумя вещами — набором колонок и порядком сортировки строк. Ну и собственно самое важное — что эта таблица обновляется автоматически, когда в основной таблице добавляются, удаляются или изменяются данные. То есть, индекс перестраивается автоматически и транзакционно.

На примере библиотеки индексы — это каталожные карточки. Каталог названий книг — это один индекс, каталог авторов — второй. При поиске по каталогу мы перебираем карточки, которых мало, и они компактные, находим нужную, получаем ссылку на номер шкафа и полки. При добавлении книги в библиотеку библиотекарь заполняет новые карточки, и помещает их в каталог (в каждый из них). То есть, индекс, ускоряя поиск книги, одновременно замедляет ее добавление в библиотеку. Операции обновления и удаления в оффлайн библиотеке редки, но в целом должно быть понятно, что наличие каталогов их тоже замедляет — нужно удалить карточки на списанную книгу из всех каталожных шкафов.

Порядок в индексе может совпадать с физическим порядком строк в таблице (расстановка книг на полках и в шкафах), а может и нет. В первом случае индекс называется кластерным, и может быть только один. Некластерных индексов может быть сколько угодно. Т.е., если книги на полках расставлены по авторам, то карточки в авторском каталоге будут повторять этот порядок, а карточки по названиям — нет.

Индексов бывает много разных типов, поэтому про B-дерево — это упрощение. Например, индекс может быть реверсивным, в случае Оракла, и описывается он таким образом: реверсивные индексы буквально инвертируют последовательность байтов значения ключа, с тем чтобы уменьшить конкуренцию за блоки на последовательности сгенерированных первичных ключей. То есть, если у вас есть ключ, который растет более-менее последовательно и предсказуемо, новые записи зачастую добавляются в один и тот же блок данных на физическом уровне, и за этот блок возникает соревнование из желающих. Чтобы обновление происходило быстрее, Оракл предлагает индекс реверсивный, где значение ключа инвертируется, то есть его байты переставляются задом наперед. Таким образом «похожие» значения 1234 и 1235 (попадающие в один блок) превращаются в 4321 и 5321, перестают быть похожими, и раскидываются алгоритмом в разные блоки. В итоге обновление происходит быстрее, зато мы имеем сильное падение производительности, когда хотим достать ключи последовательно в порядке возрастания — они же не могут быть прочитаны из одного или соседних блоков, мы ведь их разбросали.

Еще один чуть менее традиционный тип индекса — это битовые карты (bitmaps). Как их описывает википедия: «обычно считается что Bitmap индексы хорошо работают на колонках с низкой кардинальностью, имеющих небольшое число различных значений». То есть, колонка содержит например пол — и имеет всего два значения, М и Ж. И может быть представлена всего одним битом.

Говоря про индексы, неплохо бы помнить о такой штуке, как селективность индекса. Например, индекс может сократить число выбираемых записей вдвое (если это скажем индекс по полю «Пол»), а может — до одной записи. Применительно к нашей библиотеке, написанные на шкафах каталога буквы алфавита — это индекс с селективностью «до шкафа», то есть, пользуясь им, мы сокращаем число просматриваемых карточек примерно в 33 раза, по числу букв алфавита. Почему примерно? Ну очевидно потому, что в наших данных (список названий книг или список авторов) имеется еще и так называемый перекос (data skew), то есть неравномерное распределение. Книг с названием на букву А скорее всего будет сильно больше, чем книг с названием на Ы.

Вообще, об индексах можно говорить долго и много (главное тут — вовремя остановиться :), скажем, если вы работаете с геоданными, вам наверняка пригодятся так называемые spatial (или пространственные) индексы, основанные на дереве квадрантов или другой похожей структуре. Википедия знает целых 15 видов пространственных индексов, так что на этом я про индексы вообще точно закругляюсь.

Партиционирование (секционирование)


В случае библиотеки партиционирование — это раскладывание каталогов по шкафам (с этикетками). То есть, если на шкафу написано: «тут книги от А до Г», и мы ищем книгу на Д, то мы в этот шкаф уже можем не заглядывать. По сути, написанные на шкафах буквы алфавита — это некая надстройка над уровнем каталога, мета-каталог или мета-индекс.

Для обычной СУБД партиционирование в сущности делает тоже самое — позволяет быстро отбросить (или оставить) только небольшую часть записей, обычно в одной партиции. Как правило, имеет место некоторое правило (выражение), которое по значению колонок таблицы вычисляет целевую партицию.

На физическом уровне хранения партиция может быть привязана к файловой группе (в терминах MS SQL), или чему-то подобному. То есть, фактически — либо к файлу, либо через него — к дисковому накопителю. Ну то есть, можно выбрать, например, SSD, если какие-то данные нужны часто и быстро, и HDD — если редко, но их много. Правда, чаще это применяется на уровне таблиц.

Стоит упомянуть такую изощренную технику, как partition switching. Применительно к MS SQL это способ перекинуть партицию целиком из одной таблицы в другую практически мгновенно. То есть, вы в течение дня допустим заливаете данные за сегодня в сравнительно небольшую временную таблицу, а зачем одним движением руки брюки правращаются… превращаются брюки… в элегантные шорты. То есть, одним alter table партиция перемещается в другую, постоянную и большую таблицу. Только не спрашивайте меня, что при этом будет с транзакциями, уровнем их изоляции и т.п. Честно скажу — не помню.

Если вспомнить про нашу библиотеку, наиболее похожей аналогией наверное будет переклеивание этикеток на шкафах. То есть, библиотекари достаточно долго манипулируют карточками в пустом неподписанном шкафу, а потом бац — на шкаф клеится этикетка, и он становится частью каталога.

Шардирование


Если мы имеем несколько процессоров (библиотекарей), то мы уже имеем шардирование. Соответственно, если мы ищем две или больше книг на разные буквы алфавита, мы можем поручить это двум и более библиотекарям, которые проведут поиск в разных шкафах параллельно. При этом двум библиотекарям в одном шкафу искать одновременно неудобно. Тут у нас полная аналогия с ядрами и шпинделями — если у нас обычный HDD, то читать с него два файла на разных цилиндрах параллельно скорее всего не будет быстрее, чем читать их последовательно, потому что каждая перестановка головки — это сильное замедление. То есть, если у нас два процессора — лучше иметь два дисковых шпинделя.

Основные методы для Hadoop/Hive/Spark


Индексы


Индексы появились довольно поздно, в версии 0.7 (битовые — в 0.8), особой популярностью никогда не пользовались, а в версии 3.0 предлагается их выбросить. Вот скажем задача в JIRA: Drop Support For Indexes From Apache Hive.

Почему так? Сложно сказать достоверно, скорее всего причин больше чем одна. Например, такие форматы как Parquet, позволяют эффективно выбирать подмножество колонок таблицы, сильно экономя на этом чтение с диска. Де факто, это и есть полный аналог индекса, вместо всей таблицы с ненужными вам колонками вы читаете только нужные, и ими манипулируете. Во-вторых, на мой взгляд индексы все равно плохо сочетаются с Data Locality, то есть, вместо одного датасета мы имеем два — данные и индекс, и они совсем не обязательно размещаются на одних узлах кластера.

И еще одна, наверное не последняя причина — индексы ведь нужно обновлять при добавлении данных. А в Hive одним из методов добавления является сохранение данных в папку HDFS, а затем просто добавление партиции. Причем это — один из способов сделать добавление атомарным. По сути, Hive — это в некотором смысле лишь надстройка над хранением, сама обработка производится (полностью или частично) стронними программами на базе MapReduce API, или скажем на Spark. Соответственно, если вы хотите, чтобы индекс был актуален, вы должны в той программе, что сохраняет данные, сами его и обновить. Автоматическое обновление индекса в такой ситуации представить себе сложно, и его в итоге нет.

Партиционирование


Указание колонок партиционирования — это самоя базовая возможность Hive. На физическом уровне такая колонка является просто набором папок в HDFS. То есть, если X является колонкой партиционирования первого уровня (возможно вложенное партиционирование), то все строки, имеющие значение колонки X=2, хранятся в файлах, лежащих внутри папки

что-то там/<schema>/<table>/X=2/...тут всякие файлы parquet, например

Здесь что-то там/schema/table это корневая папка для хранения данных таблицы. Соответственно, операция вида select distinct X from table сводится к тому, что Hive перебирает все папки внутри базовой папки таблицы, не читая вообще никакие данные с дисков (обращается только к HDFS Namenode). Если нам нужны другие колонки — то Hive будет читать только файлы внутри этой (этих) папок.

Помните partition switching? В общем-то эта техника в Hive является чуть ли не самой базовой и фундаментальной. Вы запускаете приложение, которое автономно работает с данными, и строит на выходе папку с файлами где-то в hdfs. Там лежат данные для таблицы, например в Parquet (или в любом другом возможном формате). А дальше вы просто сообщаете Hive, что появилась новая партиция. То есть, снова одно действие в виде alter table — и очередной select уже увидит новые данные, скажем за целый день. Причем на уровне Hive Metastore эта операция вполне может быть (или является) атомарной, так как Metastore это обычная реляционная СУБД, например PostgreSQL или Oracle.

Шардирование


Если вспомнить, что HDFS это распределенная файловая система, и у нас имеет место репликация каждого блока файла на несколько узлов кластера, оказывается, что мы уже практически имеем некоторый вариант шардирования. То есть, даже если в файле всего один блок, и коээфициент репликации (по умолчанию) равен 3, наш блок записан в трех экземплярах, и мы вполне можем его читать на трех узлах сразу, тремя ядрами. И обработать в три потока. То есть, мы практически даром получили параллелизм при чтении.

Бакетирование (кластеризация)


Бакетирование или кластеризация — это дальнейшее разделение таблицы или партиции Hive на бакеты (buckets) на основе хеш функции по колонке, чтобы добавить к данным дополнительную структуру, и ускорить выполнение запросов.

Кластеризованная таблица создается командой типа такой:

CREATE TABLE ... CLUSTERED BY (column_name1, column_name2, …) SORTED BY (column_name [ASC|DESC], …)] INTO num_buckets BUCKETS;

То есть, данные в таблице или партиции дополнительно раскладываются на num_buckets кластеров, путем вычисления хеш функции от колонок кластеризации.

Следует отметить одну важную вещь — в данном случае мы просто сообщили Hive, что таблица будет кластеризована и сортирована. При том реальная раскладка данных на кластеры и сортировка выполняются приложением при записи данных (однократно), и позволяют далее ускорить (многократное) выполнение запросов к тем же данным, используя имеющуюся информацию о партициях, кластерах и сортировке.

Фильтр Блума


Базовое описание того, что есть фильтр Блума, можно прочесть скажем
тут.

Чем собственно хорош фильтр Блума? Во-первых, объединение и пересечение двух фильтров — это простая операция над их битовыми массивами. Таким образом, построение фильтра Блума эффективно параллелится, в частности в Spark это выражается операцией treeAggregate. То есть, мы разбиваем наш исходный массив на части, и на каждой строим параллельно свой фильтр, а потом объединяем их, причем объединяем не в одном месте (на spark-драйвере, который может стать узким местом), а тоже параллельно. Почитать про treeAggregate подробнее можно скажем вот тут.

Сама фильтрация тоже параллелится весьма эффективно, все что нужно — это передать с машины на машину саму битовую карту фильтра, которая является достаточно компактной (в наших типовых случаях, когда данных терабайты, обычный объем фильтра порядка мегабайт).

Фильтр Блума не является чем-то специфическим для Spark, для Hive вполне существуют реализации, позволяющие применять его в виде UDF (User Defined Function). Например, вот такая.

Колоночно ориентированные форматы хранения


В зависимости от назначения конкретной таблицы может быть выгодно хранить данные на физическом уровне либо построчно, когда сохраняются сначала все колонки первой строки, затем все колонки второй, и так далее, либо поколоночно, когда сначала сохраняется первая колонка всех строк, затем вторая (также для всех), и т.д.

В мире классических СУБД вы, как правило, не управляете способом хранения. Хотя гибридные СУБД существуют, и варианты возможны, но в целом СУБД все-таки можно разделить на строчно и колоночно ориентированные. Скажем, просто Sybase — это строчно ориентированное хранение, а Sybase IQ — колоночно. И в большинстве случаев хранение построчное.

В мире же Hadoop/Hive/Spark выбор способа хранения это по сути выбор формата файла. То есть, выбираете вы скажем CSV — у вас построчный способ хранения, выбираете Parquet — уже колоночно ориентированный.

Колоночно-ориентированные форматы, наиболее известным из которых является все тот же Parquet, позволяют экономить на том, что вы выбираете не все данные, которые хранятся в таблице, а только нужные. В зависимости от числа нужных колонок их их общего числа экономия может быть весьма и весьма велика.

Кроме того, если данные хранятся по колонкам, то возможны некоторые более эффективные алгоритмы сжатия, например, если у вас числовая колонка, и значения монотонно изменяются, то можно хранить разницу между строками, которая меньше по модулю, и занимает меньше места в байтах. Хороший пример подобного подхода — когда при хранении полилинии вместо координат вы храните дельту от предыдущей точки (и вы при этом знаете, что эта линия непрерывна, т.е. скачков координат не бывает). Просто кординаты это скажем 8 десятичных знаков, а дельта при этом — всего один или два знака. Такая техника применяется в формате TopoJSON, и он весьма компактен по сравнению с похожим, но более простым GeoJSON.

Broadcast


Техника под названием Broadcast (букв. телевещание) в обычных СУБД существует как правило в виде так называемых «хинтов», т.е. указаний в SQL-запросе, или советов оптимизатору, как лучше запрос выполнить. Обычно она применяется к JOIN, когда одна из участвующих таблиц большая, а вторая достаточно маленькая, или же скажем статическая справочная.

Состоит техника в том, что копия маленькой таблицы рассылается(вещание) всем исполнителям запроса. А далее большая таблица делится на части, и каждый из исполнителей строит свою часть JOIN.

В случае Hive и Spark прелесть этого метода состоит в том, что мы можем переслать на каждый из узлов не только таблицу, но вообще говоря любую структуру данных. Ну т.е. если мы пишем на Java/Scala — то любую коллекцию, Map, битовую карту, или что угодно еще. А дальше исполнители могут это применить по своему усмотрению (ну, как пожелает левая пятка программиста). Например, самый очевидный вариант: если вы строите JOIN из двух Dataset (это ближайший аналог как раз таблицы в SQL), то вы можете применить broadcast к одному из них, либо в API, либо опять же в виде хинта в Spark SQL. Или вы можете прочитать маленький Dataset в память, построить на его базе любую оптимальную для вашей задачи структуру (фильтр Блума — наверное первый очевидный выбор), а дальше разослать эту структуру исполнителям. Которые применят ее не только для построения JOIN, но для чего угодно (фильтрации, или другой произвольной обработки).

Ну или вы можете разослать узлам кластера построенную модель для машинного обучения.

Понятно, что все эти способы оптимизации — несколько более «ручные», чем хинты в Oracle. И потребуют от вас как некоторых познаний в API, так и фантазии. Но в тоже время (и по той же причине) они намного более гибкие, и потенциально — более эффективные.

Реальный случай из жизни


И напоследок — небольшой практический пример.

Пусть у нас есть таблице в Hive, которая достаточно велика (ну скажем, порядка терабайта и более), и при этом не имеет партиционирования. И у нас есть набор ключей, которые мы бы хотели в ней найти. Как мы можем это сделать имеющимися подручными средствами?

Воспользуемся тем, что ключ (это не обязательно должен быть первичный ключ, и в общем случае в Hive и Spark такое понятие вобще не часто применяется) — это как правило небольшое подмножество колонок таблицы. Скажем, у нас достаточно типична ситуация когда у таблицы суррогатный числовой ключ из одной колонки, и 500 колонок данных. Читаем колонки первичного ключа, получаем спарковский датасет. Добавляем к нему колонку с именем файла:

val ds= spark.read.table(tableName).select(pkColumns).
   withColumn("file_name", input_file_name())

Теперь нужно найти только те файлы, где лежат нужные нам ключи. Для этого применим фильтр блума, обучив его на датасете с ключами. Имеющаяся у нас реализация (как и большинство известных мне) обучает фильтр на одном параметре (обычно строке или наборе байтов), поэтому если ключ составной или не строковый — его колонки pkColumns нужно будет преобразовать к одной строке.

Затем фильтруем наш датасет фильтром блума. Разумеется, это операция не дешевая, но зато фильтр блума хорошо распараллеливается, поэтому при наличии ресурсов все проходит быстро.

На выходе мы получаем датасет, состоящий из колонок ключа, и имени файла. Выбрасываем ключ, применяем distinct, и получаем список только тех файлов, где есть нужные нам ключи. И наконец, читаем уже из этих файлов (а не из таблицы, т.е. выполняем spark.read.parquet(files)) все нужные нам колонки.

Тут мы неявно предполагаем, что искомые ключи более-менее предсказуемо распределены по терабайту (если это в реальности первичный ключ — то запись с ним вообще должна быть одна), а именно, если у нас скажем 10 ключей, то прочитать, например, придется 10 файлов, а всего файлов скажем 1000. Иначе мы не получим никакого выигрыша по сравнению с простым чтением всего терабайта. На практике получается сокращение по времени обработки скажем раз в 10, что вполне достойно, с учетом того, что кода мы написали от силы строк 20.