Данная статья обобщает базовые шаги по установке и началу работы с PySpark Structured Streaming при участии брокера сообщений Kafka. Предполагается, что читатель уже знаком с языком программирования Python и сервисом Kafka.

При помощи PySpark Structured Streaming можно быстро разрабатывать масштабируемые сервисы обработки данных в реальном времени. Такой подход позволяет в короткие сроки сделать выгодное предложение клиенту, вовремя заметить аномалию в системе или же отображать актуальные данные. Масштабируемость обеспечивается фреймворком Spark. Модуль Structured Streaming позволяет разрабатывать программы обработки данных в реальном времени, используя синтаксис SQL.

Здесь будут рассмотрены только самые простые примеры работы с Kafka посредством PySpark: чтение/запись в Kafka, а также разбор и сохранение поступающих сообщений в формате JSON и AVRO. Для подробного ознакомления можно прочитать статьи на официальном сайте:

Требования к окружению

Все примеры были отлажены на подсистеме Ubuntu 20.04, Python 3.10.1, Spark 3.2.1, Kafka 3, OpenJDK 11.0.11 (необходима для работы Spark).

1.1) Подготовка виртуальной среды

Создаём пустую директорию с названием проекта, где инициализируем виртуальную среду (команда в терминале):

python -m venv .venv

Активируем в терминале при помощи следующей команды (из директории, где была создана виртуальная среда):

source .venv/bin/activate

Устанавливаем PySpark:

pip install wheel pyspark

Чтобы проверить, что всё установилось корректно, можно запустить интерпретатор PySpark:

pyspark

Должен запуститься интерпретатор Python, где доступен Spark.

Приветствующее сообщение Spark
Приветствующее сообщение Spark

1.2) Запуск сервиса Kafka

Для примера Kafka будет запущена локально. Это можно сделать, выполнив действия из официальной инструкции. Если умеете пользоваться Docker, то можно запустить её при помощи данного docker-compose или же использовать любой удобный для вас образ.

2) Чтение сообщений из топика Kafka

Создаём любой файл *.py. Для примера это будет example1.py.

Для работы со Spark необходимо инициализировать объект SparkSession:

from pyspark.sql import SparkSession

spark = (SparkSession
         .builder
         .appName('quickstart-streaming-kafka')
         .getOrCreate())
spark.sparkContext.setLogLevel('WARN')

При инициализации SparkSession вы можете указывать различные имена в методе appName(), которые  позволяют позже опознавать свои приложения (например, при мониторинге всех запущенных приложений Spark на кластере).

Последняя строка опциональна, она влияет на вывод логов работы Spark. Установив значение "WARN", не будут выводиться информационные логи.

Укажем источник данных – топик Kafka:

source = (spark
          .readStream
          .format('kafka')
          .option('kafka.bootstrap.servers', 'localhost:9092')
          .option('subscribe', 'input00')
          .load())

Используя объект SparkSession, можно указывать источник данных, благодаря чему получим объект DataFrame, который и будет представлять получаемые данные. Источники в Spark делятся на два типа:

  • batch – разовая работа, когда вам заранее известен объём данных (для этого используется свойство read у SparkSession);

  • stream – потоковая работа, когда приложение запущено и обрабатывает поступающие данные (для этого используется свойство readStream) в реальном времени. В данном режиме Spark с некоторой периодичностью формирует batch из поступающих данных.

Чтобы считывать из Kafka, необходимо в формате format() передать строку "kafka". Список обязательных опций при работе с данным форматом:

  • kafka.bootstrap.servers – адрес сервера брокера (можно несколько – через запятую внутри строки);

  • subscribe – топик, откуда будут считываться новые сообщения (можно несколько – через запятую внутри строки).

Посмотрим схему получаемых сообщений, используя метод printSchema() у получившегося DataFrame:

source.printSchema()

Вывод в терминале:

root
 |-- key: binary (nullable = true)
 |-- value: binary (nullable = true)
 |-- topic: string (nullable = true)
 |-- partition: integer (nullable = true)
 |-- offset: long (nullable = true)
 |-- timestamp: timestamp (nullable = true)
 |-- timestampType: integer (nullable = true)

Краткая информация по каждому полю:

  1. key – ключ, который используется в Kafka для определения партиции, в которую должно было попасть сообщение;

  2. value – непосредственно сами данные, которые необходимо обрабатывать;

  3. topic – название топика, откуда было получено сообщение (источники в Spark позволяют подписываться сразу на несколько топиков, их можно идентифицировать по данному полю);

  4. partition и offset – партиция и смещение служат указателем, где именно в топике хранится сообщение;

  5. timestamp и timestampType – информация о времени, когда поступило сообщение в топик.

Можно заметить, что поля "key" и "value" имеют двоичный формат. И если поступит сообщение со значением "Hello, World!", то оно будет представлено массивом байтов:

-------------------------------------------
Batch: 1
-------------------------------------------
+----+--------------------+-------+---------+------+--------------------+-------------+
| key|               value|  topic|partition|offset|           timestamp|timestampType|
+----+--------------------+-------+---------+------+--------------------+-------------+
|[30]|[48 65 6C 6C 6F 2...|input00|        0|     3|2022-03-11 08:28:...|            0|
+----+--------------------+-------+---------+------+--------------------+-------------+

В Spark можно выполнять выборку с применением различных выражений, например, приведение типов. Используя метод selectExpr(), где можно указать выводимые поля в DataFrame, выведем и преобразуем поле "value" к строке, а также выведем "offset":

df = (source
      .selectExpr('CAST(value AS STRING)', 'offset'))

Напишем простой вывод данных из источника в консоль:

console = (df
           .writeStream
           .format('console'))

У любого DataFrame можно вызвать свойство writeStream, после чего указать формат вывода.  В нашем случае это вывод в консоль (полный список поддерживаемых форматов доступен в документации). Таким образом, получится объект DataStreamWriter, который осталось только запустить:

console.start().awaitTermination()

Метод start() запускает поток на выполнение. Но для того, чтобы программа не остановилась, необходимо поставить её в ожидание. Это можно сделать методом awaitTermination(), который будет ожидать, пока поток не будет терминирован.

Посмотрим на вывод, записав сообщение "Hello, World!" в топик "input00":

-------------------------------------------
Batch: 1
-------------------------------------------
+-------------+------+
|        value|offset|
+-------------+------+
|Hello, World!|     5|
+-------------+------+

Готово! Теперь мы можем читать сообщения из Kafka.

Полный код примера
def main():
    from pyspark.sql import SparkSession

    spark = (SparkSession
             .builder
             .appName('streaming-kafka')
             .getOrCreate())
    spark.sparkContext.setLogLevel('WARN')

    source = (spark
              .readStream
              .format('kafka')
              .option('kafka.bootstrap.servers', 'localhost:9092')
              .option('subscribe', 'input00,input01')
              .load())
    source.printSchema()

    df = (source
          .selectExpr('CAST(value AS STRING)', 'offset'))

    console = (df
               .writeStream
               .format('console')
               .queryName('console output'))
    console.start().awaitTermination()


if __name__ == '__main__':
    main()

3) Запуск скрипта

Для запуска Spark приложений используется spark-submit. Чтобы взаимодействовать с Kafka, необходимо добавить зависимость "spark-sql-kafka-0-10_2.12" (актуальная версия для Spark 3.2.1 на момент написания статьи) к аргументу "--packages" (если есть доступ к сети Интернет или пакет уже имеется в кэше) или "--jars" (если имеется локально на запускаемой машине, необходимо указать путь).

spark-submit принимает множество параметров, о которых можно прочитать в документации. Эта утилита доступна в виртуальной среде Python, где был установлен пакет PySpark, поэтому её можно запускать данной командой в терминале при активированной виртуальной среде (должна быть установлена Java):

spark-submit --packages org.apache.spark:spark-sql-kafka-0-10_2.12:3.2.1 example1.py

4) Пишем в топик Kafka

Вывод в топик Kafka написать не многим сложнее, чем вывод в консоль.

Дополняя предыдущий пример, напишем ещё один выход для df:

query = (df
         .writeStream
         .format('kafka')
         .queryName('kafka-output')
         .option('kafka.bootstrap.servers', 'localhost:9092')
         .option('topic', 'output00')
         .option('checkpointLocation', './.local/checkpoint'))

Для вывода в Kafka необходимо в метод format() записать "kafka" и указать в опциях:

  • kafka.bootstrap.servers – адрес брокер сервера (можно несколько –  через запятую внутри строки);

  • topic –  в какой топик записывать данные (только один);

  • checkpointLocation – указать директорию, где будут записываться/читаться чекпоинты. Чекпоинты сохраняют указатель на месте, где остановилась обработка источника, что может пригодиться при перезапуске программы (продолжит с того места, где остановился). Настраивается на DataStreamWriter. Более подробно про чекпоинты можно прочитать в документации.

Ещё добавочно здесь появилась строка с методом queryName(). Это необязательно, но так как у нас появился второй поток в одной программе, следует их все проименовать, чтобы различать при мониторинге. Запустим оба потока:

console.start()
query.start().awaitTermination()

Вывод:

-------------------------------------------
Batch: 1
-------------------------------------------
+-------------------+------+
|              value|offset|
+-------------------+------+
|Hello, second sink!|     0|
+-------------------+------+

Сообщение в топик Kafka также записан:

Сообщение в выходном топике Kafka
Сообщение в выходном топике Kafka

ВАЖНО: у выходного DataFrame обязательно должно быть поле "value", иначе программа упадёт с ошибкой. Именно это поле и будет записано в "value" сообщения для Kafka. Поле "key" может отсутствовать в DataFrame.

Благодаря тому, что у каждого потока есть свое название, мы можем их с лёгкостью отличать в Spark Web-UI, который запускается параллельно с развёртыванием Spark приложения (по умолчанию доступно по localhost:4040).

Spark Web-UI
Spark Web-UI
Окно статистики выполнения запросов Spark
Окно статистики выполнения запросов Spark

Отлично, теперь мы можем записывать новые сообщения в топик Kafka!

Полный код примера
def main():
    from pyspark.sql import SparkSession

    spark = (SparkSession
             .builder
             .appName('streaming-kafka')
             .getOrCreate())
    spark.sparkContext.setLogLevel('WARN')

    source = (spark
              .readStream
              .format('kafka')
              .option('kafka.bootstrap.servers', 'localhost:9092')
              .option('subscribe', 'input00')
              .load())

    df = (source
          .selectExpr('CAST(value AS STRING)', 'offset'))

    console = (df
               .writeStream
               .format('console')
               .queryName('console-output'))

    query = (df
             .writeStream
             .format('kafka')
             .queryName('kafka-output')
             .option('kafka.bootstrap.servers', 'localhost:9092')
             .option('topic', 'output00')
             .option('checkpointLocation', './.local/checkpoint'))

    console.start()
    query.start().awaitTermination()


if __name__ == '__main__':
    main()

5) Сообщение в формате JSON

Передавая данные, мы всегда ждём чего-то большего, чем просто одну строку. Для этого можно использовать текстовые форматы данных, например, JSON. Spark имеет функции, которые могут преобразовать строку в struct, то есть в структуру данных.

Будем обрабатывать следующую структуру данных:

{
    "name": String,
    "age": Int
}

Заводим источник source, который вычитывает сообщения из топика Kafka, при этом поле "value" приводим к типу String (как описано в разделе 2).

В Spark есть функция from_json(), которая может приводить поле из строки к структуре. При этом ей необходимо передать схему сообщения, которая может быть типа DataType, а также String. Воспользуемся первым вариантом:

from pyspark.sql import types as t

schema = t.StructType(
    [
        t.StructField('name', t.StringType(), True),
        t.StructField('age', t.IntegerType(), True),
    ],
)

Все типы можно найти в пакете pyspark.sql.types. Комбинируя таким образом типы данных, можно описывать необходимую схему.

Теперь нужно применить функцию from_json() к полю, которое содержит строку с JSON:

from pyspark.sql import functions as f

df = (df
      .select(f.from_json('value', schema).alias('data')))
print("df schema")
df.printSchema()

Вывод схемы:

df schema
root
 |-- data: struct (nullable = true)
 |    |-- name: string (nullable = true)
 |    |-- age: integer (nullable = true)

Первым аргументом функции from_json() было передано название поля, вторым аргументом – схема данных. Метод alias() изменяет название получившегося поля (аналогично слову AS в SQL).

Добавим немного логики: будем фильтровать по полю "age", что больше или равно 18 при помощи метода where(). Так как при парсинге мы получаем структуру, то при обращении к конкретному полю структуры необходимо обращаться через точку ("data.age"):

df = df.where(df['data.age'] >= 18)

Структуру можно легко развернуть при помощи метода select():

console = df.select('data.*')
print("console schema")
console.printSchema()

Вывод схемы:

console schema
root
 |-- name: string (nullable = true)
 |-- age: integer (nullable = true)

Не стоит переживать по поводу строк с выводом в консоль, как print() и printSchema(), так как в сам запрос обработки Spark они не попадут, выполнятся только в момент инициализации запроса (то есть при поступлении новых данных эти выводы отрабатываться не будут).

Добавим вывод в консоль:

console = (console
           .writeStream
           .format('console')
           .queryName('console-output'))

Для сохранения данных в Kafka, подготовим также JSON. Для этого в Spark есть функция to_json(), принимающая ColumnOrName, где можно указать поле, которое содержит структуру:

df = df.withColumn('value', f.to_json('data'))

query = (df
         .writeStream
         .format('kafka')
         .queryName('kafka-output')
         .option('kafka.bootstrap.servers', 'localhost:9092')
         .option('topic', 'output00')
         .option('checkpointLocation', './.local/checkpoint'))

Готово!

Полный код примера
def main():
    from pyspark.sql import SparkSession
    from pyspark.sql import functions as f

    from schema_example3 import schema

    spark = (SparkSession
             .builder
             .appName('streaming-kafka')
             .getOrCreate())
    spark.sparkContext.setLogLevel('WARN')

    source = (spark
              .readStream
              .format('kafka')
              .option('kafka.bootstrap.servers', 'localhost:9092')
              .option('subscribe', 'input00')
              .load())

    df = (source
          .selectExpr('CAST(value AS STRING)', 'offset'))

    df = (df
          .select(f.from_json('value', schema).alias('data')))
    print("df schema")
    df.printSchema()

    df = df.where(df['data.age'] >= 18)

    console = df.select('data.*')
    print("console schema")
    console.printSchema()

    console = (console
               .writeStream
               .format('console')
               .queryName('console-output'))

    df = df.withColumn('value', f.to_json('data'))

    query = (df
             .writeStream
             .format('kafka')
             .queryName('kafka-output')
             .option('kafka.bootstrap.servers', 'localhost:9092')
             .option('topic', 'output00')
             .option('checkpointLocation', './.local/checkpoint'))

    console.start()
    query.start().awaitTermination()


if __name__ == '__main__':
    main()

Схема данных:

from pyspark.sql import types as t

schema = t.StructType(
    [
        t.StructField('name', t.StringType(), True),
        t.StructField('age', t.IntegerType(), True),
    ],
)

Запишем три JSON строки в топик "input00":

{"name":"Ivan","age":15}
{"name":"Vladimir","age":33}
{"name":"Dmitry","age":24}

Из них только две записи проходят условие фильтрации. Вывод в консоле:

+--------+---+
|    name|age|
+--------+---+
|Vladimir| 33|
|  Dmitry| 24|
+--------+---+

Состояние топика "output00":

Состояние выходного топика Kafka
Состояние выходного топика Kafka

6) Сообщение в формате AVRO

Работа с AVRO не входит в основной модуль работы со Spark. Для этого необходимо, как и с Kafka, подтягивать дополнительный пакет при запуске приложения:

spark-submit --packages org.apache.spark:spark-sql-kafka-0-10_2.12:3.2.1,org.apache.spark:spark-avro_2.12:3.2.1 example4.py

Для сериализации и десериализации сообщений формата AVRO в Spark есть две функции: to_avro() и from_avro(), которые хранятся в модуле pyspark.sql.avro:

from pyspark.sql.avro import functions as fa

Переведём структуру данных из прошлого примера в формат AVRO и сохраним как файл *.avsc:

{
  "type": "record",
  "name": "Person",
  "namespace": "example",
  "fields": [
    {"name": "name","type": ["string", "null"]},
    {"name": "age","type": ["long", "null"]}
  ]
}

Для метода from_avro() необходимо передать колонку и схему в формате строки. Получить схему можно обычным способом из файла:

schema = open('schema_example4.avsc', 'r').read()

Относительно примера с JSON остаётся только заменить функцию десериализации колонки, где хранится сообщение (в отличии от JSON, AVRO хранится в двоичном формате, а значит нам не нужно изначально приводить колонку "value" к строковому типу):

df = (source
      .select(fa.from_avro('value', schema).alias('data')))

На выходе to_avro() указывать схему опционально. Но лучше всё же её указать, иначе могут возникнуть проблемы при десериализации (например, если порядок колонок был нарушен):

df = df.withColumn('value', fa.to_avro('data', schema))

Таким образом, мы можем сериализовать и десериализовать сообщения в формате AVRO.

Полный код примера
def main():
    from pyspark.sql import SparkSession
    from pyspark.sql.avro import functions as fa

    schema = open('schema_example4.avsc', 'r').read()

    spark = (SparkSession
             .builder
             .appName('streaming-kafka')
             .getOrCreate())
    spark.sparkContext.setLogLevel('WARN')

    source = (spark
              .readStream
              .format('kafka')
              .option('kafka.bootstrap.servers', 'localhost:9092')
              .option('subscribe', 'input00')
              .load())

    df = (source
          .select(fa.from_avro('value', schema).alias('data')))

    print("df schema")
    df.printSchema()

    df = df.where(df['data.age'] >= 18)

    console = df.select('data.*')
    print("console schema")
    console.printSchema()

    console = (console
               .writeStream
               .format('console')
               .queryName('console-output'))

    df = df.withColumn('value', fa.to_avro('data', schema))

    query = (df
             .writeStream
             .format('kafka')
             .queryName('kafka-output')
             .option('kafka.bootstrap.servers', 'localhost:9092')
             .option('topic', 'output00')
             .option('checkpointLocation', './.local/checkpoint'))

    console.start()
    query.start().awaitTermination()


if __name__ == '__main__':
    main()

Схема данных в формате AVRO:

{
  "type": "record",
  "name": "Person",
  "namespace": "com.neoflex.example",
  "fields": [
    {"name": "name","type": ["string", "null"]},
    {"name": "age","type": ["long", "null"]}
  ]
}

Для публикации сообщений в формате AVRO в Kafka был написан и приложен к примерам Producer. Все примеры из статьи доступны в репозитории.

Заключение

Цель статьи – научиться писать простейшие процессы потоковой обработки с применением фреймворка PySpark, взаимодействовать с Kafka и читать/писать сообщения в форматах JSON и AVRO. Spark позволяет использовать синтаксис SQL, благодаря чему программы получаются удобочитаемы за счёт отсутствия необходимости изучать синтаксис очередного фреймворка, ведь SQL достаточно широко известен.

Таким образом, используя брокер сообщения Kafka и PySpark Structured Streaming, вы сможете описывать свои системы обработки данных любой сложности и масштабируемости.

Автор статьи: Дмитрий Жданов, специалист бизнес-направления Fast Data компании Neoflex.

Комментарии (1)


  1. procfg
    05.07.2022 08:41

    Спасибо за статью)