Всем привет! Популярность интернет коммерции постоянно растет, как и доля информатизации всех смежных с торговлей видов деятельности. Вместе с этим растет и сложность обработки информации. Каждый заказ, сделанный клиентом интернет магазина, порождает за собой большое количество интеграций с различными сервисами. Такими сервисами могут быть сервисы обработки платежей, доставки, системы учета и лояльности. Каждый заказ должен быть оплачен, учтен, собран и доставлен, а также доступен для последующего анализа. Эту, и так не простую ситуацию, усложняет и тот факт, что пользователь интернет магазина не хочет долго и мучительно чего-то ждать при оформлении заказа. Отклик от интернет магазина должен быть быстрым, ведь каждая миллисекунда задержки увеличивает шанс потери клиента, а в последствии и прибыли. В этой статье я хочу рассказать про брокер сообщений RabbitMQ и как с его помощью можно организовать процесс обработки заказов используя Node.js и TypeScript. Добро пожаловать под кат.
Необходимая теория
Думаю, многие слышали про RabbitMQ, ведь первая open source версия этого брокера сообщений, основанного на протоколе AMQP, была выпущена аж в 2007 году. Брокер сообщений необходим для связи разных компонентов системы в единое целое, как клей необходим для реанимации разбитой вазы. С помощью брокера сообщений можно реализовать асинхронную обработку событий, поступающих в систему. Как раз такая асинхронная обработка заказов и нужна интернет магазину. Но для начала необходимо разобраться с основными компонентами RabbitMQ. У этого брокера есть три основных компонента, с помощью которых мы будем выстраивать процесс обработки:
- Message. Это минимальная единица информации в рамках брокера сообщений и нашего сервиса обработки, которая может быть обработана. Сам RabbitMQ хранит сообщения в бинарном виде, но для нашей системы и для статьи это не важно. Сообщения мы будем принимать и обрабатывать в виде JSON. Так же стоит упомянуть, что сообщения в RabbitMQ имеют заголовки. Они схожи с заголовками http запросов. Это ассоциативный массив, в который можно записать необходимую информацию.
- Message queue. Это очередь, в которой RabbitMQ хранит сообщения. На очередь сообщений могут быть подписаны один или несколько потребителей (consumer). Каждое сообщение из очереди rabbit распределяет по потребителям, используя алгоритм round-robin.
- Exchange. Это, как понятно из названия, точка обмена. К этой точке могут быть привязаны очереди или другие обменники. Точка обмена не хранит сообщения, основная ее функция — это маршрутизация сообщений в одну или несколько очередей, или такие же точки обмена. Каждая очередь или обменник привязывается по ключу маршрутизации (routing key). В RabbitMQ есть несколько разных типов обменников, которые влияют на то, как именно exchange будет маршрутизировать поступившее в него сообщение.
Для того, чтобы описать, как работают разные типы обменников, необходимо разобраться, что из себя представляют ключи маршрутизации. Ключ маршрутизации есть как у привязки (binding) очереди к обменнику, так и у самого сообщения. Routing key -это просто строка, поделенная на блоки. Каждый блок разделен точкой. Например, “notify.sendEmail.sendSms”. При этом для ключа маршрутизации сообщения можно задавать шаблоны с использованием специальных символов # и *. * — говорит что после точки может идти один любой блок, а вот после # может идти любое количество блоков. Например “notify.sendSms.*” или “notify.#”. Теперь можно переходить к типам точек обмена.
Есть четыре типа обменников:
- Fanout. Логика маршрутизации данного exchange'a проста, он перенаправляет поступившее сообщение во все очереди или обменники, которые привязаны к нему.
- Direct. Этот exchange перенаправляет сообщение в зависимости от того, совпадает ли routing key сообщения с routing key привязки.
- Topic. Exchange этого типа также как и Direct маршрутизирует сообщение в зависимости от routing key. Но в качестве ключа маршрутизации может выступать шаблон.
- Headers. Этот exchange, в отличие от остальных, использует для маршрутизации заголовки сообщений. При этом очереди к обменнику биндятся также с помощью ассоциативного массива. Логику, по которой обменник будет маршрутизировать сообщения, можно менять с помощью специального ключа “x-match“, который задается в ассоциативном массиве привязки. Ключу можно задать два значения all или any. Если значение all, то заголовки сообщения должны полностью совпадать с ассоциативным массивом привязки, если значение any, то значение должно совпадать хотя бы у одного ключа.
Это основные компоненты RabbitMQ. Более подробно об этих компонентах можно почитать в спецификации протокола AMQP. Далее мы будем проектировать и реализовывать систему обработки заказов на примере TypeScript, попутно разбираясь с настройками каждого компонента.
Проектирование
Для упрощения примера будем считать, что для успешной обработки интернет заказа у нас должен быть следующий функционал:
- Сохранять поступающие заказы
- Отправлять sms клиенту с номером заказа, а также статусом заказа
- Отправлять сообщение в службу курьерской доставки о новом заказе из нашего интернет магазина, если клиент выбрал этот способ доставки
Но мало реализовать этот функционал, ведь наш интернет магазин планирует расширять функционал и предоставлять в дальнейшем больше разных возможностей своим клиентам (а такое происходит всегда). Например, уведомлять клиента по email или предоставлять на выбор несколько способов доставки заказа. Из этого следует, что нам нужно спроектировать систему таким образом, чтобы добавлять функционал было просто.
Также стоит упомянуть, что я буду использовать шаблон отложенных сообщений для того, чтобы была возможность, при недоступности внешнего сервиса, повторять логику несколько раз. Про этот шаблон можно почитать тут
Чтобы более наглядно представить конечную цель, я нарисую схему.
Давайте разберемся по порядку, как устроен процесс обработки заказа на этой схеме. Схема разбита на блоки и разные цвета. Блоки белого цвета обозначают внешние сервисы, которые мы рассматривать не будем. Блоки серого цвета обозначают элементы RabbitMQ. Очереди и обменники. Зеленым цветом отражены блоки бизнес логики, которые необходимо реализовывать. Также каждый блок, имеющий отношение к нашей логике, пронумерован. Цифры обозначают процесс и подпроцесс в соответствии с порядком.
Первым делом, сообщение по HTTP API попадает в наш сервис. После этого мы должны присвоить номер заказу, сохранить заказ в базе данных со статусом “новый” и отправить ответ об успешном создании заказа, с его номером, обратно. Клиент, получив сообщение об успешном создании заказа, идет заниматься своими делами. Отправив положительный ответ, мы отправляем объект заказа в exchange постобработки, из которого он попадает в worker формирования routing key. Этот воркер, получив объект заказа из очереди, на его основе (есть ли в заказе email или телефон клиента, какой способ доставки был выбран) должен сформировать ключ маршрутизации заказа. Сформировав routing key, worker отправляет сообщение обратно в exchange постобработки, но теперь ключ маршрутизации у заказа изменился и обменник может отправить его уже по нужному маршруту. В зависимости от ключа, заказ может быть отправлен в exchange, который отвечает за уведомления, exchange интеграций или сразу в оба. А далее по такой же логике в очереди и воркеры.
Воркеры отправки sms и службы доставки будут пытаться обработать сообщение несколько раз. Количество таких попыток можно передать в переменной окружения. Но бесконечно обрабатывать сообщение не стоит, ведь ошибка может крыться в самом сообщении или логике воркера. Поэтому после превышения количества допустимых попыток сообщение будет удаляться из очередей и отправляться в хранилище ошибок, из которого его можно будет повторно отправить обратно на нужный уровень обработки.
Реализация
Для проверки реализации потребуется сам rabbit. Я рекомендую использовать для этой цели docker и официальный образ брокера. Установить и запустить контейнер можно следующей командой.
docker run -d --name rabbit -p 5672:5672 -e rabbitmq:3.7.15-management-alpine
Это образ с web интерфейсом, доступным на порту 15672, для удобной отладки.
Реализовывать задуманное будем с помощью TypeScript и библиотеки amqplib (реализация клиента RabbitMQ для Node.js) поэтому для начало необходимо описать несколько интерфейсов. Опишем интерфейсы заказа и сообщений, которые мы будем отправлять в rabbit.
// Интерфейс товара заказа
export interface Product {
id: string;
name: string;
price: number;
}
// Общий интерфейс заказа
export interface Order {
clientName: string;
city: string;
email?: string;
phone?: string;
products: Product[];
totalSum: number;
deliveryAddress?: string;
}
// Интерфейс заказа у которого есть номер телефона клиента
export interface OrderWithPhone extends Order {
phone: string;
}
// Интерфейс заказа у которого есть адрес доставки
export interface OrderWithDeliveryAddress extends Order {
deliveryAddress: string;
}
// Types Guard'ы для определения какой заказ к нам пришел
export const isOrderWithPhone = (order: Order): order is OrderWithPhone => Boolean(order.phone);
export const isOrderWithDeliveryAddress = (order: Order): order is OrderWithDeliveryAddress =>
Boolean(order.deliveryAddress);
// Сообщение в рамках системы.
export interface Message<O extends Order> {
errors: string[];
retry: number;
order: O;
// Интерфейс сообщения которое будет отправленно в хранилище ошибок
export interface FailOrder extends Message<Order> {
exchange: string;
routingKey: string;
}
Теперь нужно описать интерфейс конфигурации очередей и обменников, на основе которой будем строить структуру обработки в rabbit.
import { Types, ExchangeTypes } from '../constants';
import { Options } from 'amqplib';
// Типы объектов RabbitMQ которые мы будем использовать для конфигурации
export enum Types {
QUEUE = 'queue',
EXCHANGE = 'exchange',
}
// Типы обменников которые будем использовать
export enum ExchangeTypes {
TOPIC = 'topic',
}
// Интерфейс описания очереди
export interface Queue {
name: string;
options: Options.AssertQueue;
}
// Интерфейс описания обменника
export interface Exchange {
name: string;
type: ExchangeTypes;
}
// Интерфейс описания привязки
export interface Binding {
type: Types;
destination: string;
source: string;
routingKey: string;
}
// Интерфейс конфигурации RabbitMQ
export interface PipelineConfig {
queues: Queue[];
exchanges: Exchange[];
bindings: Binding[];
}
Описав основные компоненты системы, опишем конфигурацию, которая была нарисована на схеме с помощью объекта.
Queues
export default [
// Очередь для сообщений для которых нужно сгенерировать routingKey
{
name: 'generateRoutingKey',
options: {
durable: true,
},
},
// Очередь отправки sms
{
name: 'sendSms',
options: {
durable: true,
},
},
// Очередь интеграции со службой доставки
{
name: 'delivery',
options: {
durable: true,
},
},
// Отложенная очередь для сообщений которые ожидают повторной отправки sms
{
name: 'sendSmsHold',
options: {
durable: true,
deadLetterExchange: 'notify',
deadLetterRoutingKey: 'sendSms',
messageTtl: 60000,
},
},
// Отложенная очередь для сообщений которые ожидают повторной отправки в службу доставки
{
name: 'deliveryHold',
options: {
durable: true,
deadLetterExchange: 'integrates',
deadLetterRoutingKey: 'delivery',
messageTtl: 60000,
},
},
];
При описании очередей используются следующие опции для очереди
- durable. По умолчанию все сообщения очереди хранятся в памяти. Следовательно, при перезагрузке брокера сообщения пропадут. Для избежания этого можно использовать эту опцию. С этой настройкой rabbit будет сбрасывать сообщения на диск. Но тут есть один нюанс. Чтобы сообщения сохранились после рестарта брокера, мало этой настройки, нужно, чтобы сообщения отправлялись в очередь с опцией persistent.
- messageTtl. Время жизни сообщения. Задаётся в миллисекундах
- deadLetterExchange. Имя обменника, куда сообщение отправится из очереди при истечении ее срока жизни
- deadLetterRoutingKey. RoutingKey, с которым сообщение будет отправлено в обменник из предыдущей опции
Exchanges
import { ExchangeTypes } from '../constants';
export default [
{
name: 'postprocessing',
type: ExchangeTypes.TOPIC,
},
{
name: 'notify',
type: ExchangeTypes.TOPIC,
},
{
name: 'integrates',
type: ExchangeTypes.TOPIC,
},
];
Bindings
import { Types } from '../constants';
export default [
{
type: Types.EXCHANGE,
destination: 'notify',
source: 'postprocessing',
routingKey: '#.notify.#',
},
{
type: Types.EXCHANGE,
destination: 'integrates',
source: 'postprocessing',
routingKey: '#.integrates.#',
},
{
type: Types.QUEUE,
destination: 'generateRoutingKey',
source: 'postprocessing',
routingKey: 'generateRoutingKey',
},
{
type: Types.QUEUE,
destination: 'sendSms',
source: 'notify',
routingKey: '#.sendSms.#',
},
{
type: Types.QUEUE,
destination: 'delivery',
source: 'integrates',
routingKey: '#.delivery.#',
},
{
type: Types.QUEUE,
destination: 'sendSmsHold',
source: 'notify',
routingKey: 'sendSmsHold',
},
{
type: Types.QUEUE,
destination: 'deliveryHold',
source: 'integrates',
routingKey: 'deliveryHold',
},
];
Полная конфигурация
import { PipelineConfig } from '../interfaces';
import exchanges from './exchanges';
import queues from './queues';
import bindings from './bindigs';
export const pipelineConfig: PipelineConfig = {
exchanges,
queues,
bindings,
};
Для подключения к rabbit напишем класс.
import { connect, Connection, Channel } from 'amqplib';
export class RabbitConnect {
private _uri: string;
private _connection: Connection;
private _chanel: Channel;
constructor() {
// Строка подключения к rabbit будет браться из окружения
this._uri = process.env.RABBIT_URI || 'amqp://localhost';
}
protected async connect() {
this._connection = await connect(this._uri);
this._chanel = await this._connection.createChannel();
}
protected async disconnect() {
await this._chanel.close();
return this._connection.close();
}
protected get chanel() {
return this._chanel;
}
}
Напишем класс Pipeline, который при старте будет создавать всю необходимую инфраструктуру в rabbit по описанной ранее конфигурации.
import { RabbitConnect } from './RabbitConnect';
import { PipelineConfig } from './interfaces';
import { Types } from './constants';
export class Pipeline extends RabbitConnect {
private _pipeline: PipelineConfig;
constructor(pipelineConfig: PipelineConfig) {
super();
this._pipeline = pipelineConfig;
}
public async create() {
try {
await this.connect();
// Создаем очереди
const createQueues = this._pipeline.queues.map(queue =>
this.chanel.assertQueue(queue.name, queue.options),
) as PromiseLike<any>[];
// Создаём обменники
const createExchanges = this._pipeline.exchanges.map(exchange =>
this.chanel.assertExchange(exchange.name, exchange.type),
) as PromiseLike<any>[];
await Promise.all([...createQueues, ...createExchanges]);
// После создания необходимых компонентов создаём биндинги
const createBindings = this._pipeline.bindings.map(binding => {
if (binding.type === Types.QUEUE) {
return this.chanel.bindQueue(binding.destination, binding.source, binding.routingKey);
}
return this.chanel.bindExchange(binding.destination, binding.source, binding.routingKey);
});
await Promise.all(createBindings);
return this.disconnect();
} catch (error) {
console.error(error);
throw new Error(error);
}
}
}
Теперь напишем абстрактный класс воркера с общим для всех воркеров функционалом, от которого можно будет наследоваться.
import { RabbitConnect } from './RabbitConnect';
import { Message, Order, FailOrder } from './interfaces';
import { ConsumeMessage } from 'amqplib';
export interface WorkerParams {
maxRetry?: number; // Максимальное количество повторов обработки
active: string; // Имя активной очереди
exchange: string; // Имя обменника из которого пришло сообщение
holdKey?: string; // Ключ маршрутизации для отложенной очереди
}
export abstract class Worker<M extends Order> extends RabbitConnect {
private _maxRetry: number;
private _active: string;
private _holdKey: string | undefined;
protected exchange: string;
private _currentMessage: Message<M>;
private _currentConsumeMessage: ConsumeMessage;
constructor({ active, holdKey, exchange, maxRetry }: WorkerParams) {
super();
this._maxRetry = maxRetry || 0;
this._active = active;
this._holdKey = holdKey;
this.exchange = exchange;
}
public async subscribe() {
await this.connect();
this.chanel.consume(this._active, this.checkMessage.bind(this));
}
// Метод проверки для сообщений у которых превышен лимит повторов
private async checkMessage(message: ConsumeMessage) {
this._currentConsumeMessage = message;
const orderMessage: Message<M> = JSON.parse(message.content.toString());
if (orderMessage.retry >= this._maxRetry) {
await this.sendToErrorStorage('Превышен лимит попыток');
}
this._currentMessage = orderMessage;
// Если количество попыток не превышено вызываем метод с бизнес логикой
await this.handler(orderMessage.order || orderMessage);
}
// Метод отправки сообщения в хранилище ошибок
protected async sendToErrorStorage(error: string) {
const message: FailOrder = {
order: this._currentMessage.order,
errors: [...this._currentMessage.errors, error],
retry: this._currentMessage.retry + 1,
exchange: this.exchange,
routingKey: this._active
};
console.log('Отправка в хранилище ошибок', message);
this.ack();
}
// Метод отправки сообщения в отложенную очередь
protected async hold(error: string) {
if (!this._holdKey) {
return;
}
const orderMessage = {
order: this._currentMessage.order,
errors: [...this._currentMessage.errors, error],
retry: this._currentMessage.retry + 1,
};
const orderData = Buffer.from(JSON.stringify(orderMessage));
return this.chanel.publish(this.exchange, this._holdKey, orderData);
}
// Метод подтверждения удачной обработки сообщения
protected async ack() {
return this.chanel.ack(this._currentConsumeMessage);
}
protected abstract handler(message: M): void;
}
По умолчанию rabbit требует подтверждение уcпешной обработки сообщения от воркера. Для этого у канала подключения есть метод ack. Если воркер не смог обработать сообщение, то существует метод nack, который говорит rabbit'у, чтобы он отправил сообщение другому воркеру.
Теперь мы можем написать несколько простых воркеров из схемы.
Воркер генерации ключа маршрутизации.
import { Worker } from '../Worker';
import {
isOrderWithPhone,
isOrderWithDeliveryAddress,
Order,
Message,
} from '../interfaces';
import { Keys } from '../constants';
export class GenerateRoutingKey extends Worker<Order> {
constructor() {
super({
active: 'generateRoutingKey',
exchange: 'postprocessing',
});
}
protected async handler(order: Order) {
try {
const routingKey: string[] = [];
if (isOrderWithPhone(order)) {
routingKey.push(Keys.SEND_SMS);
}
if (isOrderWithDeliveryAddress(order)) {
routingKey.push(Keys.SEND_TO_DELIVERY);
}
const message: Message<Order> = {
retry: 0,
errors: [],
order,
};
await this.chanel.publish(
this.exchange,
routingKey.join('.'),
Buffer.from(JSON.stringify(message)),
);
await this.ack();
} catch (error) {
console.error(error);
await this.sendToErrorStorage(error);
}
}
}
Воркеры отправки sms.
import { Worker } from '../Worker';
import { OrderWithPhone } from '../interfaces';
export class SendSms extends Worker<OrderWithPhone> {
constructor() {
super({
active: 'sendSms',
exchange: 'notify',
holdKey: 'sendSmsHold',
maxRetry: process.env.MAX_RETRY ? parseInt(process.env.MAX_RETRY) : 5,
});
}
protected async handler(message: OrderWithPhone) {
try {
console.log('Отправка sms на номер: ', message.phone);
this.ack();
} catch (error) {
console.error(error);
await this.hold(error);
}
}
}
Воркер интеграции со службой доставки.
import { Worker } from '../Worker';
import { OrderWithDeliveryAddress } from '../interfaces';
export class Delivery extends Worker<OrderWithDeliveryAddress> {
constructor() {
super({
active: 'delivery',
exchange: 'interates',
holdKey: 'deliveryHold',
maxRetry: process.env.MAX_RETRY ? parseInt(process.env.MAX_RETRY) : 5,
});
}
protected async handler(message: OrderWithDeliveryAddress) {
try {
console.log('Отправка заказа в службу доставки на адрес: ', message.deliveryAddress);
this.ack();
} catch (error) {
console.error(error);
await this.hold(error);
}
}
}
Точка входа в приложение.
import { Pipeline } from './Pipeline';
import { pipelineConfig } from './pipeline';
import { GenerateRoutingKey } from './workers/GenerateRoutingKey';
import { SendSms } from './workers/SendSms';
import { Delivery } from './workers/Delivery';
(async () => {
try {
const pipeline = new Pipeline(pipelineConfig);
const generateRoutingKey = new GenerateRoutingKey();
const sendSms = new SendSms();
const delivery = new Delivery();
await pipeline.create();
await Promise.all([generateRoutingKey.subscribe(), sendSms.subscribe(), delivery.subscribe()]);
} catch (error) {
console.error(error);
process.exit(1);
}
})();
Приводить пример кода класса записи заказа в базу и генерации номера интернет заказа я не буду. Это выходит за рамки данной статьи. Для проверки кода можно воспользоваться веб интерфейсом rabbit'а, отправив в обменник posrprocessing json заказа.
Заключение
Такая схема построения обработки интернет заказа позволяет легко масштабировать систему. Нам не составит труда добавить в эту схему несколько очередей и воркеров, чтобы добавить нужный функционал. Например, можно добавить отправку уведомлений на email или отправку заказа для учета в 1С. Преобразованная схема будет выглядеть так:
Надеюсь, вам понравилась статья. Буду рад любым комментариям и критике. Весь представленный код можно найти на github
Комментарии (4)
otchgol
04.10.2019 08:23RabbitMQ всем хорош, но не очень любит большое количество динамических подключений. На мой субъективный взгляд он просто повисает. Хотя возможно он и очухивается через какое-то время. Перезапуск его лечит и, мне лично, это кажется возможным вариантом решения. Надеюсь нагрузочное тестирование проводилось и этот аспект был учтен.
apapacy
04.10.2019 12:33В данном случае подключений не так уж много (единицы) — т.к. к нему же не браузеры или мобильные устройства по веб-сокету подключаются, а подключаются только сервисы бэкэнда.
Но то о чем вы говорите — подвисание — вернее остановка очредей дело как я слышал от разработчиков и читал в статьях — распространенный случай и дело тут не в физическом исчерпании ресурсов а в протоколе. Могут быть различные сочетания не предусмотренных ситуаций. Например:
1) воркер получил сообщение из очереди — воркер выполнил (или не выполнил бизнес-задачу) — воркер свалился не подтвердив получение события — rabbitmq обнаружил разрыв соединения и вернул событие в очередь — очередной воркер получил сообщение… процесс идет по кругу
2) воркер получил сообщение и подвис в момент обработки выполнив (или не выполнив) бизнес-задачу и не подтвердив получение — событие не обработано и не ставится в очередь до разрыва соединения
3) воркер получил сообщение и подвис в момент обработки выполнив бизнес-задачу и подвисает не подтвердив получение — событие не обработано и rabbitmq по таймауту возвращает событие в очередь — следующий воркер получает то же сообщение, повторно выполняет бизнес-задачу и подвисает (т.к. тот же набор данных) — то таймауту rabbitmq возвращает событие в очередь… процесс идет по кругу
В зависимости от сочетаний настроек и багов могут проявляться: остановки очередей, не выполнение заданий, повторное выполнение заданий
apapacy
В целом, хорошо. Однако, я немного, с Вашего позволения, покритикую не конкретно Ваше решение, а саму идею кастомной реализации job queue.
Если предположить что Вы разрабатываете библиотеку реализации job queue — то после того как она станет уровня продакшин — ее можно юзать всем и во всех проектах. Если же это как часть обычного коммерческого проекта — уверяю Вас, что устранение багов (утраченные сообщения, дублирующиеся сообщения и, особенно, остановка очередей — визитная карточка rabbitmq) ожидает Вас и Вашу команду.
В этом смысле есть готовые решения см. обзор решений https://habr.com/ru/post/458608/. Если почитать issue этих решений на git.hub, Вы сможете примерно узнать в начале какого пути Вы находитесь.
Кстати, очень не хватает отлаженной библиотеки job queue на rabbitmq, и Ваша библиотека вполне может стать ею.
3ongleip Автор
Спасибо за комментарий и положительную оценку! Я согласен с Вашей критикой кастомной реализации job queue. Действительно есть много отличных готовых решений и делать еще одно не совсем уместно. Эта статья скорее недосказанный (поскольку я хотел сделать упор именно на rabbit и typescript) вариант реализации event sourcing.