Перед публикацией своего цикла статей по MapReduce в Cache, мне показалось важным озвучить данную прошлогоднюю точку зрения из статьи Адама Дрейка «Command-line tools can be 235x faster than your Hadoop cluster». К сожалению оригинальная статья Тома Хайдена, на которую он ссылается стала уже недоступна на сайте Тома, но её, по-прежнему, можно найти в архивах. Для полноты картины предлагаю ознакомиться и с ней тоже.
Введение
Посещая в очередной раз свои любимые сайты, я нашел крутую статью Тома Хайдена об использовании Amazon Elastic Map Reduce (EMR) и mrjob для вычисления статистики отношения выигрыш/проигрыш в наборе данных со статистикой по шахматным матчам, которую он скачал с сайта millionbase archive, и c которой он начал играться используя EMR. Так как объем данных был всего 1.75GB, описывающий 2 миллиона шахматных партий, то я скептически отнесся к использованию Hadoop для данной задачи, хотя были и понятны его намерения просто поиграться и изучить плотнее, на реальном примере, утилиту mrjob и инфраструктуру EMR.
Изначально формулировка задачи (найти строку результата в файле и подсчитать расклад) больше подходила для поточной обработки посредством команд shell. Я попытался применить такой подход с похожим объемом данных на моем лаптопе, и получил результат примерно через 12 секунд (т.е. эффективная скорость обработки данных около 270МБ/сек), тогда как обработка в Hadoop заняла 26 минут (т.е. эффективная скорость обработки данных 1.14МБ/сек).
При рассказе об обработке такого объема данных на кластере из 7 машин c1.medium Том сказал, что это заняло 26 минут в кластере и «возможно это время лучше чем если бы я делал это последовательно на моей машине, но все-таки медленнее чем если бы обрабатывал это посредством хитрого, многопоточного приложения локально.»
Это абсолютно верно, хотя заметим, что даже локальная последовательная обработка может легко побить эти 26 заявленных минут. Да, Том делал этот проект просто чтобы развлечься и получить удовольствие, но часто многие другие используют инструменты Big Data (tm) для обработки и анализа такого, «не очень большого», объема данных, с которым получить результаты можно применяя инструменты и технике и попроще и побыстрее.
Один, наиболее недооцененный подход для обработки данных — это использование старых добрых инструментов и конструкций shell. Преимущества такого подхода огромны, т.к. создавая конвейер данных из shell команд, вы все шаги обработки запускаете в параллель с минимальными накладными расходами. Это, как если бы у вас был свой локальный кластер Storm. Можно даже попытаться перевести концепции Spouts, Bolts и Sinks в простые команды и конвейеры в shell. Вы можете легко конструировать конвейер для обработки данных из простых команд, и работать они будут быстрее большинства инструментов из арсенала Big Data (tm).
Также давайте поговорим об отличии поточного и пакетного подходов в обработке данных. Том упомянул в начале статьи, что если он загружал больше 10000 результатов игр локально, то его [Python] программа очень быстро вылетала по памяти. Это происходило из-за того, что все данные об играх загружались сначала в память для последующего анализа. Тем не менее, при более плотном исследовании задачи, можно создать конвейер по поточной обработке данных, который фактически не будет потреблять памяти.
Результирующий конвейер, который мы создали, был более чем 236 раза быстрее аналогичной Hadoop реализации и почти не съедал память.
Исследование данных
Первым шагом упражнения будет выкачивание этих файлов PGN. Т.к. я не представлял, как выглядит этот формат, то я заглянул в Wikipedia.
[Event "F/S Return Match"]
[Site "Belgrade, Serbia Yugoslavia|JUG"]
[Date "1992.11.04"]
[Round "29"]
[White "Fischer, Robert J."]
[Black "Spassky, Boris V."]
[Result "1/2-1/2"]
(moves from the game follow...)
Нас интересуют только результаты игр, с 3 возможными вариантами. 1-0 будет означать что выиграли белые, 0-1 – выиграли черные и 1/2-1/2 означает ничью в партии. Там так же есть результат - , который означает, что игра продолжается или была прервана, но мы игнорируем этот случай в нашем упражнении.
Получение набора данных
Первым делом постараемся скачать данные об играх. Это оказалось не так легко, но после короткого поиска в сети я нашел Git репозиторий rozim в котором было множество различных источников данных о шахматных играх за различные промежутки времени. Я использовал код из этого репозитория для компиляции набора данных объемом в 3.46ГБ, что почти вдвое больше объема данных, используемых Томом в его тесте. Следующим шагом поместим все эти данные в наш конвейер.
Построение конвейера по обработке данных
Если вы будете следить за повествованием и замерять каждый шаг, то не забудьте очищать кеш страниц вашей операционной системы для получения более правильных времянок при замерах обработки данных.
Использование команд из shell удобно для обработки данных, т.к. вы можете получить параллелизм исполнения команд «задарма». Проверьте, например, как исполнится данная команда в терминале.
sleep 3 | echo "Привет, мир"
Интуитивно может показаться, что здесь мы сначала задерживаемся на 3 секунды на первой команде и потом выдаем на терминал «Привет, мир». Но, на самом деле, обе команды запускаются на исполнение одновременно. Это один из основных факторов, почему мы можем получить такое ускорение на одной машине для простых команд, не нагружающих IO.
Перед тем как начать конструировать конвейер по обработки данных, давайте определим верхний поток производительности посредством простого копирования данных в /dev/null.
В этом случае операция занимает 13 секунд, и для массива 3.46ГБ это означает 272МБ/сек. Это будет нашей верхней границей скорости чтения с диска.
Теперь мы можем начать построение конвейера, и на первом шаге будем использовать cat для генерации потока данных:
cat *.pgn
Так как нас интересуют только строки с результатами, то мы можем сканировать файлы и выбирать только строки содержащие 'Result' посредством утилиты grep:
cat *.pgn | grep "Result"
[Непонятно, почему он сразу не запускал это как `grep Result *.pgn`?]
Это даст нам только строки содержащие Result. Теперь если хотите, то можете использовать команды sort и uniq для того, чтобы получить список уникальных элементов и для подсчета их количества.
cat *.pgn | grep "Result" | sort | uniq -c
Это очень простой подход для анализа, и он предоставляет нам результат за 70 секунд. Мы, конечно же, можем это ускорить, но и сейчас заметим, что с учетом линейного масштабирования это же самое заняло бы на кластере Hadoop примерно 52 минуты на обработку.
Чтобы ускориться, мы можем убрать шаги sort | uniq из конвейера и заместить их вызовом AWK, который является замечательным инструментом для обработки данных событий.
cat *.pgn | grep "Result" | awk '{ split($0, a, "-"); res = substr(a[1], length(a[1]), 1); if (res == 1) white++; if (res == 0) black++; if (res == 2) draw++;} END { print white+black+draw, white, black, draw }'
Здесь мы возьмем всю запись, разделим по знаку дефис, и возьмем символ непосредственно ему предшествующий слева, если там будет 0, то выиграли черные, 1 – белые, и 2 – ничья. Заметим, что здесь $0 это встроенная переменная, представляющая всю запись.
Это уменьшает время исполнения до примерно 65 секунд, и т.к. мы обрабатываем примерно в 2 раза больше данных, то это ускорение примерно в 47 раз.
Даже на текущем шаге мы имеем ускорение в 47 раз при локальном исполнении. Более того, размер используемой памяти почти нулевой, т.к. хранятся данные только текущей записи, и инкрементирование 3х переменных целого типа — это ничтожная с точки зрения потребления памяти операция. Тем не менее, htop показывает, что grep в данной последовательности является узким местом, и полностью использует процессорное время одного ядра.
Параллелизация узкого места
Проблема недоиспользования дополнительных ядер может быть решена при помощи замечательной команды xargs, которая может запустить grep параллельно. Т.к. xargs ожидает вход в определенном формате, то будем использовать find с аргументом -print0 , для того чтобы имя передаваемого в xargs файла заканчивалось нулем. Соответственно передадим -0, чтобы xargs на своей стороне ожидал завершенные нулем строки. Опция -n управляет количеством строк передаваемым в один вызов, и -P означает количество параллельных запускаемых команд. Важно заметить, что такой параллельный конвейер не гарантирует порядка доставки, но это не проблема, если вы рассчитывали на распределенную обработку изначально. Опция -F в grep означает, что мы ищем по простому совпадению строки, и не спрашиваем никаких замороченных регулярных выражений, что в теории может дать дополнительный выигрыш по скорости, что не было замечено в экспериментах
[это утверждение не совсем верно для нашего примера, gnu grep строит детерминированный конечный автомат, что в данном простом выражении будет эквивалентно простому совпадению строки. Единственное что выигрываем, это время компиляции регулярного выражения, чем в данном случае можно пренебречь].
find . -type f -name '*.pgn' -print0 | xargs -0 -n1 -P4 grep -F "Result" | gawk '{ split($0, a, "-"); res = substr(a[1], length(a[1]), 1); if (res == 1) white++; if (res == 0) black++; if (res == 2) draw++;} END { print NR, white, black, draw }'
В результате получаем время 38 секунд, что дало «привар» в 40% только за счет параллелизации запуска grep в нашем конвейере. Теперь мы примерно в 77 раз быстрее реализации на Hadoop.
Хотя мы и улучшили производительность «драматически» за счет параллелизации шага с grep, но мы можем вообще избавиться от него, заставляя сам awk искать нужные записи и работать только с теми, что содержат строку «Result».
find . -type f -name '*.pgn' -print0 | xargs -0 -n1 -P4 awk '/Result/ { split($0, a, "-"); res = substr(a[1], length(a[1]), 1); if (res == 1) white++; if (res == 0) black++; if (res == 2) draw++;} END { print white+black+draw, white, black, draw }'
Это может показаться правильным решением, но это выдает результаты о каждом файле в отдельности. Тогда как нам нужен общий, агрегированный результат. Корректная реализация агрегации концептуально очень похожа на MapReduce:
find . -type f -name '*.pgn' -print0 | xargs -0 -n4 -P4 awk '/Result/ { split($0, a, "-"); res = substr(a[1], length(a[1]), 1); if (res == 1) white++; if (res == 0) black++; if (res == 2) draw++ } END { print white+black+draw, white, black, draw }' | awk '{games += $1; white += $2; black += $3; draw += $4; } END { print games, white, black, draw }'
Добавляя второй вызов awk, в итоге мы получаем желаемую агрегатную информацию об играх.
Это еще более увеличивает финальную скорость, сокращая время запуска до 18 секунд, что 174 раз быстрее Hadoop реализации.
Тем не менее, мы можем это еще немного ускорить, применяя mawk, который полностью заменяет gawk, при сохранении тех же опций запуска, но предоставляя лучшую производительность.
find . -type f -name '*.pgn' -print0 | xargs -0 -n4 -P4 mawk '/Result/ { split($0, a, "-"); res = substr(a[1], length(a[1]), 1); if (res == 1) white++; if (res == 0) black++; if (res == 2) draw++ } END { print white+black+draw, white, black, draw }' | mawk '{games += $1; white += $2; black += $3; draw += $4; } END { print games, white, black, draw }'
Таким образом, конвейер с применением перенаправления вывода между процессами find | xargs mawk | mawk позволил выполнить обработку за 12 секунд, что составляет около 270 МБ/сек, и что более чем в 235 раз быстрее реализации на Hadoop.
Заключение
Надеюсь, нам удалось аргументировать несколько наших идей относительно порочной практики по использованию инструментов Hadoop для обработки не очень большого набора данных, вместо обработки их же на одном компьютере с применением обычных shell команд и инструментов. Если у вас есть реально гигантский массив данных или действительно требуются распределенные вычисления, то, конечно же, может потребоваться использование [тяжелых] инструментов Hadoop, но чаще мы видим ситуацию, когда Hadoop используют вместо традиционных реляционных баз данных, или там, где другие решения будут и быстрее и дешевле, и легче в поддержке.
Комментарии (48)
vird
25.09.2015 15:45+7По поводу
[Непонятно, почему он сразу не запускал это как `grep Result *.pgn`?]
Трюк заключается в том, что в некоторых ситуациях cat file | grep работает эпично быстрее на некоторых типах условий, чем просто grep.
По ходу grep таки сначала полностью загружает файл в память и только потом обрабатывает (не для всех условий выборки). Есть другая гипотеза, что grep загружает файл неоптимальными chunk'ами.
Разница в некоторых случаях достигала grep — 3 часа cat | grep — 30 сек. Воспроизвести не смогу за давностью (2 года назад был опыт)tsafin
25.09.2015 15:58+1Хмм, интересно.
С учетом того что загрузка больше одной строки в grep не имеет смысл т.к. все регулярные выражения все равно однострочные, то это поведение не имеет большого смысла. Но объяснило бы результаты. Надо исходники посмотреть…
pehat
25.09.2015 16:38+6Прежде чем сочинять легенды про животворящий cat, попробуйте просто запустить grep два раза подряд на одном и том же файле.
vird
25.09.2015 16:44+3Кэш файловой системы ускорил второй запуск где-то в 4 раза. И какой с этого вывод?
Strepetarh
25.09.2015 16:57+4С учётом категорической недостаточности информации, никакой. Или любой. Например, что за последние два года вы прикупили ssd на смену hdd. Просто тормоза grep'а по сравнению с cat'ом — это магия, а существенное ускорение засчёт кэша — факт.
vird
25.09.2015 17:58+9Воспроизвел
fallocate -l 1G test
cat test | grep -E [0-9]
# «прогреваем кэш»
time -p cat test | grep -E [0-9]
real 0.95
user 0.61
sys 0.57
time -p grep -E [0-9] test
real 3.32
user 2.63
sys 0.69tsafin
25.09.2015 19:28+3Интересно, я бы сравнил логи `strace -e trace=open,read` в обоих случаях, перед тем как делать догадки.
lybin
27.09.2015 05:46grep and awk and sed are for processing files. Obviously processing is going to involve more processing power than not processing the file and only outputting its contents as-is.
ValdikSS
28.09.2015 13:04Воу-воу, у меня первое отрабатывает секунду, а второе отрабатывало 157 секунд (!!), загружая CPU на 100%, при этом процесс был running и я его не мог убить даже SIGKILL от рута. Такое поведение наблюдается каждый запуск.
strace -p $(pidof grep)
просто замораживается после Process … attached, аналогично и gdb.vird
28.09.2015 14:16157 секунд, но отработало. У знакомого был пример, когда на macos вообще не отработало за 20 минут.
ValdikSS
28.09.2015 14:39Ага, это, похоже, был баг в ядре! У меня lseek на 64 КБ вперед делался 2 минуты. После обновления ядра, вроде, нормально.
ValdikSS
28.09.2015 15:28+2tsafin
28.09.2015 21:02Отличный багрепорт, спасибо, Влад!
т.е. если, допустим, использовать dd из /dev/zero для создания заполненного фала, а не sparse файла, то будет честнее и ближе к жизни. Так получилось, что в данном случае наткнулись на странный баг в ext4. И пока не знаем почему grep с файлами медленнее. Если это действительно так.ValdikSS
28.09.2015 21:44+1У меня grep с файлами быстрее.
dd if=/dev/zero of=test33 bs=1M count=1024
time grep '1' test33 — меньше 1 секунды (0.929 в среднем)
time cat test33 | grep '1' — больше 1 секунды (1.046 в среднем)
При этом, во время cat слышно очень частую работу оперативки (дроссели шуршат). Да и в valgrind callgrind видно, что при первом варианте происходит около 750 тысяч вызовов функций, а при втором — 5 миллиардов (!!)
Extrapolator
25.09.2015 19:37+1копирование данных в /dev/null [...] занимает 13 секунд, и для массива 3.46ГБ это означает 272МБ/сек
выполнить обработку за 12 секунд, что составляет 270 МБ/сек
Магия!tsafin
25.09.2015 19:47+2Хороший вопрос! Я не менял цифр из оригинальной статьи, и ровно этот смысл там и был написан.
P.S.
Какая хорошая аудитория в Хабре, в оригинальной статье никто за год не нашел такие расхождения
P.P.S.
Его сайт отвечает через раз, похоже, Хабра-эффект в действии.
el777
25.09.2015 20:05Хадуп для того и нужен, что он продолжает работать тогда, когда ресурсов 1-2 машин уже недостаточно.
Ествественно, ценой потери эффективности, вы получаете возможность обработать весь массив данных.grossws
25.09.2015 20:19А также ценой повышения latency обработки заданий, т. к. запуск MR job'а — дело долгое. Это боль при многостадийной или итеративной обработке одного массива данных. В общем, то место, в которое «выстрелил» spark.
el777
29.09.2015 13:50+1Само собой, все небесплатно.
Выбор между вариантами: вообще не решить задачу или решить ее за большое время.
Komzpa
25.09.2015 23:30+3А если поменять xargs на parallel, то можно ещё и прозрачно загрузить подсчётами все ядра системы, и получить ещё больший буст.
Ivanhoe
26.09.2015 11:21+1И даже ядра на удаленных машинах по SSH. parallel — мега-штука.
tsafin
26.09.2015 13:39+1Спасибо Komzpa за ссылку — не слышал. Посмотрел, и штука дейтсвительно интересная. Если надо удобно параллелить/распределять скрипты между узлами/ядрами. Есть где поиспользовать уже в ближайшее время
Но в данном случае `parallel` думаю не дал бы никакого выигрыша т.к. автор сам распараллеливал на все имеющиегося у него ядра через опцию -P 4 у xargs.
cubuanic
26.09.2015 17:01Если там было 4 ядра, то по сути xargs -P 4 то же самое что и parallel, так что буста не будет.
andy1618
26.09.2015 14:18+1Это примерно как сранивать сортировку пузырьком и квиксорт на массиве из двух-трёх элементов :)
Shablonarium
26.09.2015 16:29BigData — это не про размер, а про операбельность. Допустим, возьмем такой же набор данных и следующую задачку. Игровой шахматный сервер и каждому игроку показываеть его место в ранке по количеству побед. Там где для исследователя 12 секунд еще допустимо, для игрока — это отказ в обслуживании по таймауту. Не говоря уже о том, что на самом деле исследователи также подвержены эмоциональной деградации из-за любой операции дольше миллисекунд.
webkumo
26.09.2015 20:47Вы уверены, что правильно выразили свою мысль? Пока больше на бред какой-то смахивает. Тем более, что инструменты BigData слабо приспособлены для выполнения операций не «дольше миллисекунд»… Точнее они просто не про скорость работы.
tsafin
26.09.2015 21:55Не очень хороший пример с пересчетом рейтинга. Как часто у нас в данном примере надо пересчитывать его: при каждом запросе, каждую секунду, каждую минуту, каждый час?
Или только по завершении партии?
(Ответ: Рейтинги надо хранить подсчитанными и пересчитывать их только при изменении данных, которые меняются не чаще чем Н секунд/минут (как функция средней продолжительности партии и среднего количества параллельно активных партий — ср.длина/ср.параллелизм)
Я бы очень не рекомендовал пытаться пересчитывать рейтинг по каждому запросу, обеспечивая «реал-тайм рейтинг». Зря только электроны гонять)
Shablonarium
26.09.2015 16:41Статья слишком однобокая в том, что не сказано в чем таких задержек на хадупе. И тем более 7 машин — это не тот размер кластера, который был бы подходящим по идеологии. И тем более, не сказано что вместо 7-ми машин можно взять 7 тысяч и изменить этот параметр можно за секунду. А вот с ноутбуком такой фокус проделать нельзя.
tsafin
26.09.2015 21:46+1А будет ли это эффективно с финансовой точки зрения? Нет, я серьезно. Можно конечно запустить 1000 виртуализированных Sandy Bridge ядер, которые при сравнении лов-в-лоб, конечно, работают в 5 раз медленнее физического Haswell ядра, но навалившись, гуртом и большой компанией дающих выигрыш в 100 раз (при условии бесконечно параллелизуемого алгоритма). Но какой ценой будет это достигнуто? Сколько денег за электричество вы заплатите для обслуживания такой виртуализированной фермы? Будет ли такое решение предоставлять результат быстрее накладных расходов на планировщик и задержек на возврат результата?
Ведь по большому счету вся история про облака и кластеры началась с основной идей — не платить миллионы$ за большой сервер а поставить 100 commodity server-ов и получить решение эффективное с точки зрения стоимости Ватта. Если в вашем Hadoop решении, для того чтобы его сделать в 10 раз, надо заплатить, скажем, в 100 раз больше на инфраструктуру, то может ну его на фиг? Может и по старинке иногда? Задумываясь об алгоритмах и задержках, и не всегда полагаясь на волшебство BigData?
dolphin278
26.09.2015 18:54А если при реальном увеличении объема данных, которые нужно обабатывать взять Joyent Manta, то и конвейеры имеющиеся можно будет использовать.
tsafin
26.09.2015 21:35Спасибо, Борь dolphin278 за ссылку на Манта. Не знал. Сразу не понял каким образом это можно применить к нашему случаю, но там есть хорошие, и я бы даже сказал классические примеры, и стало понятно. Это довольно типичный сервис grid вычислений, с распределенным планировщиком задач, но обернутый в модный JSON.
Очень сильно напомнил мне NetBatch который в Intel-е используют для тестирования в своих серверных фермах (не думаю, что он известен за пределами Intel). Там не было модного JSON, но все остальное как и здесь.
P.S.
Одна проблема с такими грид средами — накладные расхода на формирование контекста, постановку в очередь, разворачивание контекста. запуск, сбор результата, и возврат данных — ООЧЕНЬ большие. Для таких элементарных заданий как описана здесь, когда сам прогон несколько минут, такие накладные расходы на планировщик и инфраструктуру чрезмерны.dolphin278
26.09.2015 21:57+1Так тут не в JSON'е дело — ты можешь выполнять операции на кластере используя привычные shell-утилиты и логику работы, т.е. получается тот же способ решения задач, который ты используешь на локальной машине с консольными утилитами может быть перенесен на работу в мантовском гриде.
Что, конечно, имеет смысл делать в том случае, когда данных становится реально много, т.е. исходный тезис статьи.tsafin
26.09.2015 22:10Да, согласен, это подходит если уже есть распараллеленый алгоритм на утилитах (что у нас было, когда добавляли стадию reduce дополнительным awk-ом) и при дальнейшей необходимости расширять счетные ресурсы вширь, то можно это запустить на Joyen Manta, ранее озвученном GNU parallel или каком другом классическом grid планировщике.
4p4
05.10.2015 23:41Возможно применение ag (silver search) ещё ускорило бы. Это скорее ack чем grep, но для данной задачи бы подошёл идеально.
boyer-moor + fmap() + pcre jit + pthreads.
webkumo
Хм… а у Адама на ноуте, я так понимаю, стоит SSD? Просто обычный HDD никак не может выдать 270 МБ/сек… Ну и применение BigData инструментов для таких маленьких данных… это да, просто смешно.
tsafin
И (отвечая на не заданный здесь вопрос) я полностью согласен с Девидом Кантором в его утверждении, что BigData начинаются там, где уже невозможно весь набор данных поместить в память сервера. Для текущих конфигураций 2х-сокетных серверов это где-то в районе 3ТБ twitter.com/thekanter/status/559034352474914816
Все что меньше по размеру — «не очень Big Data»
tsafin
Кстати, если кто не знает Дэвида Кантера — он лучший специалист вне интел по архитектуре процессоров x86. www.realworldtech.com
Даже, работая в Интеле, я предпочитал по его статьям изучать архитектуру Nehalem, даже и имея доступ к HAS/MAS.
gaelpa
На таком объеме данных, это может быть и HDD, просто на N-ный прогон ядро уже держало все файлы в RAM кэше.
easyman
Да тут копирование в /dev/null заняло 13 сек.
А обработка 12 :)
Так что, да, оперативки у него достаточно :)