Всем привет!

В этой статье мы посмотрим на два варианта решения "типичной" задачи потоковой обработки с учетом состояния при помощи фреймворка Apache Flink.

Если вы только начинаете знакомство с Flink, здесь можно найти обильное количество материалов.

Постановка задачи

Итак, давайте представим базу данных (пусть это будет PostgreSQL) состоящую из трех таблиц:

  1. Clients - общая информация по клиенту:

Имя поля

Тип поля

Описание

clientId

int

уникальный идентификатор клиента

name

string

имя клиента

surname

string

фамилия клиента

patronymic

optional[string]

отчество клиента (необязательное поле)

sex

string

пол

  1. ClientCompany - информация о компании клиента:

Имя поля

Тип поля

Описание

clientId

int

уникальный идентификатор клиента

companyId

int

уникальный идентификатор компании

companyName

string

название компании

  1. Payment - информация о платежах клиента:

Имя поля

Тип поля

Описание

clientId

int

уникальный идентификатор клиента

amount

int

сумма платежа

tmMs

long

время свершения платежа

Будем получать обновления по всем таблицам через CDC-канал (пусть это будет Debezium) в топик Kafka в формате Json. Задачей нашей Job-ы будет слушать топики и выдавать обновления по клиенту (например: клиент сменил компанию) во внешнюю систему (в рамках статьи в качестве внешней системы будем использовать консоль вывода).

Изображение
Рисунок 1 - Схема бизнесс процесса

Начинаем программировать

Наш технологический стек:

  1. Scala v2.12.10

  2. Flink v1.16.0

  3. Circe v0.14.3

Мы разберем 2 варианта решения:

  1. При помощи Table API

  2. При помощи классического State Processor API

Прежде чем писать основную логику нам потребуется реализовать несколько общих шагов, которыми мы будем пользоваться в обеих реализациях.

Первое что нам потребуется - это научиться читать топики Kafka, в этом нам поможет уже готовый Flink Kafka Connector:

sealed trait KafkaConsumerSource {

  def configure[A: Decoder: ClassTag](bootstrapServers: String, topicName: String): KafkaSource[A] = {
    KafkaSource
      .builder()
      .setBootstrapServers(bootstrapServers)
      .setGroupId(s"$topicName-group")
      .setTopics(topicName)
      .setDeserializer(new KafkaJsonRecordDeserializer[A])
      .setStartingOffsets(OffsetsInitializer.earliest())
      .build()
  }
}

object KafkaConsumerSource extends KafkaConsumerSource {

  def configureKafkaDataSource[A: Decoder: ClassTag](
      topicName: String
  )(implicit env: StreamExecutionEnvironment,
    typeInformation: TypeInformation[A]): DataStream[A] = {
    val kafkaSource = KafkaConsumerSource.configure[A](Kafka.BootstrapServers, topicName)
    env.fromSource[A](kafkaSource, WatermarkStrategy.noWatermarks[A](), s"$topicName-flow")
  }
}

class KafkaJsonRecordDeserializer[A: Decoder: ClassTag] extends KafkaRecordDeserializationSchema[A] {

  override def deserialize(record: ConsumerRecord[Array[Byte], Array[Byte]], out: Collector[A]): Unit = {
    val value = new String(record.value())
    val obj   = Serde.toObj(value).toOption // в случае неудачи при десериализации, ошибка просто игнорируется и событие отбрасывается (в реальных проектах так делать не стоит)
    obj.foreach(out.collect)
  }

  override def getProducedType: TypeInformation[A] = {
    TypeExtractor
      .getForClass(classTag[A].runtimeClass.asInstanceOf[Class[A]])
      .asInstanceOf[TypeInformation[A]]
  }
}

В сериализации/десериализации сообщений нам поможет Circe:

object Serde {

  def toObj[A: Decoder](json: String): Either[circe.Error, A] = decode[A](json)

  def toJson[A: Encoder](obj: A): String = obj.asJson.toString()
}

Мы немного упростим себе задачу, сымитируем поведение CDC-канала, отправляя сообщения требуемого формата напрямую в топик Kafka:

case class Client (before: Option[Client.Value], after: Option[Client.Value], op: String)

object Client {
  case class Value(clientId: Int, name: String, surname: String, patronymic: Option[String], sex: String)
}

case class ClientCompany (before: Option[ClientCompany.Value], after: Option[ClientCompany.Value], op: String)

object ClientCompany {
  case class Value(clientId: Int, companyId: Int, companyName: String)
}

case class Payment (before: Option[Payment.Value], after: Option[Payment.Value], op: String)

object Payment {
  case class Value(clientId: Int, amount: Int, tmMs: Long)
}

Теперь мы готовы приступить к основной части.

Использование Flink Table API

Итак, прежде всего нам нужно получить необходимое окружение, поскольку мы будем работать в потоковом режиме, заручимся таковым:

implicit val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment

Далее получим окружение для доступа к Table API :

implicit val tableEnv: StreamTableEnvironment = StreamTableEnvironment.create(env)

Формируем Kafka Source при помощи кода который мы подготовили выше:

val clients: DataStream[Client] = KafkaConsumerSource.configureKafkaDataSource[Client]("clients")

Теперь нам остался всего один шаг до получения представления с которым нам предстоит работать - Dynamic Tables. Чтобы сделать этот шаг максимально простым, воспользуемся возможностями Scala и расширим DataStream методом toStreamTable:

implicit class DataStreamOps[A: RowTransformer](val stream: DataStream[A]) {

  def toStreamTable(primaryKey: String*)(implicit env: StreamTableEnvironment): Table = {
    val transformer = RowTransformer[A]
    stream
      .map(transformer.toRow(_))
      .flatMap(_.toSeq)(transformer.typeInformation)
      .toChangelogTable(
        env,
        Schema
          .newBuilder()
          .primaryKey(primaryKey: _*)
          .build(),
        ChangelogMode.upsert()
      )
  }
}

На вход функции подается список имен полей, которые составят первичный ключ нашей таблицы. Как и в большинстве фреймворков наименьшей единицей для работы с таблицами является строка (Row). Нужен способ преобразования всех наших сущностей в тип Row, продолжим пользоваться возможностями Scala, на этот раз тем что Scala позволяет писать код в функциональном стиле, а конкретно мы воспользуемся Type Class-ми. Наш Type Class будет иметь следующий вид:

sealed trait RowTransformer[A] {
  def toRow(entity: A): Option[Row]
  protected val fieldsName: Array[String]
  implicit def typeInformation: TypeInformation[Row]
}

Представим реализацию для одной из таблиц (для остальных алгоритм будет аналогичен):

implicit case object ClientRow extends RowTransformer[Client] {
  override def toRow(entity: Client): Option[Row] = {
    val kind   = kindOf(entity.operation)
    val record = entity.before.orElse(entity.after)
    record.map(r => Row.ofKind(kind, Int.box(r.clientId), r.name, r.surname, r.patronymic.orNull, r.sex))
  }

  override implicit def typeInformation: TypeInformation[Row] = {
    Types.ROW_NAMED(fieldsName, Types.INT, Types.STRING, Types.STRING, Types.STRING, Types.STRING)
  }

  override protected val fieldsName: Array[String] = Array("clientId", "name", "surname", "patronymic", "sex")
}

Теперь мы можем преобразовывать DataStream в Table одним действием:

val clients: Table = KafkaConsumerSource.configureKafkaDataSource[Client]("clients")
  .toStreamTable("clientId")

Это далеко не единственный способ создания таблиц, если мы изначально хотим работать с данными как с таблицей, необязательно сперва представлять данные в виде DataStream:

tableEnv.createTable(
  "companies",
  TableDescriptor
    .forConnector("kafka")
    .schema(
      Schema
        .newBuilder()
        .column("clientId", DataTypes.INT().notNull())
        .column("companyId", DataTypes.INT().notNull())
        .column("companyName", DataTypes.STRING().notNull())
        .primaryKey("clientId")
        .build()
    )
    .option(KafkaConnectorOptions.TOPIC, Seq("companies").asJava)
    .option(KafkaConnectorOptions.PROPS_BOOTSTRAP_SERVERS, Kafka.BootstrapServers)
    .option(KafkaConnectorOptions.PROPS_GROUP_ID, "companies-group")
    .option(KafkaConnectorOptions.SCAN_STARTUP_MODE, KafkaConnectorOptions.ScanStartupMode.EARLIEST_OFFSET)
    .option(KafkaConnectorOptions.VALUE_FORMAT, "debezium-json")
    .build()
)

val companies: Table = tableEnv.sqlQuery("SELECT * FROM companies")

companies.print("Companies")

И конечно же куда без SQL:

tableEnv.executeSql(
  """
    |CREATE TABLE payments (clientId INT NOT NULL, amount INT NOT NULL, tmMs BIGINT NOT NULL)
    |WITH (
    | 'connector' = 'kafka',
    | 'topic' = 'payments',
    | 'properties.bootstrap.servers' = 'localhost:9092',
    | 'properties.group.id' = 'payments-group',
    | 'scan.startup.mode' = 'earliest-offset',
    | 'format' = 'debezium-json'
    |)
    |""".stripMargin)

val payments: Table = tableEnv.sqlQuery("SELECT * FROM payments")

payments.print("Payments")

Мы окончательно подготовились к написанию основной логики. Для нашей задачи будем использовать inner join, т.е. при получении нового события мы будем выдавать результат во внешнюю систему только в случае наличия данных по ключу во всех таблицах.

clients
  .join(companies, $"client.clientId" === $"company.clientId")
  .join(payments, $"client.clientId" === $"payment.clientId")
  .select("client.clientId", "name", "surname", "companyId", "companyName", "amount", "timestamp")
  .toChangelogStream(Schema.derived(), ChangelogMode.upsert())
  .print("Portfolio")
Изображение
Рисунок 2 - Dynamic Tables

Использование State Processor API

Снова начнем с окружения, в этом варианте нам будет достаточно только окружения для работы в потоковом режиме:

implicit val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment

В прошлом варианте мы приводили все наши сущности к типу Table, сейчас нам нужно сделать нечто похожее:

case class PortfolioState(
   id: Int,
   client: Option[Client.Value],
   company: Option[ClientCompany.Value],
   payment: Option[Payment.Value]) {

  def complete: Option[Portfolio] = {
    for {
      client  <- client
      company <- company
      payment <- payment
    } yield {
      Portfolio(
        client.clientId,
        client.name,
        client.surname,
        company.companyId,
        company.companyName,
        payment.amount,
        payment.timestamp
      )
    }
  }
}

Мы описали свой собственный аналог таблицы, добавив в него метод complete формирующий результат только в случае наличия данных по всем трем сущностям (аналог inner join). Опишем способ трансформации исходной сущности в наш тип (для остальных сущностей будет аналогично):

object PortfolioState {

  def apply(client: Client): PortfolioState = {
    val id = client.before.map(_.clientId).getOrElse(client.after.map(_.clientId).get)
    PortfolioState(id, client.before.orElse(client.after), None, None)
  }
}

Итоговый результат:

val clients: DataStream[PortfolioState] = KafkaConsumerSource.configureKafkaDataSource[Client]("clients").map(PortfolioState(_))

Последнее чего нам не хватает - это обработчика событий, который будет поддерживать актуальное состояние и выдавать результаты:

class StateProcessor extends KeyedProcessFunction[Int, PortfolioState, Portfolio] {

  private var state: ValueState[PortfolioState] = _

  override def open(parameters: Configuration): Unit = {
    state = getRuntimeContext.getState(
      new ValueStateDescriptor[PortfolioState]("state", createTypeInformation[PortfolioState])
    )
  }

  override def processElement(
     value: PortfolioState,
     ctx: KeyedProcessFunction[Int, PortfolioState, Portfolio]#Context,
     ut: Collector[Portfolio]
  ): Unit = {
    Option(state.value()).fold {
      state.update(value) // если state == null, инициализируем его входным значением
    } { currentState =>
      val updatedState = currentState.copy(
        client = value.client.orElse(currentState.client),
        company = value.company.orElse(currentState.company),
        payment = value.payment.orElse(currentState.payment)
      )
      updatedState.complete.foreach(out.collect) // если данных хватает, то выдаем результат
      state.update(updatedState)
    }
  }
}

Соберем все вместе:

clients
  .union(companies) // объединяем все три потока в один
  .union(payments)
  .keyBy(_.id) // шардиурем данные по полю id (clientId)
  .process(new PortfolioStateProcessor())
  .print("Portfolio")

Итоги

Мы рассмотрели 2 разных подхода. Даже несмотря на всю простоту постановки, подход с применением Table API для задач отслеживания изменений и моментальной выдачи результата выглядит привлекательнее. Мы получаем возможность описывать желаемый результат на привычном SQL-подобном синтаксисе, находясь в бесконечном потоке событий.

Полный код проекта можно найти здесь

Комментарии (0)