![image](https://habrastorage.org/webt/hf/xh/dl/hfxhdlv7katnrbytwzqmhjy3xom.png)
В предыдущей статье речь шла об общем решении для вебсокетов в Angular, где мы на основе WebSocketSubject построили шину с реконнектом и сервисом для использования в компонентах. Подобная реализация подходит для большинства простых случаев, например, приема и отправки сообщений в чате и т.д., но её возможностей может быть недостаточно там, где нужно построить нечто более гибкое и контролируемое. В этой статье я раскрою некоторые особенности при работе с вебсокетами и расскажу о тех требованиях, с которыми сталкивался сам и, возможно, столкнетесь вы.
Часто в больших проектах с высокой посещаемостью перед фронтендом встают задачи, которые в других обстоятельствах привычнее видеть на бэкенде. В условиях жесткой экономии серверных ресурсов часть проблем мигрирует на территорию фронтенда, по этой причине в проект закладывается максимум расширяемости и контроля.
Вот список основных требований для вебсокет-клиента, которые будут рассматриваться в этой статье:
- Автоматический «умный» реконнект;
- Режим дебага;
- Система подписок на события на основе RxJs;
- Прием и парсинг бинарных данных;
- Проецирование (маппинг) получаемой информации на модели;
- Контроль над изменениями моделей по мере прихода новых событий;
- Игнорирование произвольных событий и отмена игнорирования.
Рассмотрим каждый пункт подробнее.
Реконнект/Дебаг
Про реконнект я писал в предыдущей статье, поэтому просто процитирую часть текста:
Реконнект, или организация переподключения к серверу, это первостепенный фактор при работе с вебсокетами, т.к. обрывы сети, падения сервера или другие ошибки, вызывающие обрыв коннекта способны обрушить работу приложения.
Важно учесть, что попытки переподключения не должны быть слишком частыми и не должны продолжаться до бесконечности, т.к. такое поведение способно подвесить клиент.
Сам по себе вебсокет не умеет восстанавливать соединение при обрыве. Следовательно, если перезагрузился или упал сервер, или же у пользователя переподключился интернет, то для продолжения работы нужно переподключиться и вебсокету.
В этой статье для реконнекта и дебага будет использоваться Reconnecting WebSocket, который содержит нужный функционал и прочие опции, такие как смена url вебсокета между переподключениями, выбор произвольного конструктора WebSocket и т.д. Также подойдут и другие альтернативные решения. Реконнект же из предыдущей статьи не подходит, т.к. он написан под WebSocketSubject, который в этот раз не применяется.
Система подписок на события на основе RxJs
Для использования вебсокетов в компонентах нужно подписываться на события и отписываться от них, когда потребуется. Для этого воспользуемся распространенным дизайн-паттерном Pub/Sub.
«Издатель-подписчик (англ. publisher-subscriber или англ. pub/sub)?—?поведенческий шаблон проектирования передачи сообщений, в котором отправители сообщений, именуемые издателями (англ. publishers), напрямую не привязаны программным кодом отправки сообщений к подписчикам (англ. subscribers). Вместо этого сообщения делятся на классы и не содержат сведений о своих подписчиках, если таковые есть. Аналогичным образом подписчики имеют дело с одним или несколькими классами сообщений, абстрагируясь от конкретных издателей.»
Подписчик обращается к издателю не напрямую, а через промежуточную шину?—?сервис вебсокетов. Также должна быть возможность подписаться на несколько событий с одинаковым типом возвращаемых данных. Для каждой подписки создается собственный Subject, который добавляется к объекту listeners, что позволяет адресовать события вебсокета нужным подпискам. При работе с RxJs Subject, возникают некоторые сложности с отписками, поэтому создадим несложный сборщик мусора, который будет удалять сабжекты из объекта listeners в случае, когда у них отсутствуют observers.
Прием и парсинг бинарных данных
WebSocket поддерживает передачу бинарных данных, файлов или стримов, что часто используется в больших проектах. Это выглядит примерно следующим образом:
0x80, <длина?—?один или несколько байт>, <тело сообщения>
Чтобы не создавать ограничений на длину передаваемого сообщения и в то же время не расходовать байты нерационально, разработчики протокола использовали следующий алгоритм. Каждый байт в указании длины рассматривается по отдельности: старший указывает на то, последний ли это байт (0) или же за ним идут другие (1), а младшие 7 битов содержат передаваемые данные. Следовательно, когда появляется признак бинарного дата-фрейма 0x80, то берется следующий байт и откладывается в отдельную «копилку». Потом следующий байт, если у него установлен старший бит, тоже переносится в «копилку» и так далее до тех пор, пока не встретится байт с нулевым старшим битом. Этот байт?—?последний в указателе длины и также складывается в «копилку». Теперь из байтов в «копилке» убираются старшие биты, и остаток объединяется. Вот это и будет длина тела сообщения?—?7-битные числа без старшего бита.
Механизм парсинга и бинарного стрима на фронтенде сложен и связан с маппингом данных на модели. Этому можно посвятить отдельную статью. В этот раз разберем простой вариант, а сложные случаи оставим для следующих публикаций, если будет интерес к теме.
Проецирование (маппинг) получаемой информации на модели
Независимо от типа передачи принимаемое требуется безопасно читать и изменять. Нет единого мнения как это лучше делать, я придерживаюсь теории модели данных, так как считаю её логичной и надежной для программирования в ООП-стиле.
«Модель данных?—?это абстрактное, самодостаточное, логическое определение объектов, операторов и прочих элементов, в совокупности составляющих абстрактную машину доступа к данным, с которой взаимодействует пользователь. Эти объекты позволяют моделировать структуру данных, а операторы?—?поведение данных.»
Всевозможные популярные шины, которые не дают представление объекта, как класса, в котором определяется поведение, структура и т.д., создают путаницу, хуже контролируются и иногда обрастают тем, что им не свойственно. Например, класс собаки при любых условиях должен описывать собаку. Если собаку воспринимать в виде набора полей: хвост, цвет, морда и т.д., то у собаки может вырасти лишняя лапа, а вместо головы появиться другая собака.
![image](https://habrastorage.org/webt/sq/fa/xu/sqfaxulbeyaffws-46gxho3-dcg.jpeg)
Контроль над изменениями моделей по мере прихода новых событий
В этом пункте опишу задачу, с которой столкнулся при работе над веб-интерфейсом мобильного приложения спортивных ставок. API приложения работало через вебсокеты, через которые получали: обновление коэффициентов, добавление и удаление новых типов ставок, уведомления о начале или окончании матча и т.д.?—?итого около трёхсот событий вебсокета. Во время матча ставки и информация непрерывно обновляются, иногда 2–3 раза в секунду, таким образом проблема заключалась в том, что вслед за ними без промежуточного контроля обновлялся и интерфейс.
Когда пользователь следил за ставкой с мобильного устройства, и в это же время на его дисплее обновлялись списки, то ставка исчезала из поля видимости, поэтому пользователю приходилось искать отслеживаемую ставку заново. Такое поведение повторялось на каждое обновление.
![image](https://habrastorage.org/webt/tq/5p/e8/tq5pe8qxiepdjj7dx5-jry10bjs.gif)
Для решения потребовалась иммутабельность для объектов, которые отображались на экране, но при этом коэффициенты ставок должны были меняться, неактуальные ставки приобретать неактивный вид, а новые не добавляться до тех пор, пока пользователь не проскроллит экран. На бэкенде устаревшие варианты не хранились, поэтому такие линии требовалось запоминать и помечать флагом «deleted», для чего было создано промежуточное хранилище данных между вебсокетом и подпиской, что обеспечило контроль за изменениями.
В новом сервисе также создадим слой-заместитель и в этот раз воспользуемся Dexie.js?—?обертку над IndexedDB API, но подойдет любая другая виртуальная или браузерная БД. Допустимо использование Redux.
Игнорирование произвольных событий и отмена игнорирования
В одной компании часто существуют сразу несколько однотипных проектов: мобильная и веб версии, версии с различными настройками для разных групп пользователей, расширенные и урезанные варианты одного и того же приложения.
Часто все они используют единую кодовую базу, поэтому иногда требуется отключать ненужные события в рантайме или при DI, не удаляя подписки и снова включать, т.е. игнорировать часть из них, чтобы не обрабатывать ненужные события. Это простая, но полезная функция, которая добавляет гибкости шине Pub/Sub.
Начнем с описания интерфесов:
export interface IWebsocketService { // публичный интерфейс сервиса
addEventListener<T>(topics: string[], id?: number): Observable<T>;
runtimeIgnore(topics: string[]): void;
runtimeRemoveIgnore(topics: string[]): void;
sendMessage(event: string, data: any): void;
}
export interface WebSocketConfig { // конфиг при DI
url: string;
ignore?: string[];
garbageCollectInterval?: number;
options?: Options;
}
export interface ITopic<T> { // Топик для Pub/Sub
[hash: string]: MessageSubject<T>;
}
export interface IListeners { // объект с топиками
[topic: string]: ITopic<any>;
}
export interface IBuffer { // бинарный буфер из ws.message
type: string;
data: number[];
}
export interface IWsMessage { // ws.message
event: string;
buffer: IBuffer;
}
export interface IMessage { // Для демо
id: number;
text: string;
}
export type ITopicDataType = IMessage[] | number | string[]; // типизируем callMessage в сервисе
Отнаследуем Subject, чтобы создать сборщик мусора:
export class MessageSubject<T> extends Subject<T> {
constructor(
private listeners: IListeners, // объект с топиками
private topic: string, // текущий топик
private id: string // id сабжекта
) {
super();
}
/*
* переопределяем стандартный next,
* теперь на очередное обращение при отсутствии подписок,
* будет вызываться garbageCollect
*/
public next(value?: T): void {
if (this.closed) {
throw new ObjectUnsubscribedError();
}
if (!this.isStopped) {
const {observers} = this;
const len = observers.length;
const copy = observers.slice();
for (let i = 0; i < len; i++) {
copy[i].next(value);
}
if (!len) {
this.garbageCollect(); // выносим мусор
}
}
}
/*
* garbage collector
* */
private garbageCollect(): void {
delete this.listeners[this.topic][this.id]; // удаляем Subject
if (!Object.keys(this.listeners[this.topic]).length) { // удаляем пустой топик
delete this.listeners[this.topic];
}
}
}
В отличие от прошлой реализации websocket.events.ts сделаем частью модуля вебсокетов
export const WS_API = {
EVENTS: {
MESSAGES: 'messages',
COUNTER: 'counter',
UPDATE_TEXTS: 'update-texts'
},
COMMANDS: {
SEND_TEXT: 'set-text',
REMOVE_TEXT: 'remove-text'
}
};
Для конфигурирования при подключении модуля создадим websocket.config:
import { InjectionToken } from '@angular/core';
export const config: InjectionToken<string> = new InjectionToken('websocket');
Создадим модель для Proxy:
import Dexie from 'dexie';
import { IMessage, IWsMessage } from './websocket.interfaces';
import { WS_API } from './websocket.events';
class MessagesDatabase extends Dexie { // это стандартное использование Dexie с typescript
public messages!: Dexie.Table<IMessage, number>; // id is number in this case
constructor() {
super('MessagesDatabase'); // имя хранилища
this.version(1).stores({ // модель стора
messages: '++id,text'
});
}
}
Простой парсер моделей, в реальных условиях его лучше разделить на несколько файлов:
export const modelParser = (message: IWsMessage) => {
if (message && message.buffer) {
/* парсим */
const encodeUint8Array = String.fromCharCode
.apply(String, new Uint8Array(message.buffer.data));
const parseData = JSON.parse(encodeUint8Array);
let MessagesDB: MessagesDatabase; // IndexedDB
if (message.event === WS_API.EVENTS.MESSAGES) { // IMessage[]
if (!MessagesDB) {
MessagesDB = new MessagesDatabase();
}
parseData.forEach((messageData: IMessage) => {
/* создаем транзакцию */
MessagesDB.transaction('rw', MessagesDB.messages, async () => {
/* создаем, если запись отсутствует */
if ((await MessagesDB.messages
.where({id: messageData.id}).count()) === 0) {
const id = await MessagesDB.messages
.add({id: messageData.id, text: messageData.text});
console.log(`Addded message with id ${id}`);
}
}).catch(e => {
console.error(e.stack || e);
});
});
return MessagesDB.messages.toArray(); // возвращаем массив IMessage[]
}
if (message.event === WS_API.EVENTS.COUNTER) { // counter
return new Promise(r => r(parseData)); // промис с счетчиком
}
if (message.event === WS_API.EVENTS.UPDATE_TEXTS) { // text
const texts = [];
parseData.forEach((textData: string) => {
texts.push(textData);
});
return new Promise(r => r(texts)); // промис с массивом строк
}
} else {
console.log(`[${Date()}] Buffer is "undefined"`);
}
};
WebsocketModule:
@NgModule({
imports: [
CommonModule
]
})
export class WebsocketModule {
public static config(wsConfig: WebSocketConfig): ModuleWithProviders {
return {
ngModule: WebsocketModule,
providers: [{provide: config, useValue: wsConfig}]
};
}
}
Начнем создавать сервис:
private listeners: IListeners; // список топиков
private uniqueId: number; // соль для id подписки
private websocket: ReconnectingWebSocket; // объект вебсокета
constructor(@Inject(config) private wsConfig: WebSocketConfig) {
this.uniqueId = -1;
this.listeners = {};
this.wsConfig.ignore = wsConfig.ignore ? wsConfig.ignore : [];
// коннектимся
this.connect();
}
ngOnDestroy() {
this.websocket.close(); // убиваем вебсокет при дестрое
}
Метод connect:
private connect(): void {
// ReconnectingWebSocket config
const options = {
connectionTimeout: 1000, // таймаут реконнекта, если не задано
maxRetries: 10, // попытки реконнекта, если не задано
...this.wsConfig.options
};
// Коннектимся
this.websocket = new ReconnectingWebSocket(this.wsConfig.url, [], options);
this.websocket.addEventListener('open', (event: Event) => {
// соединение открыто
console.log(`[${Date()}] WebSocket connected!`);
});
this.websocket.addEventListener('close', (event: CloseEvent) => {
// соединение закрыто
console.log(`[${Date()}] WebSocket close!`);
});
this.websocket.addEventListener('error', (event: ErrorEvent) => {
// ошибка соединения
console.error(`[${Date()}] WebSocket error!`);
});
this.websocket.addEventListener('message', (event: MessageEvent) => {
// диспатчим события в подписки
this.onMessage(event);
});
setInterval(() => {
// дублируем сборщик мусора
this.garbageCollect();
}, (this.wsConfig.garbageCollectInterval || 10000));
}
Дублируем сборщик мусора, будет проверять подписки по таймауту:
private garbageCollect(): void {
for (const event in this.listeners) {
if (this.listeners.hasOwnProperty(event)) {
const topic = this.listeners[event];
for (const key in topic) {
if (topic.hasOwnProperty(key)) {
const subject = topic[key];
// удаляем Subject если нет подписок
if (!subject.observers.length) {
delete topic[key];
}
}
}
Удаляем топик, если пуст
if (!Object.keys(topic).length) {
delete this.listeners[event];
}
}
}
}
Смотрим в какую подписку слать событие:
private onMessage(event: MessageEvent): void {
const message = JSON.parse(event.data);
for (const name in this.listeners) {
if (this.listeners.hasOwnProperty(name) && !this.wsConfig.ignore.includes(name)) {
const topic = this.listeners[name];
const keys = name.split('/'); // если подписаны на несколько событий
const isMessage = keys.includes(message.event);
const model = modelParser(message); // получаем промис с моделями
if (isMessage && typeof model !== 'undefined') {
model.then((data: ITopicDataType) => {
// отправляем в Subject
this.callMessage<ITopicDataType>(topic, data);
});
}
}
}
}
Шлем событие в Subject:
private callMessage<T>(topic: ITopic<T>, data: T): void {
for (const key in topic) {
if (topic.hasOwnProperty(key)) {
const subject = topic[key];
if (subject) {
// отправляем подписчику
subject.next(data);
} else {
console.log(`[${Date()}] Topic Subject is "undefined"`);
}
}
}
}
Создаем топик Pub/Sub:
private addTopic<T>(topic: string, id?: number): MessageSubject<T> {
const token = (++this.uniqueId).toString();
const key = id ? token + id : token; // уникальный id для токена
const hash = sha256.hex(key); // SHA256-хэш в качестве id топика
if (!this.listeners[topic]) {
this.listeners[topic] = <any>{};
}
return this.listeners[topic][hash] = new MessageSubject<T>(this.listeners, topic, hash);
}
Подписка на одно или несколько событий:
public addEventListener<T>(topics: string | string[], id?: number): Observable<T> {
if (topics) {
// подписка на одно или несколько событий
const topicsKey = typeof topics === 'string' ? topics : topics.join('/');
return this.addTopic<T>(topicsKey, id).asObservable();
} else {
console.log(`[${Date()}] Can't add EventListener. Type of event is "undefined".`);
}
}
Здесь все намеренно упрощено, но можно преобразовывать в бинарные сущности, как и в случае с сервером. Отсылка команд на сервер:
public sendMessage(event: string, data: any = {}): void {
// если соединение активно, шлем имя события и информацию
if (event && this.websocket.readyState === 1) {
this.websocket.send(JSON.stringify({event, data}));
} else {
console.log('Send error!');
}
}
Добавляем события в игнорлист в рантайме:
public runtimeIgnore(topics: string[]): void {
if (topics && topics.length) {
// добавляем в игнорлист
this.wsConfig.ignore.push(...topics);
}
}
Удаляем события из игнорлиста:
public runtimeRemoveIgnore(topics: string[]): void {
if (topics && topics.length) {
topics.forEach((topic: string) => {
// ищем событие в списке топиков
const topicIndex = this.wsConfig.ignore.findIndex(t => t === topic);
if (topicIndex > -1) {
// снова слушаем собтия
this.wsConfig.ignore.splice(topicIndex, 1);
}
});
}
}
Подключаем модуль вебсокетов:
@NgModule({
declarations: [
AppComponent
],
imports: [
BrowserModule,
ReactiveFormsModule,
WebsocketModule.config({
url: environment.ws, // или "ws://mywebsocketurl"
// список игнорируемых событий
ignore: [WS_API.EVENTS.ANY_1, WS_API.EVENTS.ANY_2],
garbageCollectInterval: 60 * 1000, // интервал сборки мусора
options: {
connectionTimeout: 1000, // таймаут реконнекта
maxRetries: 10 // попытки реконнекта
}
})
],
providers: [],
bootstrap: [AppComponent]
})
export class AppModule {
}
Используем в компонентах:
@Component({
selector: 'app-root',
templateUrl: './app.component.html',
styleUrls: ['./app.component.css']
})
export class AppComponent implements OnInit, OnDestroy {
private messages$: Observable<IMessage[]>;
private messagesMulti$: Observable<IMessage[]>;
private counter$: Observable<number>;
private texts$: Observable<string[]>;
public form: FormGroup;
constructor(
private fb: FormBuilder,
private wsService: WebsocketService) {
}
ngOnInit() {
this.form = this.fb.group({
text: [null, [
Validators.required
]]
});
// get messages
this.messages$ = this.wsService
.addEventListener<IMessage[]>(WS_API.EVENTS.MESSAGES);
// get messages multi
this.messagesMulti$ = this.wsService
.addEventListener<IMessage[]>([
WS_API.EVENTS.MESSAGES,
WS_API.EVENTS.MESSAGES_1
]);
// get counter
this.counter$ = this.wsService
.addEventListener<number>(WS_API.EVENTS.COUNTER);
// get texts
this.texts$ = this.wsService
.addEventListener<string[]>(WS_API.EVENTS.UPDATE_TEXTS);
}
ngOnDestroy() {
}
public sendText(): void {
if (this.form.valid) {
this.wsService
.sendMessage(WS_API.COMMANDS.SEND_TEXT, this.form.value.text);
this.form.reset();
}
}
public removeText(index: number): void {
this.wsService.sendMessage(WS_API.COMMANDS.REMOVE_TEXT, index);
}
}
Сервис готов к использованию.
Пример из статьи хоть не является универсальным решением для каждого проекта, но демонстрирует один из подходов работы с вебсокетами в больших и сложных приложениях. Вы можете взять его на вооружение и модифицировать в зависимости от текущих задач.
Полную версию сервиса можно найти на GitHub.
По всем вопросам можете обращаться в комментарии, ко мне в Телеграм или на канал Angular там же.
nohuhu
Всё это очень похоже на JSON-RPC с добавлением лишних сущностей.