Давайте поговорим об использовании и преимуществах параллельных вычислений в R.

Причина, по которой стоит об этом задуматься: заставляя компьютер больше работать (выполнять много расчетов одновременно), мы меньше времени ждем результатов наших экспериментов и можем выполнить еще. Это особенно важно для анализа данных (R как платформа обычно используется именно для этой цели), поскольку часто требуется повторить вариации одного и того же подхода, чтобы что-то узнать, вывести значения параметров, оценить стабильность модели.

Обычно, для того, чтобы заставить компьютер больше работать, сначала нужно потрудиться самому аналитику, программисту или создателю библиотеки, чтобы организовать вычисления в виде, удобном для параллелизации. В лучшем случае кто-то уже сделал это за вас:
  • Хорошие параллельные библиотеки, например, многопоточные BLAS/LAPACK, включены в Revolution R Open (RRO, сейчас Microsoft R Open) (смотреть здесь).
  • Специализированные параллельные расширения, предоставляющие свои собственные высокопроизводительные реализации важных процедур, например, методы rx от RevoScaleR или методы h2o от h2o.ai.
  • Фреймворки абстрактной параллелизации, например, Thrust/Rth.
  • Использование прикладных библиотек R, связанных с параллелизацией (в частности, gbm, boot и vtreat). (Некоторые из этих библиотек не используют параллельные операции, пока не задано окружение для параллельного выполнения.)

В дополнение к задаче, подготовленной для параллелизации, нужно оборудование, которое будет ее поддерживать. Например:

  • Ваш собственный компьютер. Обычно даже ноутбуки имеют четыре и более ядер. Потенциальное преимущество в работе алгоритма в четыре раза быстрее — огромно.
  • Графические процессоры (GPU). Многие мащины обладают одной или несколькими мощными видеокартами. Для некоторых вычислительных задач эти процессоры в 10-100 раз быстрее, чем центральный процессор (CPU), обычно использующийся для вычислений (подробности).
  • Компьютерные кластеры (например, Amazon ec2, Hadoop-сервера и др.).

Очевидно, параллельные вычисления в R — обширная и узкоспециализированная тема. Может показаться, что невозможно быстро научиться этой магии — как сделать ваши вычисления более быстрыми.

В этой статье мы покажем, как можно ускорить ваши вычисления, воспользовавшись базовыми возможностями языка R.

Для начала, должна быть задача, поддающаяся параллелизации. Наиболее очевидные задачи такого рода содержат повторяющиеся действия (интуитивно понятный термин — «естественно параллельные»):

  • Подгонка параметров модели путем повторного применения моделей (как это делается в пакете caret).
  • Применение преобразования к большому количеству разных переменных (как это делается в пакете vtreat).
  • Оценка качества модели через кросс-валидацию, бутстрэп или другие техники выборки с повторениями.

Будем предполагать, что у нас уже есть задача с большим количеством простых повторений. Обратите внимание: эта концепция не всегда легко достижима, но такой шаг необходим, чтобы можно было начать процесс.

Вот задача, которую мы будем использовать в качестве примера: применение модели с предсказанием для небольшого набора данных. Загрузим набор данных и некоторые определения в рабочую область:

d <- iris # пусть "d" ссылается на один из встроенных в R наборов данных
vars <- c('Sepal.Length','Sepal.Width','Petal.Length')
yName <- 'Species'
yLevels <- sort(unique(as.character(d[[yName]])))
print(yLevels)

## [1] "setosa"     "versicolor" "virginica"

(Будем использовать конвенцию, что любая строка, начинающаяся с "##" — вывод результата предшествующей команды R.)

Мы столкнулись с небольшой проблемой моделирования: переменная, которую мы пытаемся прогнозировать, имеет три уровня. Техника моделирования, которую мы собирались использовать (glm(family='binomial')) не умеет предсказывать "полиномиальные результаты" (хотя есть библиотеки, предназначенные для этого). Мы решили подойти к решению этой задачи с помощью стратегии "один-против-остальных" и построить набор классификаторов: каждый будет отделять одну целевую переменную от остальных. Эта задача — очевидный кандидат на параллелизацию. Давайте для читаемости обернем в функцию построение одной выходной модели:

fitOneTargetModel <- function(yName,yLevel,vars,data) {
  formula <- paste('(',yName,'=="',yLevel,'") ~ ',
                   paste(vars,collapse=' + '),sep='')
  glm(as.formula(formula),family=binomial,data=data)
}

Тогда обычный «серийный» способ построить все модели будет выглядеть так:

for(yLevel in yLevels) {
  print("*****")
  print(yLevel)
  print(fitOneTargetModel(yName,yLevel,vars,d))
}

Или можно обернуть нашу процедуру в функцию с одной переменной (этот паттерн называется карринг) и применить элегантную нотацию R — lapply():

worker <- function(yLevel) {
  fitOneTargetModel(yName,yLevel,vars,d)
}
models <- lapply(yLevels,worker)
names(models) <- yLevels
print(models)

Преимущество нотации lapply() состоит в том, что она подчеркивает независимость каждого вычисления, именно тот вид изоляции, который нужен для параллелизации наших вычислений. Подумайте о цикле for в том смысле, что он слишком точно определяет вычисление, задавая ненужный порядок или последовательность операций.

Реорганизация вычисления функционально подготовила нас к применению параллельной библиотеки и осуществлению вычисления параллельно. Сначала развернем параллельный кластер:

# Запуск параллельного кластера
parallelCluster <- parallel::makeCluster(parallel::detectCores())
print(parallelCluster)

## socket cluster with 4 nodes on host ‘localhost’

Обратите внимание, мы создали «кластер сокетов». Кластер сокетов — это удивительно гибкий «параллельно распределенный» кластер в первом приближении. Кластер сокетов — грубое приближение в том смысле, что работает он сравнительно медленно (работа будет распределяться «неточно»), но он очень гибок в реализации: много ядер на одной машине, много ядер на нескольких машинах в одной сети, поверх других систем, например кластера MPI (message passing interface — протокол передачи сообщений).

На этом месте предполагаем, что код ниже будет работать (здесь — подробности по tryCatch).

tryCatch(
  models <- parallel::parLapply(parallelCluster,
                                yLevels,worker),
  error = function(e) print(e)
)

## <simpleError in checkForRemoteErrors(val):
##   3 nodes produced errors; first error: 
##      could not find function "fitOneTargetModel">

Вместо результатов получили ошибку "could not find function "fitOneTargetModel">."

Проблема: в кластере сокетов аргументы parallel::parLapply копируются в каждый узел обработки по сокету связи. Однако, целостность текущего окружения (в нашем случае так называемого «глобального окружения») не копируется (возвращаются только значения). Поэтому наша функция worker() при переносе на параллельные узлы должна иметь другое замыкание (поскольку она не может указывать на наше окружение выполнения), и оказывается, что новое замыкание больше не содержит ссылок на необходимые значения yName, vars, d и fitOneTargetModel. Это печально, но имеет смысл. R использует все окружения для реализации концепции замыканий, и R не может знать, какие значения в данном окружении фактически потребуются данной функции.

Итак, мы знаем, что не так. Как это исправить? Мы исправим это использованием окружения, отличного от глобального, чтобы перенести туда нужные нам значения. Самый легкий способ это сделать — использовать свое собственное замыкание. Чтобы этого добиться, обернем весь процесс в функцию (и будем запускать ее в контролируемом окружении). Код ниже работает:

# построение функции одной переменной, которую мы будем параллелизировать
mkWorker <- function(yName,vars,d) {
  # убедимся, что все три нужные переменные  
  # доступны в этой среде
  force(yName)
  force(vars)
  force(d)
  # определим каждую функцию, которая нужна нашей 
  # функции worker в этом окружении
  fitOneTargetModel <- function(yName,yLevel,vars,data) {
    formula <- paste('(',yName,'=="',yLevel,'") ~ ',
                     paste(vars,collapse=' + '),sep='')
    glm(as.formula(formula),family=binomial,data=data)
  }
  # Наконец: определить и вернуть нашу функцию worker.
  # "Замыкание" функции worker 
  # (где она ищет несвязанные переменные) - 
  # окружение активации/выполнения mkWorker,
  # а не глобальное окружение, как обычно.
  # Параллельная библиотека переместит 
  # это окружение (чего она не делает
  # с глобальным окружением).
  worker <- function(yLevel) {
    fitOneTargetModel(yName,yLevel,vars,d)
  }
  return(worker)
}

models <- parallel::parLapply(parallelCluster,yLevels,
                              mkWorker(yName,vars,d))
names(models) <- yLevels
print(models)

Код выше работает, потому что мы переместили нужные нам значения в новое окружение выполнения и определили функцию, которую собрались использовать, непосредственно в этом окружении. Очевидно, переопределять каждую функцию, когда она нам нужна — громоздко и затратно (хотя мы могли передать ее в обертку, как это делалось с другими значениями). Более гибкий паттерн таков: использовать вспомогательную функцию "bindToEnv" для выполнения некоторой части работы. С bindToEnv код выглядит так.

source('bindToEnv.R') # Загрузить из: http://winvector.github.io/Parallel/bindToEnv.R
# построение функции одной переменной, которую мы будем параллелизировать
mkWorker <- function() {
  bindToEnv(objNames=c('yName','vars','d','fitOneTargetModel'))
  function(yLevel) {
    fitOneTargetModel(yName,yLevel,vars,d)
  }
}

models <- parallel::parLapply(parallelCluster,yLevels,
                              mkWorker())
names(models) <- yLevels
print(models)

Паттерн выше лаконичен и хорошо работает. Несколько оговорок, которые стоит держать в уме:

  • Помните, каждый параллельный worker — удаленное окружение. Убедитесь, что нужные библиотеки определены на каждой удаленной машине.
  • Неосновные библиотеки, загруженные в исходное окружение, необязательно загружены в удаленные. Имеет смысл использовать нотацию с пакетами, например, stats::glm() при вызове функций из библиотек (вызов library(...) на каждом удаленном узле избыточен).
  • Наша функция bindToEnv сама по себе напрямую меняет окружения передаваемых ей функций (чтобы они могли обращаться к значениям, которые мы переносим). Это может вызвать дополнительные проблемы с теми окружениями, к которым применялся карринг. Здесь можно найти некоторые способы обойти эту проблему.

Об этом стоит задуматься. Однако, думаю, вы решите, что добавление восьми строчек оберточного/стереотипного кода в полной мере достойно четырехкратного или более ускорения.

Также: по завершении работы не забудьте убрать ссылку на кластер:

# Аккуратное выключение кластера
if(!is.null(parallelCluster)) {
  parallel::stopCluster(parallelCluster)
  parallelCluster <- c()
}

На этом мы закончим. В следующей статье речь пойдет о том, как построить кластеры сокетов на нескольких машинах и на Amazon ec2.

Сама по себе функция bindToEnv довольно проста:

#' Копирование аргументов в окружение и перепривязка замыкания любой функции к bindTargetEnv.
#' 
#' http://winvector.github.io/Parallel/PExample.html - пример использования.
#' 
#' 
#' Используется для передачи данных вместе с функцией в ситуациях, подобных параллельному выполнению 
#' (когда глобальное окружение будет недоступно). Обычно вызывается внутри 
#' функции, задающей функцию-worker для передачи параллельным процессам
#' (чтобы у нас было замыкание, с которым можно работать).
#' 
#' @param bindTargetEnv - окружение, к которому осуществляется привязка
#' @param objNames - дополнительные имена, которые нужно найти в родительском окружении и привязать
#' @param doNotRebind - имена функций, замыкания которых НЕ нужно привязывать
bindToEnv <- function(bindTargetEnv=parent.frame(),objNames,doNotRebind=c()) {
  # Привязка значений к окружению
  # и передача всех функций в это окружение
  for(var in objNames) {
    val <- get(var,envir=parent.frame())
    if(is.function(val) && (!(var %in% doNotRebind))) {
      # замена замыкания функции в этом окружении на целевое (ОПАСНО)
      environment(val) <- bindTargetEnv
    }
    # привязка объекта к целевому окружению, только после возможного изменения
    assign(var,val,envir=bindTargetEnv)
  }
}

Ее также можно загрузить отсюда.

Один из недостатков использования параллелизации таким образом — всегда может понадобиться еще одна функция или данное. Один из способов обойти это — использовать команду R ls(), чтобы построить список имен, которые нужно передать. Особенно эффективно сохранять результаты ls() сразу после исходных файлов с функциями и важными глобальными переменными. Без какой-либо стратегии здесь добавление элементов в списки — боль.

Для больших масштабов: не особенно подробные инструкции по запуску нескольких машин-R-серверов на ec2 можно найти здесь.
Поделиться с друзьями
-->

Комментарии (0)