imageSpark – проект Apache, предназначенный для кластерных вычислений, представляет собой быструю и универсальную среду для обработки данных, в том числе и для машинного обучения. Spark также имеет API и для R(пакет SparkR), который входит в сам дистрибутив Spark. Но, помимо работы с данным API, имеется еще два альтернативных способа работы со Spark в R. Итого, мы имеем три различных способа взаимодействия с кластером Spark. В данном посте приводиться обзор основных возможностей каждого из способов, а также, используя один из вариантов, построим простейшую модель машинного обучения на небольшом объеме текстовых файлов (3,5 ГБ, 14 млн. строк) на кластере Spark развернутого в Azure HDInsight.

Обзор средств взаимодействия со Spark


Помимо официального пакета SparkR, возможности в машинном обучении которого слабы (в версии 1.6.2 всего одна модель, в версии 2.0.0 их четыре), имеется еще два варианта доступа к Spark.

Первый вариант — это использование продукта от Microsoft — Microsoft R Server for Hadoop, в который недавно была интегрирована поддержка Spark. Используя данный продукт, можно производить вычисления по одним и тем же функциям R, в контексте локальных вычислений, Hadoop (map-reduce) или Spark. Помимо локальной установки R и доступа к кластеру Spark, облачная служба Microsoft Azure HDInsight позволяет развернуть готовые кластеры, и, кроме обычного кластера Spark, имеется возможность развернуть кластер R Server on Spark. Данный сервис представляет собой кластер Spark с предустановленным R server for Hadoop на дополнительном, пограничном узле, что позволяет сразу производить вычисления, как локально на данном сервере, так и переключаться на контекст Spark или Hadoop. Использование данного продукта достаточно хорошо описано в официальной документации к HDInsight на сайте Microsoft.

Второй вариант -это использование нового пакета sparklyr, который пока находится в стадии разработки. Этот продукт разрабатывается под эгидой RStudio — компании, под крылом которой выпущены одни из самых полезных и необходимых пакетов –knitr, ggplot2, tidyr, lubridate, dplyr и другие, поэтому этот пакет может стать еще одним лидером. Пока данный пакет слабо документирован, так как еще официально не выпущен.

На основе документации и экспериментов с каждым из этих способов работы со Spark, подготовил следующую таблицу (Табл. 1) с обобщенными функциональными возможностями каждого из способов (также добавил SparkR 2.0.0, в котором возможностей стало чуть больше).

image
Таблица 1. Обзор возможностей разных способов взаимодействия с Spark

Как видно из таблицы, нет ни одного средства в полной мере реализующие необходимые потребности «из коробки», но пакет sparklyr выгодно отличается от SparkR и R Server. Основные его достоинства – чтение csv, json, parquet файлов из hdfs. Полностью совместимый с dplyr синтаксис манипулирования данными – включающий в себя операции фильтрации, выбора колонок, агрегирующие функции, возможности выполнять слияние данных, модификацию имен колонок и многое другое. В отличии от SparkR или R server for Hadoop, где некоторые из этих задач либо не выполняются, либо выполняются очень неудобно (в R server for Hadoop слияния данных для объектов нет вовсе, оно поддерживается только для встроенного типа данных xdf). Еще одним достоинством пакета является возможность написания функций для запуска методов Java непосредственно из R кода.

Пример

  count_lines <- function(sc, file) {
  spark_context(sc) %>% 
    invoke("textFile", file, 1L) %>% 
    invoke("count")
}
count_lines(sc, "/text.csv")

Благодаря этому, можно реализовать отсутствующую функциональность пакета, используя существующие методы java в Spark или реализовав их самостоятельно.

И, разумеется, количество моделей машинного обучения значительно больше, чем у SparkR (даже в версии 2.0) и R server for Hadoop. Поэтому остановим свой выбор на данном пакете, как наиболее перспективном и удобном в использовании. Кластер Spark был развернут, при использовании Azure HDInsight облачной службы предлагающей развертывание 5 типов кластеров (HBase, Storm, Hadoop, Spark, R Server on Spark), в разных конфигурациях при минимальных усилиях.

Используемые ресурсы


  • Кластер HDInsight Apache Spark 1.6 на Linux (развертывание кластера подробно описано в документации Microsoft Azure)
  • R 3.3.2 инсталлированный на головной узел
  • RStudio preview редакции (доп. возможности для sparklyr), инсталлированный также на головной узел
  • Putty клиент, для установления сессии с головным узлом кластера и туннелирования порта RStudio на порт локального хоста (настройка RStudio и его туннелирования описана в документации Microsoft Azure)

Настройка среды


Вначале разворачиваем кластер Spark — я выбрал конфигурацию с 2 головными узлами D12v2 и 4 рабочими узлами D12v2. (D12v2: 4 ядра/28 ГБ ОЗУ, 200 ГБ диск, данная конфигурация не совсем оптимальна, но для демонстрации синтаксиса sparklyr подходит). Описание разворачивания разных типов кластером и работы с ними описано в документации на HDInsight. После успешного разворачивания кластера, используя подключение по SSH к рабочему узлу, устанавливаем туда R и RStudio, с необходимыми зависимостями. RStudio желательно использовать preview редакции, так как в ней появились дополнительные возможности для пакета sparklyr – дополнительное окно, в котором отображаются исходные датафреймы в Spark, и возможность просмотреть их свойства или их самих. После установки R, R Studio, переустанавливаем соединение, используя туннелирование на localhost:8787.

Итак, теперь в браузере по адресу localhost:8787 мы подключаемся к RStudio и продолжаем работать.

Подготовка данных


Весь код данной задачи приведен в конце данного поста.

Для данной тестовой задачи, будем использовать csv файлы NYC Taxi датасета, расположенные по адресу NYC Taxi Trips. Данные представляют собой информации о поездках на такси и их оплате. Для целей ознакомления, ограничимся одним месяцем. Построение модели на том же полном наборе данных, но используя R Server for Hadoop (в контексте Hadoop), описано в следующей статье: Exploring NYC Taxi Data with Microsoft R Server and HDInsight. Но там чтение файлов, вся предобработка — фильтрация данных, слияние таблиц было выполнено в Hive, и в R Server лишь строили модель, здесь же все, сделано на обычном R используя sparklyr.

Переместив оба файла в hdfs кластера Spark, и используя функцию sparklyr, читаем данные файлы.

Манипуляция данными


Файлы по поездкам и тарифам связаны по ключу — столбцам "medallion", "hack_licence" и "pickup_datetime", поэтому выполним присоединение слева к датафрейму data, датафрейма fare. После объединения данных и манипуляций, сохраняем датафрейм в формате parquet. Прежде чем строить модель, посмотрим на данные, для этого создадим выборку из 2000 случайных наблюдений и передадим их в R, используя collect. На данной малой выборке, построили стандартную диаграмму ggplot2 (зависимость чаевых от платы за проезд, с указанием размера точки — расстоянием маршрута и цветом точки количеством пассажиров, и разбитой на панель-сетку по типам оплаты и оператору такси) (рис. 1).

image
Рисунок 1 Диаграмма изображающая основные зависимости

На ней видно, что присутствует зависимость (линейная, как «стандарт» % от счета) размера чаевых от стоимости проезда, большая часть платежей осуществлена с использованием кредитной карты (панель CRD) и наличных (панель CSH), и что при оплате наличными чаевые всегда отсутствуют (вероятно, это объясняется тем, что при оплате наличными чаевые уже входят в стоимость оплаты, а при оплате картой нет). Поэтому в выборке для обучения оставляем только те поездки, которые оплачивались кредитной картой. Объединенный датафрейм, используя удобный синтаксис dplyr, и пайпинг magrittr, передаем дальше по цепочке: последующий отбор строк (исключая выбросы и нелогичные значения) и колонок (оставляя только необходимые для построения модели), передаем финальный датасет в функцию линейной регрессии. Для тренировки модели используем 70% всех данных, для теста оставшиеся 30%. Для данной задачи используем простую линейную регрессию. Зависимость, которую мы хотим обнаружить, это размер чаевых от параметров поездки. Данная модель на этих данных достаточно вырождена и не вполне корректна (имеется большое число чаевых равных 0), но она проста, покажет интерпретируемые коэффициенты модели и позволит продемонстрировать основные возможности sparklyr. В модели будем использовать следующие предикторы: vendor_id – идентификатор оператора такси, passenger_count – число пассажиров, trip_time_in_secs – время поездки, trip_distance — расстояние поездки, payment_type – тип платежа, fare_amount – цена поездки, surcharge – сбор. В результате обучения, модель имеет следующий вид:

Call: ml_linear_regression(., response = "tip_amount", features = c("vendor_id", "passenger_count", "trip_time_in_secs", "trip_distance", "fare_amount", "surcharge"))

Deviance Residuals: (approximate):
      Min        1Q    Median        3Q       Max 
-27.55253  -0.33134   0.09786   0.34497  31.35546 

Coefficients:
                     Estimate  Std. Error  t value Pr(>|t|)    
(Intercept)        3.2743e-01  1.4119e-03 231.9043  < 2e-16 ***
vendor_id_VTS     -1.0557e-01  1.1408e-03 -92.5423  < 2e-16 ***
passenger_count   -1.0542e-03  4.1838e-04  -2.5197  0.01175 *  
trip_time_in_secs  1.3197e-04  2.0299e-06  65.0140  < 2e-16 ***
trip_distance      1.0787e-01  4.7152e-04 228.7767  < 2e-16 ***
fare_amount        1.3266e-01  1.9204e-04 690.7842  < 2e-16 ***
surcharge          1.4067e-01  1.4705e-03  95.6605  < 2e-16 ***
---
Signif. codes:  0 ‘***’ 0.001 ‘**’ 0.01 ‘*’ 0.05 ‘.’ 0.1 ‘ ’ 1

R-Squared: 0.6456
Root Mean Squared Error: 1.249

Используя данную модель, предсказываем значения на тестовой выборке.

Выводы


В данной статье приведены основные функциональные возможности трех способов взаимодействия с Spark в R и приведен пример, реализующий чтение файлов, их предобработку, манипуляции с ними и построение простейшей модели машинного обучения, используя пакет sparklyr.

Исходный код

devtools::install_github("rstudio/sparklyr")
library(sparklyr)
library(dplyr)
spark_disconnect_all()
sc <- spark_connect(master = "yarn-client")
data_tbl<-spark_read_csv(sc, "data", "taxi/data")
fare_tbl<-spark_read_csv(sc, "fare", "taxi/fare")
fare_tbl <- rename(fare_tbl, 
                   medallionF = medallion, 
                   hack_licenseF = hack_license, 
                   pickup_datetimeF=pickup_datetime)

taxi.join<-data_tbl %>% left_join(fare_tbl, by = c("medallion"="medallionF", 
                                                   "hack_license"="hack_licenseF", 
                                                   "pickup_datetime"="pickup_datetimeF", 
                                                   ))
taxi.filtered <- taxi.join %>%
  filter(passenger_count > 0 , passenger_count < 8 ,
           trip_distance > 0 , trip_distance <= 100 ,
           trip_time_in_secs > 10 , trip_time_in_secs <= 7200 ,
           tip_amount >= 0 , tip_amount <= 40 ,
           fare_amount > 0 , fare_amount <= 200, payment_type=="CRD" ) %>%
  select(vendor_id,passenger_count,trip_time_in_secs,trip_distance,
        fare_amount,surcharge,tip_amount)%>%
  sdf_partition(training = 0.7, test = 0.3, seed = 1234)

spark_write_parquet(taxi.filtered$training, "taxi/parquetTrain")
spark_write_parquet(taxi.filtered$test, "taxi/parquetTest")  
  
for_plot<-sample_n(taxi.filtered$training,1000)%>%collect()
ggplot(data=for_plot, aes(x=fare_amount, y=tip_amount, color=passenger_count, size=trip_distance))+
  geom_point()+facet_grid(vendor_id~payment_type)
       
model.lm <- taxi.filtered$training %>%
  ml_linear_regression(response = "tip_amount", features = c("vendor_id",
                                                               "passenger_count",
                                                               "trip_time_in_secs",
                                                               "trip_distance",
                                                               "fare_amount",
                                                               "surcharge"))
print(model.lm)
summary(model.lm)

predicted <- predict(model.lm, newdata = taxi.filtered$test)
actual <- (taxi.filtered$test %>%
  select(tip_amount) %>%
  collect())$tip_amount

data <- data.frame(predicted = predicted,actual    = actual)

Поделиться с друзьями
-->

Комментарии (12)


  1. nickolas_php
    25.08.2016 18:52

    А что Вы можете сказать по поводу удобства построения моделей на Spark c помощью языка Scala, Java или Python? Ведь, сюдя по документации, все перечисленные модели (Linear regression, logistic regression, Survival regression, Decision trees, Random forests, Gradient-Boosted Trees, Principal component analysis, Multilayer perceptron, Latent Dirichlet allocation, One-vs-Rest classifier) там реализованы.
    Ссылки на документацию:
    http://spark.apache.org/docs/latest/ml-classification-regression.html
    http://spark.apache.org/docs/latest/mllib-ensembles.html
    http://spark.apache.org/docs/latest/mllib-dimensionality-reduction.html


    1. atikhonov
      25.08.2016 19:20

      Модели МО в Spark реализованы на Scala, а на Java и Python, впрочем как и на R (во всех трех вариантах), написаны функции-обертки вызывающие эти функции, что касается удобства, то у каждого свои приоритеты.


      1. nickolas_php
        25.08.2016 19:51

        Как Вы думаете есть ли смысл ожидать, что большинство кода с этого пакета послужит основой для существенного расширения списка доступных моделей в SparkR? Ведь работы над SparkR ведутся давно и вроде как поддержка этого проекта такой компанией как RStudio являлось бы хорошим стимулом к развитию. Вместо этого мы видим отдельный продукт, который повторяет цели уже существующего Open source решения.


        1. atikhonov
          25.08.2016 20:05

          Сомневаюсь, что SparkR перейдет на dplyr синтаксис и прочее. Скорее всего так и останется, SparkR будет развиваться Apache по остаточному принципу (даже во второй редакции реализовали 3 доп. модели, когда уже в самом Spark их больше десятка), а энтузиасты из RStudio писать свои функции к Spark.


          1. nickolas_php
            25.08.2016 20:31

            Кстати, вроде как в SparkR 2.0.0 они реализовали «синтаксис манипулирования данными» аналогичный к dplyr о чём они пишут в документации: https://spark.apache.org/docs/latest/sparkr.html (...similar to R data frames, dplyr) И возможно уже SparkR и не является настолько неудобным? Если Я правильно понял, то как раз реализация интерфейса, аналогичного с dplyr, и наличия разработок в пакете sparklyr, как раз и позволят относительно легко и быстро реализовать необходимые модели МО и недостающего функционала в SparkR. Очень хотелось бы иметь решение в виде одной целестной экосистемы. Как Я понял мы наблюдаем борьбу двух стандартов: Экосистема вокруг RStudio и экосистема распределённых вычислений на базе Hadoop.


            1. atikhonov
              25.08.2016 21:27

              Этот синтаксис и был в 1.6.1, но он не настолько удобен, в sparklyr в одной конвейерной (ленивой операции) можно сразу, выбрать строки, отфильтровать колонки, изменить имена и тип данных колонок, сделать необходимые преобразования, трансформировать переменные и так далее, и все это в единой парадигме dplyr. В sparkR же каждая операция отдельная функция, которая выполняется своим способом. Какой-то сильно борьбы нет, да, в дистрибутиве Spark есть директория R, ну и что, мы просто игнорируем ее, и вместо sc<-sparkR.init(master = «yarn-client») будет sc < — spark_connect(master = «yarn-client»). Многие пакеты в R, и так делают общие вещи, каждый выбирает то, что удобнее.


  1. puffer
    25.08.2016 22:19

    1. Возможно ли для алгоритмов Spark'a автоматически подбирать гиперпараметры моделей, через caret например?
    2. Есть ли сохранение моделей в файл, что бы например в продакшене (где уже используется Scala) загрузить и использовать уже обученную модель?


    1. atikhonov
      25.08.2016 22:24

      1. в R реализациях нет, только написав свои циклы, или же вызывая непосредственно методы Java
      2. да, экспериментально


  1. jzha
    25.08.2016 22:59

    Спасибо за публикацию!

    У меня есть пара вопрос по данным.
    Уточните, пожалуйста, что означает величина vendor_id?
    Какое число строк в базе taxi.filtered?
    Полученные результаты подгонки модели, видимо, лишний раз подтверждают, что в данных с большим числом наблюдений за действиями (или мнениями) людей каждый «чих» является статистически значимым с точки зрения фишеровского подхода. Это я о p-value значениях slope-коэффицентов модели.


    1. atikhonov
      25.08.2016 23:11

      Vendor_id — это оператор такси, в желтом такси NY их два — Creative Mobile Technologies и VeriFone Inc.
      строк 14 млн.


  1. jzha
    25.08.2016 23:19

    Ясно, спасибо.
    Из сообщения в посте подумал, что 14 млн. строк — это все данные за 1 месяц.


    1. atikhonov
      25.08.2016 23:28

      На самом деле, да, 14 млн. строк это все данные за 1 месяц, но у меня были модели и на 100% выборки и с типом платежа, а здесь около 8 млн.