image


В данный момент занимаюсь задачей стриминга (и преобразования) данных. В некоторых кругах
такой процесс известен как ETL, т.е. извлечение, преобразование и загрузка информации.


Весь процесс включает в себя участие следующих сервисов Google Cloud Platform:


  • Pub/Sub — сервис для realtime стриминга данных
  • Dataflow — сервис для преобразования данных (может
    работать как в realtime так и в batch режиме)
  • BigQuery — сервис для хранения данных в виде таблиц
    (поддерживает SQL)

0. Текущее положение дел

В данный момент имеется рабочий вариант стриминга на вышеуказанных сервисах, однако в
качестве шаблона используется один из стандартных.


Проблема заключается в том, что данный шаблон обеспечивает передачу данных 1 к 1, т.е. на
входе в Pub/Sub имеем строку формата JSON, на выходе имеем BigQuery таблицу с полями,
которые соответствуют ключам объектов на верхнем уровне входного JSON.


1. Постановка задачи

Создать Dataflow шаблон, который позволил бы на выходе получать таблицу или таблицы
согласно заданных условий. Например, мы хотим создать отдельную таблицу для каждого
значения определенного ключа входного JSON. Необходимо учесть тот факт, что некоторые
объекты входного JSON могут содержать в качестве значения вложенный JSON, т.о. необходимо
иметь возможность создавать BigQuery таблицы с полями типа RECORD для хранения вложенных
данных.


2. Подготовка к решению

Для создания Dataflow шаблона используется Apache Beam SDK, который, в свою очередь,
поддерживает Java и Python в качестве языка программирования. Надо сказать, что
поддерживается только версия Python 2.7.x, что немного меня удивило. Более того, поддержка
Java несколько шире, т.к. для Python например не доступен некоторый функционал и более
скромный список встроенных коннекторов. Кстати можно писать свои коннекторы.


Однако, в силу того, что с Java я не знаком, использовал Python.


До начала создания шаблона необходимо иметь следующее:


  1. формат входного JSON и он не должен меняться во времени
  2. схему или схемы BigQuery таблиц, в которые будут стримиться данные
  3. количество таблиц, в которые будет стримиться выходной поток данных

Заметим, что после создания шаблона и запуска на его базе Dataflow Job эти параметры можно
изменить только путем создания нового шаблона.


Скажем пару слов о данных ограничениях. Все они исходят из того, что нет возможности
создать динамический шаблон, который мог бы принимать на вход любую строку, парсить ее
согласно внутренней логике и затем наполнять динамически создаваемые таблицы с динамически
созданной схемой. Весьма вероятно, что такая возможность существует, но в рамках данных
инструментов мне не удалось воплотить в жизнь такую схему. Насколько я понимаю весь
пайплайн строится до выполнения его в runtime и в силу этого нет возможности менять его на
лету. Возможно кто-то поделится своим решением.


3. Решение

Для более полного понимания процесса стоит привести схему так называемого пайплайна
из документации Apache Beam.


image


В нашем случае (будем использовать разделение на несколько таблиц):


  • input — данные поступают из PubSub в Dataflow Job
  • Transform #1 — данные преобразуются из строки в Python словарь, на выходе имеем
    PCollection #1
  • Transform #2 — данные тегируются, для дальнейшего разделения по отдельным таблицам, на
    выходе имеем PCollection #2 (на самом деле кортеж PCollection)
  • Transform #3 — данные из PCollection #2 записываются в таблицы с использованием схем
    таблиц

В процессе написания собственного шаблона я активно вдохновлялся этими примерами.


Код шаблона с комментариями (оставил комментарии так же и от предыдущих авторов):
# coding=utf-8
from __future__ import absolute_import

import logging
import json
import os

import apache_beam as beam
from apache_beam.pvalue import TaggedOutput
from apache_beam.options.pipeline_options import PipelineOptions
from apache_beam.options.pipeline_options import SetupOptions
from apache_beam.options.pipeline_options import StandardOptions
from apache_beam.io.gcp.bigquery import parse_table_schema_from_json

# Имя GCP проекта
gcp_project = ''

# Имя Pub/Sub топика
topic_name = ''

# Pub/Sub топик в формате 'projects/ИМЯ_GCP_ПРОЕКТА/topics/ИМЯ_ТОПИКА'
input_topic = 'projects/%s/topics/%s' % (gcp_project, topic_name)

# Имя BigQuery датасета
bq_dataset = 'segment_eu_test'

# Имя папки с файлами схем таблиц
schema_dir = './'

class TransformToBigQuery(beam.DoFn):
    # Данный класс и метод отвечает за преобразование данных в формат, понятный коннектору 
    # BigQuery IO в данном случае это python dict
    def process(self, element, *args, **kwargs):
        body = json.loads(element)

        # Тут может быть любая логика преобразования данных, главное чтобы на выходе был
        # python dict и его схема совпадала с той схемой, каким тегом будем помечать 
        # данное сообщение

        yield body

class TagDataWithReqType(beam.DoFn):
    # Данный класс отвечает за тегирование потока, т.е. разделение потока согласно тегу и
    # возвращает кортеж из стольки потоков, сколько элементов в массиве переданном в 
    # методе with_outputs + default
    def process(self, element, *args, **kwargs):
        req_type = element.get('имя_ключа')

        types = (
            'type1',
            'type2',
            'type3',
        )

        if req_type in types:
            yield TaggedOutput(req_type, element)
        else:
            yield element

def run():

    # Парсим файлы со схемами таблиц вида имя_файла.json в папке schema_dir, количество 
    # этих файлов и будет количеством тегов для тегирования (разделения) потока
    schema_dct = {}
    for schema_file in os.listdir(schema_dir):
        filename_list = schema_file.split('.')

        if filename_list[-1] == 'json':
            with open('%s/%s' % (schema_dir, schema_file)) as f:
                schema_json = f.read()
                schema_dct[filename_list[0]] = 
                    json.dumps({'fields': json.loads(schema_json)})

    # We use the save_main_session option because one or more DoFn's in this
    # workflow rely on global context (e.g., a module imported at module level).
    pipeline_options = PipelineOptions()
    p = beam.Pipeline(options=pipeline_options)
    pipeline_options.view_as(SetupOptions).save_main_session = True
    pipeline_options.view_as(StandardOptions).streaming = True

    # Read from PubSub into a PCollection.
    input_stream = p | beam.io.ReadFromPubSub(input_topic)

    # Transform stream to BigQuery IO format
    stream_bq = input_stream 
        | 'transform to BigQuery' >> beam.ParDo(TransformToBigQuery())

    # Tag stream by schema name
    tagged_stream =         stream_bq         | 'tag data by type' 
        >> beam.ParDo(TagDataWithReqType()).
            with_outputs(*schema_dct.keys(), main='default')

    # Stream unidentified data to default table
    tagged_stream.default | 'push to default table' >> beam.io.WriteToBigQuery(
        '%s:%s.default' % (
            gcp_project,
            bq_dataset,
        ),
        schema=parse_table_schema_from_json(schema_dct.get('default')),
    )

    # Stream data to BigQuery tables by number of schema names
    for name, schema in schema_dct.iteritems():
        tagged_stream[name] | 'push to table %s' % name >> beam.io.WriteToBigQuery(
            '%s:%s.%s' % (
                gcp_project,
                bq_dataset,
                name),
            schema=parse_table_schema_from_json(schema),
        )

    result = p.run()
    result.wait_until_finish()

if __name__ == '__main__':
    logging.getLogger().setLevel(logging.INFO)
    logger = logging.getLogger(__name__)
    run()

Теперь пройдемся по коду и дадим пояснения, но прежде стоит сказать, что основная
сложность в написании данного шаблона в том, чтобы мыслить категорией "потока данных", а
не конкретного сообщения. Так же необходимо понимать, что Pub/Sub оперирует сообщениями и
именно из них мы будем получать информацию для тегирования потока.


pipeline_options = PipelineOptions()
p = beam.Pipeline(options=pipeline_options)
pipeline_options.view_as(SetupOptions).save_main_session = True
pipeline_options.view_as(StandardOptions).streaming = True

Т.к. Apache Beam Pub/Sub IO коннектор используется только в streaming mode необходимо
добавить PipelineOptions() (хотя по факту опции не используются) иначе создание шаблона
падает с исключением. Необходимо сказать об опциях запуска шаблона. Они могу быть
статические и так называемые "runtime". Вот ссылка на документацию по этой теме. Опции позволяют создавать шаблон не указывая заранее параметры, а передавая их при запуске Dataflow Job из шаблона, но мне так и не удалось это имплементировать, вероятно в силу того, что данный коннектор не поддерживает RuntimeValueProvider.


# Read from PubSub into a PCollection.
input_stream = p | beam.io.ReadFromPubSub(input_topic)

Тут все ясно из комментария, читаем поток из топика. Стоит добавить, что поток можно брать
как из топика так и из подписки (subscription). Если в качестве входа указан топик, то
автоматически будет создана временная подписка на данный топик. Синтаксис тоже довольно
понятный, входной поток данных beam.io.ReadFromPubSub(input_topic) направляется в наш
пайплайн p.


# Transform stream to BigQuery IO format
stream_bq = input_stream | 'transform to BigQuery' >> beam.ParDo(TransformToBigQuery())

Тут происходит Transform #1 и наши входные данные преобразуются из python string в
python dict, и на выходе мы получаем PCollection #1. В синтаксисе появляется >>. На
самом деле текст в кавычках это имя потока (должно быть уникально), а так же комментарий,
который добавится к блоку на графе в веб интерфейсе GCP Dataflow. Рассмотрим подробнее
переопределенный класс TransformToBigQuery.


class TransformToBigQuery(beam.DoFn):
    # Данный класс и метод отвечает за преобразование данных в формат, понятный коннектору
    # BigQuery IO в данном случае это python dict
    def process(self, element, *args, **kwargs):
        body = json.loads(element)

        # Тут может быть любая логика преобразования данных, главное чтобы на выходе был
        # python dict и его схема совпадала с той схемой, каким тегом будем помечать
        # данное сообщение, в результате мы должны вернуть python dict

        yield body

В переменной element будет находиться одно сообщение из подписки PubSub. Как видно из
кода, в нашем случае это должен быть валидный JSON. В классе должен быть обязательно
переопределен метод process, в котором и следует сделать необходимые преобразования
входной строки, чтобы получить на выходе данные, которые будут соответствовать схеме
таблицы в которую эти данные будут загружены. Т.к. наш поток в данном случае является
непрерывным, unbounded в терминах Apache Beam, то необходимо возвращать его с помощью
yield, а не return, как для конечного потока данных. В случае конечного потока можно
(и нужно) дополнительно настроить windowing и triggers


# Tag stream by schema name
tagged_stream =     stream_bq     | 'tag data by type' 
    >> beam.ParDo(TagDataWithReqType()).with_outputs(*schema_dct.keys(), main='default')   

Данный код направляет PCollection #1 на Transform #2 где будет происходить тегирование
(разделение) потока данных. В переменной schema_dct в данном случае словарь, где ключ — имя файла схемы без расширения, это и будет тегом, а значение — валидный JSON схемы
таблицы BigQuery для данного тега. Стоит отметить, что схема должна быть передана именно в
виде {'fields': СХЕМА} где СХЕМА это схема таблицы BigQuery в виде JSON (можно
экспортировать из веб интерфейса).


main='default' это имя тэга потока, в который пойдут
все сообщения, не подпадающие под условия тегирования. Рассмотрим класс
TagDataWithReqType.


class TagDataWithReqType(beam.DoFn):
    # Данный класс отвечает за тегирование потока, т.е. разделение потока согласно тегу и
    # возвращает кортеж из стольки потоков, сколько элементов в массиве переданном в
    # методе with_outputs + default
    def process(self, element, *args, **kwargs):
        req_type = element.get('имя_ключа')

        types = (
            'type1',
            'type2',
            'type3',
        )

        if req_type in types:
            yield TaggedOutput(req_type, element)
        else:
            yield element

Как видно, тут тоже переопределен класс process. В переменной types содержатся имена
тегов и они должны совпадать по количеству и имени с количеством и именами ключей словаря
schema_dct. Хотя у метода process есть возможность принимать аргументы, я так и не
смог их передать. Причину пока не выяснял.


На выходе получим кортеж потоков в количестве тегов, а именно количество наших
предопределенных тегов + дефолтный поток, который тегировать не удалось.


# Stream unidentified data to default table
tagged_stream.default | 'push to default table' >> beam.io.WriteToBigQuery(
    '%s:%s.default' % (
        gcp_project,
        bq_dataset,
    ),
    schema=parse_table_schema_from_json(schema_dct.get('default')),
)

Transform #… (на самом деле на схеме его нет, это "ответвление") — пишем дефолтный поток
в дефолтную таблицу.


tagged_stream.default — берется поток с тегом default, альтернативный синтаксис — tagged_stream['default']


schema=parse_table_schema_from_json(schema_dct.get('default')) — тут определяется схема
таблицы. Необходимо учесть, что файл default.json с валидной схемой BigQuery таблицы
должен находиться в текущей schema_dir = './' директории.


Поток попадет в таблицу с именем default.


Если таблица с таким именем (в данном датасете данного проекта) не существует, то она
будет автоматически создана из схемы благодаря параметру по умолчанию
create_disposition=BigQueryDisposition.CREATE_IF_NEEDED


# Stream data to BigQuery tables by number of schema names
for name, schema in schema_dct.iteritems():
    tagged_stream[name] | 'push to table %s' % name >> beam.io.WriteToBigQuery(
        '%s:%s.%s' % (
            gcp_project,
            bq_dataset,
            name),
        schema=parse_table_schema_from_json(schema),
    )

Transform #3, тут должно быть все понятно тем, кто читал статью с самого начала и владеет
синтаксисом python. Разделяем кортеж потоков циклом и каждый поток пишем в свою таблицу со
своей схемой. Следует напомнить, что имя потока должно быть уникально — '%s:%s.%s' % (gcp_project, bq_dataset, name).


Теперь должно быть понятно как это работает и можно создавать шаблон. Для этого нужно
выполнить в консоли (не забываем активировать venv в случае его наличия) или из IDE:


python ИМЯ_ФАЙЛА.py /
    --runner DataflowRunner /
    --project dreamdata-test /
    --staging_location gs://STORAGE_NAME/STAGING_DIR /
    --temp_location gs://STORAGE_NAME/TEMP_DIR /
    --template_location gs://STORAGE_NAME/TEMPLATES_DIR/TEMPLATE_NAME

При этом должен быть организован доступ к Google Account, например через экспорт
переменной окружения GOOGLE_APPLICATION_CREDENTIALS или другим способом.


Пару слов о --runner. В данном случае DataflowRunner говорит о том, что данный код
будет запускаться как шаблон для Dataflow Job. Еще есть возможность указать
DirectRunner, он будет использован по умолчанию при отсутсвии опции --runner и код
будет работать как Dataflow Job, но локально, что очень удобно для отладки.


Если не возникло ошибок, то gs://STORAGE_NAME/TEMPLATES_DIR/TEMPLATE_NAME и будет
созданным шаблоном. Стоит сказать, что в gs://STORAGE_NAME/STAGING_DIR так же запишутся
служебные файлы, которые необходимы для успешной работы Datafow Job созданного на базе
шаблона и удалять их не нужно.


Далее необходимо создать Dataflow Job с использованием данного шаблона, вручную или любым
другим способом (CI например).


4. Выводы

Таким образом удалось организовать стриминг потока из PubSub в BigQuery применяя
необходимые трансформации данных для целей дальнейшего хранения, преобразования и
использования данных.


Основные ссылки



В данной статье возможны неточности и даже ошибки, буду благодарен за конструктивную
критику. В конце хочется добавить, что на самом деле здесь использованы далеко не все
возможности Apache Beam SDK, но такой цели и не стояло.

Комментарии (0)