Привет, Хабр! В предыдущих статьях мы описали парадигму MapReduce, а также показали как на практике реализовать и выполнить MapReduce-приложение на стеке Hadoop. Пришла пора описать различные приёмы, которые позволяют эффективно использовать MapReduce для решения практических задач, а также показать некоторые особенности Hadoop, которые позволяют упростить разработку или существенно ускорить выполнение MapReduce-задачи на кластере.
Как мы помним, MapReduce состоит из стадий Map, Shuffle и Reduce. Как правило, в практических задачах самой тяжёлой оказывается стадия Shuffle, так как на этой стадии происходит сортировка данных. На самом деле существует ряд задач, в которых можно обойтись только стадией Map. Вот примеры таких задач:
Такие задачи решаются при помощи Map-Only. При создании Map-Only задачи в Hadoop нужно указать нулевое количество reducer’ов:
Map Only Job
Пример конфигурации map-only задачи на hadoop:
Map Only jobs на самом деле могут быть очень полезными. Например, в платформе Facetz.DCA для выявления характеристик пользователей по их поведению используется именно один большой map-only, каждый маппер которого принимает на вход пользователя и на выход отдаёт его характеристики.
Как я уже писал, обычно самая тяжёлая стадия при выполнении Map-Reduce задачи – это стадия shuffle. Происходит это потому, что промежуточные результаты (выход mapper’a) записываются на диск, сортируются и передаются по сети. Однако существуют задачи, в которых такое поведение кажется не очень разумным. Например, в той же задаче подсчёта слов в документах можно предварительно предагрегировать результаты выходов нескольких mapper’ов на одном узле map-reduce задачи, и передавать на reducer уже просуммированные значения по каждой машине.
Combine. Взято по ссылке
В hadoop для этого можно определить комбинирующую функцию, которая будет обрабатывать выход части mapper-ов. Комбинирующая функция очень похожа на reduce – она принимает на вход выход части mapper’ов и выдаёт агрегированный результат для этих mapper’ов, поэтому очень часто reducer используют и как combiner. Важное отличие от reduce – на комбинирующую функцию попадают не все значения, соответствующие одному ключу.
Более того, hadoop не гарантирует того, что комбинирующая функция вообще будет выполнена для выхода mapper’a. Поэтому комбинирующая функция не всегда применима, например, в случае поиска медианного значения по ключу. Тем не менее, в тех задачах, где комбинирующая функция применима, её использование позволяет добиться существенного прироста к скорости выполнения MapReduce-задачи.
Использование Combiner’a на hadoop:
Бывают ситуации, когда для решения задачи одним MapReduce не обойтись. Например, рассмотрим немного видоизмененную задачу WordCount: имеется набор текстовых документов, необходимо посчитать, сколько слов встретилось от 1 до 1000 раз в наборе, сколько слов от 1001 до 2000, сколько от 2001 до 3000 и так далее.
Для решения нам потребуется 2 MapReduce job’а:
Решение на псевдокоде:
Для того, чтобы выполнить последовательность MapReduce-задач на hadoop, достаточно просто в качестве входных данных для второй задачи указать папку, которая была указана в качестве output для первой и запустить их по очереди.
На практике цепочки MapReduce-задач могут представлять собой достаточно сложные последовательности, в которых MapReduce-задачи могут быть подключены как последовательно, так и параллельно друг другу. Для упрощения управления такими планами выполнения задач существуют отдельные инструменты типа oozie и luigi, которым будет посвящена отдельная статья данного цикла.
Пример цепочки MapReduce-задач.
Важным механизмом в Hadoop является Distributed Cache. Distributed Cache позволяет добавлять файлы (например, текстовые файлы, архивы, jar-файлы) к окружению, в котором выполняется MapReduce-задача.
Можно добавлять файлы, хранящиеся на HDFS, локальные файлы (локальные для той машины, с которой выполняется запуск задачи). Я уже неявно показывал, как использовать Distributed Cache вместе с hadoop streaming: добавляя через опцию -file файлы mapper.py и reducer.py. На самом деле можно добавлять не только mapper.py и reducer.py, а вообще произвольные файлы, и потом пользоваться ими как будто они находятся в локальной папке.
Использование Distributed Cache:
Те, кто привык работать с реляционными базами, часто пользуются очень удобной операцией Join, позволяющей совместно обработать содержание некоторых таблиц, объединив их по некоторому ключу. При работе с большими данными такая задача тоже иногда возникает. Рассмотрим следующий пример:
Имеются логи двух web-серверов, каждый лог имеет следующий вид:
Необходимо посчитать для каждого IP-адреса на какой из 2-х серверов он чаще заходил. Результат должен быть представлен в виде:
К сожалению, в отличие от реляционных баз данных, в общем случае объединение двух логов по ключу (в данном случае – по IP-адресу) представляет собой достаточно тяжёлую операцию и решается при помощи 3-х MapReduce и паттерна Reduce Join:
Общая схема ReduceJoin
ReduceJoin работает следующим образом:
Важно, что в этот момент на редьюсер попадают записи из обоих логов и при этом по полю type можно идентифицировать, из какого из двух логов попало конкретное значение. Значит данных достаточно, чтобы решить исходную задачу. В нашем случае reducere просто должен посчитать для каждого ключа записей, с каким type встретилось больше и вывести этот type.
Паттерн ReduceJoin описывает общий случай объединения двух логов по ключу. Однако есть частный случай, при котором задачу можно существенно упростить и ускорить. Это случай, при котором один из логов имеет размер существенно меньшего размера, чем другой. Рассмотрим следующую задачу:
Имеются 2 лога. Первый лог содержит лог web-cервера (такой же как в предыдущей задаче), второй файл (размером в 100кб) содержит соответствие URL-> Тематика. Пример 2-го файла:
Для каждого IP-адреса необходимо рассчитать страницы какой категории с данного IP-адреса загружались чаще всего.
В этом случае нам тоже необходимо выполнить Join 2-х логов по URL. Однако в этом случае нам не обязательно запускать 3 MapReduce, так как второй лог полностью влезет в память. Для того, чтобы решить задачу при помощи 1-го MapReduce, мы можем загрузить второй лог в Distributed Cache, а при инициализации Mapper’a просто считать его в память, положив его в словарь -> topic.
Далее задача решается следующим образом:
Map:
Reduce:
Reduce получает на вход ip и список всех тематик, просто вычисляет, какая из тематик встретилась чаще всего. Таким образом задача решена при помощи 1-го MapReduce, а собственно Join вообще происходит внутри map (поэтому если бы не нужна была дополнительная агрегация по ключу – можно было бы обойтись MapOnly job-ом):
Схема работы MapJoin
В статье мы рассмотрели несколько паттернов и приемов решения задач при помощи MapReduce, показали, как объединять MapReduce-задачи в цепочки и join-ить логи по ключу.
В следующих статьях мы более подробно рассмотрим архитектуру Hadoop, а также инструменты, упрощающие работу с MapReduce и позволяющие обойти его недостатки.
» Часть 1: Принципы работы с большими данными, парадигма MapReduce
» Часть 2: Hadoop
Map only job
Как мы помним, MapReduce состоит из стадий Map, Shuffle и Reduce. Как правило, в практических задачах самой тяжёлой оказывается стадия Shuffle, так как на этой стадии происходит сортировка данных. На самом деле существует ряд задач, в которых можно обойтись только стадией Map. Вот примеры таких задач:
- Фильтрация данных (например, «Наи?ти все записи с IP-адреса 123.123.123.123» в логах web-сервера);
- Преобразование данных («Удалить колонку в csv-логах»);
- Загрузка и выгрузка данных из внешнего источника («Вставить все записи из лога в базу данных»).
Такие задачи решаются при помощи Map-Only. При создании Map-Only задачи в Hadoop нужно указать нулевое количество reducer’ов:
Map Only Job
Пример конфигурации map-only задачи на hadoop:
Native interface | Hadoop Streaming Interface |
Указать нулевое количество редьюсеров при конфигурации job’a:
Более развёрнутый пример по ссылке. |
Не указываем редьюсер и указываем нулевое количество редьюсеров Пример:
|
Map Only jobs на самом деле могут быть очень полезными. Например, в платформе Facetz.DCA для выявления характеристик пользователей по их поведению используется именно один большой map-only, каждый маппер которого принимает на вход пользователя и на выход отдаёт его характеристики.
Combine
Как я уже писал, обычно самая тяжёлая стадия при выполнении Map-Reduce задачи – это стадия shuffle. Происходит это потому, что промежуточные результаты (выход mapper’a) записываются на диск, сортируются и передаются по сети. Однако существуют задачи, в которых такое поведение кажется не очень разумным. Например, в той же задаче подсчёта слов в документах можно предварительно предагрегировать результаты выходов нескольких mapper’ов на одном узле map-reduce задачи, и передавать на reducer уже просуммированные значения по каждой машине.
Combine. Взято по ссылке
В hadoop для этого можно определить комбинирующую функцию, которая будет обрабатывать выход части mapper-ов. Комбинирующая функция очень похожа на reduce – она принимает на вход выход части mapper’ов и выдаёт агрегированный результат для этих mapper’ов, поэтому очень часто reducer используют и как combiner. Важное отличие от reduce – на комбинирующую функцию попадают не все значения, соответствующие одному ключу.
Более того, hadoop не гарантирует того, что комбинирующая функция вообще будет выполнена для выхода mapper’a. Поэтому комбинирующая функция не всегда применима, например, в случае поиска медианного значения по ключу. Тем не менее, в тех задачах, где комбинирующая функция применима, её использование позволяет добиться существенного прироста к скорости выполнения MapReduce-задачи.
Использование Combiner’a на hadoop:
Native Interface | Hadoop streaming |
При конфигурации job-a указать класс-Combiner. Как правило, он совпадает с Reducer:
|
В параметрах командной строки указать команду -combiner. Как правило, эта команда совпадает с командой reducer’a. Пример:
|
Цепочки MapReduce-задач
Бывают ситуации, когда для решения задачи одним MapReduce не обойтись. Например, рассмотрим немного видоизмененную задачу WordCount: имеется набор текстовых документов, необходимо посчитать, сколько слов встретилось от 1 до 1000 раз в наборе, сколько слов от 1001 до 2000, сколько от 2001 до 3000 и так далее.
Для решения нам потребуется 2 MapReduce job’а:
- Видоизменённый wordcount, который для каждого слова рассчитает, в какой из интервалов оно попало;
- MapReduce, подсчитывающий, сколько раз в выходе первого MapReduce встретился каждый из интервалов.
Решение на псевдокоде:
|
|
|
|
Для того, чтобы выполнить последовательность MapReduce-задач на hadoop, достаточно просто в качестве входных данных для второй задачи указать папку, которая была указана в качестве output для первой и запустить их по очереди.
На практике цепочки MapReduce-задач могут представлять собой достаточно сложные последовательности, в которых MapReduce-задачи могут быть подключены как последовательно, так и параллельно друг другу. Для упрощения управления такими планами выполнения задач существуют отдельные инструменты типа oozie и luigi, которым будет посвящена отдельная статья данного цикла.
Пример цепочки MapReduce-задач.
Distributed cache
Важным механизмом в Hadoop является Distributed Cache. Distributed Cache позволяет добавлять файлы (например, текстовые файлы, архивы, jar-файлы) к окружению, в котором выполняется MapReduce-задача.
Можно добавлять файлы, хранящиеся на HDFS, локальные файлы (локальные для той машины, с которой выполняется запуск задачи). Я уже неявно показывал, как использовать Distributed Cache вместе с hadoop streaming: добавляя через опцию -file файлы mapper.py и reducer.py. На самом деле можно добавлять не только mapper.py и reducer.py, а вообще произвольные файлы, и потом пользоваться ими как будто они находятся в локальной папке.
Использование Distributed Cache:
Native API |
|
Hadoop Streaming |
|
Reduce Join
Те, кто привык работать с реляционными базами, часто пользуются очень удобной операцией Join, позволяющей совместно обработать содержание некоторых таблиц, объединив их по некоторому ключу. При работе с большими данными такая задача тоже иногда возникает. Рассмотрим следующий пример:
Имеются логи двух web-серверов, каждый лог имеет следующий вид:
<timestamp>\t<ip>\t<url>
. Пример кусочка лога: 1446792139 178.78.82.1 /sphingosine/unhurrying.css
1446792139 126.31.163.222 /accentually.js
1446792139 154.164.149.83 /pyroacid/unkemptly.jpg
1446792139 202.27.13.181 /Chawia.js
1446792139 67.123.248.174 /morphographical/dismain.css
1446792139 226.74.123.135 /phanerite.php
1446792139 157.109.106.104 /bisonant.css
Необходимо посчитать для каждого IP-адреса на какой из 2-х серверов он чаще заходил. Результат должен быть представлен в виде:
<ip>\t<first or second>
. Пример части результата:178.78.82.1 first
126.31.163.222 second
154.164.149.83 second
226.74.123.135 first
К сожалению, в отличие от реляционных баз данных, в общем случае объединение двух логов по ключу (в данном случае – по IP-адресу) представляет собой достаточно тяжёлую операцию и решается при помощи 3-х MapReduce и паттерна Reduce Join:
Общая схема ReduceJoin
ReduceJoin работает следующим образом:
- На каждый из входных логов запускается отдельный MapReduce (Map only), преобразующий входные данные к следующему виду:
key -> (type, value)
Где key – это ключ, по которому нужно объединять таблицы, Type – тип таблицы (first или second в нашем случае), а Value – это любые дополнительные данные, привязанные к ключу.
- Выходы обоих MapReduce подаются на вход 3-му MapReduce, который, собственно, и выполняет объединение. Этот MapReduce содержит пустой Mapper, который просто копирует входные данные. Дальше shuffle раскладывает данные по ключам и подаёт на вход редьюсеру в виде:
key -> [(type, value)]
Важно, что в этот момент на редьюсер попадают записи из обоих логов и при этом по полю type можно идентифицировать, из какого из двух логов попало конкретное значение. Значит данных достаточно, чтобы решить исходную задачу. В нашем случае reducere просто должен посчитать для каждого ключа записей, с каким type встретилось больше и вывести этот type.
MapJoin
Паттерн ReduceJoin описывает общий случай объединения двух логов по ключу. Однако есть частный случай, при котором задачу можно существенно упростить и ускорить. Это случай, при котором один из логов имеет размер существенно меньшего размера, чем другой. Рассмотрим следующую задачу:
Имеются 2 лога. Первый лог содержит лог web-cервера (такой же как в предыдущей задаче), второй файл (размером в 100кб) содержит соответствие URL-> Тематика. Пример 2-го файла:
/toyota.php auto
/football/spartak.html sport
/cars auto
/finances/money business
Для каждого IP-адреса необходимо рассчитать страницы какой категории с данного IP-адреса загружались чаще всего.
В этом случае нам тоже необходимо выполнить Join 2-х логов по URL. Однако в этом случае нам не обязательно запускать 3 MapReduce, так как второй лог полностью влезет в память. Для того, чтобы решить задачу при помощи 1-го MapReduce, мы можем загрузить второй лог в Distributed Cache, а при инициализации Mapper’a просто считать его в память, положив его в словарь -> topic.
Далее задача решается следующим образом:
Map:
# находим тематику каждой из страниц первого лога</em>
input_line -> [ip, topic]
Reduce:
Ip -> [topics] -> [ip, most_popular_topic]
Reduce получает на вход ip и список всех тематик, просто вычисляет, какая из тематик встретилась чаще всего. Таким образом задача решена при помощи 1-го MapReduce, а собственно Join вообще происходит внутри map (поэтому если бы не нужна была дополнительная агрегация по ключу – можно было бы обойтись MapOnly job-ом):
Схема работы MapJoin
Резюме
В статье мы рассмотрели несколько паттернов и приемов решения задач при помощи MapReduce, показали, как объединять MapReduce-задачи в цепочки и join-ить логи по ключу.
В следующих статьях мы более подробно рассмотрим архитектуру Hadoop, а также инструменты, упрощающие работу с MapReduce и позволяющие обойти его недостатки.
Ссылки на другие статьи цикла
» Часть 1: Принципы работы с большими данными, парадигма MapReduce
» Часть 2: Hadoop
dec0
Спасибо.
Более того, hadoop не гарантирует того, что комбинирующая функция вообще будет выполнена для выхода mapper’a. Поэтому комбинирующая функция не всегда применима, например, в случае поиска медианного значения по ключу.
Вот этот момент можно было бы поподробнее расписать.
asash
Для того чтобы найти медиану необходимо отсортировать все значения в порядке возрастания и выбрать элемент находящийся посередине. Для того чтобы ключи отсортировать — надо знать все значения, относящиеся к ключу. Поэтому это сделать можно только в редьюсере. Комбайнер агрегирует только часть значений, поэтому не применим в случае вычисления медианы.