В данной статье делимся опытом внедрения решения на базе СУБД ClickHouse и сервисов Yandex Cloud. Мы не коснёмся тонких настроек ClickHouse или его масштабирования, но затронем достаточно интересные на наш взгляд темы:

  • как загружать данные из On-premise в облачный ClickHouse с использованием сервисов Yandex Cloud – Functions, Object Storage, Message Queue;

  • как обрабатывать/преобразовывать данные в облачном ClickHouse – очищать и строить витрины; какие «подводные камни» нам встретились на этом пути.

Исходная задача:

В On-premise в общую директорию с некоторой периодичностью поступают XML-файлы с массивами объектов. Каждый объект имеет ключ и содержит вложенные объекты/массивы объектов.

Объекты в файлах могут дублироваться: с одним и тем же ключом может поступить несколько как в одном, так и в нескольких разных XML-файлах. Имена файлов содержат timestamp их формирования в системе-источнике, но в общую директорию они могут поступать в любом порядке, не привязанному к этому timestamp.

Необходимо:

  • данные из XML-файлов сохранять в ClickHouse в исходной структуре (с вложенными массивами, объектами и т.д.) с минимальным отставанием по времени, т.е. данные в ClickHouse должны появляться/обновляться сразу после поступления файлов в общую директорию в On-premise;

  • данные «очищать» от дублей: среди объектов с одинаковым ключом нужно оставлять тот, у которого более ранний timestamp в имени файла;

  • строить/обновлять витрины, которые в данном случае представляют собой таблицы с определёнными агрегациями и преобразованиями исходных данных.

Теперь рассмотрим детальнее.

Данные из XML-файлов необходимо сохранять в ClickHouse в исходной структуре с минимальным отставанием по времени.

Пройдя интересный курс обучения «Построение корпоративной аналитической платформы» (кстати, рекомендуем тем, кто начинает изучать облачные технологии и ClickHouse) и увидев фишки вроде интеграции с Kafka, мы предположили, что для ClickHouse или его cli-клиента нет проблем распарсить XML-файл и разложить данные по столбцам. Но оказалось, что такое ClickHouse пока не умеет. Придётся самим парсить XML и делать вставку в ClickHouse.

Немного поспорили о том, что лучше использовать в данной ситуации. Выбирали между Airflow и сервисами Yandex Cloud, в итоге пришли ко второму, а именно — бóльшую часть логики решили сделать на Yandex Cloud Functions. Причиной тому послужили следующие факторы:

  • ClickHouse будет развернут в облаке Yandex Cloud как управляемый сервис, удобно иметь программную обработку данных в том же облаке рядом с БД;

  • Yandex Cloud Functions имеют нетарифицируемый объем (так называемый «free tier»), что удешевляет разработку, тестирование функционала и дальнейшую эксплуатацию. Не потребуется содержать в On-premise сервер с Airflow (а может и несколько серверов);

  • В Yandex Cloud много полезных сервисов, с которыми легко интегрируются Yandex Cloud Functions: Yandex Object Storage, Yandex Message Queue, Yandex DB и другие. Они также имеют свой «free tier»;

  • Легкость масштабирования Yandex Cloud Functions: мы становимся практически не ограничены в вычислительных ресурсах и параллельности;

  • Вся облачная инфраструктура описывается в Terraform (у Yandex Cloud есть provider) – так минимизируем затраты на развертывание, CICD/DevOps.

Эту часть задачи мы реализовали по следующей принципиальной схеме:

В On-premise запущен скрипт (его подробно не описываем, т.к. он очень простой), который непрерывно поллит общую директорию и каждый поступивший в неё XML-файл загружает (1) в Бакет Yandex Object Storage. Окончание загрузки файла в Бакет служит сигналом для запуска (2) функции «Диспетчер», которая кладёт (3) сообщение с информацией о файле в очередь, таким образом происходит троттлинг (сдерживание) нагрузки. Дальше функция «Парсер» читает (4) из очереди сообщения, скачивает файл из Бакета себе в оперативную память, парсит XML и вставляет (5) данные в хранилище. По окончанию обработки файл из Бакета удаляется (6), чтобы не занимать место и не тратить деньги впустую.

В ClickHouse все данные сохраняются в таблицу со структурой аналогичной схеме XML из файла. Всё благодаря богатому списку типов данных ClickHouse.

Пример реализации функции «Парсер» на Python в облаке:

# import зависимостей
import boto3

# подключение к облаку
boto_session = boto3.session.Session(
    aws_access_key_id=os.getenv('ACCESS_KEY_ID'),
    aws_secret_access_key=os.getenv('SECRET_ACCESS_KEY')
)

# подключение к объектному хранилищу
s3 = boto_session.client(
    service_name='s3',
    endpoint_url='https://storage.yandexcloud.net'
)

# подключение к ClickHouse
clickhouse = Client(host=os.getenv('CH_HOST'),
                    user=os.getenv('CH_USER'),
                    password=os.getenv('CH_PASSWORD'),
                    database=os.getenv('CH_DB'),
                    secure=True,
                    settings={'input_format_import_nested_json': 1})

# основной обработчик событий
# определяет, что вызов произошёл по сообщению из очереди,
# из тела сообщения берётся идентификатор файла,
# файл скачивается, парсится, данные вставляются в ClickHouse
def handler(event, context):
    if 'messages' in event:
        for message in event['messages']:
            # получение текста сообщения:
            task_json = json.loads(message['details']['message']['body'])
            
            # получение объекта из S3:
            get_object_response = s3.get_object(Bucket=task_json['bucket_id'], 
                                        Key=task_json['object_id'])
                                        
            # парсинг файла:
            parsed_data = parse(data=xmltodict.parse(get_object_response['Body'].read()), 
                                        filename=task_json['object_id'])
                                        
            # сохранение данных:
            store(data=parsed_data, file=task_json['object_id'])
            
            # удаление обработанного файла:
            s3.delete_object(Bucket=task_json['bucket_id'], Key=task_json['object_id'])
            
            
def parse(data, filename):
    # ... реализация парсинга
    
def store(data, filename):
    # ... реализация сохранения в ClickHouse

Декларация облачной инфраструктуры на Terraform:

# Подключение к Yandex cloud и хранение состояний в Yandex Object Storage
terraform {
  required_providers {
    yandex = {
      source = "yandex-cloud/yandex"
    }
  }
  backend "s3" {
    endpoint   = "storage.yandexcloud.net"
    bucket     = "neoflex-tf-state"
    region     = "ru-central1"
    key        = "terraform.tfstate"
    access_key = "xxx"
    secret_key = "xxx"

    skip_region_validation      = true
    skip_credentials_validation = true
  }
}

provider "yandex" {
  service_account_key_file = "key.json"
}

locals {
  folder_id    = "xxx"
  ch_secrets   = jsondecode(file("clickhouse.json"))
  cluster_name = "neoflex"
  network_id   = "xxx"
  subnet_id    = "xxx"
}

# Развертывание ClickHouse
resource "yandex_mdb_clickhouse_cluster" "neoflex" {
  name                    = local.cluster_name
  environment             = "PRESTABLE"
  network_id              = local.network_id
  folder_id               = local.folder_id
  service_account_id      = yandex_iam_service_account.sa.id
  sql_database_management = false
  sql_user_management     = false

  clickhouse {
    resources {
      resource_preset_id = "s2.large"
      disk_type_id       = "network-ssd"
      disk_size          = 100
    }
  }

  database {
    name = local.ch_secrets["CH_DB"]
  }

  user {
    name     = local.ch_secrets["CH_USER"]
    password = local.ch_secrets["CH_PASSWORD"]
    permission {
      database_name = local.ch_secrets["CH_DB"]
    }
    settings {
      max_memory_usage_for_user = 17179869184
      max_memory_usage          = 17179869184
    }
  }

  host {
    type             = "CLICKHOUSE"
    zone             = "ru-central1-a"
    subnet_id        = local.subnet_id
    assign_public_ip = true
  }

  maintenance_window {
    type = "WEEKLY"
    hour = 3
    day  = "MON"
  }
}

# Бакет для приёма файлов в облаке
resource "yandex_storage_bucket" "data-input" {
  access_key = yandex_iam_service_account_static_access_key.sa-static-key.access_key
  secret_key = yandex_iam_service_account_static_access_key.sa-static-key.secret_key
  bucket     = "neoflex-data-input"
}

# Очередь для троттлинга и dead letter queue
resource "yandex_message_queue" "dlq" {
  name = "dlq"

  access_key = yandex_iam_service_account_static_access_key.sa-static-key.access_key
  secret_key = yandex_iam_service_account_static_access_key.sa-static-key.secret_key
}

resource "yandex_message_queue" "neoflex" {
  name                       = "neoflex"
  visibility_timeout_seconds = 600
  receive_wait_time_seconds  = 20
  delay_seconds              = 0
  message_retention_seconds  = 86400 # 1 day

  redrive_policy = jsonencode({
    deadLetterTargetArn = yandex_message_queue.dlq.arn
    maxReceiveCount     = 3
  })

  access_key = yandex_iam_service_account_static_access_key.sa-static-key.access_key
  secret_key = yandex_iam_service_account_static_access_key.sa-static-key.secret_key
}

# Сервисная учётка для подключения к облачным ресурсам
resource "yandex_iam_service_account" "sa" {
  folder_id   = local.folder_id
  name        = "neoflex"
  description = "Neoflex service account to access resource in the folder"
}

resource "yandex_resourcemanager_folder_iam_member" "sa-editor" {
  folder_id = local.folder_id
  role      = "storage.admin"
  member    = "serviceAccount:${yandex_iam_service_account.sa.id}"
}

resource "yandex_iam_service_account_static_access_key" "sa-static-key" {
  service_account_id = yandex_iam_service_account.sa.id
  description        = "Static access key for object storage"
}

# Функция Диспетчер и триггер её запуска при поступлении файла в Бакет
data "archive_file" "dispatcher" {
  type        = "zip"
  output_path = "${path.module}/../packages/dispatcher.zip"

  source {
    content  = file("${path.module}/../src/dispatcher.py")
    filename = "dispatcher.py"
  }

  source {
    content  = file("${path.module}/../src/requirements.txt")
    filename = "requirements.txt"
  }
}

resource "yandex_function" "dispatcher" {
  name               = "dispatcher"
  description        = ""
  user_hash          = data.archive_file.dispatcher.output_base64sha256
  runtime            = "python39"
  entrypoint         = "dispatcher.handler"
  memory             = "128"
  execution_timeout  = "10"
  service_account_id = yandex_iam_service_account.sa.id
  folder_id          = local.folder_id
  environment = {
    YMQ_QUEUE_URL     = yandex_message_queue.neoflex.id
    ACCESS_KEY_ID     = yandex_iam_service_account_static_access_key.sa-static-key.access_key
    SECRET_ACCESS_KEY = yandex_iam_service_account_static_access_key.sa-static-key.secret_key
  }
  content {
    zip_filename = data.archive_file.dispatcher.output_path
  }
}

resource "yandex_function_scaling_policy" "dispatcher" {
  function_id = yandex_function.dispatcher.id
  policy {
    tag                  = "$latest"
    zone_instances_limit = 4
    zone_requests_limit  = 150
  }
}

resource "yandex_function_trigger" "data-dispatcher" {
  name        = "data-dispatcher"
  description = "Triggered dispatcher when new file is available on S3"
  folder_id   = local.folder_id
  object_storage {
    bucket_id = yandex_storage_bucket.data-input.id
    prefix    = "some-input-file"
    create    = true
    update    = true
  }

  function {
    id                 = yandex_function.dispatcher.id
    service_account_id = yandex_iam_service_account.sa.id
  }
  dlq {
    queue_id           = yandex_message_queue.dlq.arn
    service_account_id = yandex_iam_service_account.sa.id
  }
}

# Функция Процессор и триггер её запуска по сообщению из очереди Yandex Message Queue
data "archive_file" "processor" {
  type        = "zip"
  output_path = "${path.module}/../packages/processor.zip"

  source {
    content  = file("${path.module}/../src/processor.py")
    filename = "processor.py"
  }

  source {
    content  = file("${path.module}/../src/requirements.txt")
    filename = "requirements.txt"
  }
}

resource "yandex_function" "processor" {
  name               = "processor"
  description        = ""
  user_hash          = data.archive_file.processor.output_base64sha256
  runtime            = "python39"
  entrypoint         = "processor.handler"
  memory             = "512"
  execution_timeout  = "600"
  service_account_id = yandex_iam_service_account.sa.id
  folder_id          = local.folder_id
  environment = {
    ACCESS_KEY_ID     = yandex_iam_service_account_static_access_key.sa-static-key.access_key
    SECRET_ACCESS_KEY = yandex_iam_service_account_static_access_key.sa-static-key.secret_key
    CH_DB             = yandex_mdb_clickhouse_cluster.neoflex.database.*.name[0]
    CH_HOST           = yandex_mdb_clickhouse_cluster.neoflex.host[0].fqdn
    CH_PASSWORD       = yandex_mdb_clickhouse_cluster.neoflex.user.*.password[0]
    CH_RAW_TABLE      = local.ch_secrets["CH_RAW_TABLE"]
    CH_USER           = yandex_mdb_clickhouse_cluster.neoflex.user.*.name[0]
    DATA_BUCKET       = yandex_storage_bucket.data-input.id
  }
  content {
    zip_filename = data.archive_file.processor.output_path
  }
}

resource "yandex_function_scaling_policy" "processor" {
  function_id = yandex_function.processor.id
  policy {
    tag                  = "$latest"
    zone_instances_limit = 4
    zone_requests_limit  = 150
  }
}

resource "yandex_function_trigger" "processor" {
  name        = "processor"
  description = ""
  folder_id   = local.folder_id
  message_queue {
    queue_id           = yandex_message_queue.neoflex.arn
    service_account_id = yandex_iam_service_account.sa.id
    batch_size         = "1"
    batch_cutoff       = "10"
  }
  function {
    id                 = yandex_function.processor.id
    service_account_id = yandex_iam_service_account.sa.id
  }
  dlq {
    queue_id           = yandex_message_queue.dlq.arn
    service_account_id = yandex_iam_service_account.sa.id
  }
}

Осуществили загрузку данных в облачный ClickHouse сделали. Теперь данные необходимо «очищать» от дублей.

Какие варианты мы рассматривали при подборе решения этого пункта задачи:

  • Первый вариант: при парсинге XML-файла и перед вставкой данных в ClickHouse «на лету» проверять – не будет ли дублей и делать insert/upsert в зависимости от ситуации. Не подходит, так как ClickHouse – не OLTP, а OLAP и об этом явно сказано в документации. Обновление данных в таблице реализовано как мутация – достаточно тяжеловесная операция, которую не удастся выполнять часто/много/быстро. Зато ClickHouse очень эффективно работает при вставке (рекомендуем к прочтению статью-benchmark Faster ClickHouse Imports).

  • Второй вариант – специальный движок таблиц ReplacingMergeTree, который предназначен как раз для дедубликации данных по заданному ключу. Не подходит по двум причинам:

  1. На «очищенных» (дедублицированных) данных нужно строить и обновлять витрины в режиме, близком к online, а движок ReplacingMergeTree осуществляет манипуляции с данными в фоновом режиме и в неопределенный момент времени. Поэтому есть риск собрать витрину на задублированных данных, а требования к витринам высокие, поэтому результаты должны быть точными;

  2. ReplacingMergeTree предлагает дедубликацию по ключу сортировки (секция ORDER BY в DLL таблицы), которая в данном случае не применима, так как в рамках задачи нужно не просто удалять дубли по ключу, а выбирать из дублей подходящую строку (исходя из timestamp в имени файла);

  • Третий вариант, на котором мы и остановились, – загружать в буферную таблицу всё, что поступило в Бакет и дальше осуществлять merge буферной в «чистовую» таблицу. Этот вариант мы и опишем далее.

Буферная таблица, в которую вставляет «сырые» данные функция «Парсер», имеет поле со временем вставки, партиционирование по этому полю и срок жизни записей:

DROP TABLE IF EXISTS raw;
CREATE TABLE raw
(
	...
    insertTime DateTime('Europe/Moscow') DEFAULT now()
 
) ENGINE = MergeTree()
PARTITION BY toYYYYMMDD(insertTime)
ORDER BY (...)
TTL toStartOfDay(insertTime) + INTERVAL 1 DAY;

Вообще, TTL в ClickHouse – отличная фича, которая одной строкой в DDL таблицы избавляет нас от самостоятельной организации очистки таблиц (от написания кастомных джобов, их контроля, запуска и мониторинга).

Используя Yandex Cloud Functions Trigger типа Timer, по расписанию запускается функция «Очиститель», которая сравнивает «сырые» и «чистовые» данные, определяет, что в сырых данных есть что-то новенькое и пора обновить чистовые данные. Схематично это выглядит так:

Примерно таким запросом выбираются партиции (2), которые требуют merge:

WITH rawParts AS (
	SELECT partKey, MAX(insertTime) maxInsertTime
	FROM raw
	WHERE insertTime > toStartOfMinute(now()) - INTERVAL 1 HOUR
	GROUP BY partKey),
cleanParts AS (
	SELECT partKey, MAX(insertTime) maxInsertTime
	FROM clean
	WHERE partKey IN (SELECT origOperDay FROM rawParts)
	GROUP BY partKey)
SELECT DISTINCT toYYYYMMDD(rawParts.partKey) AS partKey FROM rawParts LEFT JOIN parts USING partKey
WHERE rawParts.maxInsertTime > parts.maxInsertTime
ORDER BY partKey;

Фактически этот запрос получает из буферной таблицы список пополненных данными партиций за последний час и по времени самой свежей вставки сравнивает с партициями «чистовой» таблицы. Если обнаружено расхождение (есть подозрение на «свежие» данные, которые отсутствуют в «чистовой» таблице), то такие партиции попадают в результат запроса. В реальном случае выборка может быть гораздо сложнее. Функция «Очиститель», выполнив запрос (2), складывает результат построчно в очередь для отдельного «попартиционного» merge (3).

Сам «попартиционный» merge (5) в простейшем случае можно сделать следующим рядом запросов:

-- Предварительная очистка промежуточной таблицы:
ALTER TABLE tmp DROP PARTITION {partKey};

-- Вставка в промежуточную таблицу накопленных, свежих и сразу очищенных данных:
INSERT INTO tmp
SELECT *
FROM (
	SELECT * FROM raw r WHERE partkey = {partKey}
	UNION ALL
	SELECT * FROM clean WHERE partkey = {partKey}
	)
ORDER BY field1, field2, field3, ...
LIMIT 1 BY field1, field2, field3, ...;

-- Удаление партиции в чистовой таблице:
ALTER TABLE clean DROP PARTITION {partKey};

-- Перемещение партиции из промежуточной в чистовую таблицу:
ALTER TABLE tmp MOVE PARTITION {partKey} TO TABLE clean;

Данные сначала вставляются в промежуточную таблицу, а затем происходит подмена партиции в «чистовой» таблице, что делает весьма незаметным и быстрым её обновление.

Стоит отметить, что при вставке в промежуточную таблицу используется подзапрос, а с этим делом в ClickHouse не всё так уж просто: результат подзапроса должен помещаться в оперативную память одного сервера, ограниченную параметром max_memory_usage (или max_memory_usage_for_user). Если результат подзапроса не поместится в память, тогда весь запрос завершится с ошибкой:

DB::Exception: Memory limit (for query) exceeded

Поэтому для работоспособности такого решения потребовалось подобрать оптимальное сочетание объема одной партиции (читай – ключ партиционирования) и max_memory_usage (читай – объем оперативной памяти у хоста ClickHouse с учётом параллельности подобных запросов).

Упрощенный пример кода функции «Очиститель»:

# импорт используемых модулей
# import ...

# подключение к облаку
boto_session = boto3.session.Session(
    aws_access_key_id=os.getenv('ACCESS_KEY_ID'),
    aws_secret_access_key=os.getenv('SECRET_ACCESS_KEY')
)

# подключение к ClickHouse
clickhouse = Client(host=os.getenv('CH_HOST'),
                    user=os.getenv('CH_USER'),
                    password=os.getenv('CH_PASSWORD'),
                    database=os.getenv('CH_DB'),
                    secure=True)

# подключение к очереди Yandex Queue для отправки сообщений
ymq_queue = boto_session.resource(
    service_name='sqs',
    endpoint_url='https://message-queue.api.cloud.yandex.net',
    region_name='ru-central1'
).Queue(os.getenv('YMQ_QUEUE_URL'))


# основной обработчик событий
def handler(event, context):
    try:
        if event is not None and type(event) == dict and 'messages' in event:
            for message in event['messages']:
                if 'event_metadata' in message and \
                        'event_type' in message['event_metadata'] and \
                        message['event_metadata'][
                            'event_type'] == 'yandex.cloud.events.serverless.triggers.TimerMessage':
                    # запуск по событию, инициированному триггером с типом Timer
                    # поиск партиций для merge и отправка заданий в очередь
                    queue_partitions()
                else:
                    # запуск по сообщению из очереди
                    task_json = json.loads(message['details']['message']['body'])
                    partKey = str(task_json['partKey'])
                    # merge конкретной партиции
                    do_partition(partKey=partKey)

    except Exception as ex:
        logging.error(ex)

# поиск партиций
def queue_partitions():
    # запрос поиска партиций для merge (см. выше пример)
    query = '...' 
    rows_gen = clickhouse.execute_iter(query)
    # отправка каждой партиции в очередь отдельным сообщением
    for partKey in rows_gen:
        ymq_queue.send_message(MessageBody=json.dumps({
            'partKey': partKey[0]
        }))

# merge конкретной партиции (см. последовательность запросов выше)
def do_partition(partKey):
    # ...

Так это описывается на Terraform (декларация в добавок к той, что представлена ранее):

# Функция "Очиститель"
data "archive_file" "cleaner" {
  type        = "zip"
  output_path = "${path.module}/../packages/cleaner.zip"

  source {
    content  = file("${path.module}/../src/cleaner.py")
    filename = "cleaner.py"
  }

  source {
    content  = file("${path.module}/../src/requirements.txt")
    filename = "requirements.txt"
  }
}

resource "yandex_function" "cleaner" {
  name               = "cleaner"
  description        = ""
  user_hash          = data.archive_file.cleaner.output_base64sha256
  runtime            = "python39"
  entrypoint         = "cleaner.handler"
  memory             = "256"
  execution_timeout  = "300"
  service_account_id = yandex_iam_service_account.sa.id
  folder_id          = local.folder_id
  environment        = {
    CH_DB             = yandex_mdb_clickhouse_cluster.neoflex.database.*.name[0]
    CH_HOST           = yandex_mdb_clickhouse_cluster.neoflex.host[0].fqdn
    CH_PASSWORD       = yandex_mdb_clickhouse_cluster.neoflex.user.*.password[0]
    CH_USER           = yandex_mdb_clickhouse_cluster.neoflex.user.*.name[0]
    ACCESS_KEY_ID     = yandex_iam_service_account_static_access_key.sa-static-key.access_key
    SECRET_ACCESS_KEY = yandex_iam_service_account_static_access_key.sa-static-key.secret_key
    YMQ_QUEUE_URL     = yandex_message_queue.cleaner.id
    YANDEX_FOLDER_ID  = local.folder_id
    YDB_ENDPOINT      = yandex_ydb_database_serverless.neoflex.document_api_endpoint
  }
  content {
    zip_filename = data.archive_file.cleaner.output_path
  }
}

resource "yandex_function_scaling_policy" "cleaner" {
  function_id = yandex_function.cleaner.id
  policy {
    tag                  = "$latest"
    zone_instances_limit = 4
    zone_requests_limit  = 150
  }
}

# Триггер-таймер для запуска функции по расписанию
resource "yandex_function_trigger" "cleaner_cron" {
  name        = "cleaner-cron"
  description = ""
  folder_id   = local.folder_id
  timer {
    cron_expression = "*/5 * * * ? *"
  }
  function {
    id                 = yandex_function.cleaner.id
    service_account_id = yandex_iam_service_account.sa.id
  }
}

# Триггер для запуска функции по сообщению из очереди
resource "yandex_function_trigger" "cleaner_queue" {
  name        = "cleaner-queue"
  description = ""
  folder_id   = local.folder_id
  message_queue {
    queue_id           = yandex_message_queue.cleaner.arn
    service_account_id = yandex_iam_service_account.sa.id
    batch_size         = "1"
    batch_cutoff       = "10"
  }
  function {
    id                 = yandex_function.cleaner.id
    service_account_id = yandex_iam_service_account.sa.id
  }
}

# Очередь для сообщений с именами партиций, которые будут пересобираться
resource "yandex_message_queue" "cleaner" {
  name                       = "partitions2merge"
  visibility_timeout_seconds = 600
  receive_wait_time_seconds  = 20
  delay_seconds              = 0
  message_retention_seconds  = 86400 # 1 day

  redrive_policy = jsonencode({
    deadLetterTargetArn = yandex_message_queue.dlq.arn
    maxReceiveCount     = 3
  })

  access_key = yandex_iam_service_account_static_access_key.sa-static-key.access_key
  secret_key = yandex_iam_service_account_static_access_key.sa-static-key.secret_key
}

Итак, данные в облако загрузили и очистили, теперь на основе очищенных данных необходимо строить ряд витрин с агрегациями.

Тут схема, процесс и реализация абсолютно идентичны предыдущей части – есть исходная таблица и нужно на её данных строить/обновлять витрину. Стоить отметить только один нюанс, с которым мы столкнулись, –  это использование массивов и вложенных таблиц в ClickHouse.

Например, у нас есть исходная таблица со следующей структурой:

CREATE TABLE clean
(
	id Int64,
	positions Nested(
        id UInt16,
        value1 String,
		value2 String
    ),
	insertTime DateTime('Europe/Moscow') DEFAULT now()
) ENGINE = MergeTree()
PARTITION BY toYYYYMMDD(insertTime)
ORDER BY (id);

Мы хотим выбрать все данные в плоском виде и делаем такой запрос:

SELECT id, p.id, p.value1, p.value2 
FROM clean ARRAY JOIN positions AS p

Это работает.

Дальше, например, у нас есть другая таблица с маппингами:

CREATE TABLE mappings
(
	from String,
	to String
) ENGINE = MergeTree()
ORDER BY (from);

И мы хотим применить маппинг к одному полю нашей таблицы с чистовыми данными:

SELECT p.value1, m.to value1_to
FROM clean ARRAY JOIN positions AS p
LEFT JOIN mappings AS m ON p.value1 = m.from;

Это тоже отлично работает.

Но если мы захотим сделать еще один маппинг по другому полю:

SELECT p.value1, m1.to value1_to, p.value2, m2.to value2_to
FROM clean ARRAY JOIN positions AS p
LEFT JOIN mappings AS m1 ON p.value1 = m1.from
LEFT JOIN mappings AS m2 ON p.value2 = m2.from

получим ошибку:

DB::Exception: Multiple JOIN does not support mix with ARRAY JOINs

Да, есть такое ограничение. И обойти его можно, используя подзапрос:

SELECT p.value1, value1_to, p.value2, m2.to value2_to FROM
	(
		SELECT p.value1, m1.to value1_to, p.value2
		FROM clean ARRAY JOIN positions AS p
		LEFT JOIN mappings AS m1 ON p.value1 = m1.from
	) t
LEFT JOIN mappings AS m2 ON p.value2 = m2.from

Но, как отмечено ранее, подзапросы накладывают существенные ограничения прежде всего на объем обрабатываемых запросом данных. И, если в исходной таблице будут триллионы строк, то такой запрос вряд ли сможет быть выполнен. Придётся упрощать исходную таблицу (чтобы не делать два раза JOIN), либо упрощать целевую таблицу (отказаться от дополнительного маппинга), либо обрабатывать исходную таблицу частями (как, например, сделано при «очистке» сырых данных) и искать оптимальное сочетание вышеуказанных параметров решения.

Заключение

В ClickHouse достаточно много различных  ограничений, в том числе синтаксических.  До поддержки полноценного SQL он, к сожалению, пока не дотягивает. Тот же ARRAY JOIN в одном запросе/подзапросе можно использовать только один раз.

При проектировании решений с использованием ClickHouse мы рекомендуем сразу детально продумывать все структуры таблиц, необходимых сквозному процессу: от таблиц с сырыми данных до конечных витрин, с учётом различных ограничений ClickHouse.

Чтобы получать финишные данные за наименьшее количество итераций с наименьшим количеством промежуточных таблиц и подзапросов, иногда стоит отказаться от оригинальной структуры исходных данных в виде массивов или вложенных таблиц, Иногда будет быстрее, например, применить маппинг или сделать дополнительное  вычисление еще на уровне обработки сырых данных перед вставкой в ClickHouse.

В общем, чем проще и полнее исходные данные (таблицы с данными), тем удобнее и дешевле в дальнейшем с ними работать и тем больше прелестей Full Scan открываются у ClickHouse.

Что касается плюсов применения ClickHouse, как нам видится, эта СУБД лучше всего пригодна в кейсах, когда в режиме online нужно сохранять колоссальные объёмы структурированных данных и иметь возможность из этих данных быстро получать несложные выборки (хотя бы без подзапросов и JOIN с крупными таблицами).

Вставка огромных объемов данных в ClickHouse – не проблема, как и простая выборка (с агрегацией) из большой таблицы, ведь изначально под такой тип задач ClickHouse и создавался в Yandex.Metrica. Кроме того, поддержка своеобразного SQL, хоть и весьма ограниченного, и классических способов подключения JDBC/ODBC является огромным плюсом.

Комментарии (6)


  1. f1user
    16.09.2022 13:16
    -2

    Спасибо, все по делу.


  1. Volrath
    16.09.2022 13:16

    А вот это точно правдиво?

    ReplacingMergeTree предлагает дедубликацию по ключу сортировки (секция ORDER BY в DLL таблицы), которая в данном случае не применима, так как в рамках задачи нужно не просто удалять дубли по ключу, а выбирать из дублей подходящую строку (исходя из timestamp в имени файла)

    timestamp из названия файла нельзя парсером подтащить в виде значения в поле insertTime? или просто использовать MATERIALIZE колонку с таймстемпом вставки и её потом использовать как аргумент. Да и без аргумента replacing движок оставляет только смую последнюю вставленную строку. Пробблема движка в том только, что удаляет он не сразу и при выборке из таблицы дубли могут ещё оставаться и их надо чистить селектом.

    Ну или я не так понял задачу))

    И одна опечатка -

    -- Перемещение партиции из промежуточной в чистовую таблицу:

    ALTER TABLE clean MOVE PARTITION {partKey} TO TABLE clean;

    Тут всё таки из tmp в clean, а не из clean в clean.

    В остальном очень интересно, спасибо.


    1. neoflex Автор
      16.09.2022 13:21

      Благодарим за обратную связь!

      > timestamp из названия файла нельзя парсером подтащить в виде значения в поле insertTime?

      Нет, всё-таки insertTime отражает время фактической вставки в таблицу с "сырыми" данными. По этому полю потом таблица очищается по TTL. Может так выйти, что в обработку попадёт файл с timestamp в имени = неделя назад (а TTL = 3 суток) и данные из этого файла уйдут в молоко.
      К слову, парсер вставляет timestamp из имени файла в отдельное поле (скажем, field3) и потом "Очиститель" делает вот так при сборке чистой партиции:
      ORDER BY field1, field2, field3, ...
      LIMIT 1 BY field1, field2, field3, ...;

      > Да и без аргумента replacing движок оставляет только самую последнюю вставленную строку.

      Нам как раз нужно было оставлять не самую свежую строку, а ту, у которой timestamp в имени файла самый ранний. При этом файлы могут поступать в обработку в любом порядке, т.е. не привязанном к timestamp из имени ...

      Опечатку поправили, спасибо за внимательность)


      1. Volrath
        16.09.2022 17:04

        Самую первую можно оставлять с "инвертированной" датой. Вычитать из какого-то таймстемпа в UInt64 в далёком будущем текущий и получится, что у каждой новой записи аргумент будет меньше, чем у самой первой. И она удалится.


  1. JohnSelfiedarum
    16.09.2022 15:16

    Почему, собственно, не ReplicatedMergeTree?


    1. neoflex Автор
      16.09.2022 17:16

      Подскажите, пожалуйста, в каком месте и для решения какого вопроса предлагаете использовать ReplicatedMergeTree?"