Я попросил нашу команду маркетинга нарисовать иллюстрацию и долго объяснял, что такое вебхуки

Не так давно передо мной встала задача реализовать работу вебхуков в Личном кабинете владельца кассы компании Дримкас. Как оказалось, в сети практически нет описания и туториалов, как это сделать. Я расскажу, как мы это реализовали без тяжелых кронов по БД.


Статья будет полезна для middle node.js-разработчиков.

Где используем вебхуки


Для понимания специфики, придется начать издалека.


Дримкас производит онлайн-кассы и облачный сервис Кабинет Дримкас. Все кассовые аппараты в реальном времени отправляют данные о продажах по интернету в налоговую — это требование нового закона. Подключаясь к Кабинету, владелец кассы получает удаленный доступ к этой статистике продаж и другие инструменты.


Кабинет Дримкас позволяет владельцу кассы из веб-интерфейса следить за продажами, работать с отчетами, создавать, редактировать и автоматически загружать на все кассы товарную базу, подключать внешние товароучетные системы.


Вебхуки нам понадобились, когда мы подключали к Кабинету интернет-магазины. Для онлайн-торговли тоже нужна касса, только бумажный чек не печатается. Мы решили создать для них инструмент, чтобы они могли из обычного json-а с данными о покупке записать данные о продаже в ФН и передать их в ОФД.


Так как операция фискализации может затянуться на длительное время, превышающее обычный HTTP запрос, нам потребовалось дать возможность узнавать статус этого чека. Каждый раз стучаться в Кабинет за статусом чека не выгодно ни нам, ни Интернет магазину. А с вебхуками убиваем сразу двух зайцев: Кабинет делает запрос только один раз, а интернет-магазин получит чек, как только он будет готов.


Когда мы начали их внедрять, решили дать доступ к этому функционалу сервисам интеграторов. С их помощью сторонние сервисы, которые подключены к Кабинету, получают уведомления о продажах, открытии/закрытии смен, создании и редактировании товаров, внесении и изъятии денег. Мы не остановились до сих пор, и все важные события мы сразу переводим на вебхуки.


Наши требования к вебхукам


  • Вебхуки должны вызываться при определенных событиях;
  • Должна быть возможность «подписываться» на определенные из них, если мы не хотим получать все события;
  • Вебхуки должны повторять запросы, пока сторонний сервис их не примет;
  • Вебхук удаляется, если его не приняли в течение суток;
  • Последний пункт нам нужен, когда сторонний сервис не отвечает на запрос. Мы предполагаем, что он упал, и отправка новых запросов может уронить его снова. Поэтому эти повторные запросы мы делаем через определённые промежутки времени.

Текущий стэк бэкенда


Мы пишем на node.js. В качестве веб фреймворка выбран koa. У нас две базы данных. Postrges с sequelize, где хранятся сильно связанные данные, например, кассы и пользователи. Для хранения несвязанных и неизменяемых данных — чеков, смен — мы используем MongoDB. Ещё повсеместно используются очереди на rabbitMQ, для сглаживания скачкообразных нагрузок. Плюс redis для кэша.


Реализация вебхуков


Определяем события для вызова вебхуков


Для начала определим места, где хотим вызывать вебхуки. На уровне модели мы можем пользоваться хуками в mongoose и в большинстве случаев в sequelize.


Исторически так сложилось, что в нашей sequelize-модели нельзя создать товар сразу с данными. Мы создаем пустой товар и сразу его изменяем, поэтому пришлось руками во всех контроллерах добавить обработчики вызовов вебхуков.


Когда нет такой проблемы, всё достаточно просто. Пример из модели mongoose:


schema.static('setStatus', async function (_id, status, data) {
// логика изменения статуса
const res = await this.update({ _id }, { … });

 await Webhook.send({ ... });

 return res;
});

Подписки на события


Чтобы определить понятие подписки на определенные события, мы используем битовые маски.


В бэкэнде мы храним всю информацию о типах событий одним числом, а фронту отправляем готовый json объект:


{
  "types": {
    "products": true,
    "receipts": false,
    "shifts": true,
    "encashments": false,
    "devices": false,
    "operations": true
  },
}

Чтобы упаковать число в json и извлечь его обратно, мы создаем виртуальные атрибуты в sequelize. В них устанавливаем геттеры и сеттеры. Виртуальные поля вычисляются на лету, изменяют на поля в таблице, но при этом не хранятся БД.


Упаковываем число в json и извлекаем его обратно
// Статические методы, которые хранятся в отдельном файле
import _ from 'lodash';
export const scopeBits = {
  products: 0,
  receipts: 1,
  shifts: 2,
  encashments: 3,
  devices: 4,
  operations: 5,
};
/**
 * Этот маппинг появился, потому что мы захотели, 
 * чтобы по вебхуками прилетало название модели 
 * и тип операции в UPPER CASE и в единственном числе.
 */
/* eslint-disable key-spacing */
const typeToTypes = {
  PRODUCT:      { products:     true },
  RECEIPT:      { receipts:     true },
  SHIFT:        { shifts:       true },
  ENCASHMENT:   { encashments:  true },
  DEVICE:       { devices:      true },
  OPERATION:    { operations:   true },
};
/* eslint-enable key-spacing */

export function formMask(scope) {
  if (_.isEmpty(scope)) {
    return 0;
  }

  return _.reduce(Object.keys(scope), (mask, key) => {
    if (scope[key]) {
      mask |= 1 << scopeBits[key];
    }

    return mask;
  }, 0);
}

export function formEvents(mask) {
  return _.reduce(scopeBits, (memo, bit, scope) => {
    if (mask & (1 << bit)) {
      memo[scope] = true;
    } else {
      memo[scope] = false;
    }

    return memo;
  }, {});
}

// В описании модели:
  subscribes: {
    type: DataTypes.INTEGER,
    allowNull: false,
  },
  types: {
    type: DataTypes.VIRTUAL(DataTypes.INTEGER, ['subscribes']),
    get() {
      return this.constructor.formEvents(this.get('subscribes'));
    },
    set(types) {
      this.setDataValue('subscribes', this.constructor.formMask(types));
    },
  },


CRUD для управления вебхуками


Пользователь управляет вебхуками из веб-интерфейса или через API. Поэтому нам нужны стандартные CRUD для этой модели.


Пример «создания», остальные методы аналогичные
import _ from 'lodash';

const editCols = ['url', 'types', 'isActive'];

export async function create(ctx) {
  const fields = _.pick(ctx.request.body.fields, editCols);

  fields.userId = ctx.state.user.id;
  const webhook = await Webhook.create(fields);

  ctx.body = { id: webhook.id };
  ctx.status = 201;
}


Подготовка к вызовам


Мы не вызываем вебхуки в статическом методе класса Webhook — это позволяет сберечь ресурсы основного сайта. Это работа воркеров — делать фоновые задачи, не мешая работе с REST-API.


Когда на сайте генерируется событие, мы оповещаем воркеров об этом:


Создание статического метода Webhook.send
import _ from 'lodash';
import { getClient } from '../../storage/redis';
import { stringify, getChannel } from '../../storage/rabbitmq';
/**
 * получаем вебхуки, которых нужно вызвать для этого события у этого юзера
 * types: { products: true, devices: false, ...}
 */
async function search({ userId, types }) {
  const mask = formMask(types);

  /** 
   * Ищем флаг нужного события через битовую операцию "и"
   * и проверяем результат с самим числом, таким образом заложились на будущее,
   * если вдруг захотим искать сразу по двум флагам
   */
  return Webhook.sequelize.query(`SELECT id, url, subscribes
    FROM "Webhook" WHERE subscribes & ? = ?
      AND "userId" = ?
      AND "isActive" = TRUE`,
    {
      type: Webhook.sequelize.QueryTypes.SELECT,
      replacements: [mask, mask, userId],
    },
  );
}
/**
 * Я не нашел в документации, как сделать этот запрос средствами sequelize
 * Поэтому здесь использован сырой SQL-запрос
*/
/**
 * Вставка в очередь задания «Дернуть вебхук»
 * type=PRODUCT|DEVICE|ENCASHMENT|RECEIPT|OPERATION|...
 * action=CREATE|UPDATE|DELETE
 */
export async function send({ userId, type, action, itemId }) {
  // поиск по Redis
  const client = getClient();
  const key = `webhooks:${userId}:${type}`;
  const isWebhooksExist = await client.existsAsync(key);
  let webhooks;

  if (!isWebhooksExist) {
    // поиск в Postgres
    const types = typeToTypes[type];

    webhooks = await search({ userId, types });
    // Кэшируем в Redis, даже если не нашли
    await client.setAsync(key, JSON.stringify(webhooks), 'EX', 10);
  } else {
    webhooks = JSON.parse(await client.getAsync(key));
  }

  _.each(webhooks, (w) => {
    const payload = stringify({
      url: w.url,
      itemId,
      action,
      type,
      timestamp: Date.now(),
    });
    /**
     * Ставим задачу в очередь. Устанавливаем время, какой URL вызвать и свойства:
     * тип сущности (товар, чек, касса, ...), тип операции (создание, удаление, ..)
     * и id сущности
     */
    getChannel().sendToQueue('kab-webhooks-delayed-0', payload, { persistent: true });
  });
}


Вкратце, что мы делаем: ищем в БД все вебхуки у данного пользователя, у которого есть подписка на текущее событие. Кэшируем их, даже если ничего не нашли — если пользователь загружает кучу товаров, будут лишние запросы в БД. Когда есть вебхук, кидаем в очередь задачу с временной меткой, ссылкой, идентификатором и типом события.


Тут есть нюанс: мы экономим ресурсы сайта, и кидаем в очередь только идентификатор объекта. Если есть возможность, лучше кидать сам объект. Когда создают объект и сразу удаляют его, в очередь падает два задания. Первое задание при выполнении не сможет вытащить из базы тело объекта. Если кидать всё тело объекта, таких проблем не будет.


Вызовы и повторные вызовы вебхуков


У нас в стеке используется очереди сообщений. Мы выбрали 5 временных промежутков, и на каждый создали очередь. Если вызов не удался при первой попытке, вебхук переходит в следующую очередь. Когда воркер получает на вход задачу, он откладывает его выполнение на требуемое количество времени от 0 миллисекунд до суток. Через 24 часа мы вызываем вебхук в последний раз и удаляем.



Пример вебхука, который не могут принять в течение суток.

Каждая следующая задача в очереди не может быть вызвана раньше текущей, так как добавилась туда позже. Поэтому когда мы взяли из очереди задачу и увидели, что вызывать вебхук ещё рано, мы не завершаем эту задачу, чтобы не получить следующую.


Вызовы и повторные вызовы вебхуков
import Bluebird from 'bluebird';
import request from 'request';
import { parse, getChannel, stringify } from '../../lib/storage/rabbitmq';

const requestPostAsync = Bluebird.promisify(request.post);

const times = {
  0: 0,
  '5sec': 5 * 1000,
  '1min': 1 * 60 * 1000,
  '1hour': 1 * 60 * 60 * 1000,
  '3hours': 3 * 60 * 60 * 1000,
  '1day': 24 * 60 * 60 * 1000,
};

const getBodyById = async ({ itemId, type, action }) => {
  /** Достаем из БД актуальное состояние сущности */
};

const handle = async (channel, msg, waitENUM, nextQueue) => {
  const task = parse(msg);
  const { url, itemId, type, action, timestamp } = task;
  const data = await getBodyById({ itemId, type, action });
  const estimatedTime = Date.now() - (new Date(timestamp).getTime());
  const wait = times[waitENUM];

  if (estimatedTime < wait) {
    await Bluebird.delay(wait - estimatedTime);
  }

  try {
    const response = await requestPostAsync(url, {
      body: {
        action,
        type,
        data,
      },
      headers: {
        'content-type': 'application/json',
      },
      json: true,
      timeout: 20 * 1000,
    });

    if (response.statusCode < 200
      || response.statusCode >= 300) {
      throw new Error();
    }
    channel.ack(msg);
  } catch (err) {
    if (nextQueue) {
      getChannel().sendToQueue(nextQueue, stringify(task));
    }
    channel.nack(msg, false, false);
  }
};

/* eslint-disable no-multi-spaces */
export default function startConsume(channel) {
  channel.prefetch(2);
  channel.consume('kab-webhooks-delayed-0', msg => handle(channel, msg, 0,
                  'kab-webhooks-delayed-1'), { noAck: false });
  channel.consume('kab-webhooks-delayed-1', msg => handle(channel, msg, '5sec',
                  'kab-webhooks-delayed-2'), { noAck: false });
  channel.consume('kab-webhooks-delayed-2', msg => handle(channel, msg, '1min',
                  'kab-webhooks-delayed-3'), { noAck: false });
  channel.consume('kab-webhooks-delayed-3', msg => handle(channel, msg, '1hour',
                  'kab-webhooks-delayed-4'), { noAck: false });
  channel.consume('kab-webhooks-delayed-4', msg => handle(channel, msg, '3hour',
                  'kab-webhooks-delayed-5'), { noAck: false });
  channel.consume('kab-webhooks-delayed-5', msg => handle(channel, msg, '1day',
                  ''), { noAck: false });}
/* eslint-enable no-multi-spaces */


Еще 4 факта


  • Случается, что задачи попадают в очередь в неправильном порядке — следующая задача в очереди должна быть выполнена раньше текущей. Для нас это не критично. Разница между ними не составит больше 20 секунд — таков наш таймаут запроса.
  • В качестве временных интервалов мы выбрали следующий набор значений: 0 секунд, 5 секунд, 1 минута, 1 час, 3 часа и 24 часа.
  • Запросы, которые не выполнились в течение 24 часов, не логгируются и не хранятся. Если сторонний сервис имеет downtime сутки, то ничего страшного в неполученных вебхуках нет, потому что проблемы там другого масштаба.
  • Если worker не ответит на полученное событие и просто оборвет соединение, то RabbitMQ снова добавит это сообщение в очередь. Таким образом, даже если приложение упало, сам вебхук никуда не пропадет.

Комментарии (0)