Привет, Хабр! Мы — команда DATA ОАТС в билайн. В этой статье расскажем о кейсе, когда стандартный Spark JDBC не справился с параллельным чтением огромной таблицы из ClickHouse, и мы написали свой «мини-движок». Под катом — разбор ограничений, схема с пулом потоков на экзекуторах и опыт, который может пригодиться не только для ClickHouse.

Введение

Когда работаешь с большими объёмами данных, обычно рассчитываешь, что Spark обеспечит базовый параллелизм «из коробки». Указываешь параметры подключения — и данные читаются в несколько потоков.

Сейчас есть два способа читать данные ClickHouse из Spark:

  1. Нативный коннектор ClickHouse.

  2. Spark JDBC.

Но в нашем случае всё оказалось не так просто:

  • Нативный коннектор ClickHouse к Spark появился только в версиях 3.3+, а у нас был Spark 3.0.1.

  • При попытке читать подзапросы по JDBC оказалось, что встроенный механизм параллелизма нельзя применить к нашему кейсу.

Как работает параллелизм в Spark JDBC

Spark позволяет читать данные через JDBC в несколько потоков:

  • Указываем колонку для разбиения (partitionColumn), нижнюю и верхнюю границы (lowerBound, upperBound) и число партиций (numPartitions).

  • Spark делит диапазон значений и генерирует несколько SQL-запросов вида col BETWEEN ... AND … .

  • Каждый запрос уходит на разные экзекуторы. В результате получается вот так:

стандартное разбиение при параллельном чтении по jdbc.png

Это работает и для подзапросов, если передать их в dbtable=(SELECT ...)t. В результирующем наборе есть колонка, по которой можно делить диапазон.

Пример:

 jdbcDF = spark.read \
        .format("jdbc") \
        .option("driver", "org.postgresql.Driver") \
        .option("url", "jdbc:postgresql://dbserver") \
        .option("user", os.environ['user']) \
        .option("password", os.environ['pass']) \
        .option("dbtable", "(SELECT id, value FROM my_table WHERE active=1) t") \
        .option("numPartitions", "10") \
        .option("partitionColumn", "id") \
        .option("lowerBound", "1") \
        .option("upperBound", "10000") \
        .load()

Ограничения встроенного механизма:

  1. Работает только по одной колонке — Spark умеет делить диапазон арифметически.

  2. Колонка должна быть числовой или датой/timestamp.

  3. Колонка должна присутствовать в итоговом SELECT. Если она преобразована (через CAST, SUM, JOIN и т.д.) или вовсе отсутствует, Spark её использовать не сможет.

  4. В Spark 3.0.1 это реализовано через старый DataSource V1 — меньше гибкости и баги с типами.

Почему не сработало у нас

Мы выгружали очень большую таблицу в ClickHouse с несколькими уровнями партиционирования.

  • На верхнем уровне одна партиция могла весить десятки гигабайт.

  • Стандартный Spark умел резать только по одной колонке (например, дате), но этого было мало: данные внутри партиции всё равно оставались слишком большими.

  • Подзапрос был необходим — нужно было приводить типы, так как Spark 3.0.1 неверно их интерпретировал.

Итого:

  • Встроенный механизм параллельного чтения Spark JDBC нам не подходил.

  • Нужно было самим управлять разбиением и параллельностью.

Наше решение

Мы написали собственный механизм параллельного чтения поверх JDBC.

Идея:

  • Разбиваем данные на батчи вручную — например, по дате + региону, а не только по одной колонке.

  • Для каждого батча формируем свой WHERE и выполняем отдельный JDBC-запрос.

  • Потоки запускаются на экзекуторах, а не на драйвере.

  • Результаты собираются в общий DataFrame.

Важно: мы можем дробить даже очень тяжёлые партиции на управляемые куски, чего Spark встроенными средствами не умеет.

Архитектура

Для начала нужно определиться, каким образом проводить разбиение данных. Для нас удобнее всего было бить по партициям, но условия ограничены только вашей фантазией. Обращаемся к Clickhouse и читаем партиции:

   val query =
          s"""
             |(
             | SELECT
             |   partition
             | FROM system.parts
             | WHERE database = 'default'
             |   AND table = 'big_table'
             |   AND active
             |)
             |""".stripMargin

    val dfPartitions = spark.read
      .format("jdbc")
      .option("url", clickhouseParams.url)
      .option("dbtable", query)
      .option("user", clickhouseParams.user)
      .option("password", clickhouseParams.password)
      .option("driver", "com.clickhouse.jdbc.ClickHouseDriver")
      .load()

После — формируем батчи через функцию groupPartitions:

    val partitionBatches = groupPartitions(dfPartitions, numBatches) // numBatches - аналог numPartitions
    log.info(s"Создано ${partitionBatches.length} батчей для обработки.")

Затем запускаем пул потоков:

    val threadPool = Executors.newFixedThreadPool(maxParallelism)
    implicit val ec: scala.concurrent.ExecutionContext =
      scala.concurrent.ExecutionContext.fromExecutor(threadPool)

    val futures = partitionBatches.zipWithIndex.map { case (batch, idx) =>
      Future {
        val df = readBatch(batch, cutoffTimestamp)
        tempDfsSync.synchronized {
          tempDfs = df :: tempDfs
        }
      }
    }


    Await.ready(Future.sequence(futures), Duration(8, HOURS))

 Функция readBatch строит SQL с условиями для конкретного батча:

  // формирование whereClause в зависимости от кейса

    val query = s"""
      (SELECT
         col1, col2, ...
       FROM big_table
       WHERE $whereClause
      ) AS t
    """

И отправляет запрос по jdbc:

    try {
      val df = readJdbcWithRetry(clickhouseParams, query)

      if (df.isEmpty) spark.emptyDataFrame else df
    } catch {
      case e: Exception =>
        log.error(s"Ошибка при чтении батча: ${e.getMessage}")
        spark.emptyDataFrame
    }

А для стабильности используем retry:

    def readJdbcWithRetry(params: ClickHouseParameters, query: String): DataFrame = {
      def attempt(retryCount: Int): DataFrame = {
        Try {
          spark.read.format("jdbc")
            .option("url", params.url)
            .option("dbtable", query)
            .option("user", params.user)
            .option("password", params.password)
            .option("driver", "com.clickhouse.jdbc.ClickHouseDriver")
            .load()
        } match {
          case Success(df) => df
          case Failure(e) if retryCount < 3 =>
            Thread.sleep(math.pow(2, retryCount).toLong * 5000)
            attempt(retryCount + 1)
          case Failure(e) => throw e
        }
      }
      attempt(0)
    }

Подводные камни

Размер батчей:

  • Слишком мелкие → много соединений, ClickHouse перегружается.

  • Слишком крупные → экзекуторы ловят OOM. Оптимальные значения нужно подбирать под доступные ресурсы.

Настройки подключения:

  • fetchSize (у нас хорошо работало 50000).

  • Таймауты (чтобы запросы не зависали).

  • Ретраи (чтобы единичная ошибка не ломала весь процесс).

Лимиты параллельности:

  • Слишком много потоков → ClickHouse задыхается.

  • Слишком мало → теряется смысл параллельности. Мы ограничили пул параметром maxParallelism.

Альтернативы:

  • Обновить Spark до 3.3+ и использовать нативный коннектор ClickHouse — он решает эту задачу изящнее, но у нас это было невозможно.

  • Использовать стандартный JDBC с partitionColumn, если колонка подходит и её хватает для равномерного разбиения.

  • Concurrent JDBC reads (Gabriel Pinto, Medium) — пример похожего подхода, где автор запускает несколько независимых JDBC-запросов. Но наш кейс сложнее: нужно было дробить многослойные партиции и контролировать ресурсы.

Итоги

  • Spark JDBC умеет параллелить только простые сценарии, но в сложных случаях (подзапросы, многослойные партиции) этого мало.

  • В Spark 3.0.1 мы не могли использовать нативный коннектор.

  • Мы сделали свой механизм: батчи по нескольким условиям, пул потоков на экзекуторах, ретраи и объединение результатов.

Что получили:

  • Реальный параллелизм — нагрузка ушла на экзекуторы.

  • Гибкость в управлении ресурсами и батчами.

  • Стабильную схему для инкрементальной загрузки больших таблиц.

Подход требует аккуратной настройки, но показывает: даже в условиях ограничений Spark можно эффективно работать с огромными объёмами данных.

Комментарии (2)


  1. atues
    21.10.2025 10:14

    Только у меня картинки не отображаются?


    1. Beeline_tech Автор
      21.10.2025 10:14

      Поправили, теперь должно быть видно!