replyr
— сокращение от REmote PLYing of big data for R (удаленная обработка больших данных в R). Почему стоит попробовать
replyr
? Потому что он позволяет применять стандартные рабочие подходы к удаленным данным (базы данных или Spark).Можно работать так же, как и с локальным
data.frame
. replyr
предоставляет такие возможности:- Обобщение данных:
replyr_summary()
. - Объединение таблиц:
replyr_union_all()
. - Связывание таблиц по строкам:
replyr_bind_rows()
. - Использование функций разделения, объединения, комбинирования (
dplyr::do()
):replyr_split()
,replyr::gapply()
. - Аггрегирование/распределение:
replyr_moveValuesToRows()
/replyr_moveValuesToColumns()
. - Отслеживание промежуточных результатов.
- Контроллер объединений.
Скорее всего, вы всё это делаете с данными локально, поэтому такие возможности сделают работу со
Spark
и sparklyr
гораздо легче.replyr
— продукт коллективного опыта использования R в прикладных решениях для многих клиентов, сбора обратной связи и исправления недостатков.Примеры ниже.
Все сейчас быстро меняется, поэтому воспользуемся для примеров версиями пакетов в разработке.
base::date()
## [1] "Thu Jul 6 15:56:28 2017"
# devtools::install_github('rstudio/sparklyr')
# devtools::install_github('tidyverse/dplyr')
# devtools::install_github('tidyverse/dbplyr')
# install.packages("replyr")
suppressPackageStartupMessages(library("dplyr"))
packageVersion("dplyr")
## [1] '0.7.1.9000'
packageVersion("dbplyr")
## [1] '1.1.0.9000'
library("tidyr")
packageVersion("tidyr")
## [1] '0.6.3'
library("replyr")
packageVersion("replyr")
## [1] '0.4.2'
suppressPackageStartupMessages(library("sparklyr"))
packageVersion("sparklyr")
## [1] '0.5.6.9012'
# больше памяти, чем предполагается в https://github.com/rstudio/sparklyr/issues/783
config <- spark_config()
config[["sparklyr.shell.driver-memory"]] <- "8G"
sc <- sparklyr::spark_connect(version='2.1.0',
hadoop_version = '2.7',
master = "local",
config = config)
Summary
Стандартные
summary()
и glance()
, которые нельзя выполнить на Spark
.mtcars_spark <- copy_to(sc, mtcars)
# резюме обработки, а не данных
summary(mtcars_spark)
## Length Class Mode
## src 1 src_spark list
## ops 2 op_base_remote list
packageVersion("broom")
## [1] '0.4.2'
broom::glance(mtcars_spark)
## Error: glance doesn't know how to deal with data of class tbl_sparktbl_sqltbl_lazytbl
replyr_summary
работает.replyr_summary(mtcars_spark) %>%
select(-lexmin, -lexmax, -nunique, -index)
## column class nrows nna min max mean sd
## 1 mpg numeric 32 0 10.400 33.900 20.090625 6.0269481
## 2 cyl numeric 32 0 4.000 8.000 6.187500 1.7859216
## 3 disp numeric 32 0 71.100 472.000 230.721875 123.9386938
## 4 hp numeric 32 0 52.000 335.000 146.687500 68.5628685
## 5 drat numeric 32 0 2.760 4.930 3.596563 0.5346787
## 6 wt numeric 32 0 1.513 5.424 3.217250 0.9784574
## 7 qsec numeric 32 0 14.500 22.900 17.848750 1.7869432
## 8 vs numeric 32 0 0.000 1.000 0.437500 0.5040161
## 9 am numeric 32 0 0.000 1.000 0.406250 0.4989909
## 10 gear numeric 32 0 3.000 5.000 3.687500 0.7378041
## 11 carb numeric 32 0 1.000 8.000 2.812500 1.6152000
Аггрегирование/распределение
tidyr
работает в основном с локальными данными.mtcars2 <- mtcars %>%
mutate(car = row.names(mtcars)) %>%
copy_to(sc, ., 'mtcars2')
# ошибки
mtcars2 %>%
tidyr::gather('fact', 'value')
## Error in UseMethod("gather_"): no applicable method for 'gather_' applied to an object of class "c('tbl_spark', 'tbl_sql', 'tbl_lazy', 'tbl')"
mtcars2 %>%
replyr_moveValuesToRows(nameForNewKeyColumn= 'fact',
nameForNewValueColumn= 'value',
columnsToTakeFrom= colnames(mtcars),
nameForNewClassColumn= 'class') %>%
arrange(car, fact)
## # Source: lazy query [?? x 4]
## # Database: spark_connection
## # Ordered by: car, fact
## car fact value class
##
## 1 AMC Javelin am 0.00 numeric
## 2 AMC Javelin carb 2.00 numeric
## 3 AMC Javelin cyl 8.00 numeric
## 4 AMC Javelin disp 304.00 numeric
## 5 AMC Javelin drat 3.15 numeric
## 6 AMC Javelin gear 3.00 numeric
## 7 AMC Javelin hp 150.00 numeric
## 8 AMC Javelin mpg 15.20 numeric
## 9 AMC Javelin qsec 17.30 numeric
## 10 AMC Javelin vs 0.00 numeric
## # ... with 342 more rows
Связывание по строкам
dplyr bind_rows
, union
и union_all
сейчас неприменимы в Spark
. replyr::replyr_union_all()
и replyr::replyr_bind_rows()
— работоспособная альтернатива.bind_rows()
db1 <- copy_to(sc,
data.frame(x=1:2, y=c('a','b'),
stringsAsFactors=FALSE),
name='db1')
db2 <- copy_to(sc,
data.frame(y=c('c','d'), x=3:4,
stringsAsFactors=FALSE),
name='db2')
# Ошибки из-за попытки осуществить операцию над обработчиком, а не данными
bind_rows(list(db1, db2))
## Error in bind_rows_(x, .id): Argument 1 must be a data frame or a named atomic vector, not a tbl_spark/tbl_sql/tbl_lazy/tbl
union_all
# игнорирует названия столбцов и приводит все данные к строкам
union_all(db1, db2)
## # Source: lazy query [?? x 2]
## # Database: spark_connection
## x y
##
## 1 1 a
## 2 2 b
## 3 3 c
## 4 4 d
union
# игнорирует названия столбцов и приводит все данные к строкам
# скорее всего, также потеряет дублирующиеся строки
union(db1, db2)
## # Source: lazy query [?? x 2]
## # Database: spark_connection
## x y
##
## 1 4 d
## 2 1 a
## 3 3 c
## 4 2 b
replyr_bind_rows
replyr::replyr_bind_rows
может связывать вместе несколько data.frame
-ов.replyr_bind_rows(list(db1, db2))
## # Source: table [?? x 2]
## # Database: spark_connection
## x y
##
## 1 1 a
## 2 2 b
## 3 3 c
## 4 4 d
dplyr::do
В нашем примере просто возьмем по нескольку строк из каждой группы аггрегированного набора данных. Обратите внимание: поскольку мы не задаем порядок в явном виде с помощью arrange, нельзя всегда ожидать совпадения результатов в разных источниках данных (БД или
Spark
).dplyr::do
на локальных данных
Из
help('do', package='dplyr')
:by_cyl <- group_by(mtcars, cyl)
do(by_cyl, head(., 2))
## # A tibble: 6 x 11
## # Groups: cyl [3]
## mpg cyl disp hp drat wt qsec vs am gear carb
##
## 1 22.8 4 108.0 93 3.85 2.320 18.61 1 1 4 1
## 2 24.4 4 146.7 62 3.69 3.190 20.00 1 0 4 2
## 3 21.0 6 160.0 110 3.90 2.620 16.46 0 1 4 4
## 4 21.0 6 160.0 110 3.90 2.875 17.02 0 1 4 4
## 5 18.7 8 360.0 175 3.15 3.440 17.02 0 0 3 2
## 6 14.3 8 360.0 245 3.21 3.570 15.84 0 0 3 4
dplyr::do
на Spark
by_cyl <- group_by(mtcars_spark, cyl)
do(by_cyl, head(., 2))
## # A tibble: 3 x 2
## cyl V2
##
## 1 6
## 2 4
## 3 8
Получаем не совсем то, что можно использовать.
replyr
разделение/объединение
mtcars_spark %>%
replyr_split('cyl',
partitionMethod = 'extract') %>%
lapply(function(di) head(di, 2)) %>%
replyr_bind_rows()
## # Source: table [?? x 11]
## # Database: spark_connection
## mpg cyl disp hp drat wt qsec vs am gear carb
##
## 1 21.0 6 160.0 110 3.90 2.620 16.46 0 1 4 4
## 2 21.0 6 160.0 110 3.90 2.875 17.02 0 1 4 4
## 3 22.8 4 108.0 93 3.85 2.320 18.61 1 1 4 1
## 4 24.4 4 146.7 62 3.69 3.190 20.00 1 0 4 2
## 5 18.7 8 360.0 175 3.15 3.440 17.02 0 0 3 2
## 6 14.3 8 360.0 245 3.21 3.570 15.84 0 0 3 4
replyr gapply
mtcars_spark %>%
gapply('cyl',
partitionMethod = 'extract',
function(di) head(di, 2))
## # Source: table [?? x 11]
## # Database: spark_connection
## mpg cyl disp hp drat wt qsec vs am gear carb
##
## 1 21.0 6 160.0 110 3.90 2.620 16.46 0 1 4 4
## 2 21.0 6 160.0 110 3.90 2.875 17.02 0 1 4 4
## 3 22.8 4 108.0 93 3.85 2.320 18.61 1 1 4 1
## 4 24.4 4 146.7 62 3.69 3.190 20.00 1 0 4 2
## 5 18.7 8 360.0 175 3.15 3.440 17.02 0 0 3 2
## 6 14.3 8 360.0 245 3.21 3.570 15.84 0 0 3 4
replyr::replyr_apply_f_mapped
Что хотелось бы получить: данные с именами, соответствующими коду (т.е. изменить данные, а не код).
По некоторому размышлению этого можно достигнуть, если связать изменение данных с окружением функции, а не с данными. То есть данные изменены до тех пор, пока соответствующая контролирующая функция выполняется. В нашем случае эта функция —
replyr::replyr_apply_f_mapped()
, и работает она следующим образом.Предположим, что операция, которую мы хотим использовать — функция понижения порядка, полученная из некоторого неподконтрольного нам источника (например, пакета). Это может быть простая функция (как ниже), но предположим, мы хотим использовать ее без изменений (исключая даже совсем небольшие, как введение
wrapr::let()
).# внешняя функция с заданными в явном виде названиями столбцов
DecreaseRankColumnByOne <- function(d) {
d$RankColumn <- d$RankColumn - 1
d
}
Чтобы применить эту функцию к
d
(в котором не такие, как ожидается, названия столбцов!), мы используем replyr::replyr_apply_f_mapped()
для создания нового параметризированного адаптера:# наши данные
d <- data.frame(Sepal_Length = c(5.8,5.7),
Sepal_Width = c(4.0,4.4),
Species = 'setosa',
rank = c(1,2))
# обработчик для ввода параметров
DecreaseRankColumnByOneNamed <- function(d, ColName) {
replyr::replyr_apply_f_mapped(d,
f = DecreaseRankColumnByOne,
nmap = c(RankColumn = ColName),
restrictMapIn = FALSE,
restrictMapOut = FALSE)
}
# использование
dF <- DecreaseRankColumnByOneNamed(d, 'rank')
print(dF)
## Sepal_Length Sepal_Width Species rank
## 1 5.8 4.0 setosa 0
## 2 5.7 4.4 setosa 1
replyr::replyr_apply_f_mapped()
переименовывает столбцы так, как ожидается в DecreaseRankColumnByOne
(соответствие задано в nmap
), применяет DecreaseRankColumnByOne
и возвращает имена к исходным перед тем, как вернуть результат.Отслеживание промежуточных результатов
Многие задачи в
Sparklyr
связаны с созданием промежуточных или временных таблиц. Это можно делать с помощью dplyr::copy_to()
и dplyr::compute()
. Эти способы могут быть ресурсоёмкими.В
replyr
есть функции, позволяющие держать процесс под контролем: генераторы временных имен, не изменяющие собственно данные (они также используются внутри самого пакета).Сама по себе функция довольно проста:
print(replyr::makeTempNameGenerator)
## function (prefix, suffix = NULL)
## {
## force(prefix)
## if ((length(prefix) != 1) || (!is.character(prefix))) {
## stop("repyr::makeTempNameGenerator prefix must be a string")
## }
## if (is.null(suffix)) {
## alphabet <- c(letters, toupper(letters), as.character(0:9))
## suffix <- paste(base::sample(alphabet, size = 20, replace = TRUE),
## collapse = "")
## }
## count <- 0
## nameList <- list()
## function(..., peek = FALSE, dumpList = FALSE, remove = NULL) {
## if (length(list(...)) > 0) {
## stop("replyr::makeTempNameGenerator tempname generate unexpected argument")
## }
## if (peek) {
## return(names(nameList))
## }
## if (dumpList) {
## v <- names(nameList)
## nameList <<- list()
## return(v)
## }
## if (!is.null(remove)) {
## victims <- intersect(remove, names(nameList))
## nameList[victims] <<- NULL
## return(victims)
## }
## nm <- paste(prefix, suffix, sprintf("%010d", count),
## sep = "_")
## nameList[[nm]] <<- 1
## count <<- count + 1
## nm
## }
## }
##
##
Например, чтобы объединить несколько таблиц, хорошее решение для некоторых источников данных — вызывать
compute
после каждого объединения (иначе полученный SQL может стать длинным и трудным для понимания и в поддержке). Код выглядит примерно так:# создание данных для примера
names <- paste('table', 1:5, sep='_')
tables <- lapply(names,
function(ni) {
di <- data.frame(key= 1:3)
di[[paste('val',ni,sep='_')]] <- runif(nrow(di))
copy_to(sc, di, ni)
})
# собственный генератор временных имён
tmpNamGen <- replyr::makeTempNameGenerator('JOINTMP')
# объединение слева таблиц в последовательности
joined <- tables[[1]]
for(i in seq(2,length(tables))) {
ti <- tables[[i]]
if(i<length(tables)) {
joined <- compute(left_join(joined, ti, by='key'),
name= tmpNamGen())
} else {
# использование постоянного имени
joined <- compute(left_join(joined, ti, by='key'),
name= 'joinres')
}
}
# удаление временных значений
temps <- tmpNamGen(dumpList = TRUE)
print(temps)
## [1] "JOINTMP_1dr7xHI9CkSZJwXfKA1B_0000000000"
## [2] "JOINTMP_1dr7xHI9CkSZJwXfKA1B_0000000001"
## [3] "JOINTMP_1dr7xHI9CkSZJwXfKA1B_0000000002"
for(ti in temps) {
db_drop_table(sc, ti)
}
# результаты
print(joined)
## # Source: table [?? x 6]
## # Database: spark_connection
## key val_table_1 val_table_2 val_table_3 val_table_4 val_table_5
##
## 1 1 0.8045418 0.5006293 0.8656174 0.5248073 0.8611796
## 2 2 0.1593121 0.5802938 0.9722113 0.4532369 0.7429018
## 3 3 0.4853835 0.5313043 0.6224256 0.1843134 0.1125551
Аккуратное введение и управление временными данными может сохранить ресурсы (и время, и место) и значительно улучшить результаты. Нам кажется хорошей практикой задавать в явном виде генератор временных имён, передавать его во все преобразования
Sparklyr
, а затем очищать временные значения все вместе, когда результаты от них больше не зависят.Заключение
Если вы хотите тщательно контролировать обработку данных в
Spark
или БД с помощью R
, стоит рассмотреть replyr
в дополнение к dplyr
и sparklyr
.sparklyr::spark_disconnect(sc)
rm(list=ls())
gc()
## used (Mb) gc trigger (Mb) max used (Mb)
## Ncells 821292 43.9 1442291 77.1 1168576 62.5
## Vcells 1364897 10.5 2552219 19.5 1694265 13.0
Поделиться с друзьями