Worker, SharedWorker, WebSocket, PUSH, IPC-вызовы в Electron и PWA-приложении
В данном цикле статей мы рассмотрим задачу синхронизации состояния приложения между окнами. В качестве подопытного у нас будет приложение на Electron, работающее в offline/online-режимах, которое также может запускаться в PWA-режиме.
Дисклеймер
Меня зовут Владимир Завертайлов. В последние лет 15 я занимаюсь в основном управлением it-компанией. Программирование воспринимаю как хобби. Поэтому прошу заранее простить, если какие-то из приведенных примеров кода или концепций будут наивными.
Две недели назад я ничего из этого вообще не умел :)
Дано
Итак. У нас на руках есть довольно богатое приложение, написанное на TypeScript, React + Redux. Запускать мы его умеем в среде Electron (это платформа на основе браузера Chrome, чтобы делать полноценные портабельные приложения — хоть для Mac, хоть для Windows или Linux — у нас все это есть). Так же есть версия для PWA, использующая большую часть кода десктоп-приложения.
Синхронизация стейта двух окон через Main-процесс при Drag&Drop
Приложение умеет работать в offline-режиме и синхронизироваться с облачным хранилищем (мы использовали протокол GRPC). Открывать несколько окон, синхронизировать состояние между ними. Позволяет делать Drag & Drop между окнами. Кроме того, есть богатая поддержка хоткеев. И хитрые операции, в том числе Redo / Undo.
Что плохо
Огромное количество boilerplate-кода из-за Redux.
Архитектурная ошибка: каждое окно хранит в стейте полную копию всей базы. И полагается на то, что стейт — есть истина. Итог: на больших базах (несколько десятков тысяч записей) окна открываются несколько секунд.Пожалуйста, никогда не делайте так! Безопаснее думать о state как о кеше, который нужен вам для синхронной отрисовки компонентов.
Много слоев абстракции. Сообщения между слоями недостаточно строго стандартизированы. Слушатели могут подключаться в самых разных, порой неожиданных, местах.
-
Протокол GRPC довольно сложно расширять. А его подход с подстановкой дефолтных значений вместо не переданных параметров — вообще рассадник трудноуловимых багов.
Что хорошо
Операции TimeTravel (Redo / Undo) легко делать на Redux: достаточно просто сохранить состояние в стек и путешествовать по нему.
Код довольно стабильный. Все детские болячки в нем решены. Синхронизация работает хорошо.
Автотесты, в том числе интеграционные. Имеются. CI работает как надо.
В общем, все довольно по-взрослому. Кроме работы со стейтом и Redux-портянок. Хочется чего-то… Понять бы конкретно чего…
Целевое состояние
Убрать бойлерплейт-код по-максимуму. Стейт сделать динамическим, с ленивой загрузкой по мере надобности.
Сообщения между слоями типизированы. Слои абстракции и передача сообщений между слоями максимально сокращены. Вся логика межпроцессного взаимодействия спрятана под капот. Я хочу, чтобы стейт обновлялся сам, если чего-то пришло от сервера, или случилось в другом окне!
Попробовать заменить GRPC на REST/GrapQL/MessagePack. С одной стороны хочется оставить бинарный протокол. Но мне не нравятся схемы GRPS. С другой — использовать web-сокеты, для реалтайм обновления от сервера через PUSH. С третей — нас очень просят сделать публичный REST-api, и это есть в планах на этот год. Нужно выбрать.
PWA-приложение: сделать возможность работы в нескольких окнах.
А еще мне хотелось получить такой код, от которого бы душа радовалась. Это — важно. Помчались!
Пинарик
В качестве экспериментального подопытного компонента я выбрал Пинарик — простой трекер привычек, который несколько раз пользователи просили добавить в наше приложение на CustDev-интервью. Это табличка со списком привычек, которые ты отслеживаешь каждый день. И делаешь пометку “сделал/не сделал/так себе сделал”. Если делаешь что-то полезное каждый день, это сразу бросается в глаза.
Пинарик. 2 окна. "Гирлянда". Синхронный стейт. Подопытный компонент в программистском дизайне.
Вообще в сторах полно таких приложений, но нас настойчиво просят добавить такую панель в SingularityApp. Окей. Будем планировать рефакторинг стора на этом компоненте.
Worker и SharedWorker
Браузеры умеют запускать фоновые процессы в worker-ах. Вы пишете какой-то скрипт, кладете его в отдельный файл и дальше просто создаете в основном окне объект Worker, натравливая его на этот скрипт:
const worker = new Worker(“worker.js”);
SharedWorker работает еще интереснее: несколько экземпляров окон могут разделить между собой один и тот же SharedWorker. Первое, что напрашивается — вынести стейт в SharedWorker и обращаться к нему.
Однако эта идея мне не понравилась. Мы не можем шарить память между процессами браузера (и это правильно). Все что мы можем — это отправлять сообщения в воркер и принимать сообщения из него. Ассинхронно. Либо передать какой-то подготовленный объект из потока worker-а в поток окна. Очевидно, что этот метод нам тоже не подходит, поскольку в момент передачи worker потеряет объект у себя.
Интерфейсы отправки сообщений у Worker и SharedWorker хоть и похожи, но слегка разные. Worker имеет массу ограничений, в частности — не может сам взаимодействовать DOM-деревом (логично, у него нет своего окна).
Более того, SharedWorker не может даже в консоль ничего написать, что делает его отладку особенно утомительной. Поэтому имеет смысл:
Универсализировать отправку сообщений в Worker или Shared Worker
Отладить все на Worker
Переключиться на SharedWorker
Хинт: для отладки SharedWorker в браузере используйте chrome://inspect/#workers. Найдите свой SharedWorker, кликните Inspect. Так можно посмотреть консоль воркера.
В альтернативных браузерах — не подскажу. На крайний случай просто делайте http-запрос на какой-нибудь localhost:3000?<log>
, логи от которого вам доступы. Но вообще это лучше один раз написать, отладить и забыть. Если вы не мазохист, конечно.
Сборка Worker и SharedWorker с TypeScript для Electron. Отладка.
Файл со скриптом для Worker должен быть отдельный. Мы используем TypeScript, поэтому не можем отдать напрямую какой-то js-файл, без трансплаера (ts->js — кстати, почему TypeScript нет в редакторе кода Habr?
). Я пробовал два подхода;
Сделать отдельную конфигурацию webpack, собирающую воркеры. Этот подход мы используем в боевом приложении — нам там нужно все пожатое и оптимизированное (только не надо пытаться резать worker на чанки — но это и ежу понятно ????).
-
Если используете сборщик Electron Forge — кажется, единственный вариант — сделать отдельную точку входа (Entry Point) в Electron. Удобно для отладки, так как почти адекватно работает hot reload (почти, но на самом деле — нет; если вы в отладке воркера и не хотите чинить фантомы — не полагайтесь на хотрелод: некоторые объекты, вроде WebSocket или EventListner в момент хотрелода могут быть проинициированы повторно, например. А могут и не быть. Лучше вообще исключите файлы воркеров из вотчера webpack. Сложно на словах, на деле – просто добавь
watchOptions: { ignored:папка с воркерами}
в конфиги вепбака).Для этого в package.json в секции
@electron-forge/plugin-webpack
добавляем в entryPoints нужные нам воркеры:
// Основное окно
{
name: 'MySuperApp',
html: './assets/index.ejs',
js: './src/index.tsx',
preload: {
// Preload используем общий, для всех entryPoint
js: "./src/preload.ts"
}
},
// Worker. Альтернативный вариант: собирать отдельно через webpack
{
js: "./src/workers/worker_sync.ts",
name: "Worker",
},
{
js: "./src/workers/worker_service.ts",
name: "Service Worker",
},
В итоге, после yarn start
, в папке с билдами у меня появятся две новых точки входа (worker_sync и worker_service), а изменения в коде будут в реалтайме пытаться приехать в воркер (но не все так солнечно, конечно).
Стандартизация транспорта. Worker, PostMessage, Emitter, Promise, TimeOut
Итак. Взаимодействие с Worker, штатно, работает через механизм отправки сообщений. Способы отправки немного различаются для Worker и SharedWorker. Впрочем это легко инкапсулировать.
if (this.worker instanceof Worker) {
this.worker.postMessage(message); // Веб-воркер
} else if (this.worker instanceof SharedWorker) {
this.worker.port.postMessage(message); // Пошареный воркер
} else if (this.worker instanceof ServiceWorker) {
// Сервис-воркер. Это не сработает для Chrome < 51. Но и нехер:
// https://bugs.chromium.org/p/chromium/issues/detail?id=543198
this.worker.postMessage(message);
}
Похоже на UDP. Выстрелил и забыл. Что чертовски неудобно, если хочется получить от Worker что-то в ответ. Как быть?
Ну понятно, как. Пронумеровать все сообщения и ждать ответ на конкретное. Я использовал uuid
для подписи сообщений, поскольку у меня куча окон и не хотелось бы, чтобы одни окна получили сообщения, адресованные другим.
Тут очень хорошо помогает паттерн Emitter
. Это объект, который можно попросить слушать события конкретного типа (например по id). Либо постоянно, либо один раз. В случае взаимодействия с воркером в режиме запрос->ответ нам будет достаточно получать событие один раз и отключаться от Emitter.
А что будет, если Worker долго не отвечает? Можно улучшить ситуацию, используя таймаут для ожидания сообщения. Более того, вместо работы с сообщениями удобнее работать с Promise. Если мы не дождались ответа от Worker, Promise переводим в состояние Reject.
Звучит сложновато, в коде все это гораздо проще.
/**
* Отправляет в воркер, ждет ответа на него
* @param message
* @returns Promise обработки сообщения в воркере
*/
public postMessage<TResult = unknown, TInput = unknown>(message: TInput): Promise<TResult> {
// Подписываем сообщения uuidv4. Оборачиваем в формат [uid, message]. Отправляем в воркер.
// Эмиттер ждет разового ответа на сообщение с заданным id.
// Как получает сообщение — реджектит или резовит промис, в зависимости от того, была ли воркере ошибка
// Кроме того, запускаем таймер. Если сработал таймаут и не было ответа от воркера — реджектим промис, кидаем эксэпшин
// eslint-disable-next-line @typescript-eslint/no-this-alias
const self = this;
const messageId = uuidv4();
const messageToSend = [messageId, message];
return new Promise((resolve, reject) => {
self.emitter.once(messageId, (error, result) => {
if (error) reject(error); else resolve(result);
return result;
})
self.postMessageToWorker(messageToSend);
setTimeout(() => {
if (hasListeners(self.emitter, messageId)) {
reject(new Error(`Timeout exceed, ${workerExceedTimeout} sec`))
self.emitter.off(messageId, resolve)
}
}, workerExceedTimeout * 1000)
})
}
Конечно, нам заранее нужно подготовить сам Emitter и передавать в него все те события, что приходят от Wokrer. Тут снова немножко разный транспорт для Worker и SharedWorker
Инициализация клиента, эмиттера и отправка сообщений в эмиттер
private worker: Worker | SharedWorker | ServiceWorker | WebSocket;
private emitter: EventEmitter = new EventEmitter();
constructor(worker: Worker | SharedWorker | ServiceWorker | WebSocket) {
this.worker = worker;
// На события от воркеров тригегеррим onMessage, который в свою очередь рергает эмиттер
if (this.worker instanceof SharedWorker) {
this.worker.port.addEventListener("message", this.onMessage.bind(this), false);
this.worker.port.start();
} else {
this.worker.addEventListener('message', this.onMessage.bind(this))
}
}
// Добавляем возможность отработки любых событий внутри класса
onMessageReceive(e: MessageEvent) { return e } // Обработка выходящего события. Для перегрузки в дочерних классах.
onResultReceive(result: IWorkerActionResult, e: MessageEvent) { return e } // Получено событие с результатом
/**
* Слушает все сообщения. Если соответствуют нашему формату [messageId, error, result] — отправляем сообщение в эмиттер
* @param e
* @returns
*/
private onMessage(e: MessageEvent): void {
console.log("I RECEIVE MESSAGE");
e = this.onMessageReceive(e);
const message = e.data
if (!Array.isArray(message) || message.length < 2) {
// Игнорируем. Сообщение не нашего формата
return;
}
const messageId = message[0];
const error = message[1];
const result = message[2];
this.onResultReceive(result as IWorkerActionResult, e);
this.emitter.emit(messageId, error, result);
}
Объяснение для менеджеров
Message: “Катя, отправь Полину домой”.
Promise: “Хорошо”.
“Ой” и “Отправила” — возможные исходы промиса (Resolve / Reject).
Красный список — это Emitter с номерами событий (поручил-контролирую).
Будильник — это SetTimeout.
А уши у парня большие и красные потому, что у ему еще кучу всего другого слушать приходится.
Тем временем, на стороне Worker
Во-первых, сделаем универсальным отправку сообщений из Worker/SharedWorker на клиента. В коде упомянут ServiceWorker, но мы не используем их на данный момент, кажется, там нестабильный API и это скорее — задел на будущее.
// eslint-disable-next-line @typescript-eslint/no-explicit-any
export function postMessageToClient(sourceEvent: any, msg: any) {
if (sourceEvent.source) {
// shared worker -> renderer
sourceEvent.source.postMessage(msg);
} else if (typeof self.postMessage !== 'function') {
// service worker -> renderer
sourceEvent.ports[0].postMessage(msg);
} else {
// web worker -> renderer
self.postMessage(msg);
}
}
Особенность SharedWorker
: для отправки сообщений на клиент мы должны дождаться события connect
и далее использовать его source
для общения с клиентом. Во всех последующих сообщениях поле source
уже не прилетает.
constructor(worker: Worker | SharedWorker | ServiceWorker | WebSocket) {
this.worker = worker;
// На события от воркеров тригегеррим onMessage, который в свою очередь рергает эмиттер
if (this.worker instanceof SharedWorker) {
this.worker.port.addEventListener("message", this.onMessage.bind(this), false);
this.worker.port.start();
} else {
this.worker.addEventListener('message', this.onMessage.bind(this))
}
}
События с результатом запроса к Worker, которые мы отправляем на клиент, должны быть подписаны, чтобы Emitter отработал как надо. Допустим, в таком формате, [messageId, error|null, result].
/**
* Формирует и отправляет message с ошибкой или результатом операции [messageId, error|null, result]
* @param {*} e событие, через которое можно отправлять данные в обратную сторону
* @param {*} messageId
* @param {*} error
* @param {*} result
*/
private postOutgoingMessage(e: MessageEvent, messageId: string, error: Error = null, result: unknown = null) {
console.log("postOutgoingMessage", e, messageId, error, result);
if (error) {
postMessageToClient(e,
[messageId, {
message: error.message
}]);
} else {
postMessageToClient(e, [messageId, null, result]);
}
}
Теперь остается слушать входящие в Worker события, вызывать обработчик с нужной нам бизнес-логикой (в данном случае я объявил абстрактный метод onMessage, который должен быть реализован конкретным Worker) и отправлять результат обратно.
Обработка входящих в worker сообщений
// eslint-disable-next-line @typescript-eslint/no-explicit-any
abstract onMessage(message: IWorkerAction, e: MessageEvent): any;
/**
* Обработка входящих в worker сообщений
*
*/
private onIncomingMessage(e: MessageEvent, transportEvent: MessageEvent = null) {
// В случае Shared Worker, транспорт, через который надо отправлять события обратно,
// хранится событии connect.
if (!transportEvent)
transportEvent = e;
const data = e.data;
if (!Array.isArray(data) || data.length !== 2) {
// Сообщение не нашего типа. Пропускаем.
// todo: тут бы какую-то метку более понятную сделать.
// Иначе можно перебивать сообщения через этот канал, и просто сообщения
return;
}
const messageId = data[0];
const message = data[1];
try {
const result = this.onMessage(message, transportEvent);
// eslint-disable-next-line @typescript-eslint/no-this-alias
const _this = this;
if (!isPromise(result)) {
// Обычный результат, не промис
_this.postOutgoingMessage(transportEvent, messageId, null, result);
} else {
// Колбэк вернул промис.
// Дожидаемся результата, отправляем либо ошибку, либо результат вычислений промиса
result.then(function (finalResult: unknown) {
_this.postOutgoingMessage(transportEvent, messageId, null, finalResult);
}, function (finalError: Error) {
_this.postOutgoingMessage(transportEvent, messageId, finalError);
});
}
} catch (error) {
// Ошибка -- вызов колбэка упал
this.postOutgoingMessage(transportEvent, messageId, error, null);
}
}
Промежуточные итоги
Теперь клиенты (окна) могут общаться с Worker или SharedWorker, получая Promise в качестве результата.
Достаточно:
-
Создать файл, реализующий бизнес-логику Worker
export class SyncWorker extends PromiseWorker { … onMessage(message: IWorkerAction, e: MessageEvent) { … return som value } … } const worker = new SyncWorker();
-
Подключить Worker или SharedWorker в контексте Renderer. Кстати, Electron Forge определяет самостояетльно константы, вроде
WORKER_SYNC_WEBPACK_ENTRY
для точек монтирования// Отлаживать удобнее через простые воркеры, т.к. у них без магии работает консоль // const worker = new Worker(WORKER_SYNC_WEBPACK_ENTRY); const worker = new SharedWorker(WORKER_SYNC_WEBPACK_ENTRY); Server = new PromiseWorkerClient(worker); … Server.postMessage(query)
Т.е. делать запросы, получать ответы и обработать ошибки, если что-то пошло не так.
Ограничения
Понятно, что не надо пытаться в качестве данных и результатов передавать колбеки, символы или DOM-деревья. Есть разумное ограничение на формат передаваемых между контекстами данных: Structured Clone Algoritm.
В данной реализации нет гарантированного способа дождаться на клиенте, что скрип Worker успешно загрузился.
Полный код примеров из этой статьи доступен в github
Что дальше
Спасибо что дочитали. Довольно сложно выкраивать время для статей. Если материал будет полезным — дальше мы научим Worker работать с источниками данных. Для экспериментов я использовал Rest, WebSocket и IndexedDB, т.к. делать поддержку GRPC на экспериментальном подпроекте слишком громоздко. Сделаем стор на MobX и научим его реагировать на изменения конкретных коллекций, как в соседних окнах, так и на сервере, посмотрим, как и куда встраивать Optimistic Update и запустим Time Travel.
Успехов!
tmnhy
Врадимир, что такое "Дискраймер"?
zevvssibirix Автор
Спасибо! Fixed.