Давно уже витала в воздухе необходимость реализовать запуск регулярных Spark задач через Oozie, но всё руки не доходили и вот наконец свершилось. В этой статье хочу описать весь процесс, возможно она упростит Вам жизнь.


Содержание


  • Задача
  • Оборудование и установленное ПО
  • Написание Spark задачи
  • Написание workflow.xml
  • Написание coordinator.xml
  • Размещение проекта на hdfs
  • Запуск регулярного выполнения
  • Заключение

Задача


Мы имеем следующую структуру на hdfs:


hdfs://hadoop/project-MD2/data
hdfs://hadoop/project-MD2/jobs
hdfs://hadoop/project-MD2/status

В директорию data ежедневно поступают данные и раскладываются по директориям в соответствие с датой. Например, данные за 31.12.2017 запишутся по следующему пути: hdfs://hadoop/project/data/2017/12/31/20171231.csv.gz.


Формат входных данных

  • Разделитель строк: “\n”
  • Разделитель столбцов: “;”
  • Способ сжатия: gzip
  • Количество столбцов: 5 (device_id; lag_A0; lag_A1; flow_1; flow_2)
  • Заголовок в первой строке отсутствует
  • Данные за предыдущие сутки гарантированно записывается в соответствующую директорию в интервал времени с 00:00 до 03:00 следующих суток.

В директории jobs располагаются задачи, которые имеют непосредственное отношение к проекту. Нашу задачу мы также будем размещать в этом каталоге.
В директорию status должна сохраняться статистика по количеству пустых полей (со значением null) за каждый день в формате json. Например, для данных за 31.12.2017 должен будет появиться файл hdfs://hadoop/project-MD2/status/2017/12/31/part-*.json


Примет json файла:

{
   "device_id_count_empty" : 0, 
   "lag_A0_count_empty" : 10, 
   "lag_A1_count_empty" : 0, 
   "flow_1_count_empty" : 37, 
   "flow_2_count_empty" : 100
}

Оборудование и установленное ПО


В нашем распоряжение есть кластер из 10 машин, каждая из которых имеет 8-и ядерный процессор и оперативную память в размере 64 Гб. Общий объём жёстких дисков на всех машинах 100 Тб. Для запуска задач на кластере отведена очередь PROJECTS.


Установленное ПО:

  • Apache Hadoop 2.7.3 (Hortonworks)
  • Apache Spark 2.0.0
  • Apache Oozie 4.2.0
  • Scala 2.11.11
  • Sbt 1.0.2

Написание Spark задачи


Создадим структуру проекта, это можно очень просто сделать в любой среде разработки, поддерживающей scala или из консоли, как показано ниже:


mkdir -p daily-statistic/project
echo "sbt.version = 1.0.2" > daily-statistic/project/build.properties
echo "" > daily-statistic/project/plugins.sbt
echo "" > daily-statistic/build.sbt
mkdir -p daily-statistic/src/main/scala

Замечательно, теперь добавим плагин для сборки, для этого в файле daily-statistic/project/plugins.sbt добавляем следующую строку:


addSbtPlugin("com.eed3si9n" % "sbt-assembly" % "0.14.5")

Добавим описание проекта, зависимости и особенности сборки в файл daily-statistic/build.sbt:


name := "daily-statistic"

version := "0.1"

scalaVersion := "2.11.11"

libraryDependencies ++= Seq(
  "org.apache.spark" %% "spark-core" % "2.0.0" % "provided",
  "org.apache.spark" %% "spark-sql" % "2.0.0" % "provided"
)

assemblyJarName in assembly := s"${name.value}-${version.value}.jar"

Перейдём в директорию daily-statistic и выполним команду sbt update, для обновления проекта и подтягивания зависимостей из репозитория.
Создаём Statistic.scala в директории src/main/scala/ru/daily


Код задачи:

package ru.daily

import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.DataFrame
import org.apache.spark.sql.functions._

object Statistic extends App {

   // инициализация   
   implicit lazy val spark: SparkSession = SparkSession.builder()
     .appName("daily-statistic")
     .getOrCreate()

   import spark.implicits._

   val workDir = args(0)
   val datePart = args(1)
   val saveDir = args(2)

   try {

      val date = read(s"$workDir/$datePart/*.csv.gz")
         .select(
            '_c0 as "device_id",
            '_c1 as "lag_A0",
            '_c2 as "lag_A1",
            '_c3 as "flow_1",
            '_c4 as "flow_2"
         )

         save(s"$saveDir/$datePart", agg(date))

   } finally spark.stop()

   // чтение исходных данных   
   def read(path: String)(implicit spark: SparkSession): DataFrame = {

      val inputFormat = Map("header" -> "false", "sep" -> ";", "compression" -> "gzip")

      spark.read
         .options(inputFormat)
         .csv(path)
   }

   // построение агрегата
   def agg(data: DataFrame):DataFrame = data
      .withColumn("device_id_empty", when('device_id.isNull, lit(1)).otherwise(0))
      .withColumn("lag_A0_empty", when('lag_A0.isNull, lit(1)).otherwise(0))
      .withColumn("lag_A1_empty", when('lag_A1.isNull, lit(1)).otherwise(0))
      .withColumn("flow_1_empty", when('flow_1.isNull, lit(1)).otherwise(0))
      .withColumn("flow_2_empty", when('flow_2.isNull, lit(1)).otherwise(0))
      .agg(
         sum('device_id_empty) as "device_id_count_empty",
         sum('lag_A0_empty) as "lag_A0_count_empty",
         sum('lag_A1_empty) as "lag_A1_count_empty",
         sum('flow_1_empty) as "flow_1_count_empty",
         sum('flow_2_empty) as "flow_2_count_empty"
      )

   // сохранение результата
   def save(path: String, data: DataFrame): Unit = data.write.json(path)

} 

Собираем проект командой sbt assembly из директории daily-statistic. После успешного завершения сборки в директории daily-statistic/target/scala-2.11 появится пакет с задачей daily-statistic-0.1.jar.


Написание workflow.xml


Для запуска задачи через Oozie нужно описать конфигурацию запуска в файле workflow.xml. Ниже приведён пример для нашей задачи:


<workflow-app name="project-md2-daily-statistic" xmlns="uri:oozie:workflow:0.5">

   <global>
      <configuration>
         <property>
            <name>oozie.launcher.mapred.job.queue.name</name>
            <value>${queue}</value>
         </property>
      </configuration>
   </global>

   <start to="project-md2-daily-statistic" />

   <action name="project-md2-daily-statistic">
      <spark xmlns="uri:oozie:spark-action:0.1">
         <job-tracker>${jobTracker}</job-tracker>
         <name-node>${nameNode}</name-node>
         <master>yarn-client</master>
         <name>project-md2-daily-statistic</name>
         <class>ru.daily.Statistic</class>
         <jar>${nameNode}${jobDir}/lib/daily-statistic-0.1.jar</jar>
         <spark-opts>
            --queue ${queue}
            --master yarn-client
            --num-executors 5
            --conf spark.executor.cores=8
            --conf spark.executor.memory=10g
            --conf spark.executor.extraJavaOptions=-XX:+UseG1GC
            --conf spark.yarn.jars=*.jar
            --conf spark.yarn.queue=${queue}
         </spark-opts>
         <arg>${nameNode}${dataDir}</arg>
         <arg>${datePartition}</arg>
         <arg>${nameNode}${saveDir}</arg>
       </spark>

       <ok to="end" />
       <error to="fail" />

   </action>

   <kill name="fail">
      <message>Statistics job failed [${wf:errorMessage(wf:lastErrorNode())}]</message>
   </kill>

   <end name="end" />

</workflow-app>

В блоке global устанавливается очередь, для MapReduce задачи которая будет находить нашу задачи и запускать её.
В блоке action описывается действие, в нашем случае запуск spark задачи, и что нужно делать при завершении со статусом ОК или ERROR.
В блоке spark определяется окружение, конфигурируется задача и передаются аргументы. Конфигурация запуска задачи описывается в блоке spark-opts. Параметры можно посмотреть в официальной документации
Если задача завершается со статусом ERROR, то выполнение переходит в блок kill и выводится кратное сообщение об ошибки.
Параметры в фигурных скобках, например ${queue}, мы будем определять при запуске.


Написание coordinator.xml


Для организации регулярного запуска нам потребуется ещё coordinator.xml. Ниже приведён пример для нашей задачи:


<coordinator-app name="project-md2-daily-statistic-coord" frequency="${frequency}" start="${startTime}" end="${endTime}" timezone="UTC" xmlns="uri:oozie:coordinator:0.1">
    <action>
        <workflow>
            <app-path>${workflowPath}</app-path>
            <configuration>
                <property>
                    <name>datePartition</name>
                    <value>${coord:formatTime(coord:dateOffset(coord:nominalTime(), -1, 'DAY'), "yyyy/MM/dd")}</value>
                </property>
            </configuration>
        </workflow>
    </action>
</coordinator-app>

Здесь из интересного, параметры frequency, start, end, которые определяют частоту выполнения, дату и время начала выполнения задачи, дату и время окончания выполнения задачи соответственно.
В блоке workflow указывается путь к директории с файлом workflow.xml, который мы зададим позднее при запуске.
В блоке configuration определяется значение свойства datePartition, которое в данном случае равно текущей дате в формате yyyy/MM/dd минус 1 день.


Размещение проекта на hdfs

Как уже было сказано ранее нашу задачу мы будем размещать в директории hdfs://hadoop/project-MD2/jobs:


hdfs://hadoop/project-MD2/jobs/daily-statistic/lib/daily-statistic-0.1.jar
hdfs://hadoop/project-MD2/jobs/daily-statistic/workflow.xml
hdfs://hadoop/project-MD2/jobs/daily-statistic/coordinator.xml
hdfs://hadoop/project-MD2/jobs/daily-statistic/sharelib

Здесь в принципе всё понятно без комментариев за исключением директории sharelib. В эту директорию мы положим все библиотеки, которые использовались в процессе создания зашей задачи. В нашем случае это все библиотеки Spark 2.0.0, который мы указывали в зависимостях проекта. Зачем это нужно? Дело в том, что в зависимостях проекта мы указали "provided". Это говорит системе сборки не нужно включать зависимости в проект, они будут предоставлены окружением запуска, но мир не стоит на месте, администраторы кластера могут обновить версию Spark. Наша задача может оказаться чувствительной к этому обновлению, поэтому для запуска будет использоваться набор библиотек из директории sharelib. Как это конфигурируется покажу ниже.


Запуск регулярного выполнения


И так всё готово к волнительному моменту запуска. Мы будем запускать задачу через консоль. При запуске нужно задать значения свойствам, которые мы использовали в xml файлах. Вынесем эти свойства в отдельный файл coord.properties:


# описание окружения
nameNode=hdfs://hadoop
jobTracker=hadoop.host.ru:8032

# путь к директории с файлом coordinator.xml
oozie.coord.application.path=/project-MD2/jobs/daily-statistic

# частота в минутах (раз в 24 часа)
frequency=1440
startTime=2017-09-01T07:00Z
endTime=2099-09-01T07:00Z

# путь к директории с файлом workflow.xml
workflowPath=/project-MD2/jobs/daily-statistic

# имя пользователя, от которого будет запускаться задача
mapreduce.job.user.name=username
user.name=username

# директория с данными и для сохранения результата
dataDir=/project-MD2/data 
saveDir=/project-MD2/status
jobDir=/project-MD2/jobs/daily-statistic 

# очередь для запуска задачи
queue=PROJECTS

# использовать библиотеке из указанной директории на hdfs вместо системных
oozie.libpath=/project-MD2/jobs/daily-statistic/sharelib
oozie.use.system.libpath=false

Замечательно, тереть всё готово. Запускаем регулярное выполнение командой:


oozie job -oozie http://hadoop.host.ru:11000/oozie -config coord.properties -run

После запуска в консоль выведется job_id задачи. Используя этот job_id можно посмотреть информацию о статусе выполнения задачи:


oozie job -info {job_id}

Остановить задачу:


oozie job -kill {job_id}

Если Вы не знаете job_id задачи, то можно найти его, показав все регулярные задачи для вашего пользователя:


oozie jobs -jobtype coordinator -filter user={user_name}

Заключение


Получилось немного затянуто, но на мой взгляд лучше подробная инструкция чем квест-поиск по интернету. Надеюсь описанный опыт будет Вам полезен, спасибо за внимание!

Комментарии (3)


  1. paveltro
    29.09.2017 11:10

    Класс!


  1. Yo1
    29.09.2017 11:11

    все хорошо, но что за задача то? зачем читаются эти файлы?
    как я понимаю Oozie будет запускать каждый джоб через spark-submit скрипт, т.е. стартовать на каждый джоб отдельный jvm, соответственно каждый джоб будет иметь свой sparkSession. многим задачам такое не подойдет…


    1. entony Автор
      29.09.2017 11:33

      В данном случае описана часть задачи для сбора статистики по приходящим данным. Основной упор я делал на описание запуска. Да у каждой задачи будет свой sparkSession.