Worker, SharedWorker, WebSocket, PUSH, IPC-вызовы в Electron и PWA-приложении

В данном цикле статей мы рассмотрим задачу синхронизации состояния приложения между окнами. В качестве подопытного у нас будет приложение на Electron, работающее в offline/online-режимах, которое также может запускаться в PWA-режиме.

Дисклеймер

Меня зовут Владимир Завертайлов. В последние лет 15 я занимаюсь в основном управлением it-компанией. Программирование воспринимаю как хобби. Поэтому прошу заранее простить, если какие-то из приведенных примеров кода или концепций будут наивными.

Две недели назад я ничего из этого вообще не умел :)

Дано

Итак. У нас на руках есть довольно богатое приложение, написанное на TypeScript, React + Redux. Запускать мы его умеем в среде Electron (это платформа на основе браузера Chrome, чтобы делать полноценные портабельные приложения — хоть для Mac, хоть для Windows или Linux — у нас все это есть). Так же есть версия для PWA, использующая большую часть кода десктоп-приложения. 

Синхронизация стейта двух окон через Main-процесс при Drag&Drop

Приложение умеет работать в offline-режиме и синхронизироваться с облачным хранилищем (мы использовали протокол GRPC). Открывать несколько окон, синхронизировать состояние между ними. Позволяет делать Drag & Drop между окнами. Кроме того, есть богатая поддержка хоткеев. И хитрые операции, в том числе Redo / Undo.

Что плохо

  1. Огромное количество boilerplate-кода из-за Redux.

  2. Архитектурная ошибка: каждое окно хранит в стейте полную копию всей базы. И полагается на то, что стейт — есть истина. Итог: на больших базах (несколько десятков тысяч записей) окна открываются несколько секунд.Пожалуйста, никогда не делайте так! Безопаснее думать о state как о кеше, который нужен вам для синхронной отрисовки компонентов. 

  3. Много слоев абстракции. Сообщения между слоями недостаточно строго стандартизированы. Слушатели могут подключаться в самых разных, порой неожиданных, местах.

  4. Протокол GRPC довольно сложно расширять. А его подход с подстановкой дефолтных значений вместо не переданных параметров — вообще рассадник трудноуловимых багов.

    Текущее состояние: пора рефакторить State
    Текущее состояние: пора рефакторить State

Что хорошо

  1. Операции TimeTravel (Redo / Undo) легко делать на Redux: достаточно просто сохранить состояние в стек и путешествовать по нему.

  2. Код довольно стабильный. Все детские болячки в нем решены. Синхронизация работает хорошо.

  3. Автотесты, в том числе интеграционные. Имеются. CI работает как надо.

В общем, все довольно по-взрослому. Кроме работы со стейтом и Redux-портянок. Хочется чего-то… Понять бы конкретно чего…

Целевое состояние

  1. Убрать бойлерплейт-код по-максимуму. Стейт сделать динамическим, с ленивой загрузкой по мере надобности.

  2. Сообщения между слоями типизированы. Слои абстракции и передача сообщений между слоями максимально сокращены. Вся логика межпроцессного взаимодействия спрятана под капот. Я хочу, чтобы стейт обновлялся сам, если чего-то пришло от сервера, или случилось в другом окне!

  3. Попробовать заменить GRPC на REST/GrapQL/MessagePack. С одной стороны хочется оставить бинарный протокол. Но мне не нравятся схемы GRPS. С другой — использовать web-сокеты, для реалтайм  обновления от сервера через PUSH. С третей — нас очень просят сделать публичный REST-api, и это есть в планах на этот год. Нужно выбрать.

  4. PWA-приложение: сделать возможность работы в нескольких окнах.

А еще мне хотелось получить такой код, от которого бы душа радовалась. Это — важно. Помчались!

Пинарик

В качестве экспериментального подопытного компонента я выбрал Пинарик — простой трекер привычек, который несколько раз пользователи просили добавить в наше приложение на CustDev-интервью. Это табличка со списком привычек, которые ты отслеживаешь каждый день. И делаешь пометку “сделал/не сделал/так себе сделал”. Если делаешь что-то полезное каждый день, это сразу бросается в глаза. 

Пинарик. 2 окна. "Гирлянда". Синхронный стейт. Подопытный компонент в программистском дизайне.

Вообще в сторах полно таких приложений, но нас настойчиво просят добавить такую панель в SingularityApp. Окей. Будем планировать рефакторинг стора на этом компоненте.

Worker и SharedWorker

Браузеры умеют запускать фоновые процессы в worker-ах. Вы пишете какой-то скрипт, кладете его в отдельный файл и дальше просто создаете в основном окне объект Worker, натравливая его на этот скрипт:

    const worker = new Worker(“worker.js”);

SharedWorker работает еще интереснее: несколько экземпляров окон могут разделить между собой один и тот же SharedWorker.  Первое, что напрашивается — вынести стейт в SharedWorker и обращаться к нему. 

Однако эта идея мне не понравилась. Мы не можем шарить память между процессами браузера (и это правильно). Все что мы можем — это отправлять сообщения в воркер и принимать сообщения из него. Ассинхронно. Либо передать какой-то подготовленный объект из потока worker-а в поток окна. Очевидно, что этот метод нам тоже не подходит, поскольку в момент передачи worker потеряет объект у себя.

Интерфейсы отправки сообщений у Worker и SharedWorker хоть и похожи, но слегка разные. Worker имеет массу ограничений, в частности — не может сам взаимодействовать DOM-деревом (логично, у него нет своего окна). 

Более того, SharedWorker не может даже в консоль ничего написать, что делает его отладку особенно утомительной. Поэтому имеет смысл:

  1. Универсализировать отправку сообщений в Worker или Shared Worker

  2. Отладить все на Worker

  3. Переключиться на SharedWorker

Хинт: для отладки SharedWorker в браузере используйте chrome://inspect/#workers. Найдите свой SharedWorker, кликните Inspect. Так можно посмотреть консоль воркера.

В альтернативных браузерах — не подскажу. На крайний случай просто делайте http-запрос на какой-нибудь localhost:3000?<log>, логи от которого вам доступы. Но вообще это лучше один раз написать, отладить и забыть. Если вы не мазохист, конечно.

Сборка Worker и SharedWorker с TypeScript для Electron. Отладка.

Файл со скриптом для Worker должен быть отдельный. Мы используем TypeScript, поэтому не можем отдать напрямую какой-то js-файл, без трансплаера (ts->js — кстати, почему TypeScript нет в редакторе кода Habr?). Я пробовал два подхода;

  1. Сделать отдельную конфигурацию webpack, собирающую воркеры. Этот подход мы используем в боевом приложении — нам там нужно все пожатое и оптимизированное (только не надо пытаться резать worker на чанки — но это и ежу понятно ????).

  2. Если используете сборщик Electron Forge — кажется, единственный вариант — сделать отдельную точку входа (Entry Point) в Electron. Удобно для отладки, так как почти адекватно работает hot reload (почти, но на самом деле — нет; если вы в отладке воркера и не хотите чинить фантомы — не полагайтесь на хотрелод: некоторые объекты, вроде WebSocket или EventListner в момент хотрелода могут быть проинициированы повторно, например. А могут и не быть. Лучше вообще исключите файлы воркеров из вотчера webpack. Сложно на словах, на деле – просто добавь watchOptions: { ignored:папка с воркерами}в конфиги вепбака).

    Для этого в package.json в секции @electron-forge/plugin-webpack добавляем в  entryPoints нужные нам воркеры:

// Основное окно
{
	name: 'MySuperApp',
  html: './assets/index.ejs',
  js: './src/index.tsx',
  preload: {
  	// Preload используем общий, для всех entryPoint
		js: "./src/preload.ts"
	}
},
// Worker. Альтернативный вариант: собирать отдельно через webpack
{
	js: "./src/workers/worker_sync.ts",
	name: "Worker",
},
{
	js: "./src/workers/worker_service.ts",
	name: "Service Worker",
},

В итоге, после yarn start, в папке с билдами у меня появятся две новых точки входа (worker_sync и worker_service), а изменения в коде будут в реалтайме пытаться приехать в воркер (но не все так солнечно, конечно).

Electron Forge дает быстрый старт, но структура билда выглядит на троекчу. 
Hot Reload работает из коробки. Но зачем?
Electron Forge дает быстрый старт, но структура билда выглядит на троекчу. Hot Reload работает из коробки. Но зачем?

Стандартизация транспорта. Worker, PostMessage, Emitter, Promise, TimeOut

Итак. Взаимодействие с Worker, штатно, работает через механизм отправки сообщений. Способы отправки немного различаются для Worker и SharedWorker. Впрочем это легко инкапсулировать.

if (this.worker instanceof Worker) {
    this.worker.postMessage(message); // Веб-воркер
} else if (this.worker instanceof SharedWorker) {
    this.worker.port.postMessage(message); // Пошареный воркер
} else if (this.worker instanceof ServiceWorker) {
    // Сервис-воркер. Это не сработает для Chrome < 51. Но и нехер:
    // https://bugs.chromium.org/p/chromium/issues/detail?id=543198
    this.worker.postMessage(message);
}

Похоже на UDP. Выстрелил и забыл. Что чертовски неудобно, если хочется получить от Worker что-то в ответ. Как быть? 

Окно отправляет сообщение в Worker. А будет ли ответ? Не факт.
Объяснение для менеджеров: это как своей коллеге стикер с заданием на экран повесить. 
Сделает ли — не факт.
Окно отправляет сообщение в Worker. А будет ли ответ? Не факт. Объяснение для менеджеров: это как своей коллеге стикер с заданием на экран повесить. Сделает ли — не факт.

Ну понятно, как. Пронумеровать все сообщения и ждать ответ на конкретное. Я использовал uuid для подписи сообщений, поскольку у меня куча окон и не хотелось бы, чтобы одни окна получили сообщения, адресованные другим.

Тут очень хорошо помогает паттерн Emitter. Это объект, который можно попросить слушать события конкретного типа (например по id). Либо постоянно, либо один раз. В случае взаимодействия с воркером в режиме запрос->ответ нам будет достаточно получать событие один раз и отключаться от Emitter.

Promise + Emitter + Timeout
Promise + Emitter + Timeout

А что будет, если Worker долго не отвечает? Можно улучшить ситуацию, используя таймаут для ожидания сообщения. Более того, вместо работы с сообщениями удобнее работать с Promise. Если мы не дождались ответа от Worker, Promise переводим в состояние Reject. 

Звучит сложновато, в коде все это гораздо проще. 

/**
 * Отправляет в воркер, ждет ответа на него
 * @param message 
 * @returns Promise обработки сообщения в воркере
 */
 public postMessage<TResult = unknown, TInput = unknown>(message: TInput): Promise<TResult> {

    // Подписываем сообщения uuidv4. Оборачиваем в формат [uid, message]. Отправляем в воркер.
    // Эмиттер ждет разового ответа на сообщение с заданным id. 
    // Как получает сообщение — реджектит или резовит промис, в зависимости от того, была ли воркере ошибка
    // Кроме того, запускаем таймер. Если сработал таймаут и не было ответа от воркера — реджектим промис, кидаем эксэпшин

    // eslint-disable-next-line @typescript-eslint/no-this-alias
    const self = this;
    const messageId = uuidv4();
    const messageToSend = [messageId, message];

    return new Promise((resolve, reject) => {
        self.emitter.once(messageId, (error, result) => {
            if (error) reject(error); else resolve(result);
            return result;
        })
        self.postMessageToWorker(messageToSend);
        setTimeout(() => {
            if (hasListeners(self.emitter, messageId)) {
                reject(new Error(`Timeout exceed, ${workerExceedTimeout} sec`))
                self.emitter.off(messageId, resolve)
            }
        }, workerExceedTimeout * 1000)
    })
}

Конечно, нам заранее нужно подготовить сам Emitter и передавать в него все те события, что приходят от Wokrer. Тут снова немножко разный транспорт для Worker и SharedWorker

Инициализация клиента, эмиттера и отправка сообщений в эмиттер
private worker: Worker | SharedWorker | ServiceWorker | WebSocket;
private emitter: EventEmitter = new EventEmitter();

constructor(worker: Worker | SharedWorker | ServiceWorker | WebSocket) {
    this.worker = worker;
    // На события от воркеров тригегеррим onMessage, который в свою очередь рергает эмиттер
    if (this.worker instanceof SharedWorker) {
        this.worker.port.addEventListener("message", this.onMessage.bind(this), false);
        this.worker.port.start();
    } else {
        this.worker.addEventListener('message', this.onMessage.bind(this))
    }
}
// Добавляем возможность отработки любых событий внутри класса        
onMessageReceive(e: MessageEvent) { return e } // Обработка выходящего события. Для перегрузки в дочерних классах.
onResultReceive(result: IWorkerActionResult, e: MessageEvent) { return e } // Получено событие с результатом
/**
 * Слушает все сообщения. Если соответствуют нашему формату [messageId, error, result] — отправляем сообщение в эмиттер
 * @param e 
 * @returns 
 */
private onMessage(e: MessageEvent): void {
    console.log("I RECEIVE MESSAGE");
    e = this.onMessageReceive(e);
    const message = e.data
    if (!Array.isArray(message) || message.length < 2) {
        // Игнорируем. Сообщение не нашего формата
        return;
    }
    const messageId = message[0];
    const error = message[1];
    const result = message[2];
    this.onResultReceive(result as IWorkerActionResult, e);
    this.emitter.emit(messageId, error, result);
}

Объяснение для менеджеров

Раз уж нарисовал — вставлю! Какой-нибудь профессор в лекцию утащит :)
Раз уж нарисовал — вставлю! Какой-нибудь профессор в лекцию утащит :)

Message: “Катя, отправь Полину домой”.
Promise: “Хорошо”.
“Ой” и “Отправила” — возможные исходы промиса (Resolve / Reject).
Красный список — это Emitter с номерами событий (
поручил-контролирую).
Будильник — это SetTimeout.
А уши у парня большие и красные потому, что у ему еще кучу всего другого слушать приходится.

Тем временем, на стороне Worker

Во-первых, сделаем универсальным отправку сообщений из Worker/SharedWorker на клиента. В коде упомянут ServiceWorker, но мы не используем их на данный момент, кажется, там нестабильный API и это скорее — задел на будущее.

// eslint-disable-next-line @typescript-eslint/no-explicit-any
export function postMessageToClient(sourceEvent: any, msg: any) {    
    if (sourceEvent.source) { 
        // shared worker -> renderer
        sourceEvent.source.postMessage(msg);
    } else if (typeof self.postMessage !== 'function') { 
        // service worker -> renderer
        sourceEvent.ports[0].postMessage(msg);
    } else { 
        // web worker -> renderer
        self.postMessage(msg);
    }
}

Особенность SharedWorker: для отправки сообщений на клиент мы должны дождаться события connect и далее использовать его source для общения с клиентом. Во всех последующих сообщениях поле source уже не прилетает.

constructor(worker: Worker | SharedWorker | ServiceWorker | WebSocket) {
    this.worker = worker;
    // На события от воркеров тригегеррим onMessage, который в свою очередь рергает эмиттер
    if (this.worker instanceof SharedWorker) {
        this.worker.port.addEventListener("message", this.onMessage.bind(this), false);
        this.worker.port.start();
    } else {
        this.worker.addEventListener('message', this.onMessage.bind(this))
    }
}

События с результатом запроса к Worker, которые мы отправляем на клиент, должны быть подписаны, чтобы Emitter отработал как надо. Допустим, в таком формате, [messageId, error|null, result].

/**
 * Формирует и отправляет message с ошибкой или результатом операции [messageId, error|null, result]
 * @param {*} e событие, через которое можно отправлять данные в обратную сторону
 * @param {*} messageId
 * @param {*} error
 * @param {*} result
 */
 private postOutgoingMessage(e: MessageEvent, messageId: string, error: Error = null, result: unknown = null) {
    console.log("postOutgoingMessage", e, messageId, error, result);
    if (error) {
        postMessageToClient(e,
            [messageId, {
                message: error.message
            }]);
    } else {
        postMessageToClient(e, [messageId, null, result]);
    }
}

Теперь остается слушать входящие в Worker события, вызывать обработчик с нужной нам бизнес-логикой (в данном случае я объявил абстрактный метод onMessage, который должен быть реализован конкретным Worker) и отправлять результат обратно.

Обработка входящих в worker сообщений
// eslint-disable-next-line @typescript-eslint/no-explicit-any
abstract onMessage(message: IWorkerAction, e: MessageEvent): any;

/**
 * Обработка входящих в worker сообщений
 *
 */
private onIncomingMessage(e: MessageEvent, transportEvent: MessageEvent = null) {

    // В случае Shared Worker, транспорт, через который надо отправлять события обратно, 
    // хранится событии connect.
    if (!transportEvent)
        transportEvent = e;
    const data = e.data;

    if (!Array.isArray(data) || data.length !== 2) {
        // Сообщение не нашего типа. Пропускаем.
        // todo: тут бы какую-то метку более понятную сделать. 
        // Иначе можно перебивать сообщения через этот канал, и просто сообщения            
        return;
    }
    const messageId = data[0];
    const message = data[1];

    try {
        const result = this.onMessage(message, transportEvent);
        // eslint-disable-next-line @typescript-eslint/no-this-alias
        const _this = this;
        if (!isPromise(result)) { 
            // Обычный результат, не промис
            _this.postOutgoingMessage(transportEvent, messageId, null, result);
        } else {
            // Колбэк вернул промис.
            // Дожидаемся результата, отправляем либо ошибку, либо результат вычислений промиса
            result.then(function (finalResult: unknown) {
                _this.postOutgoingMessage(transportEvent, messageId, null, finalResult);
            }, function (finalError: Error) {
                _this.postOutgoingMessage(transportEvent, messageId, finalError);
            });
        }
    } catch (error) {
        // Ошибка -- вызов колбэка упал
        this.postOutgoingMessage(transportEvent, messageId, error, null);
    }
}

Промежуточные итоги

Теперь клиенты (окна) могут общаться с Worker или SharedWorker, получая Promise в качестве результата.

Достаточно:

  1. Создать файл, реализующий бизнес-логику Worker 

    export class SyncWorker extends PromiseWorker {
    …
    onMessage(message: IWorkerAction, e: MessageEvent) {
    	…
    return som value
    }
    …
    }
    const worker = new SyncWorker();
  2. Подключить Worker или SharedWorker в контексте Renderer. Кстати, Electron Forge определяет самостояетльно константы, вроде WORKER_SYNC_WEBPACK_ENTRY для точек монтирования

    // Отлаживать удобнее через простые воркеры, т.к. у них без магии работает консоль
    // const worker = new Worker(WORKER_SYNC_WEBPACK_ENTRY);
    const worker = new SharedWorker(WORKER_SYNC_WEBPACK_ENTRY);
    Server = new PromiseWorkerClient(worker);
    …
    Server.postMessage(query)

Т.е. делать запросы, получать ответы и обработать ошибки, если что-то пошло не так.

Низкоуровневая схема работы с Worker / Shared Worker
Низкоуровневая схема работы с Worker / Shared Worker

Ограничения

  1. Понятно, что не надо пытаться в качестве данных и результатов передавать колбеки, символы или DOM-деревья. Есть разумное ограничение на формат передаваемых между контекстами данных: Structured Clone Algoritm.

  2. В данной реализации нет гарантированного способа дождаться на клиенте, что скрип Worker успешно загрузился.

Полный код примеров из этой статьи доступен в github

Что дальше

Спасибо что дочитали. Довольно сложно выкраивать время для статей. Если материал будет полезным — дальше мы научим Worker работать с источниками данных. Для экспериментов я использовал Rest, WebSocket и IndexedDB, т.к. делать поддержку GRPC на экспериментальном подпроекте слишком громоздко. Сделаем стор на MobX и научим его реагировать на изменения конкретных коллекций, как в соседних окнах, так и на сервере, посмотрим, как и куда встраивать Optimistic Update и запустим Time Travel.

Успехов!

Комментарии (2)


  1. tmnhy
    21.01.2022 08:55

    Врадимир, что такое "Дискраймер"?


    1. zevvssibirix Автор
      21.01.2022 09:58
      +1

      Спасибо! Fixed.