Пару месяцев назад я начала изучать Spark, и в какой-то момент столкнулась с проблемой сохранения вычислений Structured Streaming в базе данных Cassandra.

В данном посте я привожу простой пример создания и использования Cassandra Sink для Spark Structured Streaming. Я надеюсь, что пост будет полезен тем, кто недавно начал работать со Spark Structured Streaming и задается вопросом, как выгружать результаты вычислений в базу данных.

Идея приложения очень проста — получить и распарсить сообщения из кафки, выполнить простые трансформации в спарке и сохранить результаты в кассандре.

Плюсы Structured Streaming


О Structured Streaming можно подробно почитать в документации. Если коротко, то Structured Streaming — это хорошо масштабируемый механизм обработки потоковой информации, который основан на движке Spark SQL. Он позволяет использовать Dataset / DataFrame для агрегирования данный, вычисления оконных функций, соединений и т. д. То есть Structured Streaming позволяет использовать старый добрый SQL для работы с потоками данных.

В чем проблема?


Стабильный релиз Spark Structured Streaming вышел в 2017 году. То есть, это довольно новый API, в котором реализован базовый функционал, но некоторые вещи придется делать самим. Например, в Structured Streaming есть стандартные функции для записи выходных данных в файл, кафку, консоль или память, но для того чтобы сохранить данные в базу придется использовать имеющийся в Structured Streaming приемник foreach и реализовать интерфейс ForeachWriter. Начиная с версии Spark 2.3.1, реализовать такой функционал можно только на Scala и Java.

Я предполагаю, что читатель уже знает, как Structured Streaming работает в общих чертах, знает, как реализовать нужные трансформации и теперь готов выгрузить полученные результаты в базу. Если некоторые из вышеперечисленных шагов неясны, официальная документация может послужить хорошей отправной точкой в изучении Structured Streaming. В данной статье, я бы хотела бы сфокусироваться на последнем шаге, когда вам нужно сохранить результаты в базе данных.

Ниже, я опишу пример реализации Cassandra sink для Structured Streaming и поясню как запустить его в кластере. Полный код доступен здесь.

Когда я впервые столкнулась с вышеуказанной проблемой, вот этот проект оказался очень полезным. Однако он может показаться немного сложным, если читатель только начал работать со Structured Streaming и ищет простой пример того как выгрузить данные в кассандру. Кроме того, проект написан для работы в локальном режиме и требует некоторых изменений для запуска в кластере.

Также хочу привести примеры того, как сохранить данные в MongoDB и любую другую базу, использующую JDBC .

Простое решение


Чтобы выгрузить данные во внешнюю систему, необходимо использовать приемник foreach. Подробнее об этом можно почитать здесь. Если вкратце, то необходимо реализовать интерфейс ForeachWriter. То есть необходимо определить, как открыть соединение, как обработать каждую порцию данных и как закрыть соединение в конце обработки. Исходный код выглядит следующим образом:

class CassandraSinkForeach() extends ForeachWriter[org.apache.spark.sql.Row] {
  // This class implements the interface ForeachWriter, which has methods that get called 
  // whenever there is a sequence of rows generated as output
  val cassandraDriver = new CassandraDriver();
  def open(partitionId: Long, version: Long): Boolean = {
    // open connection
    println(s"Open connection")
    true
  }
  def process(record: org.apache.spark.sql.Row) = {
    println(s"Process new $record")
    cassandraDriver.connector.withSessionDo(session =>
      session.execute(s"""
       insert into ${cassandraDriver.namespace}.${cassandraDriver.foreachTableSink} (fx_marker, timestamp_ms, timestamp_dt)
       values('${record(0)}', '${record(1)}', '${record(2)}')""")
    )
  }
  def close(errorOrNull: Throwable): Unit = {
    // close the connection
    println(s"Close connection")
  }
}

Определение CassandraDriver и структуру выходнрй таблицы я опишу позже, а пока давайте более подробно рассмотрим, как работает приведенный выше код. Чтобы подключиться к касандре из спарка, я создаю объект CassandraDriver, который обеспечивает доступ к CassandraConnector – коннектору разработанному DataStax. CassandraConnector отвечает за открытие и закрытие соединения с базой, поэтому я просто вывожу отладочные сообщения в open и close методах класса CassandraSinkForeach.

Приведенный выше код вызывается из основного приложения следующим образом:

val sink = parsed
    .writeStream
    .queryName("KafkaToCassandraForeach")
    .outputMode("update")
    .foreach(new CassandraSinkForeach())
    .start()

CassandraSinkForeach создается для каждой строки данных, таким образом каждая рабочая нода вставляет свою часть строк в базу данных. Т.е, каждая рабочая нода выполняет val cassandraDriver = new CassandraDriver (); Так выглядит CassandraDriver:

class CassandraDriver extends SparkSessionBuilder {
  // This object will be used in CassandraSinkForeach to connect to Cassandra DB from an executor.
  // It extends SparkSessionBuilder so to use the same SparkSession on each node.
  val spark = buildSparkSession
  import spark.implicits._
  val connector = CassandraConnector(spark.sparkContext.getConf)
  // Define Cassandra's table which will be used as a sink
  /* For this app I used the following table:
       CREATE TABLE fx.spark_struct_stream_sink (
       fx_marker text,
       timestamp_ms timestamp,
       timestamp_dt date,
       primary key (fx_marker));
  */
  val namespace = "fx"
  val foreachTableSink = "spark_struct_stream_sink"
}

Давайте посмотрим более подробно на объект spark . Код для SparkSessionBuilder выглядит следующим образом:

class SparkSessionBuilder extends Serializable {
  // Build a spark session. Class is made serializable so to get access to SparkSession in a driver and executors. 
  // Note here the usage of @transient lazy val 
  def buildSparkSession: SparkSession = {
    @transient lazy val conf: SparkConf = new SparkConf()
    .setAppName("Structured Streaming from Kafka to Cassandra")
    .set("spark.cassandra.connection.host", "ec2-52-23-103-178.compute-1.amazonaws.com")
    .set("spark.sql.streaming.checkpointLocation", "checkpoint")
    @transient lazy val spark = SparkSession
    .builder()
    .config(conf)
    .getOrCreate()
    spark
  }
}

На каждой рабочей ноде SparkSessionBuilder предоставляет доступ к SparkSession, который был создан на драйвере. Чтобы сделать такой доступ возможным, необходимо сериализовать SparkSessionBuilder и использовать transient lazy val, который позволяет системе сериализации игнорировать объекты conf и spark при инициализации программы и до момента обращения к объектам. Таким образом, при запуске программы buildSparkSession сериализуется и отправляется каждой рабочей ноде, но объекты conf и spark разрешаются только в момент когда к ним обращается рабочая нода.

Теперь давайте посмотрим на основной код приложения:

object KafkaToCassandra extends SparkSessionBuilder {
  // Main body of the app. It also extends SparkSessionBuilder.
  def main(args: Array[String]) {
    val spark = buildSparkSession
    import spark.implicits._
    // Define location of Kafka brokers:
    val broker = "ec2-18-209-75-68.compute-1.amazonaws.com:9092,ec2-18-205-142-57.compute-1.amazonaws.com:9092,ec2-50-17-32-144.compute-1.amazonaws.com:9092"
    /*Here is an example massage which I get from a Kafka stream. It contains multiple jsons separated by \n 
    {"timestamp_ms": "1530305100936", "fx_marker": "EUR/GBP"}
    {"timestamp_ms": "1530305100815", "fx_marker": "USD/CHF"}
    {"timestamp_ms": "1530305100969", "fx_marker": "EUR/CHF"}
    {"timestamp_ms": "1530305100011", "fx_marker": "USD/CAD"}
    */
    // Read incoming stream
    val dfraw = spark
    .readStream
    .format("kafka")
    .option("kafka.bootstrap.servers", broker)
    .option("subscribe", "currency_exchange")
    .load()
    val schema = StructType(
      Seq(
        StructField("fx_marker", StringType, false),
        StructField("timestamp_ms", StringType, false)
      )
    )
    val df = dfraw
    .selectExpr("CAST(value AS STRING)").as[String]
    .flatMap(_.split("\n"))
    val jsons = df.select(from_json($"value", schema) as "data").select("data.*")
    // Process data. Create a new date column
    val parsed = jsons
      .withColumn("timestamp_dt", to_date(from_unixtime($"timestamp_ms"/1000.0, "yyyy-MM-dd HH:mm:ss.SSS")))
      .filter("fx_marker != ''")
    // Output results into a database
    val sink = parsed
    .writeStream
    .queryName("KafkaToCassandraForeach")
    .outputMode("update")
    .foreach(new CassandraSinkForeach())
    .start()
    sink.awaitTermination()
  }
}

Когда приложение отправляется на исполнение, buildSparkSession сериализуется и отправляется рабочим нодам, однако conf и spark объекты остаются неразрешенными. Затем драйвер создает spark объект внутри KafkaToCassandra и распределяет работу между рабочими нодами. Каждая рабочая нода читает данные из кафки, делает простые преобразования над полученной порцией записей, и когда рабочая нода готова записать результаты в базу, она разрешает conf и spark объекты, тем самым получая доступ к SparkSession, созданной на драйвере.

Как собрать и запустить приложение?


Когда я перешла из PySpark в Scala, мне потребовалось некоторое время, чтобы понять, как собрать приложение. Поэтому, я включила Maven pom.xml в свой проект. Читатель может собрать приложение с помощью Maven, выполнив команду mvn package. После приложение можно отправить на исполнение используя

./bin/spark-submit --packages org.apache.spark:spark-sql-kafka-0-10_2.11:2.3.1,datastax:spark-cassandra-connector:2.3.0-s_2.11 --class com.insight.app.CassandraSink.KafkaToCassandra --master spark://ec2-18-232-26-53.compute-1.amazonaws.com:7077 target/cassandra-sink-0.0.1-SNAPSHOT.jar

Для того чтобы собрать и запустить приложение, необходимо заменить имена моих AWS-машин своими (т.е. заменить все, что похоже на ec2-xx-xxx-xx-xx.compute-1.amazonaws.com).

Spark и Structured Streaming в частности – новая для меня тема, поэтому буду очень признательна читателям за комментарии, обсуждение и исправления.

Комментарии (0)