Привет, Хабр! В предыдущих статьях мы описали парадигму MapReduce, а также показали как на практике реализовать и выполнить MapReduce-приложение на стеке Hadoop. Пришла пора описать различные приёмы, которые позволяют эффективно использовать MapReduce для решения практических задач, а также показать некоторые особенности Hadoop, которые позволяют упростить разработку или существенно ускорить выполнение MapReduce-задачи на кластере.



Map only job


Как мы помним, MapReduce состоит из стадий Map, Shuffle и Reduce. Как правило, в практических задачах самой тяжёлой оказывается стадия Shuffle, так как на этой стадии происходит сортировка данных. На самом деле существует ряд задач, в которых можно обойтись только стадией Map. Вот примеры таких задач:

  • Фильтрация данных (например, «Наи?ти все записи с IP-адреса 123.123.123.123» в логах web-сервера);
  • Преобразование данных («Удалить колонку в csv-логах»);
  • Загрузка и выгрузка данных из внешнего источника («Вставить все записи из лога в базу данных»).

Такие задачи решаются при помощи Map-Only. При создании Map-Only задачи в Hadoop нужно указать нулевое количество reducer’ов:

Map Only Job

Пример конфигурации map-only задачи на hadoop:
Native interface Hadoop Streaming Interface
Указать нулевое количество редьюсеров при конфигурации job’a:

job.setNumReduceTasks(0);

Более развёрнутый пример по ссылке.
Не указываем редьюсер и указываем нулевое количество редьюсеров Пример:

hadoop jar hadoop-streaming.jar  -D mapred.reduce.tasks=0-input input_dir-output output_dir-mapper "python mapper.py"-file "mapper.py"

Map Only jobs на самом деле могут быть очень полезными. Например, в платформе Facetz.DCA для выявления характеристик пользователей по их поведению используется именно один большой map-only, каждый маппер которого принимает на вход пользователя и на выход отдаёт его характеристики.

Combine


Как я уже писал, обычно самая тяжёлая стадия при выполнении Map-Reduce задачи – это стадия shuffle. Происходит это потому, что промежуточные результаты (выход mapper’a) записываются на диск, сортируются и передаются по сети. Однако существуют задачи, в которых такое поведение кажется не очень разумным. Например, в той же задаче подсчёта слов в документах можно предварительно предагрегировать результаты выходов нескольких mapper’ов на одном узле map-reduce задачи, и передавать на reducer уже просуммированные значения по каждой машине.

Combine. Взято по ссылке

В hadoop для этого можно определить комбинирующую функцию, которая будет обрабатывать выход части mapper-ов. Комбинирующая функция очень похожа на reduce – она принимает на вход выход части mapper’ов и выдаёт агрегированный результат для этих mapper’ов, поэтому очень часто reducer используют и как combiner. Важное отличие от reduce – на комбинирующую функцию попадают не все значения, соответствующие одному ключу.

Более того, hadoop не гарантирует того, что комбинирующая функция вообще будет выполнена для выхода mapper’a. Поэтому комбинирующая функция не всегда применима, например, в случае поиска медианного значения по ключу. Тем не менее, в тех задачах, где комбинирующая функция применима, её использование позволяет добиться существенного прироста к скорости выполнения MapReduce-задачи.

Использование Combiner’a на hadoop:

Native Interface Hadoop streaming
При конфигурации job-a указать класс-Combiner. Как правило, он совпадает с Reducer:

job.setMapperClass(TokenizerMapper.class);
job.setCombinerClass(IntSumReducer.class);
job.setReducerClass(IntSumReducer.class);






В параметрах командной строки указать команду -combiner. Как правило, эта команда совпадает с командой reducer’a. Пример:

hadoop jar hadoop-streaming.jar -input input_dir-output output_dir-mapper "python mapper.py"-reducer "python reducer.py"-combiner "python reducer.py"-file "mapper.py"-file "reducer.py"\

Цепочки MapReduce-задач


Бывают ситуации, когда для решения задачи одним MapReduce не обойтись. Например, рассмотрим немного видоизмененную задачу WordCount: имеется набор текстовых документов, необходимо посчитать, сколько слов встретилось от 1 до 1000 раз в наборе, сколько слов от 1001 до 2000, сколько от 2001 до 3000 и так далее.

Для решения нам потребуется 2 MapReduce job’а:

  1. Видоизменённый wordcount, который для каждого слова рассчитает, в какой из интервалов оно попало;
  2. MapReduce, подсчитывающий, сколько раз в выходе первого MapReduce встретился каждый из интервалов.

Решение на псевдокоде:
#map1
def map(doc):
	for word in doc:
		yield word, 1
#reduce1
def reduce(word, values):
	yield int(sum(values)/1000), 1

#map2
def map(doc):
	interval, cnt = doc.split()
	yield interval, cnt
#reduce2
def reduce(interval, values):
	yield interval*1000, sum(values)


Для того, чтобы выполнить последовательность MapReduce-задач на hadoop, достаточно просто в качестве входных данных для второй задачи указать папку, которая была указана в качестве output для первой и запустить их по очереди.

На практике цепочки MapReduce-задач могут представлять собой достаточно сложные последовательности, в которых MapReduce-задачи могут быть подключены как последовательно, так и параллельно друг другу. Для упрощения управления такими планами выполнения задач существуют отдельные инструменты типа oozie и luigi, которым будет посвящена отдельная статья данного цикла.

Пример цепочки MapReduce-задач.

Distributed cache


Важным механизмом в Hadoop является Distributed Cache. Distributed Cache позволяет добавлять файлы (например, текстовые файлы, архивы, jar-файлы) к окружению, в котором выполняется MapReduce-задача.

Можно добавлять файлы, хранящиеся на HDFS, локальные файлы (локальные для той машины, с которой выполняется запуск задачи). Я уже неявно показывал, как использовать Distributed Cache вместе с hadoop streaming: добавляя через опцию -file файлы mapper.py и reducer.py. На самом деле можно добавлять не только mapper.py и reducer.py, а вообще произвольные файлы, и потом пользоваться ими как будто они находятся в локальной папке.

Использование Distributed Cache:
Native API
//конфигурация Job’a
JobConf job = new JobConf();
DistributedCache.addCacheFile(new URI("/myapp/lookup.dat#lookup.dat"), 
                             job);
DistributedCache.addCacheArchive(new URI("/myapp/map.zip", job);
DistributedCache.addFileToClassPath(new Path("/myapp/mylib.jar"), job);
DistributedCache.addCacheArchive(new URI("/myapp/mytar.tar", job);
DistributedCache.addCacheArchive(new URI("/myapp/mytgz.tgz", job);

//пример использования в mapper-e:
public static class MapClass extends MapReduceBase  
implements Mapper<K, V, K, V> {

 private Path[] localArchives;
 private Path[] localFiles;
 
 public void configure(JobConf job) {
   // получаем кэшированные данные из архивов
   File f = new File("./map.zip/some/file/in/zip.txt");
 }
 
 public void map(K key, V value, 
                 OutputCollector<K, V> output, Reporter reporter) 
 throws IOException {
   // используем данные тут
   // ...
   // ...
   output.collect(k, v);
 }
}
Hadoop Streaming
#перечисляем файлы, которые необходимо добавить в distributed cache в параметре –files. Параметр –files должен идти перед другими параметрами.

yarn  hadoop-streaming.jar	-files mapper.py,reducer.py,some_cached_data.txt	-input '/some/input/path' 	-output '/some/output/path' \ 
	-mapper 'python mapper.py' 	-reducer 'python reducer.py' 
пример использования:
import sys
#просто читаем файл из локальной папки
data = open('some_cached_data.txt').read()

for line in sys.stdin()
	#processing input
	#use data here

Reduce Join


Те, кто привык работать с реляционными базами, часто пользуются очень удобной операцией Join, позволяющей совместно обработать содержание некоторых таблиц, объединив их по некоторому ключу. При работе с большими данными такая задача тоже иногда возникает. Рассмотрим следующий пример:

Имеются логи двух web-серверов, каждый лог имеет следующий вид: <timestamp>\t<ip>\t<url>. Пример кусочка лога:

1446792139	178.78.82.1	/sphingosine/unhurrying.css
1446792139	126.31.163.222	/accentually.js
1446792139	154.164.149.83	/pyroacid/unkemptly.jpg
1446792139	202.27.13.181	/Chawia.js
1446792139	67.123.248.174	/morphographical/dismain.css
1446792139	226.74.123.135	/phanerite.php
1446792139	157.109.106.104	/bisonant.css

Необходимо посчитать для каждого IP-адреса на какой из 2-х серверов он чаще заходил. Результат должен быть представлен в виде: <ip>\t<first or second>. Пример части результата:

178.78.82.1	first
126.31.163.222	second
154.164.149.83	second
226.74.123.135	first

К сожалению, в отличие от реляционных баз данных, в общем случае объединение двух логов по ключу (в данном случае – по IP-адресу) представляет собой достаточно тяжёлую операцию и решается при помощи 3-х MapReduce и паттерна Reduce Join:

Общая схема ReduceJoin

ReduceJoin работает следующим образом:

  1. На каждый из входных логов запускается отдельный MapReduce (Map only), преобразующий входные данные к следующему виду:

    key -> (type, value)

    Где key – это ключ, по которому нужно объединять таблицы, Type – тип таблицы (first или second в нашем случае), а Value – это любые дополнительные данные, привязанные к ключу.

  2. Выходы обоих MapReduce подаются на вход 3-му MapReduce, который, собственно, и выполняет объединение. Этот MapReduce содержит пустой Mapper, который просто копирует входные данные. Дальше shuffle раскладывает данные по ключам и подаёт на вход редьюсеру в виде:

    key -> [(type, value)]

Важно, что в этот момент на редьюсер попадают записи из обоих логов и при этом по полю type можно идентифицировать, из какого из двух логов попало конкретное значение. Значит данных достаточно, чтобы решить исходную задачу. В нашем случае reducere просто должен посчитать для каждого ключа записей, с каким type встретилось больше и вывести этот type.

MapJoin


Паттерн ReduceJoin описывает общий случай объединения двух логов по ключу. Однако есть частный случай, при котором задачу можно существенно упростить и ускорить. Это случай, при котором один из логов имеет размер существенно меньшего размера, чем другой. Рассмотрим следующую задачу:

Имеются 2 лога. Первый лог содержит лог web-cервера (такой же как в предыдущей задаче), второй файл (размером в 100кб) содержит соответствие URL-> Тематика.  Пример 2-го файла:

/toyota.php 	auto
/football/spartak.html 	sport
/cars 	auto
/finances/money 	business

Для каждого IP-адреса необходимо рассчитать страницы какой категории с данного IP-адреса загружались чаще всего.

В этом случае нам тоже необходимо выполнить Join 2-х логов по URL. Однако в этом случае нам не обязательно запускать 3 MapReduce, так как второй лог полностью влезет в память. Для того, чтобы решить задачу при помощи 1-го MapReduce, мы можем загрузить второй лог в Distributed Cache, а при инициализации Mapper’a просто считать его в память, положив его в словарь -> topic.

Далее задача решается следующим образом:

Map:

# находим тематику каждой из страниц первого лога</em>
input_line -> [ip,  topic]

Reduce:

Ip -> [topics] -> [ip, most_popular_topic]

Reduce получает на вход ip и список всех тематик, просто вычисляет, какая из тематик встретилась чаще всего. Таким образом задача решена при помощи 1-го MapReduce, а собственно Join вообще происходит внутри map (поэтому если бы не нужна была дополнительная агрегация по ключу – можно было бы обойтись MapOnly job-ом):

Схема работы MapJoin

Резюме


В статье мы рассмотрели несколько паттернов и приемов решения задач при помощи MapReduce, показали, как объединять MapReduce-задачи в цепочки и join-ить логи по ключу.

В следующих статьях мы более подробно рассмотрим архитектуру Hadoop, а также инструменты, упрощающие работу с MapReduce и позволяющие обойти его недостатки.

Ссылки на другие статьи цикла


» Часть 1: Принципы работы с большими данными, парадигма MapReduce
» Часть 2: Hadoop

Комментарии (4)


  1. dec0
    09.11.2015 13:55

    Спасибо.

    Более того, hadoop не гарантирует того, что комбинирующая функция вообще будет выполнена для выхода mapper’a. Поэтому комбинирующая функция не всегда применима, например, в случае поиска медианного значения по ключу.

    Вот этот момент можно было бы поподробнее расписать.


    1. asash
      09.11.2015 14:09

      Для того чтобы найти медиану необходимо отсортировать все значения в порядке возрастания и выбрать элемент находящийся посередине. Для того чтобы ключи отсортировать — надо знать все значения, относящиеся к ключу. Поэтому это сделать можно только в редьюсере. Комбайнер агрегирует только часть значений, поэтому не применим в случае вычисления медианы.


  1. eaa
    09.11.2015 14:43

    Интересует вопрос профилирования MapReduce-задач — какими средствами это делается.


    1. asash
      09.11.2015 18:53

      Профилировать можно по-разному. Самый простой способ — локальная отладка и профилирование mapper'a и reducer'a на локальных данных.