К маю 2026 года средний сбор аудитории в нашей системе занимал 2 часа 50 минут. Проблема состояла из двух частей. Первая: 96.5% этого времени процесс стоял в очереди. Вторая: даже когда процесс добирался до исполнения, расчёт занимал 6 минут — десятки задач внутри DAG выполнялись над множествами в миллионы строк, и основным инструментом были JOIN.

Мы последовательно устранили обе проблемы — 20 изменений, разбитых на шесть этапов. Ниже описан каждый шаг и итоговый результат.

Все числа «до» и «после» получены одним и тем же SQL-запросом по Postgres — это воспроизводимый замер.

Домен обобщён: конкретная компания не имеет значения. Это платформа, которая по булевым условиям («траты > 5000 ₽ за последние 3 месяца» И «покупки в категории Рестораны») собирает множество клиентских идентификаторов из 10-миллионной базы и доставляет его во внешние системы.

Глоссарий

Термин

Определение

Аудитория (сбор аудитории)

Множество клиентов, удовлетворяющих набору условий. «Собрать аудиторию» — вычислить это множество и доставить наружу.

DAG

Направленный ациклический граф задач: пересечение ждёт, пока досчитают финансы и гео.

TQ (движок очереди задач)

Планировщик поверх Postgres: хранит граф, состояния, зависимости, ретраи и результат каждой задачи.

Воркер

Обработчик одного типа задачи: FINANCIAL считает финансовый фильтр, HASH сопоставляет хеши, INTERSECT пересекает множества.

Колоночная СУБД

Аналитическая база данных, где происходит вся работа над множествами клиентов.

Процесс vs задача

Один сбор аудитории — один процесс, который разворачивается в десятки задач (узлов DAG).

Очередь (queue) vs исполнение (exec)

Задача сначала ждёт свободного слота (queue), потом выполняется (exec).

Основные типы воркеров, упоминаемые в статье:

Воркер

Что делает

FINANCIAL

Собирает клиентов по финансовым условиям: сумма трат, доходов, количество операций за период. Самый тяжёлый.

HASH

Сопоставляет внешний файл хешей (SHA512 телефонов, MD5 почт) с контактами клиентов в базе.

MCC

Фильтрует клиентов по кодам категорий торговых точек (merchant category codes).

INTERSECT

Пересекает множества клиентов из разных веток: клиент должен пройти все фильтры одновременно.

UNION

Объединяет множества клиентов: клиент должен пройти хотя бы один из фильтров.

SUBTRACT

Вычитает одно множество из другого — например, исключает клиентов, попавших в стоп-лист.

DELTA_AUDITORY

Вычисляет разницу между старой и новой версией аудитории при пересборе.

EXPORT

Материализует итоговое множество и отправляет его в Kafka / S3.

CLEAN_UP

Удаляет временные таблицы после завершения процесса.

1. Архитектура

Система построена на четырёх компонентах: REST-сервис, движок очереди задач (TQ), колоночная СУБД и транспортный слой (Kafka + S3).

Ключевое архитектурное решение — сбор аудитории устроен как DAG. Под параметры аудитории строится граф задач, сохраняется в Postgres и исполняется воркерами. DAG естественно описывает и параллелизм (финансы и гео считаются одновременно), и зависимости (пересечение ждёт завершения всех веток). Эту модель мы не меняли — она работает. Изменения касались всего, что построено вокруг.

Сквозной путь задачи

Граф строится DSL:

tqueue(type = "AUDITORY_GATHERING", continuous = true, priority = 1) {
    val fin  = task<FinancialParams, SetResult>("FINANCIAL")  { params { /* период, порог */ } }
    val mcc  = task<MccParams, SetResult>("MCC")              { params { /* коды, период */ } }
    val hash = task<HashParams, SetResult>("HASH")            { params { /* файл хешей */ } }

    val intersect = task<IntersectParams, SetResult>("INTERSECT") {
        dependsOn(fin, mcc, hash)
    }
    val export = task<ExportParams, Unit>("EXPORT") { dependsOn(intersect) }
    task<CleanUpParams, Unit>("CLEAN_UP") { dependsOn(export) }
}

Цикл планировщика — исходная версия

Каждые 5 секунд планировщик исполнял фиксированную последовательность:

каждые 5 секунд:
  1. обработать периодические задачи
  2. сторож: пометить TIMEOUT просроченные задачи
  3. сторож: пометить TIMEOUT слишком старые NEW
  4. сторож: освободить устаревшие резервации
  5. раздать готовые задачи + назначить ноду
  6. прочитать назначённые этой ноде задачи
  7. исполнить: IN_PROGRESS → воркер → успех/ошибка

Половина проблем находилась в шагах 1–4 — они выполнялись до того, как очередь доходила до полезной работы.

Исходная конфигурация

Параметр

Значение

Реплики сервиса

2

CPU / память на реплику

500m / 2 GiB (JVM -Xmx 1 GiB)

Максимум одновременных задач

5

Пул соединений к колоночной СУБД

7

Интервал опроса планировщика

5 с

Планировщик запуска сборов

каждые 15 с

Эти значения объясняют бóльшую часть того, что показали метрики. Особенно — лимит в 5 одновременных задач.

2. Диагностика

Как получены числа

Отдельной системы метрик для сборов не было — ни Grafana-дашборда, ни мониторинга времени аудиторий. Однако весь жизненный цикл задачи находился в Postgres: у каждой строки tq_process и tq_task есть date_created (попала в очередь), date_started (взята в работу) и date_finished (завершена). Этих трёх меток достаточно, чтобы разложить полное время на ожидание и исполнение одной агрегацией:

SELECT
    type AS process_type,
    COUNT(*) AS total_runs,
    ROUND(AVG(EXTRACT(EPOCH FROM (date_finished - date_created)))) AS avg_total_sec,
    ROUND(PERCENTILE_CONT(0.95) WITHIN GROUP (
        ORDER BY EXTRACT(EPOCH FROM (date_finished - date_created)))) AS p95_total_sec,
    ROUND(AVG(EXTRACT(EPOCH FROM (date_finished - date_started)))) AS avg_exec_sec,
    ROUND(AVG(EXTRACT(EPOCH FROM (date_started  - date_created)))) AS avg_queue_sec
FROM tq_process
WHERE date_created >= NOW() - INTERVAL '14 days'
  AND status = 'COMPLETED'
GROUP BY type;

2.1. 96.5% времени — ожидание

Тип процесса

Запусков

Полное время (avg)

Исполнение (avg)

Ожидание (avg)

AUDITORY_GATHERING

3 765

10 229 с

361 с

9 868 с

MATCHING_DELTA_EXCLUSION

16 800

2 939 с

1 с

2 938 с

OFFER_AUDITORY_LOADING

8 955

8 с

2 с

6 с

Средний сбор шёл 2 часа 50 минут. Расчёт занимал 6 минут. Всё остальное — ожидание в очереди. Вывод, определивший стратегию: оптимизация SQL в воркерах затрагивает только 3.5% времени процесса. Основная проблема — не скорость вычислений, а то, что очередь почти не доходит до них.

2.2. Одна очередь на всех

Лимит «5 одновременных задач» был общим на весь контур. Разброс стоимости воркеров:

Тип задачи

Исполнение (avg)

Ожидание (avg)

FINANCIAL

198 с

272 с

DELTA_AUDITORY

92 с

12 708 с

SUBTRACT

48 с

12 606 с

INTERSECT

38 с

10 610 с

HASH

13 с

12 187 с

UNION

2 с

12 146 с

FINANCIAL (198 с) и UNION (2 с) конкурировали за одни и те же пять слотов. Выдача задач — ORDER BY priority, date_created — означала, что один процесс с десятком готовых задач мог занять все слоты, пока остальные ждали. Отсюда queue ≈ 27 × exec.

2.3. Ночная волна

В 22:00 система запускала массу динамических пересборов — одновременно. Самые медленные процессы создавались около 22:00:40 и ждали до 32 000 секунд (≈ 9 часов). К графу добавлялись дельта-задачи:

Дельта-задача исполнялась 92 секунды, ожидание старта — 12 708 секунд. Соотношение 138:1.

2.4. Узкие места движка очереди

  • Опрос между уровнями DAG. delay(5s) в конце каждой итерации. Каждый следующий уровень графа ждал до 5 секунд даже при свободных ресурсах.

  • Сторожевые проверки на критическом пути. Три UPDATE-запроса выполнялись до раздачи задач. Замедление любого из них задерживало старт полезной работы.

  • Резервация в два шага. Сначала node_id (статус NEW), затем отдельно IN_PROGRESS. Между ними — окно NEW + node_id, открытое для состояния гонки.

  • Приоритет выставлялся повторно. continuous = true вызывал UPDATE tq_task SET priority = -1 при старте каждого воркера. Для графа из 50 задач — 50 одинаковых UPDATE.

  • NOT EXISTS по зависимостям на каждом цикле. Готовность задачи вычислялась вложенным запросом со сканированием тысяч строк tq_task_dependencies каждые 5 секунд.

2.5. Слишком мелкая нарезка DAG

Финансовая ветка дробилась по одному месяцу (режим EACH_MONTH), MCC и терминалы также разрезались по периодам. Результаты сводились дополнительными уровнями агрегации и пересечения:

Для 12 месяцев это давало 16 TQ-задач и 3 уровня DAG только на финансовую ветку. Каждый уровень — дополнительное ожидание в очереди.

2.6. Повторный счёт стабильных данных

Для каждой аудитории строился отдельный DAG с нуля. Сотня процессов с идентичными FINANCIAL(январь, порог 5000) — это сотня одинаковых запросов в колоночную СУБД.

3. Решения

Мы реализовали 20 изменений, сгруппированных по слоям. Ниже — что именно сделано. В следующем разделе — как это разворачивалось и какие дало результаты.

3.1. Ресурсы и конфигурация

CPU throttling → 2000m

Grafana показывала 54 000+ throttled periods: JVM регулярно приостанавливалась Kubernetes CFS-контроллером. Операции JDBI, корутины и сборщик мусора работали на 70–80% от возможной скорости.

# k8s values — единственное изменение:
resources:
  limits:
    cpu: 2000m    # было 500m
  requests:
    cpu: 500m     # не меняли — не влияет на scheduling

Эффект: исполнение ускорилось на 20–30%. Увеличение CPU limit не несёт риска.

Расширение пула и лимитов

Пул соединений к колоночной СУБД: 7 → 15. max-concurrent-tasks: 5 → 10. Добавлен явный лимит на очередь колоночной СУБД — ранее он отсутствовал.

database.clickhouse.datasource:
  maximumPoolSize: 15    # было 7

tqueue:
  max-concurrent-tasks: 10   # было 5
  queue-limits:
    KAFKA: 2
    CLICKHOUSE: 2
    COLUMNAR_DB: 8           # новый лимит

Разделение очередей по типу нагрузки

FINANCIAL (198 с) и UNION (2 с) больше не стояли в одной очереди:

tqueue:
  queue-limits:
    HEAVY: 3       # FINANCIAL, AGGREGATION — тяжёлая аналитика
    LIGHT: 10      # INTERSECT, UNION, HASH, MCC — лёгкие операции
    FILE_OP: 5     # EXPORT, CLEAN_UP — операции с файлами
    AUTO_GATHER: 2 # фоновые пересборы
// FINANCIAL, FINANCIAL_AGGREGATION → HEAVY
task.queueType = "HEAVY"
// INTERSECT, UNION, HASH, MCC → LIGHT
task.queueType = "LIGHT"
// EXPORT, CLEAN_UP → FILE_OP
task.queueType = "FILE_OP"

Эффект: UNION перестал ждать за FINANCIAL. Критический путь DAG сократился на 15–20%.

3.2. Справедливое планирование

Fair scheduling: ROW_NUMBER по процессам

Старый запрос ORDER BY priority, date_created LIMIT 5 отдавал все слоты одному процессу. Новый ранжирует задачи внутри процесса и раздаёт по одной на процесс:

WITH ranked AS (
  SELECT t.*,
         ROW_NUMBER() OVER (
             PARTITION BY t.process_id
             ORDER BY t.priority NULLS LAST, t.date_created) AS rn
  FROM tq_task t
  WHERE t.status = 'NEW' AND t.node_id IS NULL
    AND NOT EXISTS (/* незавершённые зависимости */)
    AND EXISTS (/* активный процесс */)
)
SELECT * FROM ranked
ORDER BY rn, priority NULLS LAST, date_created
LIMIT :fetchLimit FOR UPDATE SKIP LOCKED

Эффект: 9-часовой хвост в SLOW-бакете исчез. Добавлен индекс по (status, node_id, process_id).

Ограничение одновременных процессов

tqueue:
  max-concurrent-processes-by-type:
    AUDITORY_GATHERING: 5    # не более 5 процессов одновременно

Вместо 100 параллельных процессов — группы по 5, каждая завершается предсказуемо.

3.3. Устранение накладных расходов движка

Приоритет — один раз

UPDATE tq_task SET priority = -1 перенесён из execute() каждого воркера в момент создания процесса. Для графа из 50 задач — 1 UPDATE вместо 50.

Сторож — в отдельную корутину

Три сторожевых UPDATE-запроса вынесены в независимый цикл с интервалом 60 секунд:

// Отдельный корутин, не блокирует выдачу задач:
config.schedulingScope.launch {
    while (isActive) {
        delay(60.seconds)
        tqTaskWatchdog.checkForTimeouts(config.taskExecutionTimeout)
        tqTaskWatchdog.checkForStaleNewTasks(config.taskStaleNewTimeout)
        tqTaskWatchdog.checkForStaleReservedTasks(config.taskReservationTimeout)
    }
}

// Основной цикл — только scheduling:
while (isActive) {
    recurringTaskService.runSchedulingIteration()
    launchProcessingTasks()
    delay(config.interval)
}

Shared Task: одна задача на всех

В TQ добавлено понятие task_key = SHA256(type + canonical_params). Если два процесса создают задачу с одинаковым ключом — физически исполняется одна, результат подписывается обоим:

tq_shared_task
  id | task_key | owner_task_id | status | result

tq_task_shared_subscription
  process_id | task_id | shared_task_id

Эффект: количество строк в tq_task при ночной волне сократилось кратно, конкуренция за слоты — пропорционально.

3.4. Параллелизм

Параллельный consumer в сервисе доставки

Kafka consumer на стороне доставки был последовательным: FetchMessage → Consume (S3 + HTTP + DB, 10–30 с) → CommitOffset → следующий. При пачке из 10 сообщений в 22:00 это давало 100–300 секунд задержки.

Переписан на worker pool из N горутин с отслеживанием минимального неподтверждённого смещения:

jobs := make(chan kafkaMsg.Message[T], bufferSize)
for i := 0; i < workerCount; i++ {
    go func() {
        for msg := range jobs { consumer.Consume(msg) }
    }()
}
// reader loop: FetchMessage → jobs ← parseMessage
// offset tracker: commit(min uncommitted offset)

Эффект: пачка сообщений обрабатывается за время самого долгого, а не суммы всех.

Внутренний параллелизм для помесячных сборов

Режим EACH_MONTH на 12 месяцев создавал 16 TQ-задач (12 FINANCIAL + 3 INTERSECT + 1 финальный). Заменён одним воркером FINANCIAL_MONTHLY_ALL с параллельными корутинами и семафором:

@Worker("FINANCIAL_MONTHLY_ALL")
class FinancialMonthlyAllWorker : TQWorker<...> {
    override suspend fun run(task, tqApi) {
        val semaphore = Semaphore(5)  // не перегрузить пул колоночной СУБД
        val monthResults = coroutineScope {
            months.map { monthParams ->
                async(Dispatchers.IO) {
                    semaphore.withPermit {
    columnarRepository.gatherAuditory(fileId, monthParams)
                    }
                }
            }.awaitAll()
        }
        val final = intersectionRepository.intersectViaBitmap(monthResults)
        task.result = AuditoryGatheringResult(bitmapLocation = final, ...)
    }
}

Эффект: 16 TQ-задач → 1 задача. 3 уровня DAG → 0. Критический путь: ~600 с → ~210 с.

3.5. Батчинг фоновых задач

MATCHING_DELTA_EXCLUSION создавал 16 800 процессов за две недели — каждый с одной секундой работы и 49 минутами ожидания. Упаковано по 50 задач в один процесс:

// Было: 50 процессов по 1 задаче
// Стало: 1 процесс с 50 задачами внутри
val batchSize = 50
clientIds.chunked(batchSize).forEach { batch ->
    process.addTask(MatchingDeltaExclusionsTask(batch))
}

Эффект: количество записей в tq_task и tq_process сократилось на порядок.

3.6. Битовые карты и ранний выход

Roaring Bitmaps в колоночной СУБД

Ранее листовые воркеры записывали промежуточные результаты как строки (auditory_load_id, client_id), а INTERSECT/SUBTRACT/UNION выполняли JOIN по миллионам строк. Мы перевели операции над множествами на roaring bitmaps:

Листовой воркер пишет битовое множество параллельно со строками:

CREATE TABLE tmp_auditory_bitmap (
    auditory_load_id String,
    bitmap AggregateFunction(groupBitmap, UInt64),
    created_at DateTime DEFAULT now()
) ENGINE = AggregatingMergeTree ORDER BY auditory_load_id
  TTL created_at + INTERVAL 2 HOUR;

INSERT INTO tmp_auditory_bitmap
SELECT :fileId, groupBitmapState(toUInt64(client_id))
FROM financial_clients WHERE ...;

INTERSECT — bitmapAnd вместо JOIN:

-- Было: JOIN по client_id, 38 секунд
-- Стало: bitmapAnd, ~200 миллисекунд
INSERT INTO tmp_auditory_bitmap
SELECT :resultId, bitmapAnd(b1.bitmap, b2.bitmap)
FROM tmp_auditory_bitmap b1 CROSS JOIN tmp_auditory_bitmap b2
WHERE b1.auditory_load_id = :id1 AND b2.auditory_load_id = :id2;

SUBTRACT — bitmapAndnot, UNION — bitmapOr. Экспорт: arrayJoin(bitmapToArray(client_bitmap)).

Процессор обрабатывает 64 бита за инструкцию. Для 10 млн клиентов пересечение — ~156 000 операций. Сжатое множество на 10 млн клиентов — 5–15 МБ. client_id хеширован в UInt64 через cityHash64; коллизии на реальных данных — ниже 0.0001%, проверено до внедрения.

Эффект: INTERSECT 38 с → < 1 с; SUBTRACT 48 с → < 1 с; UNION 2 с → < 100 мс.

Cardinality-aware DAG

Если один из фильтров возвращает пустой результат, дальнейшие операции над множеством не нужны: A ∩ ∅ = ∅. Добавлено две оптимизации:

Preflight count() перед полным gather:

val count = repository.estimateCount(params)
if (count == 0L) {
    task.result = AuditoryGatheringResult(isEmpty = true, rowCount = 0, ...)
    return  // полный gather пропущен
}

Short-circuit в INTERSECT и UNION:

// INTERSECT:
if (deps.any { it.isEmpty }) {
    task.result = AuditoryGatheringResult(isEmpty = true)
    return
}
// UNION:
val nonEmpty = deps.filterNot { it.isEmpty }
if (nonEmpty.size == 1) { task.result = nonEmpty.single(); return }

Внедрение шло осторожно: shadow mode (логирование без изменения поведения), затем включение под feature flag.

Эффект: разреженные аудитории (редкие MCC, специфическая география) — 198 с → 5–10 с. Плотные аудитории — без изменений, кроме накладных расходов на estimateCount.

3.7. Кеширование результатов

Content-Addressable Segment Cache

Кеш по каноническому хешу параметров задачи:

CREATE TABLE tq_segment_cache (
    params_hash VARCHAR PRIMARY KEY,
    file_id     VARCHAR NOT NULL,
    table_name  VARCHAR NOT NULL DEFAULT 'tmp_auditory',
    created_at  TIMESTAMPTZ NOT NULL DEFAULT NOW(),
    expires_at  TIMESTAMPTZ NOT NULL
);
val hash = params.canonicalHash()  // SHA256 от нормализованных параметров
val cached = segmentCacheRepository.findValid(hash)
if (cached != null) {
    task.result = AuditoryGatheringResult(fileId = cached.fileId, ...)
    return  // 5 мс вместо 198 с
}
val location = gatherAuditory(task)
segmentCacheRepository.save(hash, location.fileId, ttl = 24.hours)

Ключ включает версию исходных данных, чтобы исключить возврат устаревшего результата:

cache_key = SHA256(task_type + canonical_params + source_data_version)

Ночной прогрев кеша

Ленивый кеш не помогает первому запросу. Добавлен проактивный прогрев: фоновая задача в 02:00 вычисляет 50 наиболее востребованных параметров за последние 14 дней:

@Recurring(id = "PrecomputeSegmentsJob", cron = "0 2 * * *")
fun precomputePopularSegments() {
    val popular = segmentCacheRepository.findMostUsed(limit = 50, since = 14.days)
    popular.forEach { (hash, params) ->
        val fileId = repository.gatherAuditory(generateFileId(), params)
        segmentCacheRepository.save(hash, fileId, ttl = 25.hours)
    }
}
segment-precompute:
  maxSegments: 50
  maxDuration: 2h
  maxParallelism: 2
  ttl: 25h

Эффект: повторяющиеся параметры FINANCIAL получают результат из кеша мгновенно. Нагрузка перенесена с пикового окна на ночь.

3.8. Динамические пересборы

Автоматические пересборы запускались планировщиком каждые 15 секунд и конкурировали с ручными за одни слоты. Внедрено пять изменений:

  1. Отдельная очередь AUTO_GATHER с 2 слотами. Фоновые пересборы не вытесняются ручными.

  2. Deadline-aware приоритет:

val urgency = when {
    deadline < now + 1.hours  -> -20   // критично
    deadline < now + 6.hours  -> -10   // срочно
    deadline < now + 24.hours -> -5    // планово
    else                      ->  2    // фоново
}
  1. Ночное окно. Пересборы с deadline > 24 ч — только в 22:00–06:00. Дневные слоты остаются ручным и срочным.

  2. Pre-gather. За 6 часов до активации оффера аудитория вычисляется заранее:

fun preGatherBeforeActivation() {
    offerRepository.findActivatingWithin(6.hours)
        .filter { !auditoryRepository.isFreshEnough(it.auditoryId, 12.hours) }
        .forEach { offer ->
            gatheringService.run(AuditoryGatheringRequest(fromJob = true, deadline = offer.activateAt))
        }
}
  1. Пропуск дельты при малом изменении:

val deltaSize = estimateDelta(currentAuditory, newParams)
if (deltaSize < 0.01 * currentSize) return currentAuditory

4. Как разворачивали и что получилось

20 изменений нельзя внедрять одновременно — при ошибке придётся откатывать всё. Мы разбили работу на шесть этапов. Каждый этап: деплой → суточный замер тем же measurement-запросом → сравнение с предыдущим baseline.

Этап 1: Ресурсы

Что деплоили: CPU limit 500m → 2000m.

Почему первым: увеличение CPU limit не может ничего сломать.

Через 24 часа: throttled periods на Grafana упали на 85%. avg_exec_sec снизился на 20–30% по всем типам задач. avg_queue_sec практически не изменился — сама по себе мощность не лечит очередь.

Этап 2: Конфигурация очереди

Что деплоили: пул соединений 7 → 15, max-concurrent-tasks 5 → 10, лимит на очередь колоночной СУБД.

Риск: низкий. Удвоение одновременных задач могло перегрузить базу. Отслеживали connection errors и latency.

Через 24 часа: avg_queue_sec упал с 9 868 до ~5 000 с. Ошибок соединений — ноль.

Этап 3: Разделение очередей + справедливое планирование

Что деплоили: четыре типа очередей, fair scheduling SQL, ограничение в 5 одновременных процессов.

Риск: средний. Оконная функция — новый запрос на критическом пути. Потребовался индекс по (status, node_id, process_id) и EXPLAIN ANALYZE на прод-объёме.

Через 24 часа: avg_queue_sec упал с ~5 000 до ~1 800 с. P95 > 20 000 с перестал появляться. Лёгкие задачи начали стартовать сразу после готовности.

Этап 4: Расчистка критического пути движка

Что деплоили: приоритет один раз, сторож в отдельную корутину, Shared Task с дедупликацией.

Риск: средний. Shared Task — новое понятие в ядре TQ.

Через 24 часа: накладные расходы планировщика снизились. getAvailableTasks — быстрее, сторожевые UPDATE не блокируют выдачу. Эффект ~10–15% к avg_total.

Этап 5: Параллелизм + батчинг

Что деплоили: параллельный Kafka consumer, FINANCIAL_MONTHLY_ALL, батчинг MATCHING_DELTA_EXCLUSION.

Риск: выше предыдущих. Offset tracking при параллельном consume и внутренний параллелизм требовали аккуратного тестирования.

Через 24 часа: EACH_MONTH — критический путь ~600 → ~210 с. Пачка из 10 сообщений Kafka обрабатывается за время самого долгого. tq_process — на порядок меньше строк.

Этап 6: Битовые карты + кеш + динамические пересборы

Что деплоили: roaring bitmaps, cardinality-aware DAG, segment cache, ночной прогрев, пять улучшений пересборов.

Риск: самый высокий. Требовалась миграция схемы колоночной СУБД и cityHash64. Строковый fallback оставлен под feature flag.

Через 24 часа: INTERSECT 38 с → < 1 с, SUBTRACT 48 с → < 1 с, UNION 2 с → < 100 мс. Повторяющиеся FINANCIAL — попадание в кеш. Динамические пересборы больше не конкурируют с ручными.

Итоговые цифры

Метрика

Было (май 2026)

Стало (после этапа 6)

Среднее полное время

10 229 с (≈ 2 ч 50 мин)

~10 с

Среднее исполнение

361 с (≈ 6 мин)

~8 с

Среднее ожидание в очереди

9 868 с (≈ 2 ч 44 мин)

~2 с

P95 полного времени

28 189 с

~30 с

Максимальное (ночной хвост)

32 851 с

~60 с

По типам сборов

Тип сбора

Было

Стало

Простой MCC/TERMINAL

~200 с

~3 с

FINANCIAL WHOLE_PERIOD

~600 с

~10 с

FINANCIAL EACH_MONTH 12 мес

~800 с

~20 с

Самый большой сбор + пересбор

10 000–30 000 с

~60 с

Вклад по слоям

Слои влияют на одни и те же участки критического пути, поэтому их вклад нельзя механически перемножить. Каждый слой подтверждён независимым замером:

  1. Ресурсы + конфигурация: queue_sec: 9 868 → ~5 000 с. ~×2.

  2. Разделение очередей + fair scheduling: avg_total: 10 229 → ~3 000 с. ~×1.7.

  3. Расчистка движка: avg_total ещё −10–15%.

  4. Параллелизм + батчинг: критический путь финансов ~600 → ~210 с.

  5. Битовые карты + кеш + пересборы: INTERSECT 38 → < 1 с, FINANCIAL → кеш. avg_exec: 361 → ~8 с.

  6. Совокупно: исполнение ускорено ~×45 (361 → ~8 с), очередь практически устранена (9 868 → ~2 с). Полное время — с 2 ч 50 мин до нескольких секунд.

Замечание про выгрузку

Физическая выгрузка многомиллионной аудитории в CSV занимает больше нескольких секунд. Все числа выше относятся к вычислению множества клиентов и его размера. Материализация и доставка идут асинхронно.

5. Что из этого можно применить в других системах

  1. Измеряйте queue‑vs‑exec до оптимизации SQL. 96.5% времени — ожидание. Пока очередь не починена, ускорение воркеров даёт эффект лишь на 3.5% бюджета.

  2. Разделяйте приём нагрузки и исполнение. Неконтролируемый залп запросов создаёт длинный хвост независимо от скорости отдельных задач. Порции, окна и backpressure — самостоятельный слой защиты.

  3. Убирайте дорогие проверки с горячего пути. NOT EXISTS на каждом тике планировщика → явный статус READY по событию. Сторожевые проверки — в отдельный цикл.

  4. Используйте битовое представление множеств. Операции над аудиториями — объединение, пересечение, разность. Roaring bitmaps: десятки секунд → миллисекунды. Плата — хеширование идентификаторов и поддержка со стороны СУБД.

  5. Считайте стабильное заранее. Content‑addressable cache и ночной прекомпьют устраняют повторный счёт одинаковых условий. Плата — свежесть данных и TTL.

  6. Внедряйте слоями и измеряйте каждый этап. Никакого «big bang». Каждое изменение — отдельный деплой, суточный замер, сравнение с baseline. 20 изменений — 20 замеров.

Комментарии (0)