Вступление

  Всем привет. Меня зовут Андрей, я работаю в компании SegmentStream, занимаюсь вопросами архитектуры, инфраструктуры и написанием кода. Сегодня я расскажу как мы адаптировали Cube под наши нужды. В рамках статьи я не буду подробно рассказывать про сам Cube, если вы не знакомы с Cube, то лучше сначала немного почитать про него на официальном сайте cube.dev/docs. Если говорить про SegmentStream, то мы разрабатываем SaaS-платформу для оптимизации рекламных кампаний на основе машинного обучения. Звучит просто, но я рекомендую почитать, там много интересных вещей.

  Также, в статье будет достаточное количество кода на TypeScript, который можно при желании не стесняться заимствовать. Я постараюсь оставлять объяснения к нему.

Зачем нам Cube

  Итак, в начале 2022 года мы начали процесс релиза новой панели управления для наших пользователей, новая панель должна была скрыть многие детали реализации и сделать использование системы интуитивно понятным. 

  Так как мы работаем с данными, то нужно иметь возможность эти данные как-то визуализировать. До этого клиенты могли смотреть отчёты через Google Data Studio, но проблема в том, что это был полностью сторонний инструмент, и даже его интеграция на сайт была через iframe. 

  Но это ещё половина беды. Oсновная проблема была в том, что отчёты приходилось настраивать под каждого клиента, и, очевидно, с ростом количества клиентов это вылилось бы в определённые трудности.

  Оценив все плюсы и минусы, мы решили, что нужно переезжать на что-то своё, что-то, что будет стоить адекватных денег, и что будет закрывать все наши потребности. А исходные данные у нас следующие:

  1. Мы храним данные в BigQuery, при этом, по большей части, в BigQuery клиента (данные-то клиенту принадлежат) и доступ получаем через OAuth2.0;

  2. Данные, необходимые для отчётов, разбиты на 2 здоровенные таблицы по 20-30 миллионов строк с 10 различными ключами и 30 метриками каждая;

  3. Обработка данных стоит денег в BigQuery, так что нужно, по возможности, избегать запросов в BigQuery;

  4. Данные в этих двух таблицах обновляются по запросу от нас.

  Посмотрев существующие решения, мы остановились на Cube. Нам понравилось, что его можно запустить у себя и что расширять его можно на TypeScript, который является нашим основным языком разработки. Вот так Cube выглядит на архитектурном уровне (картинка позаимствована из блога Cube):

Компоненты следующие:

  • API отвечает за обработку входящих запросов;

  • Cube Refresh Worker отвечает за обновление данных по расписанию;

  • Redis используется как кеш и шина;

  • Кластер Cube Store отвечает за хранение преагрегаций и обработку запросов к ним.

  Тут имеет смысл добавить пару слов про преагрегации. Преагрегации это особый компактный формат представления данных, который хранится отдельно от основных данных и скорость чтения которых существенно выше.


  Cube Store выгружает эти самые преагрегации из таблицы в файлы формата parquet и ходит в них во время запросов.

  В общем, нам это всё очень понравилось и мы начали интегрировать Cube.

Немного теории про Cube

  Если у вас уже есть опыт работы с Cube, то этот раздел, вероятно, можно пропустить, если нет, то настоятельно рекомендую прочитать, потому что по ходу повествования будут появлятсья понятия термины, относящиеся к Cube.

  В Cube данные описываются при помощи кубов (внезапно), куб представляет из себя описание того, как нужно извлекать данные из источника, например:

cube(`Users`, {
 sql: `SELECT * FROM users`,
 
 measures: {
   count: {
     sql: `id`,
     type: `count`,
   },
 },
 
 dimensions: {
   city: {
     sql: `city`,
     type: `string`,
   },
 
   companyName: {
     sql: `company_name`,
     type: `string`,
   },
 },
});

  В коде выше мы определили базовый sql-запрос, 1 measure и 2 dimension. Если говорить простым языком, то measure это то, что мы хотим посчитать, а dimension - то, по чему хотим группировать. То есть, если мы хотим получить колличество пользователей в каждом городе, то нужно будет отправить в Cube следующий запрос:

{
  dimensions: ['Users.city'],
  measures: ['Users.count']
}

  Но ходить каждый раз в источник данных не всегда удобно, иногда дорого, да и зачем, если можно заранее посчитать количество пользователей в каждом городе и сохранить. Этот механизм называется преагрегацицей и заключается в том, что Cube предварительно материализует данные в соответствии с кубом, это позволяет существенно ускорить запросы.

  Преагрегаци определяются в кубе и выглядят как-то так:

cube(`Orders`, {
 sql: `SELECT * FROM users`,
 
 ...,
 
 preAggregations: {
    usersByCity: {
     dimensions: [CUBE.city],
     measures: [CUBE.count],
   },
 },
});

  Так, как мы используем BigQuery, то имеет смысл немного рассказать про преагрегации в контексте работы с BigQuery. У Cube сущетсвует несколько стратегий создания преагрегаций:

  • Хранить в отдельной таблице в базе;

  • Стримом выгрузить эту отдельную таблицу из базы и отправить на хранение в Cube Store;

  • Использовать механизм exportBucket, при помощи которого BigQuery сам выгружает таблицу в файл и присылает в CubeStore ссылку на эти файлы.

  Конечно же, последний вариант является самим быстрым с точки зрения построения преагрегаци, но в следующем разделе будет дана ремарка почему мы его не смогли использовать и остановились на втором варианте.

  Как Cube понимает, что пора обновлять данные? Для этого в преагрегацию необходимо добавить параметр refreshKey:

refreshKey: {
  every: `30 seconds`
  sql: `SELECT MAX(created_at) FROM users`,
}

  Cube раз в 30 секунд будет выполнять этот запрос и при изменении результата будет пересчитывать преагрегации.

OAuth2.0

  В Cube из коробки есть драйвер для BigQuery, вот только он умеет работать только с сервис-аккаунтами, а у нас практически все клиенты через OAuth 2.0 сидят. Что делать? Расширить драйвер, конечно же.

class SegmentStreamBigQueryDriver extends BigQueryDriver {
 config: BigQueryDriverOptions;
 constructor(config: BigQueryDriverOptions) {
   super(config);
 
   this.config = config;
   this.pinoLogger = pinoLogger;
 
   if (config.oauthCredentials) {
     const refreshClient = new UserRefreshClient();
 
     refreshClient.fromJSON({
       type: 'authorized_user',
       client_id: config.oauthCredentials.clientId,
       client_secret: config.oauthCredentials.clientSecret,
       refresh_token: config.oauthCredentials.refreshToken,
       project_id: config.projectId,
     });
 
     this.bigquery.authClient.cachedCredential = refreshClient;
   }
 }
}

  Кстати, exportBucket работать с OAuth 2.0 не будет. Дело в том, что для подписи ссылок GCP использует ключ сервис-аккаунта, такие вот дела.

Авторизация пользователей

  Теперь встаёт вопрос: как нам передать с фронта креды пользователя от BQ, которые мы должны хранить и не отдавать никуда? Cube умеет в авторизацию через JWT, но вот только нам нужно не просто проверить, что запрос пришёл от добросовестного пользователя, нам нужно ещё и креды от BigQuery передать.

  Мы не стали долго думать и просто засунули в JWT креды, предварительно зашифровав их. Но через некоторое время словили 431 Request Header Fields Too Large и побежали ставить костыли и пошли добавлять флаги к ноде на увеличенный размер заголовков. Думаю, что сделать вы это и сами легко сможете, а интерфейс выглядит вот так:

interface EncodedCredentials {
 content: string;
 iv: string;
}
interface SecurityContext {
 projectId: string;
 datasetId: string;
 oauthCredentials?: EncodedCredentials;
 encodedServiceAccount?: EncodedCredentials;
}

  Кто прочитал код, тот может заметить, что мы ещё и сервис-аккаунты поддержали. Да, мы решили, что нужно переводить пользователей на сервис-аккаунты и заканчивать гонять терабайты данных через нашу инфраструктуру (BigQuery умеет экспортировать таблицу в файл в GCP, но как было сказано выше, умеет он это только с сервис-аккаунтом).

  Итого, вся эта красота стала выглядеть следующим образом:

  1. Фронт идёт в API и получает JWT для Cube, в JWT зашиты креды от BigQuery, у JWT, конечно же, есть подпись и срок жизни;

  2. С полученным JWT и сформированным запросом фронт идёт в Cube;

  3. Cube проверяет подпись JWT, расшифровывает креды и создаёт инстанс драйвера на основе SecurityContext;

  4. Cube формирует ответ и отдаёт его клиенту.

  В случае с Cube Refresh Worker (сервис, который следит, что все данные актуальны и обновляет преагрегаты) мы из API запрашиваем всё то же самое, но без стадии зашивания в JWT.

Преагрегации без поля updated_at

  Преагрегации это очень крутая вещь, особенно вместе с Cube Store: штукенция на Rust, которая хранит их на диске и умеет доставать оттуда данные в обход BigQuery. Cube умеет строить преагрегации с разной гранулярностью и партиционировать их, но чтобы партиционирование и обновление партиций работало корректно, нужно правильно указать ключ преагрегации, но ключика-то у нас нет.

  К счастью, ребята из команды по работе с данными вовремя подсказали нам один хак:

granularity: `day`,
partitionGranularity: `week`,
buildRangeStart: { sql: `SELECT DATE_SUB(CURRENT_DATETIME(), INTERVAL 92 DAY)` },
buildRangeEnd: { sql: `SELECT CURRENT_DATETIME()` },
refreshKey: {
 every: `5 minutes`,
 sql: `SELECT COALESCE(max(last_modified_time), '2020-01-01T00:00:00.000Z') as last_updated_at
       FROM \`INFORMATION_SCHEMA.PARTITIONS\`
       where table_name = 'conversions'
         AND partition_id != '__NULL__'
         AND ${FILTER_PARAMS.Attribution.date.filter(
           "TIMESTAMP(PARSE_DATE('%Y%m%d', partition_id))",
         )}`,
}

  В BigQuery можно сделать запрос на метаданные и получить время модификации конкретной партиции (таблицы мы партиционируем).

  А через пару недель к нам пришли клиенты с жалобами на спам запросы… И это, честно говоря, было ожидаемо: каждые 5 минут Cube делал для каждой преагрегации и для каждой партиции запрос, после чего за сутки натекало неплохо так. А ещё BigQuery тратит 10 мегабайт на такой запрос (почему - большой вопрос), и за месяц может накапать под терабайт.

  Итак, проблему нужно решать. Фактически, нам вообще не нужны эти проверки, мы и так прекрасно знаем, когда была обновлена конкретная таблица (мы же её сами и обновляем), значит, нужно как-то ограничить походы Cube на BigQuery. Cube в бекенд с таким вопросом ходить не умеет, значит нужно пойти в Cube и подтюнить драйвер так, чтобы он умел кешировать запросы в redis, а кеш мы будем уже сами сбрасывать. Ниже приведён код того, что мы сделали. Мы заодно замокали проверки buildRangeStart и buildRangeEnd, потому что для этого в BigQuery лезть совсем не нужно.

 async runQueryJob<T = QueryRowsResponse>(
   initBigQueryQuery: Query,
   options: any,
   withResults = true,
 ): Promise<T> {
   // Parenthesized expression cannot be parsed as an expression, struct constructor, or subquery
   const bigQueryQuery = {
     ...initBigQueryQuery,
     query: this.replaceILIKE(initBigQueryQuery.query || ''), // BigQuery doesn't support ILIKE
   };
 
   if (bigQueryQuery.query === 'SELECT CURRENT_DATETIME()') {
     return [
       [
         {
           f0_: {
             value: new Date().toISOString(),
           },
         },
       ],
     ] as unknown as T;
   }
 
   if (bigQueryQuery.query === 'SELECT DATE_SUB(CURRENT_DATETIME(), INTERVAL 92 DAY)') {
     return [
       [
         {
           f0_: {
             value: new Date(Date.now() - 1000 * 60 * 60 * 24 * 92).toISOString(),
           },
         },
       ],
     ] as unknown as T;
   }
 
   if (this.cacheRenewewer && bigQueryQuery.query?.includes(`INFORMATION_SCHEMA.PARTITIONS`)) {
     const tableName = bigQueryQuery.query.match(/table_name = '([a-zA-Z0-9]*)'/);
     const { projectId, datasetId } = this.config;
     if (tableName && tableName[1] && projectId && datasetId) {
       const postfix = `${bigQueryQuery.params?.join('_').replace(/[-:\.]/, '_') || ''}_${
         tableName[1]
       }`;
       const cachedResult = await this.cacheRenewewer.getRecord(projectId, datasetId, postfix);
 
       if (cachedResult) {
         return cachedResult;
       }
 
       this.pinoLogger.info(
         {
           projectId,
           datasetId,
           postfix,
           tableName: tableName[1],
           params: bigQueryQuery.params,
         },
         'Request pre-aggregation last_updatet_at from DB',
       );
 
       const [preAggregationJob] = await this.bigquery
         .dataset(this.config.datasetId)
         .createQueryJob(bigQueryQuery);
       const result = (await this.waitForJobResult(
         preAggregationJob,
         options,
         withResults,
       )) as any as T;
       await this.cacheRenewewer.setRecord(projectId, datasetId, postfix, result);
       return result;
     }
   }
 
   const [job] = await this.bigquery.dataset(this.config.datasetId).createQueryJob(bigQueryQuery);
   return (await this.waitForJobResult(job, options, withResults)) as any as T;
 }

  Код не самый тривиальный, так что лучше будет немного пояснить, что здесь вообще происходит: запросы на текущее время мы отдаём сами (странно, что в Cube нельзя это сделать без костылей), все результаты преагрегации мы кидаем в кеш и отдаём из него, а после того, как мы на стороне нашего бекенда обновили таблицы у пользователя, то мы сбрасываем кеш. Проблема решена, ура!

Утекающая память

  Думаю, что многим знакомы вот такие картинки.

   

  Мы тоже не стали исключением и получили удовольствие наблюдать такие картинки после подключения Cube к крупным клиентам. Понятно, что мы сами себе злые Буратино и нужно использовать сервис-аккаунты с exportBucket, но на тот момент у нас был только OAuth2.0. Да и, в конце концов, стримы не должны есть 15 гигабайт. Пошли делать console.log дебаг и нашли проблему, но уже не в Cube, а в драйвере BigQuery. Дело в том, что стриминг в драйвере BigQuery вообще не стриминг и ест память. Пришлось писать свою обёртку. Фактически, мы просто сделали стримы через пагинацию.

import { Readable } from 'stream';
import { BigQuery, Dataset, Job, QueryResultsOptions } from '@google-cloud/bigquery';
 
class BigQueryReadStream extends Readable {
 nextQuery: QueryResultsOptions | null;
 rowsBuffer: any[] = [];
 constructor(protected job: Job, protected limitPerPage: number) {
   super({
     objectMode: true,
   });
 
   this.nextQuery = {
     autoPaginate: false,
     maxResults: limitPerPage,
   };
 }
 
 protected manualPaginationCallback(
   err: Error | null,
   rows: any[] | null | undefined,
   nextQuery: QueryResultsOptions | null | undefined,
 ) {
   if (err) {
     return this.destroy(err);
   }
 
   this.nextQuery = nextQuery || null;
   this.rowsBuffer.push(...(rows || []));
   this.push(this.rowsBuffer.shift() || null);
 }
 
 protected readFromBQ() {
   if (!this.nextQuery) {
     return this.push(null);
   }
   this.job.getQueryResults(this.nextQuery, this.manualPaginationCallback.bind(this));
 }
 
 async _read() {
   if (this.rowsBuffer.length === 0) {
     this.readFromBQ();
   } else {
     this.push(this.rowsBuffer.shift());
   }
 }
}
 
interface BigQueryParams {
 bigquery: BigQuery;
 dataset?: string;
}
 
export const createQueryStream = async (
 bigquery: BigQueryParams,
 query: string,
 params: unknown[],
 limitPerPage = 1000,
): Promise<Readable> => {
 const instance: BigQuery | Dataset = bigquery.dataset
   ? bigquery.bigquery.dataset(bigquery.dataset)
   : bigquery.bigquery;
 
 // Run the query as a job
 const [job] = await instance.createQueryJob({
   query,
   params,
   parameterMode: 'positional',
   useLegacySql: false,
 });
 
 // For all options, see https://cloud.google.com/bigquery/docs/reference/v2/jobs/getQueryResults
 const queryResultsOptions = {
   // Retrieve zero resulting rows.
   maxResults: 0,
 };
 
 // Wait for the job to finish.
 await job.getQueryResults(queryResultsOptions);
 
 return new BigQueryReadStream(job, limitPerPage);
};

  Ну и интегрировать написанное в сам Cube это было проще простого:

 public async stream(query: string, values: unknown[]): Promise<any> {
   const streamUUID = randomUUID();
   this.pinoLogger.info(
     {
       streamUUID,
       query,
       projectId: this.config.projectId,
       datasetId: this.config.datasetId,
     },
     `Starting stream ${streamUUID}: ${query}`,
   );
 
   const stream = await createQueryStream(
     {
       bigquery: this.bigquery,
       dataset: this.config.datasetId,
     },
     query,
     values,
     this.config.limitPerPage || 1000,
   );
   const rowStream = new HydrationStream({ streamUUID }, (message: string, params: any) =>
     this.pinoLogger.info(params, message),
   );
   stream.pipe(rowStream);
   return {
     rowStream,
   };
 }
}

Custom dimensions

  Как вы помните, в рамках куба можно определить набор dimensions и  преагрегаций, но что если пользователю нужно самому задавать dimensions? Схема-то для всех общая, а потребности у пользователей разные. Мы назвали это custom dimensions.

  Для решения этой задачи в Cube тоже есть решение, но придётся чуть поработать напильником. Решение называется “Dynamic Schema Creation” и оно нам подходит, но есть нюанс.

  Нюанс заключается в том, что security context используется только в момент компиляции схемы, соответственно, нужно связать security context и версию схемы:

function schemaVersion(context: Context) {
 const securityContext = extractSecurityContextFromContext(context);
 const customDimensions = (securityContext.customDimensions || [])
   .map(
     customDimension =>
       `${customDimension.key}:${customDimension.sql}:${customDimension.type || ''}`,
   )
   .join(':');
 const customDimensionsHash = crypto.createHash('sha256');
 customDimensionsHash.update(customDimensions);
 return `${getAppIdFromBQConfig(
   securityContext.projectId,
   securityContext.datasetId,
 )}__${customDimensionsHash.digest('hex')}`;
}

  Отлично, теперь при изменении набора пользовательских dimensions версия схемы будет меняться, осталось только эти dimensions запихнуть в схему.

  Сначала извлечём их:

 const { securityContext } = COMPILE_CONTEXT;
 const customDimensions = securityContext.customDimensions || [];

  А потом добавим в схему:

  dimensions: {    
    ...customDimensions.reduce((customDimensionsObject, customDimension) => {
       if (!customDimension.sql) {
         return customDimensionsObject;
       }
 
       customDimensionsObject[customDimension.key] = {
         sql: customDimension.sql.replace(/Report\.([a-zA-Z]+)/g, (_, dimension) => {
           return Report[dimension];
         }),
         type: customDimension.type || 'string',
       };
 
       return customDimensionsObject;
     }, {})
  }

  Готово, теперь можно использовать custom dimensions при формировании запросов.

За границами преагрегации

  Внезапно оказалось, что за границами преагрегации (вне buildRangeStart и buildRangeEnd) ничего нет, пустота, одни нули. Почему? А вот так, Cube видит, что есть преагрегация для запроса и идёт туда, не обращая внимания на то, что там есть не все данные.

  Что делать? А ничего, если запрос выходит за границы преагрегации, то придётся как-то без преагрегации жить. Для этого нужно добавить фейковый measure:

notPreaggregatedMeasure: {
  sql: `0`,
  type: `number`,
}

  На стороне клиента, если есть понимание, что запрос выходит за границы, то добавляем эту метрику, и запрос улетит в БД, ну или в кэш.

Сontinue wait

  Иногда при запросе данных можно получить ошибку вида “продолжайте ждать, всё будет, но чуть позже”. В большинстве случаев это означает, что Cube что-то там строит на своей стороне.

  Но проблема в том, что во-первых, это может затянуться, а во-вторых, если мы знаем, что преагрегации строятся (очень хочется попросить команду Cube дать эндпоинт со статусом в разрезе security context) мы же можем отдать пользователю запросы напрямую (через notPreaggregatetMeasure) и показать красивую крутилку о том, что мы что-то там строим.

  В общем, этот ответ тоже можно использовать с пользой.

  Кстати (вообще, некстати), к проблеме отсутствия эндпоинта со статусом. Как получить время обновления данных? Напрямую никак. Но можно чуть поковырять ответ и сделать вот так:

lastRefreshTime: new Date(
  Math.max(
    ...row
      .serialize()
      .loadResponse.results.map(result => new Date(result.lastRefreshTime).valueOf()),
  )
)

Шардинг Cube Worker

  В самом начале я написал, что SegmentStream это SaaS. Значит количество SecurityContext, фактически, неограничено, либо достаточно большое. Но Cube Worker не умеет обрабатывать часть SecurityContext, соответственно, могут появиться ощутимые задержки на обновление данных (export bucket у нас есть далеко не у всех клиентов). Что делать? Шардировать. Как? Классическое консистентное хеширование.

  Для консистетного хеширования нужно, чтобы каждый воркер знал свой номер и знал общее число воркеров. Для этого нам как нельзя лучше подходит StatefulSet в K8S. Нам нужно передать в каждый воркер его порядковый номер, шаблон имени и общее число реплик:

         env {
           name = "TOTAL_REPLICAS"
           value = local.replicas
         }
 
         env {
           name = "POD_GENERATED_NAME"
           value = "cubestore-workers-"
         }
 
         env {
           name = "POD_NAME"
           value_from {
             field_ref {
               field_path = "metadata.name"
             }
           }
         }

  После чего при помощи библиоткеки hashring осуществить фильтрацию нужных контекстов:

export const podName = process.env.POD_NAME || 'cube-refresh-worker-0';
export const podGeneratedName = process.env.POD_POD_GENERATED_NAME_NAME || 'cube-refresh-worker-';
export const totalRepicas = Number(process.env.TOTAL_REPLICAS || 1);
 
const hashring = new HashRing(
 new Array(totalRepicas).fill(0).map((_, i) => `${podGeneratedName}${i}`),
);
 
const filterSecurityContext = (context: Context) => {
 const workerForContext = hashring.get(contextToAppId(context));
 return workerForContext === podName;
};
 
const apiClient = new ApiClient(process.env.API_URL || '');
 
const cube = {
 dbType: 'bigquery',
 externalDbType: 'cubestore',
 cacheAndQueueDriver: 'redis',
 scheduledRefreshContexts: async () => {
   const scheduledRefreshContexts = !dev
     ? (await apiClient.getAllSecurityContexts()).map((bq: SecurityContext) => ({
         securityContext: bq,
       }))
     : undefined;
 
   if (!scheduledRefreshContexts) {
     pinoLogger.info(`Unable to fetch contexts`);
     return [];
   }
 
   pinoLogger.info(`Fetched ${scheduledRefreshContexts.length} contexts`);
 
   const finalContexts = scheduledRefreshContexts.filter(filterSecurityContext);
 
   return finalContexts;
 },
};

  В результате, мы получаем возможность неограниченно горизонтально скейлить Cube Worker и существенно ускорить рассчёт преагрегаций.

SQL API

  А напоследок, поговорим о самом вкусном - SQL API в Cube. Cube стабильно работал в продакшене: графики строились, таблицы рисовались. Но что нам опять не понравилось? Всё описанное выше отлично работает, но мы захотели ещё больше гибкости. К сожалению, часть про формирование SQL на основе запросов тянет на отдельную статью, так что если интересно,  пишите в комментариях, а здесь расскажу про основы.

  Во-первых, внезапно, 50к строк и мегабайты данных гонять на фронт не очень нормально, нужно получать с бэкенда уже отсортированные данные с пагинацией.

  Во-вторых, хочется иметь возможность получать с бэкенда кастомные метрики, например, разницу между какими-то двумя метриками или дифф по датам, при этом бекенд должен уметь всё это сортировать ещё по любому выбранному полю.

  Через REST API делать такое нельзя. Там даже нельзя запросить разницу между двумя метриками, что, в принципе, и нормально, этот API для таких вещей и не предназначен.

  Фактически, SQL API это API, которое совместимо с драйвером PostgreSQL и при помощи которого можно строить практически любые запросы. Приведу пример, как мы формируем запрос:

const firstQuery = `SELECT
     ${[
    `${
        hasDimension ? dimension : "'none'"
    } as "${firstIntervalPrefix}dimension"`,
    ...selectMeasures,
    ...selectAdditionalMetrics,
].join(',\n')}
     FROM Report WHERE Report.date > '{{ DATE_FROM }}' AND  Report.date < '{{ DATE_TO }}' ${
    filter ? `AND ${converFilterIntoSQL(filter)}` : ''
}
     ${hasDimension ? `GROUP BY "${firstIntervalPrefix}dimension"` : ''}
   `;

if (!secondDateRange) {
    return firstQuery
        .replace(/\{\{ DATE_FROM \}\}/, firstDateRange.from)
        .replace(/\{\{ DATE_TO \}\}/, firstDateRange.to);
}

  Сами метрики определяются подобным образом:

const reportAttributionFieldsMap = new Map([
 [ReportAttributionMetric.cost, 'MEASURE(Report.cost)'],
 [ReportAttributionMetric.clicks, 'MEASURE(Report.clicks)'],
 [ReportAttributionMetric.impressions, 'MEASURE(Report.impressions)'],
 [ReportAttributionMetric.users, 'MEASURE(Report.visitors)'],
 [ReportAttributionMetric.sessions, 'MEASURE(Report.visits)'],
 [ReportAttributionMetric.timeOnSite, 'MEASURE(Report.timeOnSite)'],
]);

  Что на счёт additionalMetrics, так это фактически набор переменных и действий над этими переменными, где в качестве переменных выступают дефолтные метрики.

  Не было бы лимита в 50к строк - было бы совсем хорошо.

  А вот с получением метаданных у нас возникла проблема. Как вы помните, у Cube нет эндпоинта для получения метаданных по преагрегациям, а из SQL API это не вытащишь (по крайней мере в документации этого нет). Здесь пришлось использовать уже привычный REST API и обогащать результат полученными метаданными:

const restResult = await cubeJSClient
    .query(
        queries.dashboard.pingQuery.query([now, oneWeekAgo], conversionsIds),
    )
    .catch(() => undefined);

const sqlResult = await this.cubejsSQLAPI.requestTable(
    projectId,
    conversionsIds,
    {
        ...params,
        disablePreaggregation:
            params.disablePreaggregation ||
            restResult.buildingPreAggregations ||
            false,
    },
);

return {
    lastRefreshTime: restResult?.lastRefreshTime || new Date(),
    dimension: params.dimension,
    buildingPreAggregations: restResult?.buildingPreAggregations || false,
    rows: sqlResult.rows,
    totalRows: sqlResult.totalRows,
};

  Да, выглядит достаточно необычно, но пока мы ничего лучше не придумали.

  Ниже можно увидеть схему, как в итоге всё это работает.

 Вместо заключения

  Спасибо за то, что потратили время и прочитали эту статью, я надеюсь, что это было полезно. Уверен, что многое осталось за кадром, так что не стесняйтесь задавать любые вопросы в комментариях, с удовольствием на них отвечу.

  Подводя итог можно сказать, что Cube очень удобный и полезный инструмент для построения отчётов и работы с данными. Да, в нашем случае “из коробки” он работает не так хорошо, как хотелось бы, и, ещё много чего нужно доделать, но при желании это всё реализуемо, и использование Cube снимает очень много вопросов. И кстати, всё описанное с успехом работает как в k8s, так и в Cube Cloud.

  Отдельное спасибо команде Cube за оперативные ответы, своевременные фиксы багов и ревью это статьи.

  А вот такой красивый фронт у нас получился под бекенд Cube SQL API:

Комментарии (0)