Исходный код, разобранный в статье, опубликован в этом репозитории

Частое явление в вебе — полная перезагрузка приложения при переходе между страницами. При этом соединение WebSocket разрывается, а новая подписка устанавливается только после начала выполнения загруженного JavaScript, что занимает как минимум секунду. Во время перезагрузки страницы сообщения, отправленные через WebSocket, не будут получены клиентской частью приложения.

Отказоустойчивая архитектура для Realtime
Отказоустойчивая архитектура для Realtime

Аналогичная проблема возникает и в микросервисной архитектуре при использовании gRPC-стримов для взаимодействия между микросервисами. Когда микросервис отключается, соединение теряется, и попытки доставки сообщений приводят к исключениям. Чтобы решить эти проблемы, был создан starter kit, который использует Redis в качестве очереди доставки сообщений получателю.

Код веб-сервера

Очередь событий была протестирована на тысяче сессий суммарно по 35 сессий одновременно. Интеграционный тест приложен в репозитории, при наличие мощной машины, на которой установлено более 16Гб оперативной памяти, количество одновременных сессий можно увеличить в настройках

Playwright E2E
Playwright E2E

Следующий код создает вебсокет-сервер, осуществляющий вещание сообщений, полученных из микросервисной архитектуры через grpc.streamService.makeClient

import { WSContext } from "hono/ws";

import { grpc } from "@modules/remote-grpc";
import { BroadcastRedis } from "@modules/remote-redis";
import { singleshot } from "functools-kit";
import { app, upgradeWebSocket } from "src/config/app";

const connectionManager = new BroadcastRedis("host-ws__redis-emit");

app.get("/api/v1/realtime/ws", upgradeWebSocket(() => {

  let isClosed = false;

  const makeConnection = singleshot((sessionId: string, ws: WSContext) => {
    connectionManager.listenEvent(sessionId, async (data) => {
      if (isClosed) {
        return false;
      }
      ws.send(JSON.stringify(data));
      return true;
    });

    connectionManager.listenDisconnect(sessionId, () => {
      if (!isClosed) {
        ws.close();
      }
    });
  });

  return {
    onMessage(event, ws) {
      const { sessionId } = JSON.parse(event.data.toString());
      makeConnection(sessionId, ws);
    },
    onClose: () => {
      isClosed = true;
    },
  }
}));

grpc.streamService.makeClient<{ side: string, value: string }>("MessageService", async (message) => {
  connectionManager.emit(message.data);
});

export default app;

При получении первого сообщения, вебсокет считывает sessionId, который со стороны браузера потребуется сохранить в sessionStorage так, чтобы он не менялся при обновлении страницы. Отправка сообщения клиенту осуществляется через connectionManager , тот самый буфер, упомянутый в первом абзаце статьи. Коннект создается ровно один раз с использованием функции высшего порядка singleshot

Код буфера WebSocket

Для тестирования приложения был создан echo сервис, каждую секунду отправляющий число секунд с 00:01. Когда веб-сервер отключается, сообщения пишутся в Redis, при повторном подключении, если оно было осуществлено в течение минуты, все сообщения из очереди отправляются в порядке поступления. Новые сообщения встают в конец очереди, сохраняя очередность

При восстановлении подключения к сети пропущенные сообщения доставляются клиенту
При восстановлении подключения к сети пропущенные сообщения доставляются клиенту

Следующий код осуществляет создание ориентированных словарей в Redis, которые осуществляют буферизацию событий так, чтобы клиент мог переподключиться в течение минуты и получить все пропущенные сообщения

import { singleton } from "di-singleton";
import {
  IPubsubWrappedFn,
  pubsub,
  Subject,
  memoize,
  randomString,
} from "functools-kit";
import BaseMap from "./BaseMap";
import { inject } from "src/core/di";
import RedisService from "src/services/base/RedisService";
import TYPES from "src/config/types";

type MessageListener<Data = any> = (data: Data) => Promise<boolean>;

type ConnectionEmit = keyof {
  "host-sse__redis-emit": never;
  "host-ws__redis-emit": never;
};

const TTL_ONLINE_SECONDS = 1 * 60;

export const BroadcastRedis = singleton(
  class {

    readonly redisService = inject<RedisService>(TYPES.redisService);

    _disconnectSubject = new Subject<string>();

    _listenerMap = new Map<string, IPubsubWrappedFn>();
    _emitMap = new Map<string, MessageListener>();

    getEmitQueue = memoize(
      ([id]) => id,
      (id: string) =>
        new class extends BaseMap(`${this.connectionEmitId}__${id}`) { }
    );

    constructor(readonly connectionEmitId: ConnectionEmit) {}

    listenEvent = async <Data = any>(
      id: string,
      emit: MessageListener<Data>
    ) => {
      const queue = this.getEmitQueue(id);
      this._emitMap.set(id, emit);
      if (!this._listenerMap.has(id))
        this._listenerMap.set(
          id,
          pubsub<Data>(
            async (data) => {
              const emit = this._emitMap.get(id);
              if (emit) {
                return await emit(data);
              }
              return false;
            },
            {
              onDestroy: async () => {
                this._listenerMap.delete(id);
                this._emitMap.delete(id);
                this._disconnectSubject.next(id);
                await queue.clear();
                this.getEmitQueue.clear(id);
              },
              queue: pubsub.fromMap(queue),
            }
          )
        );
    };

    listenDisconnect = (id: string, fn: () => void) => {
      this._disconnectSubject.filter((channelId) => channelId === id).once(fn);
    };

    _getTotalListeners = async () => {
      const redis = await this.redisService.getRedis();
      const pattern = `${this.connectionEmitId}__*`;
      const keys = await redis.keys(pattern);
      const orderKeys = keys.filter((key) => key.endsWith(":online"));
      const start = `${this.connectionEmitId}__`;
      const end = ':online';
      return [...new Set(orderKeys)].map((key) => key.slice(start.length, key.indexOf(end)));
    };

    _pushOnlineListener = async (id: string) => {
      const redis = await this.redisService.getRedis();
      await redis.setex(`${this.connectionEmitId}__${id}:online`, TTL_ONLINE_SECONDS, 'online');
    };

    emit = async <Data = any>(data: Data) => {
      for (const id of await this._getTotalListeners()) {
        if (this._listenerMap.has(id)) {
          continue;
        }
        const queue = this.getEmitQueue(id);
        await queue.setWithKeepExpire(randomString(), data);
      }
      for (const [id, fn] of this._listenerMap.entries()) {
        await this._pushOnlineListener(id);
        fn(data);
      }
    };
  }
);

export type TBroadcastRedis = InstanceType<typeof BroadcastRedis>;

export default BroadcastRedis;

Каждое соединения клиента с websocket представляет собой инстанцию анонимного класса, наследующую динамический класс фабрики BaseMap . Инстанции кешируются функцией высшего порядка memoize и создаются в runtime после первого обращения, сохраняя одну ссылку для одинаковых sessionId. Класс BroadcastRedis обернут в di-singleton , для каждого connectionEmitId всегда возвращается своя уникальная ссылка.

Код буфера gRPC

Дополнительно, был написан альтернативный канал связи, использующий Server-Sent Events (SSE) + HTTP/2. В отличие от WebSocket не требует отправки heartbeat пакетов, что удобно на Python, где основной поток может быть заморожен, что приведет к переподключениям WebSocket

HTTP/2 + SSE как альтернативный канал связи
HTTP/2 + SSE как альтернативный канал связи

Буфер gRPC используется в хост приложении и позволяет повторить запрос несколько раз, если отправитель или получатель не запущены. Так же, если клиент или сервер упадут, при следующем запуске очередь сообщений восстановится автоматически

import { inject } from "src/core/di";
import type ProtoService from "./ProtoService";
import type LoggerService from "./LoggerService";
import TYPES from "src/config/types";
import { CC_GRPC_MAP } from "src/config/params";
import get from "src/utils/get";
import * as grpc from "@grpc/grpc-js";
import {
  errorData,
  Subject,
  createAwaiter,
  queued,
  CANCELED_PROMISE_SYMBOL,
  singleshot,
  sleep,
  singlerun,
  IPubsubArray,
  PubsubArrayAdapter,
  randomString,
} from "functools-kit";

type Ctor = new (...args: any[]) => grpc.Client;
type ServiceName = keyof typeof CC_GRPC_MAP;

const CHANNEL_OK_SYMBOL = Symbol("channel-ok");
const CHANNEL_ERROR_SYMBOL = Symbol("channel-error");

const CHANNEL_RECONNECT_SYMBOL = Symbol("channel-reconnect");

interface IMessage<Data = object> {
  serviceName: string;
  clientId: string;
  userId: string;
  requestId: string;
  stamp: string;
  data: Data;
}

export type SendMessageFn<T = object> = (
  outgoing: IMessage<T>
) => Promise<void | typeof CANCELED_PROMISE_SYMBOL>;

const GRPC_READY_DELAY = 60_000;
const GRPC_MAX_RETRY = 15;

export interface IMakeClientConfig<T = object> {
  queue: IPubsubArray<[string, IMessage<T>]>;
}

export interface IMakeServerConfig<T = object> {
  queue: IPubsubArray<[string, IMessage<T>]>;
}

interface IAwaiter {
  resolve(): void;
}

export class StreamService {
  private readonly protoService = inject<ProtoService>(TYPES.protoService);
  private readonly loggerService = inject<LoggerService>(TYPES.loggerService);

  _serverRef = new Map<string, grpc.Server>();

  _makeServerInternal = <T = object>(
    serviceName: ServiceName,
    connector: (incoming: IMessage<T>) => Promise<void>,
    reconnect: (error: boolean) => void,
    attempt: number
  ): SendMessageFn<any> => {
    this.loggerService.log(
      `remote-grpc streamService _makeServerInternal connecting service=${serviceName} attempt=${attempt}`
    );
    const messageSubject = new Subject<IMessage>();

    // Много строк, базовая реализация взаимодействия с сервером gRPC 

    return async (outgoing: IMessage) => {
      await messageSubject.waitForListener();
      await messageSubject.next(outgoing);
    };
  };

  makeServer = <T = object>(
    serviceName: ServiceName,
    connector: (incoming: IMessage<T>) => Promise<void>,
    { queue = new PubsubArrayAdapter() }: Partial<IMakeServerConfig> = {}
  ): SendMessageFn<any> => {
    this.loggerService.log(
      `remote-grpc streamService makeServer connecting service=${serviceName}`
    );

    const reconnectSubject = new Subject<typeof CHANNEL_RECONNECT_SYMBOL>();
    const connectorFn = queued(connector) as typeof connector;

    const awaiterMap = new Map<string, IAwaiter>();

    let attempt = 0;
    let outgoingFnRef: SendMessageFn<any>;

    const makeBroadcast = singlerun(async () => {
      while (await queue.length()) {
        let isOk = true;
        try {
          const first = await queue.getFirst();
          if (!first) {
            break;
          }
          const [id, outgoingMsg] = first;
          const awaiter = awaiterMap.get(id);
          if (!awaiter) {
            this.loggerService.log(
              "remote-grpc streamService makeServer missing awaiter",
              { id, outgoingMsg }
            );
            continue;
          }
          const status = await Promise.race([
            outgoingFnRef(outgoingMsg),
            reconnectSubject.toPromise(),
          ]);
          if (status === CHANNEL_RECONNECT_SYMBOL) {
            this.loggerService.log(
              `remote-grpc streamService makeServer reconnect service=${serviceName}`
            );
            throw CHANNEL_ERROR_SYMBOL;
          }
          await awaiter.resolve();
        } catch {
          isOk = false;
        } finally {
          if (isOk) {
            await queue.shift();
          }
          await sleep(10);
        }
      }
    });

    {
      const makeConnection = () => {
        attempt += 1;
        reconnectSubject.next(CHANNEL_RECONNECT_SYMBOL);
        outgoingFnRef = this._makeServerInternal<T>(
          serviceName,
          connectorFn,
          singleshot(async () => {
            if (attempt >= GRPC_MAX_RETRY) {
              await queue.clear();
              throw new Error(
                `remote-grpc streamService makeServer max retry reached service=${serviceName}`
              );
            }
            makeConnection();
          }),
          attempt
        );
      };
      makeConnection();
    }

    const makeCommit = async (outgoing: IMessage) => {
      const [result, awaiter] = createAwaiter<void>();
      const id = randomString();
      awaiterMap.set(id, awaiter);
      await queue.push([id, outgoing]);
      await makeBroadcast();
      return await result;
    };

    const makeInit = singleshot(async () => {
      const resolveList: Promise<void>[] = [];
      for await (const [id] of queue) {
        const [resolve, awaiter] = createAwaiter<void>();
        awaiterMap.set(id, awaiter);
        resolveList.push(resolve);
      }
      await makeBroadcast();
      await Promise.all(resolveList);
    });

    return async (outgoing: IMessage) => {
      await makeInit();
      await makeCommit(outgoing);
      attempt = 0;
    };
  };

  // Код взаимодействия клиента организован аналогично
}

export default StreamService;

Дополнительно

Если необходимо организовать InMemory очередь без использования Redis , есть адаптер

import { singleton } from "di-singleton";
import { IPubsubWrappedFn, pubsub, Subject } from "functools-kit";

type MessageListener<Data = any> = (data: Data) => Promise<boolean>;

export const BroadcastMemory = singleton(
  class {
    _disconnectSubject = new Subject<string>();

    _listenerMap = new Map<string, IPubsubWrappedFn>();
    _emitMap = new Map<string, MessageListener>();

    constructor(readonly connectionPoolId: string) { }

    listenEvent = async <Data = any>(
      id: string,
      emit: MessageListener<Data>
    ) => {
      this._emitMap.set(id, emit);
      if (!this._listenerMap.has(id))
        this._listenerMap.set(
          id,
          pubsub<Data>(
            async (data) => {
              const emit = this._emitMap.get(id);
              if (emit) {
                return await emit(data);
              }
              return false;
            },
            {
              onDestroy: () => {
                this._listenerMap.delete(id);
                this._emitMap.delete(id);
                this._disconnectSubject.next(id);
              },
            }
          )
        );
    };

    listenDisconnect = (id: string, fn: () => void) => {
      this._disconnectSubject.filter((channelId) => channelId === id).once(fn);
    };

    emit = <Data = any>(data: Data) => {
      for (const fn of this._listenerMap.values()) {
        fn(data);
      }
    };
  }
);

export type TBroadcastMemory = InstanceType<typeof BroadcastMemory>;

export default BroadcastMemory;

Спасибо за внимание!

Комментарии (24)


  1. nin-jin
    12.12.2024 14:51

    Модель сообщений крайне плоха в условиях нестабильного соединения:

    • Сообщения надо везде буферизировать, пока нет возможности доставить.

    • Сообщения доставляются все по порядку, даже если в конце идёт сообщение, которое делает обработку предыдущих 100 бессмысленным.

    Куда лучше работает модель состояний, с которой не нужны Кафки и редиски, ибо хватит и пары простых серверов для обслуживания куда большего числа соединений.


    1. tripolskypetr Автор
      12.12.2024 14:51

      Конечно, в условиях, когда бюджет позволяет приоретизировать и очищать очередь сообщений, это будет лучше. Однако, позволить подобное по финансам может разве что условный яндекс, которому дают деньги. Или банк, который косвенно печатает деньги.

      Для малого и среднего бизнеса, который деньги зарабатывает, это неприемлимо, так как на вещь, которая по мнению владельца бизнеса работает сама по себе, будет влито полтора месяца инженерной работы

      В условиях нестабильного соединения, я бы предпочёл сразу брать Application Server, такой как Firebase (не для ядерной сверхдержавы), AppWrite или Supabase, где отправка уведомлений на устройства грамотно заложена из коробки.

      В идеале, не делать MPA вовсе: есть SPA


      1. nin-jin
        12.12.2024 14:51

        Вы не поняли, вам совсем не нужны очереди, это гораздо дешевле.


        1. tripolskypetr Автор
          12.12.2024 14:51

          При разработке Internet of things по WebSocket передаются команды, например, на открытие дверцы турникета. Если python клиент заморозит поток c websocket heartbeat, команда открытия турникета уйдет в молоко, когда запись в бд уже сделана


          1. nin-jin
            12.12.2024 14:51

            Ничего не понял. Передавайте не команды, а состояния "турникет открыт" и синхронизируйте его между всеми заинтересованными узлами, а не превращайте турникет в вибратор, присылая пачкой десяток команд на открытие-закрытие.


            1. tripolskypetr Автор
              12.12.2024 14:51

              Как обычно в IoT, владелец завода не понимает разницу между слесарем и кодером до тех пор, пока не упрется в срок договора. Далее приглашают инженера решать вопрос, потом дают ему 2 недели на увольнение и порочный круг Путинизма продолжается

              Эта статья именно для тех, кому дали донашивать архитектуру с турникетом-вибратором. При этом даже владелец бизнеса понимает, что код говно, но с темы нужно как-то выскакивать


          1. jonic
            12.12.2024 14:51

            А зачем делать запись в бд если турникет не открылся?


            1. tripolskypetr Автор
              12.12.2024 14:51

              В предидущей архитектуре не было заложено, что турникет отправляет обратное сообщение, что турникет открыт


              1. jonic
                12.12.2024 14:51

                Еще раз; зачем делать запись в бд если мы знаем что сообщение турникету не доставлено?


  1. YChebotaev
    12.12.2024 14:51

    А зачем такое? Обычно перезагрузка страницы означают что на ней самые свежие данные. А по вебсокетам отправляются только обновления, произошедшие после загрузки

    Но если прям сильно надо, то посмотрите на socket.io


    1. tripolskypetr Автор
      12.12.2024 14:51

      1. В socket.io не кеширует очередь, это лишь адаптер websocket/long polling.

      2. При разработке Internet of things по WebSocket передаются команды, например, на открытие дверцы турникета. Если python клиент заморозит поток c websocket heartbeat, команда открытия турникета уйдет в молоко, когда запись в бд уже сделана


  1. jonic
    12.12.2024 14:51

    Поздравляю, вы только что изобрели mqtt.


    1. tripolskypetr Автор
      12.12.2024 14:51

      Как по мне, зоопарк эти ваши AMPQ, MQTT, PPAP и тд. В итоге, дофига инструментов, а специалиста, который будет решать, зная платформу от и до, не будет. Его, как сказать, ИИ заменит, только вот не заработает в итоге программа


  1. hardtop
    12.12.2024 14:51

    Задачу бы описали, для чего это всё делалось. Но пока выглядит, как овер-инжиниринг. Зачем забивать канал связи? Зачем данные за 1 секунду назад после перезагрузки?


    1. tripolskypetr Автор
      12.12.2024 14:51

      В заголовке написано: MPA


      1. GarfieldX
        12.12.2024 14:51

        Вот честно. Нифига не понятно ведь. Прочитав комменты понял что не я один такой, а все. И это уже диагноз статье.

        Нужна вводная на пальцах. Все мы работаем в разных областях. И даже аббревиатуру можем не знать, что уж говорить про нюансы и свойственную проблематику. Проще надо быть и люди к вам потянутся :)


        1. tripolskypetr Автор
          12.12.2024 14:51

          NDA :-)


          1. Noah1
            12.12.2024 14:51

            Гениальный ответ! Универсальный даже. "В статье ничего не понятно? А вам и не надо, у нас NDA".


            1. tripolskypetr Автор
              12.12.2024 14:51

              Тут что-то случилось с твоим комментарием. Не хочешь написать ещё раз?


  1. headliner1985
    12.12.2024 14:51

    Что только не придумают, чтобы не использовать старый добрый серверный рендеринг) куда собственно опять всё и возвращается потихоньку.


    1. nighthtr
      12.12.2024 14:51

      На первый запрос да, но после, как всё-таки удобно рендерить только то, что нужно. Правда делать это нужно не как автор, а то получается какие-то "горе от ума".. ))


      1. tripolskypetr Автор
        12.12.2024 14:51

        Тут очень узкая специфика, из других комментов можно додумать контекст.


  1. Kravets2
    12.12.2024 14:51

    За Hono респект


    1. tripolskypetr Автор
      12.12.2024 14:51

      Уважуха брат, Hono база!