На Хабре и прочих интернетах чуть не каждый день постят пустые статьи о бигдата, создавая у спецов стойкое ощущение, что кроме маркетинга за стеком бигдаты ничего нет. На самом деле там достаточно интересных технологий под капотом Hadoop и тут я хочу слегка разбавить маркетинг, взглядом технического спеца с опытом Oracle.
В первую очередь стоит понимать, что один из столпов бигдаты Hadoop, это не только батч процессинг и map-reduce, как многие пытаются изобразить. Это запросто может быть обработка и с противоположного спектра задач: чтение потока мелких сообщений, например от IoT (spark на Hadoop, читает Kafka stream), на ходу агрегируя и выявляя отклонения. Это могут быть тучи мелких запросов отчетных систем с JOINs к parquet файлам через Impala. Все это все та же экосистема Hadoop. Там очень много всего, различной степени зрелости и требующего различного подхода. Причем поверьте, никто толком не знает наиболее выигрышный вариант. Где-то задачи отлично ложатся на классические map-reduce и достаточные примитивные parquet на hdfs, где-то Spark с обычными SQL JOINs чуть не на текстовых файликах. Крупнейший дистрибутив Cloudera сейчас активно продвигает нечто напоминающее базу данных Kudu, которая может как в java стайл map-reduce использоваться, так и под SQL в Impala.
Там много всего и почти все это между собой комбинируется, при этом в зависимости от комбинации подходы обработки могут быть очень и очень разными. В принципе это отдаленно напоминает метания Oracle в конце 90х, начала нулевых, когда они в субд начали пихать всякие xml таблицы, java stored procedures и прочие странные вещи.
По началу попав в проект с Hadoop/map-reduce, было совершенно не понятно, за счет чего с таким подходом можно соревноваться с полноценными СУБД, ведь всё общение map-reduce проходит через писанину вна hdfs. Сначала маппер читает все подряд, потом пишет на hdfs свой аутпут для редюсеров, потом редюсеры читают, снова пишут. С оракловыми представлениями о прекрасном казалось это точно не взлетит. Но позже появилось понимание за счет чего.
Примерно как Oracle делает с виду много лишней работы, пытаясь за счет «лишней» писанины на диски увеличить параллельное выполнение. Oracle, например, пишет в «лишний» UNDO, что позволяет ему развести конкурирующих читателей. Пишет блокировки в блоки данных, что позволяет не держать огромные списки блокировок в памяти, прозрачно обменивать инфой о блокировках между нодами RAC кластера. Примерно в этом же ключе стоит смотреть на с виду лишнюю писанину map-reduce. Все это «лишнее» в итоге позволяет выполнять гораздо больше задач в параллель, на фоне Oracle.
При этом я был сразу удивлен размером parquet файлов, таблички в Oracle без индексов на 80-100 гб, легко превращаются в 20-30 гб parquet. В результате map-reduce читает с дисков в разы меньше, загружая сразу тучу ядер кластера, в то время как Oracle и прочесть должен больше и все вычисления возлагает на единственный пользовательский процесс. Это камень в огород PL/SQL машины, хотя и у SQL движка оракла тоже тьма нюансов на тему параллельного чтения.
Что бы было понятно, как же выглядит реализация логики в Hadoop могу проиллюстрировать одну из «пятничных задачек» с оракловой ветки sql.ru. В принципе все началось с баталии на тему map-reduce vs Spark.
Задачка такая:
В Oracle решение было предложено таким:
В SparkSQL задачка решается без третьего пункта так
Как видим решение на spark-sql почти от оракла и не отличается. А вот map-reduce решение:
Mapper
Reducer
Запускаем
Итог: бигдата стек огромен, там множество подсистем работающих в своей парадигме, map-reduce лишь один из болтиков, не факт, что теперь и всем нужный, в свете расцвета моды на Spark. Батч процессинг не единственная парадигма реализуемая в бигдата, а Spark уже позволяет писать логику в том числе и декларативным SQL языком. Map-reduce же тоже вполне себе красиво кладется на некоторые задачи, при этом с легкостью решает задачи, которые еще недавно были по силам лишь серьезным оракловым серверам. Если приглядеться к коду то видно, что на Spark-sql так и не реализован пункт 3 задания, зато map-reduce код, хоть и не декларативный, вышел достаточно элегантным и запросто расширяемым. Пукт 3 задания запросто добавляется в map-reduce код за несколько минут девелопером средней паршивости, тогда как нагромождение аналитических функций в оракловом решении потребует серьезной подготовки девелопера.
В первую очередь стоит понимать, что один из столпов бигдаты Hadoop, это не только батч процессинг и map-reduce, как многие пытаются изобразить. Это запросто может быть обработка и с противоположного спектра задач: чтение потока мелких сообщений, например от IoT (spark на Hadoop, читает Kafka stream), на ходу агрегируя и выявляя отклонения. Это могут быть тучи мелких запросов отчетных систем с JOINs к parquet файлам через Impala. Все это все та же экосистема Hadoop. Там очень много всего, различной степени зрелости и требующего различного подхода. Причем поверьте, никто толком не знает наиболее выигрышный вариант. Где-то задачи отлично ложатся на классические map-reduce и достаточные примитивные parquet на hdfs, где-то Spark с обычными SQL JOINs чуть не на текстовых файликах. Крупнейший дистрибутив Cloudera сейчас активно продвигает нечто напоминающее базу данных Kudu, которая может как в java стайл map-reduce использоваться, так и под SQL в Impala.
Там много всего и почти все это между собой комбинируется, при этом в зависимости от комбинации подходы обработки могут быть очень и очень разными. В принципе это отдаленно напоминает метания Oracle в конце 90х, начала нулевых, когда они в субд начали пихать всякие xml таблицы, java stored procedures и прочие странные вещи.
По началу попав в проект с Hadoop/map-reduce, было совершенно не понятно, за счет чего с таким подходом можно соревноваться с полноценными СУБД, ведь всё общение map-reduce проходит через писанину вна hdfs. Сначала маппер читает все подряд, потом пишет на hdfs свой аутпут для редюсеров, потом редюсеры читают, снова пишут. С оракловыми представлениями о прекрасном казалось это точно не взлетит. Но позже появилось понимание за счет чего.
Примерно как Oracle делает с виду много лишней работы, пытаясь за счет «лишней» писанины на диски увеличить параллельное выполнение. Oracle, например, пишет в «лишний» UNDO, что позволяет ему развести конкурирующих читателей. Пишет блокировки в блоки данных, что позволяет не держать огромные списки блокировок в памяти, прозрачно обменивать инфой о блокировках между нодами RAC кластера. Примерно в этом же ключе стоит смотреть на с виду лишнюю писанину map-reduce. Все это «лишнее» в итоге позволяет выполнять гораздо больше задач в параллель, на фоне Oracle.
При этом я был сразу удивлен размером parquet файлов, таблички в Oracle без индексов на 80-100 гб, легко превращаются в 20-30 гб parquet. В результате map-reduce читает с дисков в разы меньше, загружая сразу тучу ядер кластера, в то время как Oracle и прочесть должен больше и все вычисления возлагает на единственный пользовательский процесс. Это камень в огород PL/SQL машины, хотя и у SQL движка оракла тоже тьма нюансов на тему параллельного чтения.
Что бы было понятно, как же выглядит реализация логики в Hadoop могу проиллюстрировать одну из «пятничных задачек» с оракловой ветки sql.ru. В принципе все началось с баталии на тему map-reduce vs Spark.
Задачка такая:
Написать запрос, который для каждого adjusted value находит
1) предыдущее original (ordered by id)
2) original со сдвигом, который задается переменной (предыдущее считается со сдвигом 0)
3) число предыдущих original так чтоб их сумма не превышала заданный лимит
var shift number
var limit number
exec :shift := 2;
PL/SQL procedure successfully completed.
exec :limit := 2000;
PL/SQL procedure successfully completed.
В Oracle решение было предложено таким:
SQL> select t.id,
2 t.type,
3 t.value,
4 decode(type, 'adjusted', max(decode(type, 'original', value)) over(partition by grp)) prev_o_value,
5 decode(type, 'adjusted', max(decode(type, 'original', shift_n)) over(partition by grp)) prev_shift_n_o_value,
6 decode(type, 'adjusted', count(decode(type, 'original', id))
7 over(order by orig_val_running_sum range between 2000 preceding and 1 preceding)) count_o
8 from (select t.*,
9 sum(decode(type, 'original', value)) over(order by id) - decode(type, 'original', value, 0) orig_val_running_sum,
10 decode(type, 'original', lag(value, 2) over(order by decode(type, 'original', 0, 1), id)) shift_n,
11 sum(decode(type, 'original', 1)) over(order by id) grp
12 from t) t
13 order by id;
ID TYPE VALUE PREV_O_VALUE PREV_SHIFT_N_O_VALUE COUNT_O
---------- ------------------------------ ---------- ------------ -------------------- ----------
10 original 100
20 original 200
30 adjusted 300 200 2
40 original 400
50 adjusted 500 400 100 3
60 original 600
70 original 700
80 adjusted 800 700 400 5
90 adjusted 900 700 400 5
100 original 1000
110 adjusted 1100 1000 600 2
120 original 1200
130 adjusted 1300 1200 700 1
140 original 1400
150 adjusted 1500 1400 1000 1
15 rows selected.
В SparkSQL задачка решается без третьего пункта так
select t.id,
t.type,
t.value,
case when type = 'adjusted' then max(case when type = 'original' then value end) over (partition by position, grp) end prev_o_value,
case when type = 'adjusted' then max(case when type = 'original' then shift_n end) over (partition by position, grp) end prev_shift_n_o_value
from (select t.*,
case when type = 'original'
then lag(value, 2) over (partition by position order by case when type = 'original' then 0 else 1 end, id)
end shift_n,
sum(case when type = 'original' then 1 end) over (partition by position order by id) grp
from t) t
order by id
+---+--------+-----+------------+--------------------+
|id |type |value|prev_o_value|prev_shift_n_o_value|
+---+--------+-----+------------+--------------------+
|10 |original|100 |null |null |
|20 |original|200 |null |null |
|30 |adjusted|300 |200 |null |
|40 |original|400 |null |null |
|50 |adjusted|500 |400 |100 |
|60 |original|600 |null |null |
|70 |original|700 |null |null |
|80 |adjusted|800 |700 |400 |
|90 |adjusted|900 |700 |400 |
|100|original|1000 |null |null |
|110|adjusted|1100 |1000 |600 |
|120|original|1200 |null |null |
|130|adjusted|1300 |1200 |700 |
|140|original|1400 |null |null |
|150|adjusted|1500 |1400 |1000 |
+---+--------+-----+------------+--------------------+
Как видим решение на spark-sql почти от оракла и не отличается. А вот map-reduce решение:
Mapper
public class ParquetMapper extends Mapper<LongWritable, GenericRecord, Text, AvroValue<GenericRecord>> {
private final Text outputKey = new Text();
private final AvroValue<GenericRecord> outputValue = new AvroValue<GenericRecord>();
@Override
protected void map(LongWritable key, GenericRecord value, Context context) throws IOException, InterruptedException {
outputKey.set(String.valueOf(value.get("position")));
outputValue.datum(value);
context.write(outputKey, outputValue);
}
}
Reducer
public class ParquetReducer extends Reducer<Text, AvroValue<GenericRecord>, Void, Text> {
private static final byte shift = 2 ;
private TreeMap<Integer, AbstractMap.SimpleEntry<String, Integer>> rows = new TreeMap<Integer,AbstractMap.SimpleEntry<String, Integer>>();
List<Integer> queue = new LinkedList<Integer>();
private String adj = "";
private int lastValue = -1;
@Override
protected void reduce(Text key, Iterable<AvroValue<GenericRecord>> values, Context context) throws IOException, InterruptedException {
for (AvroValue<GenericRecord> value : values) {
rows.put((Integer) value.datum().get("id"),
new AbstractMap.SimpleEntry(value.datum().get("type"), value.datum().get("value"))) ;
}
for(Map.Entry<Integer, AbstractMap.SimpleEntry<String, Integer>> entry : rows.entrySet()) {
AbstractMap.SimpleEntry<String, Integer> rowData = entry.getValue();
if (rowData.getKey().equals("original")) {
lastValue = rowData.getValue() ;
queue.add(lastValue) ;
adj = "" ;
} else {
adj = " " + String.valueOf(lastValue);
if (queue.size()- shift >0) {
adj = adj + " " + queue.get(queue.size()-shift).toString() ;
}
}
Text output = new Text(entry.getKey()+" "+rowData.getKey() + " " + rowData.getValue() + adj);
context.write(null, output );
}
}
}
Запускаем
[yarn@sandbox map-reduce]$ hadoop fs -cat /out/part-r-00000
10 original 100
20 original 200
30 adjusted 300 200
40 original 400
50 adjusted 500 400 200
60 original 600
70 original 700
80 adjusted 800 700 600
90 adjusted 900 700 600
100 original 1000
110 adjusted 1100 1000 700
120 original 1200
130 adjusted 1300 1200 1000
140 original 1400
150 adjusted 1500 1400 1200
Итог: бигдата стек огромен, там множество подсистем работающих в своей парадигме, map-reduce лишь один из болтиков, не факт, что теперь и всем нужный, в свете расцвета моды на Spark. Батч процессинг не единственная парадигма реализуемая в бигдата, а Spark уже позволяет писать логику в том числе и декларативным SQL языком. Map-reduce же тоже вполне себе красиво кладется на некоторые задачи, при этом с легкостью решает задачи, которые еще недавно были по силам лишь серьезным оракловым серверам. Если приглядеться к коду то видно, что на Spark-sql так и не реализован пункт 3 задания, зато map-reduce код, хоть и не декларативный, вышел достаточно элегантным и запросто расширяемым. Пукт 3 задания запросто добавляется в map-reduce код за несколько минут девелопером средней паршивости, тогда как нагромождение аналитических функций в оракловом решении потребует серьезной подготовки девелопера.
Комментарии (4)
asd111
05.09.2017 07:48Посмотрите Yandex Clickhouse. Это табличная БД для аналитики, со своими ограничениями но одна из самых быстрых на данный момент. Правда нужен процессор с SSE 4.2 и линукс.
Yo1 Автор
05.09.2017 10:09да, видел пару тестов кто-то в блогах публиковал, но на наших широтах врятли получит распространение субд из России. у нас ставку на kudu сделали.
fediq
Выглядит так, будто вы посмотрели на Hadoop v0.20 в 2009ом, после этого впали в кому с кошмарами из маркетингового мусора, а теперь проснулись и озираетесь по сторонам "О-о-о, айфоны!.. О-о-о, спарк!.. О-о-о, паркет...".
Yo1 Автор
выглядит, что еще один пых-пых воятель не понял о чем речь, но комент обязан был оставить…