Исходный код, разобранный в статье, опубликован в этом репозитории
Частое явление в вебе — полная перезагрузка приложения при переходе между страницами. При этом соединение WebSocket разрывается, а новая подписка устанавливается только после начала выполнения загруженного JavaScript, что занимает как минимум секунду. Во время перезагрузки страницы сообщения, отправленные через WebSocket, не будут получены клиентской частью приложения.
Аналогичная проблема возникает и в микросервисной архитектуре при использовании gRPC-стримов для взаимодействия между микросервисами. Когда микросервис отключается, соединение теряется, и попытки доставки сообщений приводят к исключениям. Чтобы решить эти проблемы, был создан starter kit, который использует Redis в качестве очереди доставки сообщений получателю.
Код веб-сервера
Очередь событий была протестирована на тысяче сессий суммарно по 35 сессий одновременно. Интеграционный тест приложен в репозитории, при наличие мощной машины, на которой установлено более 16Гб оперативной памяти, количество одновременных сессий можно увеличить в настройках
Следующий код создает вебсокет-сервер, осуществляющий вещание сообщений, полученных из микросервисной архитектуры через grpc.streamService.makeClient
import { WSContext } from "hono/ws";
import { grpc } from "@modules/remote-grpc";
import { BroadcastRedis } from "@modules/remote-redis";
import { singleshot } from "functools-kit";
import { app, upgradeWebSocket } from "src/config/app";
const connectionManager = new BroadcastRedis("host-ws__redis-emit");
app.get("/api/v1/realtime/ws", upgradeWebSocket(() => {
let isClosed = false;
const makeConnection = singleshot((sessionId: string, ws: WSContext) => {
connectionManager.listenEvent(sessionId, async (data) => {
if (isClosed) {
return false;
}
ws.send(JSON.stringify(data));
return true;
});
connectionManager.listenDisconnect(sessionId, () => {
if (!isClosed) {
ws.close();
}
});
});
return {
onMessage(event, ws) {
const { sessionId } = JSON.parse(event.data.toString());
makeConnection(sessionId, ws);
},
onClose: () => {
isClosed = true;
},
}
}));
grpc.streamService.makeClient<{ side: string, value: string }>("MessageService", async (message) => {
connectionManager.emit(message.data);
});
export default app;
При получении первого сообщения, вебсокет считывает sessionId
, который со стороны браузера потребуется сохранить в sessionStorage
так, чтобы он не менялся при обновлении страницы. Отправка сообщения клиенту осуществляется через connectionManager
, тот самый буфер, упомянутый в первом абзаце статьи. Коннект создается ровно один раз с использованием функции высшего порядка singleshot
Код буфера WebSocket
Для тестирования приложения был создан echo сервис, каждую секунду отправляющий число секунд с 00:01. Когда веб-сервер отключается, сообщения пишутся в Redis, при повторном подключении, если оно было осуществлено в течение минуты, все сообщения из очереди отправляются в порядке поступления. Новые сообщения встают в конец очереди, сохраняя очередность
Следующий код осуществляет создание ориентированных словарей в Redis, которые осуществляют буферизацию событий так, чтобы клиент мог переподключиться в течение минуты и получить все пропущенные сообщения
import { singleton } from "di-singleton";
import {
IPubsubWrappedFn,
pubsub,
Subject,
memoize,
randomString,
} from "functools-kit";
import BaseMap from "./BaseMap";
import { inject } from "src/core/di";
import RedisService from "src/services/base/RedisService";
import TYPES from "src/config/types";
type MessageListener<Data = any> = (data: Data) => Promise<boolean>;
type ConnectionEmit = keyof {
"host-sse__redis-emit": never;
"host-ws__redis-emit": never;
};
const TTL_ONLINE_SECONDS = 1 * 60;
export const BroadcastRedis = singleton(
class {
readonly redisService = inject<RedisService>(TYPES.redisService);
_disconnectSubject = new Subject<string>();
_listenerMap = new Map<string, IPubsubWrappedFn>();
_emitMap = new Map<string, MessageListener>();
getEmitQueue = memoize(
([id]) => id,
(id: string) =>
new class extends BaseMap(`${this.connectionEmitId}__${id}`) { }
);
constructor(readonly connectionEmitId: ConnectionEmit) {}
listenEvent = async <Data = any>(
id: string,
emit: MessageListener<Data>
) => {
const queue = this.getEmitQueue(id);
this._emitMap.set(id, emit);
if (!this._listenerMap.has(id))
this._listenerMap.set(
id,
pubsub<Data>(
async (data) => {
const emit = this._emitMap.get(id);
if (emit) {
return await emit(data);
}
return false;
},
{
onDestroy: async () => {
this._listenerMap.delete(id);
this._emitMap.delete(id);
this._disconnectSubject.next(id);
await queue.clear();
this.getEmitQueue.clear(id);
},
queue: pubsub.fromMap(queue),
}
)
);
};
listenDisconnect = (id: string, fn: () => void) => {
this._disconnectSubject.filter((channelId) => channelId === id).once(fn);
};
_getTotalListeners = async () => {
const redis = await this.redisService.getRedis();
const pattern = `${this.connectionEmitId}__*`;
const keys = await redis.keys(pattern);
const orderKeys = keys.filter((key) => key.endsWith(":online"));
const start = `${this.connectionEmitId}__`;
const end = ':online';
return [...new Set(orderKeys)].map((key) => key.slice(start.length, key.indexOf(end)));
};
_pushOnlineListener = async (id: string) => {
const redis = await this.redisService.getRedis();
await redis.setex(`${this.connectionEmitId}__${id}:online`, TTL_ONLINE_SECONDS, 'online');
};
emit = async <Data = any>(data: Data) => {
for (const id of await this._getTotalListeners()) {
if (this._listenerMap.has(id)) {
continue;
}
const queue = this.getEmitQueue(id);
await queue.setWithKeepExpire(randomString(), data);
}
for (const [id, fn] of this._listenerMap.entries()) {
await this._pushOnlineListener(id);
fn(data);
}
};
}
);
export type TBroadcastRedis = InstanceType<typeof BroadcastRedis>;
export default BroadcastRedis;
Каждое соединения клиента с websocket представляет собой инстанцию анонимного класса, наследующую динамический класс фабрики BaseMap
. Инстанции кешируются функцией высшего порядка memoize
и создаются в runtime после первого обращения, сохраняя одну ссылку для одинаковых sessionId
. Класс BroadcastRedis обернут в di-singleton
, для каждого connectionEmitId
всегда возвращается своя уникальная ссылка.
Код буфера gRPC
Дополнительно, был написан альтернативный канал связи, использующий Server-Sent Events (SSE) + HTTP/2. В отличие от WebSocket не требует отправки heartbeat пакетов, что удобно на Python, где основной поток может быть заморожен, что приведет к переподключениям WebSocket
Буфер gRPC
используется в хост приложении и позволяет повторить запрос несколько раз, если отправитель или получатель не запущены. Так же, если клиент или сервер упадут, при следующем запуске очередь сообщений восстановится автоматически
import { inject } from "src/core/di";
import type ProtoService from "./ProtoService";
import type LoggerService from "./LoggerService";
import TYPES from "src/config/types";
import { CC_GRPC_MAP } from "src/config/params";
import get from "src/utils/get";
import * as grpc from "@grpc/grpc-js";
import {
errorData,
Subject,
createAwaiter,
queued,
CANCELED_PROMISE_SYMBOL,
singleshot,
sleep,
singlerun,
IPubsubArray,
PubsubArrayAdapter,
randomString,
} from "functools-kit";
type Ctor = new (...args: any[]) => grpc.Client;
type ServiceName = keyof typeof CC_GRPC_MAP;
const CHANNEL_OK_SYMBOL = Symbol("channel-ok");
const CHANNEL_ERROR_SYMBOL = Symbol("channel-error");
const CHANNEL_RECONNECT_SYMBOL = Symbol("channel-reconnect");
interface IMessage<Data = object> {
serviceName: string;
clientId: string;
userId: string;
requestId: string;
stamp: string;
data: Data;
}
export type SendMessageFn<T = object> = (
outgoing: IMessage<T>
) => Promise<void | typeof CANCELED_PROMISE_SYMBOL>;
const GRPC_READY_DELAY = 60_000;
const GRPC_MAX_RETRY = 15;
export interface IMakeClientConfig<T = object> {
queue: IPubsubArray<[string, IMessage<T>]>;
}
export interface IMakeServerConfig<T = object> {
queue: IPubsubArray<[string, IMessage<T>]>;
}
interface IAwaiter {
resolve(): void;
}
export class StreamService {
private readonly protoService = inject<ProtoService>(TYPES.protoService);
private readonly loggerService = inject<LoggerService>(TYPES.loggerService);
_serverRef = new Map<string, grpc.Server>();
_makeServerInternal = <T = object>(
serviceName: ServiceName,
connector: (incoming: IMessage<T>) => Promise<void>,
reconnect: (error: boolean) => void,
attempt: number
): SendMessageFn<any> => {
this.loggerService.log(
`remote-grpc streamService _makeServerInternal connecting service=${serviceName} attempt=${attempt}`
);
const messageSubject = new Subject<IMessage>();
// Много строк, базовая реализация взаимодействия с сервером gRPC
return async (outgoing: IMessage) => {
await messageSubject.waitForListener();
await messageSubject.next(outgoing);
};
};
makeServer = <T = object>(
serviceName: ServiceName,
connector: (incoming: IMessage<T>) => Promise<void>,
{ queue = new PubsubArrayAdapter() }: Partial<IMakeServerConfig> = {}
): SendMessageFn<any> => {
this.loggerService.log(
`remote-grpc streamService makeServer connecting service=${serviceName}`
);
const reconnectSubject = new Subject<typeof CHANNEL_RECONNECT_SYMBOL>();
const connectorFn = queued(connector) as typeof connector;
const awaiterMap = new Map<string, IAwaiter>();
let attempt = 0;
let outgoingFnRef: SendMessageFn<any>;
const makeBroadcast = singlerun(async () => {
while (await queue.length()) {
let isOk = true;
try {
const first = await queue.getFirst();
if (!first) {
break;
}
const [id, outgoingMsg] = first;
const awaiter = awaiterMap.get(id);
if (!awaiter) {
this.loggerService.log(
"remote-grpc streamService makeServer missing awaiter",
{ id, outgoingMsg }
);
continue;
}
const status = await Promise.race([
outgoingFnRef(outgoingMsg),
reconnectSubject.toPromise(),
]);
if (status === CHANNEL_RECONNECT_SYMBOL) {
this.loggerService.log(
`remote-grpc streamService makeServer reconnect service=${serviceName}`
);
throw CHANNEL_ERROR_SYMBOL;
}
await awaiter.resolve();
} catch {
isOk = false;
} finally {
if (isOk) {
await queue.shift();
}
await sleep(10);
}
}
});
{
const makeConnection = () => {
attempt += 1;
reconnectSubject.next(CHANNEL_RECONNECT_SYMBOL);
outgoingFnRef = this._makeServerInternal<T>(
serviceName,
connectorFn,
singleshot(async () => {
if (attempt >= GRPC_MAX_RETRY) {
await queue.clear();
throw new Error(
`remote-grpc streamService makeServer max retry reached service=${serviceName}`
);
}
makeConnection();
}),
attempt
);
};
makeConnection();
}
const makeCommit = async (outgoing: IMessage) => {
const [result, awaiter] = createAwaiter<void>();
const id = randomString();
awaiterMap.set(id, awaiter);
await queue.push([id, outgoing]);
await makeBroadcast();
return await result;
};
const makeInit = singleshot(async () => {
const resolveList: Promise<void>[] = [];
for await (const [id] of queue) {
const [resolve, awaiter] = createAwaiter<void>();
awaiterMap.set(id, awaiter);
resolveList.push(resolve);
}
await makeBroadcast();
await Promise.all(resolveList);
});
return async (outgoing: IMessage) => {
await makeInit();
await makeCommit(outgoing);
attempt = 0;
};
};
// Код взаимодействия клиента организован аналогично
}
export default StreamService;
Дополнительно
Если необходимо организовать InMemory
очередь без использования Redis
, есть адаптер
import { singleton } from "di-singleton";
import { IPubsubWrappedFn, pubsub, Subject } from "functools-kit";
type MessageListener<Data = any> = (data: Data) => Promise<boolean>;
export const BroadcastMemory = singleton(
class {
_disconnectSubject = new Subject<string>();
_listenerMap = new Map<string, IPubsubWrappedFn>();
_emitMap = new Map<string, MessageListener>();
constructor(readonly connectionPoolId: string) { }
listenEvent = async <Data = any>(
id: string,
emit: MessageListener<Data>
) => {
this._emitMap.set(id, emit);
if (!this._listenerMap.has(id))
this._listenerMap.set(
id,
pubsub<Data>(
async (data) => {
const emit = this._emitMap.get(id);
if (emit) {
return await emit(data);
}
return false;
},
{
onDestroy: () => {
this._listenerMap.delete(id);
this._emitMap.delete(id);
this._disconnectSubject.next(id);
},
}
)
);
};
listenDisconnect = (id: string, fn: () => void) => {
this._disconnectSubject.filter((channelId) => channelId === id).once(fn);
};
emit = <Data = any>(data: Data) => {
for (const fn of this._listenerMap.values()) {
fn(data);
}
};
}
);
export type TBroadcastMemory = InstanceType<typeof BroadcastMemory>;
export default BroadcastMemory;
Спасибо за внимание!
Комментарии (24)
YChebotaev
12.12.2024 14:51А зачем такое? Обычно перезагрузка страницы означают что на ней самые свежие данные. А по вебсокетам отправляются только обновления, произошедшие после загрузки
Но если прям сильно надо, то посмотрите на socket.io
tripolskypetr Автор
12.12.2024 14:51В socket.io не кеширует очередь, это лишь адаптер websocket/long polling.
При разработке Internet of things по WebSocket передаются команды, например, на открытие дверцы турникета. Если python клиент заморозит поток c websocket heartbeat, команда открытия турникета уйдет в молоко, когда запись в бд уже сделана
jonic
12.12.2024 14:51Поздравляю, вы только что изобрели mqtt.
tripolskypetr Автор
12.12.2024 14:51Как по мне, зоопарк эти ваши AMPQ, MQTT, PPAP и тд. В итоге, дофига инструментов, а специалиста, который будет решать, зная платформу от и до, не будет. Его, как сказать, ИИ заменит, только вот не заработает в итоге программа
hardtop
12.12.2024 14:51Задачу бы описали, для чего это всё делалось. Но пока выглядит, как овер-инжиниринг. Зачем забивать канал связи? Зачем данные за 1 секунду назад после перезагрузки?
tripolskypetr Автор
12.12.2024 14:51В заголовке написано: MPA
GarfieldX
12.12.2024 14:51Вот честно. Нифига не понятно ведь. Прочитав комменты понял что не я один такой, а все. И это уже диагноз статье.
Нужна вводная на пальцах. Все мы работаем в разных областях. И даже аббревиатуру можем не знать, что уж говорить про нюансы и свойственную проблематику. Проще надо быть и люди к вам потянутся :)
tripolskypetr Автор
12.12.2024 14:51NDA :-)
Noah1
12.12.2024 14:51Гениальный ответ! Универсальный даже. "В статье ничего не понятно? А вам и не надо, у нас NDA".
tripolskypetr Автор
12.12.2024 14:51Тут что-то случилось с твоим комментарием. Не хочешь написать ещё раз?
headliner1985
12.12.2024 14:51Что только не придумают, чтобы не использовать старый добрый серверный рендеринг) куда собственно опять всё и возвращается потихоньку.
nighthtr
12.12.2024 14:51На первый запрос да, но после, как всё-таки удобно рендерить только то, что нужно. Правда делать это нужно не как автор, а то получается какие-то "горе от ума".. ))
tripolskypetr Автор
12.12.2024 14:51Тут очень узкая специфика, из других комментов можно додумать контекст.
nin-jin
Модель сообщений крайне плоха в условиях нестабильного соединения:
Сообщения надо везде буферизировать, пока нет возможности доставить.
Сообщения доставляются все по порядку, даже если в конце идёт сообщение, которое делает обработку предыдущих 100 бессмысленным.
Куда лучше работает модель состояний, с которой не нужны Кафки и редиски, ибо хватит и пары простых серверов для обслуживания куда большего числа соединений.
tripolskypetr Автор
Конечно, в условиях, когда бюджет позволяет приоретизировать и очищать очередь сообщений, это будет лучше. Однако, позволить подобное по финансам может разве что условный яндекс, которому дают деньги. Или банк, который косвенно печатает деньги.
Для малого и среднего бизнеса, который деньги зарабатывает, это неприемлимо, так как на вещь, которая по мнению владельца бизнеса работает сама по себе, будет влито полтора месяца инженерной работы
В условиях нестабильного соединения, я бы предпочёл сразу брать Application Server, такой как Firebase (не для ядерной сверхдержавы), AppWrite или Supabase, где отправка уведомлений на устройства грамотно заложена из коробки.
В идеале, не делать MPA вовсе: есть SPA
nin-jin
Вы не поняли, вам совсем не нужны очереди, это гораздо дешевле.
tripolskypetr Автор
При разработке Internet of things по WebSocket передаются команды, например, на открытие дверцы турникета. Если python клиент заморозит поток c websocket heartbeat, команда открытия турникета уйдет в молоко, когда запись в бд уже сделана
nin-jin
Ничего не понял. Передавайте не команды, а состояния "турникет открыт" и синхронизируйте его между всеми заинтересованными узлами, а не превращайте турникет в вибратор, присылая пачкой десяток команд на открытие-закрытие.
tripolskypetr Автор
Как обычно в IoT, владелец завода не понимает разницу между слесарем и кодером до тех пор, пока не упрется в срок договора. Далее приглашают инженера решать вопрос, потом дают ему 2 недели на увольнение и порочный круг Путинизма продолжается
Эта статья именно для тех, кому дали донашивать архитектуру с турникетом-вибратором. При этом даже владелец бизнеса понимает, что код говно, но с темы нужно как-то выскакивать
jonic
А зачем делать запись в бд если турникет не открылся?
tripolskypetr Автор
В предидущей архитектуре не было заложено, что турникет отправляет обратное сообщение, что турникет открыт
jonic
Еще раз; зачем делать запись в бд если мы знаем что сообщение турникету не доставлено?