В предыдущей части серии мы (в 100500й раз) попытались рассказать про основные приемы и стадии подхода Google MapReduce, должен признаться, что первая часть была намерено "капитанской", чтобы дать знать о MapReduce целевой аудитории последующих статей. Мы не успели показать ни строчки того, как всё это мы собираемся реализовывать в Cache ObjectScript. И про это наша рассказ сегодня (и в последующие дни).
Напомним первоначальный посыл нашего мини-проекта: вы всё еще планируем реализовать MapReduce алгоритм используя те подручные средства, что есть в Cache ObjectScript. При создании интерфейсов, мы попытаемся придерживаться того API, что мы описали в предыдущей статье про оригинальную реализацию Google MapReduce, любые девиации будут озвучены соответствующе.
Начнем с реализации абстрактных интерфейсов Mapper и Reducer.
Class MR.Base.Mapper
{
Method Map(MapInput As MR.Base.Iterator, MapOutput As MR.Base.Emitter) [ Abstract ] { }
}
Class MR.Base.Reducer
{
Method Reduce(ReduceInput As MR.Base.Iterator, ReduceOutput As MR.Base.Emitter) [ Abstract ] { }
}
Изначально, как и в канонической реализации, мы сделали 2 отдельных интерфейса MapInput и ReduceInput. Но сразу стало очевидным, что они служат одной и той же цели, и предоставляют одни и те же методы – их цель пройтись по потоку данных до конца, т.ч. они оба являются итераторами. Потому, в итоге, редуцируем их в общий интерфейс MR.Base.Iterator:
Class MR.Base.Iterator
{
Method GetNext() As %String [Abstract ] { }
Method IsAtEnd() As %Boolean [Abstract ] { }
}
Использование глобалов в качестве каналов связи
Оригинальная реализация Google MapReduce использовала файловую систему Google GFS как транспорт между узлами и стадиями алгоритма. В Cache есть свой механизм распространения (когерентных) данных между узлами (если не пользоваться голым TCP/UDP) – это протокол ECP (Enterprise Caсhe Protocol). Обычно он используется серверами приложений для получения данных от удаленных серверов баз данных. Но ничего не останавливает нас от построения на базе таких peer-to-peer соединений ECP некоей виртуальной управляющей шины, куда мы будем складывать данные в виде пар <ключ, значение> или похожие данные. Эти данные будут будут пересылаться между акторами, участвующими в конвейерах алгоритма (т.е. emit, посылаемый объектом Mapper, будет писаться в шину ECP и читаться объектом Reducer). Если акторы будут работать в рамках одного узла, то они, например могут использовать быстрые глобалы, отображенные в CACHETEMP, или обычные глобалы, если реализуемый алгоритм многостадийный и нужна надежность и журналирование.
В любом случае, будь то локальные (для конфигурации на одном узле) глобалы, или глобалы удаленного узла, подключенного через ECP, глобалы являются удобным и хорошо зарекомендовавшим себя транспортом для передачи данных между узлами кластера Cache, в данном случае, между вовлеченными в MapReduce функциями и классами.
Посему, естественным решением, позволяющим упростить нашу систему будет использование в среде Cache для передачи данных между узлами кластера протокола ECP вместо файловых систем GFS или HDFS. Функциональные характеристики ECP позволят сделать и другие упрощения (но об этом несколько позже).
Эмиттеры и черная магия
Как мы уже рассказывали в предыдущей серии, с момента когда данные уходят от объекта Mapper, и до момента как они поступают на вход Reducer, в классической реализации на мастере проходит тяжелая операция перемешения и сортировки.
В окружении, использующем глобалы к качестве транспорта, в MUMPS/Cache ObjectScript среде, мы можем полностью избежать дополнительных расходов на такую сортировку, т.к. агрегация и сортировка будут сделаны нижележащим btree* хранилищем.
Имея такие требования к дизайну, создадим базовый интерфейс эмиттера:
Class MR.Base.Emitter Extends MR.Base.Iterator
{
/// emit $listbuild(key,value(s))
Method Emit(EmitList... As %String) [Abstract ] { }
}
Эмиттер должен быть похож на интерфейс входного итератора, показанного выше (потому мы и пронаследовались от MR.Base.Iterator), но в дополнение к интерфейсу прохода по данным, эмиттер должен уметь еще и посылать данные в своё промежуточное хранилище (т.е. добавляем функцию Emit).
Первоначально, наша функция Emit была очень похожа на классическую реализацию и принимала только 2 аргумента как пару <ключ, значение>, но потом мы натолкнулись на (редкую) необходимость передавать что-то более многомерное, длиннее чем пара значений (например, кортеж любой арности), потому, в настоящий момент, Emit стал функцией принимающей переменное число аргументов.
Заметим, что в большинстве случаев, на практике, сюда будет поступать только пара аргументов <ключ, значение> как мы и видели в классической реализации.
Это всё ещё абстрактный интерфейс, больше мяса будет добавлено очень скоро.
Если бы нам, при обработке, надо было сохранять порядок поступивших элементов, то мы бы использовали реализацию ниже:
/// Emitter which maintains the order of (key,value(s))
Class MR.Emitter.Ordered Extends (%RegisteredObject, MR.Base.Emitter)
{
/// global name serving as data channel
Property GlobalName As %String;
Method %OnNew(initval As %String) As %Status
{
$$$ThrowOnError($length(initval)>0)
set ..GlobalName = initval
quit $$$OK
}
Parameter AUTOCLEANUP = 1;
Method %OnClose() As %Status
{
if ..#AUTOCLEANUP {
if $data(@i%GlobalName) {
kill @i%GlobalName
}
}
Quit $$$OK
}
...
}
Заметим на полях, что в Cache глобалы – в общем-то, глобальны :), и не будут очищены автоматически по завершении процессов их создавших. В отличие, например, от PPG (process-private globals). Но иногда все же хочется, чтобы наши промежуточные каналы, созданные для взаимодействия между стадиями конвейера MapReduce удалялись по завершении подпрограммы их создавшей. Поэтому и был добавлен режим "автоочистки" (параметр класса #AUTOCLEANUP) при котором глобал, имя которого хранится в свойстве GlobalName, будет удален при закрытии объекта (в момент вызова %OnClose).
Обратите внимание, что мы форсируем один обязательный параметр в метода %New (в %OnNew генерируем $$$ThrowOnError если имя в Initval не определено). Конструктор класса ожидает получить название глобала с которым он будет работать в качестве транспорта данных.
Class MR.Emitter.Ordered Extends MR.Base.Emitter
{
/// ...
Method IsAtEnd() As %Boolean
{
quit ($data(@i%GlobalName)\10)=0
}
/// emit $listbuild(key,value)
Method Emit(EmitList... As %String)
{
#dim list As %String = ""
for i=1:1:$get(EmitList) {
set $li(list,i) = $get(EmitList(i))
}
#dim name As %String = ..GlobalName
set @name@($seq(@name)) = list
}
/// returns emitted $lb(key,value)
Method GetNext() As %String
{
#dim value As %String
#dim index As %String = $order(@i%GlobalName@(""), 1, value)
if index '= "" {
kill @i%GlobalName@(index)
quit value
} else {
kill @i%GlobalName
quit ""
}
}
Method Dump()
{
zwrite @i%GlobalName
}
}
Надеемся, вы еще помните, что наш Emitter является наследником итератора Iterator? Посему ему нужно реализовать пару функций итератора – IsAtEnd и GetNext.
IsAtEnd – простой: если наш служебный глобал не содержит данных (т.е. $data(..GlobalName) не возвращает 10 или 11, что означает что там в поддереве есть еще узлы с данными), то мы достигли конца потока данных;
- Emit создает узел с данными в конце текущего списка. Оформляя пару (или кортеж, при арности больше 2х) как элемент $(listbuild(...)) [listbuild].
Как известно, и как хорошо написал Саша Коблов, $SEQUENCE может быть использована почти во всех местах, где использовался $INCREMENT, обеспечивая при этом лучшие скорости при работе в многопроцессорном или многосерверном режиме (через ECP). В силу меньшего количества коллизий при обращении к одном узлу глобала. Потому в коде выше мы используем $sequence для выделения индекса следующего элемента упорядоченного списка.
- На другой стороне алгоритма, в получателе GetNext() вытаскивает элементы из коллекции посредством простого $ORDER(@i%GlobalName("")). Элемент, с полученным индексом будет удален из списка после обработки.
Обращаем внимание, что данный вариант удаления элемента из списка/глобала не очень совместим с параллельным режимом, и нужно было бы добавить блокировки или сменить структуру данных. Но т.к. на ближайшие серии у нас будет только один Reducer, на всё множество Mapper ов, то мы отложим решение данной проблемы на будущее, когда приступим к много-серверной реализации.
Заметим, что структура данных, реализованная MR.Emitter.Ordered по сути реализуют классическую коллекцию FIFO ("FirstIn – FirstOut"). Мы помещаем новый элемент в конец списка и вытаскиваем из головы списка.
Специальный случай: эмиттер с автоагрегацией
Если вы посмотрите на те данные, что мы посылаем в между стадиями конвейера в примере word-count (ок, не сейчас, а когда мы вам покажем такую реализацию) то вы быстро осознаете, что:
На самом деле нам не интересен порядок, в котором мы "эмиттим" пары <ключ, значение>. Более того, нижележащее хранилище btree* всегда держит список ключей отсортированным для быстрого поиска, избавляя нас от необходимости сортировки на мастере, как произошло бы в классической реализации;
- И в наших случаях, когда мы пишем пару <key,1> на стороне Mapper, мы предполагаем в Reducer их простую агрегацию в сумму единиц. Т.е. в случае Cache ObjectScript мы полагались бы на использование $INCREMENT.
Так зачем посылать такой большой трафик ненужных данных, если мы можем их агрегировать еще в момент посылки?
Именно так и работает MR.Emitter.Sorted, который является наследником MR.Emitter.Ordered (показанного выше):
/// Emitter which sorts by keys all emitted pairs or tuples (key, value(s))
Class MR.Emitter.Sorted Extends MR.Emitter.Ordered
{
Property AutoIncrement As %Boolean [ InitialExpression = 1 ];
/// emit $listbuild(key,value)
Method Emit(EmitList... As %String)
{
#dim name As %String = ..GlobalName
#dim key As %String
#dim value As %String
if $get(EmitList)=1 {
// special case - only key name given, no value
set key = $get(EmitList(1))
quit:key=""
if ..AutoIncrement {
#dim dummyX As %Integer = $increment(@name@(key)) ; $seq is non-deterministic
} else {
set @name@(key) = 1
}
} else {
set value = $get(EmitList(EmitList))
set EmitList = EmitList - 1
for i=1:1:$get(EmitList) {
#dim index As %String = $get(EmitList(i))
quit:index=""
set name = $name(@name@(index))
}
if ..AutoIncrement {
#dim dummyY As %Integer = $increment(@name,value)
} else {
set @name = value
}
}
}
/// ...
}
Для самого простого случая, выдачи пары <key,1> или, когда значение опущено, и имеет один ключ <key> мы реализовали локальную оптимизацию, когда в режиме автоинкремента (AutoIncrement = 1) мы при вызове сразу инкрементируем соответствующий счетчик для ключа. Если же не включен автоинкремент, то мы просто (пере)определяем узел ключа в 1, фиксируя факт передачи ключа.
Для более общего случая, с двумя элементами, пары ключ-значение <key,value> или даже с большим количеством элементов <key,key2,key3,…keyn,value> (кортеж любой арности) у нас опять же реализовано 2 режима работы:
при автоинкременте мы сразу суммируем значение соответствующего узла, адресуемого ключом(ключами) с переданным значением;
- и без автоинкремента – мы присваиваем соответствующему узлу, адресуемому данным списком ключей, переданное значение value.
Обращаем внимание, что кортеж мы передаем посредством массива, аккумулирующего переменное количество аргументов. Все элементы этого массива кроме последнего, пойдут как адреса подындексов. Последний элемент кортежа будет считаться значением.
Такое необычное расширение пары «ключ-значение» в кортежи любой мощности, по нашим сведениям, является нетипичным или может быть уникальным. Нам не надо работать со строгим key-value хранилищем или bigtable хранилищем, и мы с легкостью можем работать с многомерными ключами в передаваемых элементах ("потому что можем"), что может сильно облегчить некоторые реализации алгоритмов, требующих дополнительной размерности данных, что сильно улучшает читабельность кода и упрощает понимание. В теории...
Заметим, что мы не переопределили IsAtEnd и он пронаследовал реализацию из MR.Emitter.Ordered, таким образом он по-прежнему будет возвращать ненулевое значение по окончании данных в подузлах промежуточного хранилища.
Но GetNext нам надо переопределить, т.к. мы больше не пытаемся запомнить порядок посланных данных и формат его внутреннего хранилища поменялся:
Class MR.Emitter.Sorted Extends MR.Emitter.Ordered
{
/// ...
/// returns emitted $lb(key,value)
Method GetNext() As %String
{
#dim name As %String = ..GlobalName
#dim value As %String
#dim ref As %String = $query(@name,1,value)
if ref'="" {
zkill @ref
#dim i As %Integer
#dim refLen As %Integer = $qlength(ref)
#dim baseLen As %Integer = $qlength(name)
#dim listbuild = ""
for i=baseLen+1:1:refLen {
set $li(listbuild,i-baseLen)=$qs(ref,i)
}
set $li(listbuild,*+1)=value
quit listbuild
}
quit ""
}
}
На выходе из GetNext() мы ожидаем $LISTBUILD<> список, но внутри хранилища данные пар/кортежей разбросаны по узлам иерархического хранилища. Функция $QUERY позволяет обойти узлы с данными (значениями пар/кортежей) в массиве для последующей их перепаковки в $LISTBUILD формат, индексы из массива последовательно добавляются следующим элементом списка (посредством присваивания элементу через функцию $LIST. Само же значение узла хранилища (значение в паре «ключ-значение» или последний элемент кортежа) будет добавлено в конец сформированного списка через ту же функцию $LIST(listbuild,*+1). В данном случае *+1 как раз и обозначат номер элемента списка, следующий за текущим концом.
На этом неожиданном месте мы прервем наш рассказ про MapReduce в Cache. Во второй части данного повествования мы показали базовые интерфейсы инфраструктуры, которые будут использованы в дальнейшем при реализации конкретных примеров. Уже в следующей серии мы соберем это всё воедино и реализуем классический пример WordCount, но уже на ObjectScript. Не уходите далеко!
Комментарии (11)
AlexeyMaslov
04.10.2016 18:35+1Между прочим, если не задумываться о переносе в сетевую среду, то очередь на $system.Event.Signal() / WaitMsg() работает в разы быстрее очереди на $increment/$sequence. Жаль, что методы класса $system.Event не работают в сети…
Wayfarer15
04.10.2016 21:47InterSystems не использует MapReduce в Cache для HL7 QRDA Cat I? Если использует, хотелось бы услышать про это подробнее.
tsafin
05.10.2016 01:01Я не являюсь большим специалистом по InterSystems HealthShare, но с большой долей вероятности могу предположить, что там еще не используется ни MapReduce в описываемой мной конфигурации (который является моим собственным pet project), ни будущий sharding. Т.к. они не могут полагаться на невыпущенный еще продукт. И средства масштабирования должны быть из текущего арсенала (зеркалирование, ECP, и все такое)
В статье Марка Болинского про развертывание на Azure https://community.intersystems.com/post/intersystems-example-reference-architecture-microsoft-azure-resource-manager-arm можно увидеть пример архитектуры как TrakCare так и HealthShare в облаке. Очень показательно.Wayfarer15
05.10.2016 01:57Тогда более общий вопрос, поскольку InterSystems широко (как мне кажется) представлен в health секторе в штатах — Cache вообще используется для eCQM и QRDA и если да, то как именно?
tsafin
06.10.2016 00:20+1(Очень странно мы от MapReduce вырулили на [неизвестные мне] eCQM и QRDA), ну раз уж зашел разговор, и раз никто из московских специалистов по InterSystems HealthShare или TrakCaare в общем, и по HL7 в частности не ответил, то попытаюсь вывести дискусиию на более освещенное место.
Я прошелся по страницам с описанием eCQM и QRDA и насколько понял это для обмена информации метрокой о качестве (чего? услуг?). Вроде как звучит релевантно тому, что делал бы модуль HealthShare Information Exchange, но легкий гуглинг ничего похожего не принес. С другой стороны, это может быть более релевантно другому продукту InterSystems — TrakCare но и там ничего похожего вроде бы не видно ( logist ?).
Потому вопрос, Wayfarer15, объясните мне несведущему что это такое и почему это должно быть релевантно MapReduce?
morisson
06.10.2016 00:49Думаю, что вопрос не связан с MapReduce, и не про InterSystems Cache, а скорее про InterSystems HealthShare — продукт с ярко-выраженной здравоохранимой направленностью в InterSystems.
А вы можете повторить свой вопрос в Developer Community? Там отвечают частенько непосредственно продакт-менеджеры и девелоперы продукта, т.е. можно ответ получить из самых первых рук.Wayfarer15
06.10.2016 01:09Очень вероятно (про InterSystems HealthShare). Я не совсем хорошо, точнее совсем не хорошо, ориентируюсь в продуктах InterSystems. Причина первоначального вопроса как раз в том, что InterSystems в мед ИТ системах штатов присутствует, а reference implementation для QRDA выполнен с использованием MapReduce как основной технологии. Прямо сейчас посмотрел статистику, 78% госпиталей провалили quality report за какой-то прошлый период, соответственно у них нет времени изобретать велосипед, они, скорее всего, воспользуются reference implementation перенеся это под их системы.
PS. Вполне вероятно, что всё это не имеет прямого отношения к InterSystems, но как я уже упомянул, мне это не известно.
AlexeyMaslov
Спасибо автору за спокойный неторопливый стиль изложения оригинального и интересного (особенно начиная со второй части) материала. Хотелось бы только видеть законченные примеры не в отрыве от «идеологии», а как-то поскорее, пока она не забылась.
tsafin
Примеры — пожалуйста!
Самые нетерпеливые могут посмотреть код в репозитории GitHub/cache-map-reduce и собственно примеры будут из пакетов
MR.Sample.WordCount
иMR.Sample.AgeAverage
.