Большинство знает Kurento, как WebRTC медиа-сервер. Но в их репозитории на git-хабе можно найти много чего интересного. Например, библиотека работы с JSON-RPC, которая берет на себя контроль сессии и управление протоколом.

В этой статье я расскажу как использовать это решение на Java. Сделаем простейший сервер на Spring Boot и клиент на JavaScript.

Кому это надо или какую проблему решает библиотека Kurento JSON RPC?


Подобный стек технологий очень полезен при реализации видеоконференций или других ресурсов на базе WebRTC. Когда идет трансляция, пользователь соединяется с медиа-сервером и другими пользователями множеством отдельных подключений. К тому же эти подключения могут меняться “на лету”. Так устроен WebRTC.

Но вся информация о подключениях, потоках данных, авторизации и комнатах конференций должна идти по отдельному каналу (назовем его “служебный”). И самое важное — в двустороннем режиме. Метод запрос/ответ не подходит, т.к. надо быстро сообщать об изменениях в потоках и подключениях. Т.е. сервер должен отправлять клиенту данные даже тогда, когда тот не просит.

Так как каналы связи идут по разным подключениям и разными маршрутами, то перебой связи одного канала не должен останавливать работу других. О своих каналах и подключениях медиа-сервер и браузер заботятся сами, а вот “служебный” канал под нашей опекой.

Самый удобный вариант — это WebSocket (sockJS). Двухсторонняя связь поверх HTTPS. Остаётся обеспечить функцию переподключения при временной потере связи. Этим и занимается библиотека kurento-jsonrpc.

Главными особенностями решения от kurento являются простота создания сервера/клиента (благодаря Spring Boot) и полностью автоматическая работа с протоколом JSON-RPC.

Сразу после подключения зависимостей, настройки параметров, мы начинаем заниматься именно бизнес-логикой. Всю работу за контролем “id” запросов, формированием правильного сообщения по протоколу, парсингом и сериализацией берет на себя библиотека.

Также, в сообщения между сервером и клиентом библиотека встраивает параметр sessionID, что позволяет реализовать восстановление разорванного подключения без необходимости повторной регистрации.

И всё это под капотом. Нам предоставляются “полезные” данные: команды и параметры. Ну и приятный встроенный функционал контроля жизни подключения: ping-pong, session timeout, request timeout.

Информация


Java версия библиотеки находится тут:
https://github.com/Kurento/kurento-java/tree/master/kurento-jsonrpc
В репозиториях kurento можно найти также реализации на C++ и Node-JS.

Подключать зависимости мы будем через Maven, так что ссылка нужна только для ознакомления с исходниками.

Из документации есть “hello world” на английском и некоторые комментарии в исходниках:
https://github.com/Kurento/doc-kurento-jsonrpc

Сервер


Рассмотрим пример запуска JSON-RPC сервера через WebSocket на Spring Boot. В первую очередь создаем на https://start.spring.io/ новый проект и тут же подключаем в зависимости пакет “Spring Web” — это удобно для тестирования.

Пример настройки Spring initializr:



В файле pom.xml добавляем библиотеку: “kurento-jsonrpc-server”

<!-- https://mvnrepository.com/artifact/org.kurento/kurento-jsonrpc-server -->
<dependency>
    <groupId>org.kurento</groupId>
    <artifactId>kurento-jsonrpc-server</artifactId>
    <version>6.13.1</version>
</dependency>

Сразу настраиваем SSL. Для этого нужен файл хранилища ключей и сертификатов.
Так как мы работаем с kurento, то и сертификат можно взять у них: ссылка на github. Он самоподписанный, так что при тестировании в браузере надо будет “довериться” сертификату.

По ссылке надо скачать два файла keystore.jks и application.properties и положить их в папку проекта: /src/main/resources. Не забыть в файле properties заменить порт сервера (по умолчанию там переменная: ${demo.port}). Должно получиться так:

# EMBEDDED SERVER CONFIGURATION
server.port=8080
server.ssl.key-store=classpath:keystore.jks
server.ssl.key-store-password=kurento
server.ssl.key-store-type=JKS
server.ssl.key-alias=kurento-selfsigned


Во время разработки очень полезно выставить уровень логирования для kurento: “logging.level.org.kurento=DEBUG”
“logging.level.org.kurento.jsonrpc.internal.server=TRACE”


В логи будет попадать информация о состояниях подключений и обмене сообщениями. Kurento не сильно спамит в логи, только полезное.

Теперь сам Spring Boot


Настраивается очень просто:

  1. Добавить аннотацию: @ Import(JsonRpcConfiguration.class)
  2. Переопределить метод:
    registerJsonRpcHandlers(JsonRpcHandlerRegistry registry)
  3. Создать класс-обработчик сообщений:
    — наследник от DefaultJsonRpcHandler
    — переопределить метод: handleRequest(Transaction transaction, Request request)

Это самый необходимый минимум. Далее надо добавить дополнительные настройки и переопределить методы контроля различных событий. Код я «приправил» комментариями и вывел самые важные переменные и события в логи. В итоге получилось два основных класса.

Класс, который содержит main метод (если настраивали проект по скриншоту, то класс называется KurentoJsonrpcDemoApplication):

Код java: KurentoJsonrpcDemoApplication
@SpringBootApplication
@Import(JsonRpcConfiguration.class)
public class KurentoJsonrpcDemoApplication implements JsonRpcConfigurer {

    @Override
    public void registerJsonRpcHandlers(JsonRpcHandlerRegistry registry) {
        registry.addHandler(kurentoJsonrpcDemoHandler(), "/jsonrpc");
    }

    @Bean
    public JsonRpcHandler<?> kurentoJsonrpcDemoHandler() {
        KurentoJsonrpcDemoHandler handler = new KurentoJsonrpcDemoHandler();

        handler
                .withSockJS()               // Использовать SockJS
                .withLabel("DEMO_LABEL")    // Метка сокета. Выводится в логах с ошибками.
                .withAllowedOrigins("*")    // Разрешенные узлы для подключения
                .withPingWatchdog(true);    // Включить пинги в подключениях

        return handler;
    }

    @Bean
    public ServletServerContainerFactoryBean createWebSocketContainer() {

        // Если в течении этого времени нет активности сессии, то подключение закрывается.
        final long maxSessionIdleTimeout = 100000;

        ServletServerContainerFactoryBean container = new ServletServerContainerFactoryBean();
        container.setMaxSessionIdleTimeout(maxSessionIdleTimeout);
        return container;
    }

    public static void main(String[] args) {
        SpringApplication.run(KurentoJsonrpcDemoApplication.class, args);
    }

}


Важно не пропустить аннотацию “@Import(JsonRpcConfiguration.class)”. Она и создаёт сервер.

Теперь класс “KurentoJsonrpcDemoHandler” — обработчик сообщений и событий:

Код java: KurentoJsonrpcDemoHandler
public class KurentoJsonrpcDemoHandler extends DefaultJsonRpcHandler<JsonObject> {
	private static Logger log = LoggerFactory.getLogger(KurentoJsonrpcDemoHandler.class);

	/**
	 * Обработка входящих сообщений
	 */
	@Override
	public void handleRequest(Transaction transaction, Request<JsonObject> request) throws Exception {

	    // Информация о сессии
		Session kurentoSession = transaction.getSession();

		log.info("========== REQUEST ==========");
		log.info("[SESSION] isNew: {}", kurentoSession.isNew());
		log.info("[SESSION] SessionId: {}", kurentoSession.getSessionId());
		log.info("[SESSION] RegisterInfo: {}", kurentoSession.getRegisterInfo());
		log.info("[SESSION] Attributes: {}", 	kurentoSession.getAttributes());

		// Информация от запросе
		log.info("[REQUEST] id: {}", request.getId()); // id запроса от клиента
		log.info("[REQUEST] method: {}", request.getMethod()); // команда
		log.info("[REQUEST] params: {}", request.getParams()); // параметры json

	        // Ответ. Для примера возвращаем входящие данные.
		transaction.sendResponse(request.getParams());
	}

	/**
	 * Вызывается при подключении нового клиента
	 */
	@Override
	public void afterConnectionEstablished(Session session) throws Exception {
		log.info("========== ESTABLISHED ==========");
		log.info("[SESSION] SessionId: {}", session.getSessionId());
		session.setReconnectionTimeout(15000);
	}

	/**
	 * Вызывается при обрыве соединения
	 * String status - причина отключения.
	 */
	@Override
	public void afterConnectionClosed(Session session, String status) throws Exception {
		log.info("========== CLOSED ==========");
		log.info("[STATUS] {}", status);
		log.info("[SESSION] SessionId: {}", session.getSessionId());
	}

	/**
	 * Вызывается когда клиент подключился после разрыва связи (не новый клиент)
	 */
	@Override
	public void afterReconnection(Session session) throws Exception {
		log.info("========== RECONNECTION ==========");
		log.info("[SESSION] SessionId: {}", session.getSessionId());
	}
}


Всё — самый минимальный сервер готов. Можно запускать.

Приложение создаст сервер на порте 8080 и с сокетом на пути "/jsonrpc".
Если по этому сокету будут приходить сообщения в формате протокола JSON-RPC, то сервер напишет в логи информацию о сессии и запросе, затем вернет запрос обратно.

Сервер: Подробнее о настройках


Большую часть настроек сервера я указал в коде. Разберем их внимательнее.

При создании обработчика событий можно использовать следующие настройки:

  1. withSockJS() — Использовать SockJS. Без вызова этого метода будет использоваться обычный WebSocket.
  2. withLabel(«DEMO_LABEL») — Метка сокета. Простая строка. Эту метку можно затем узнать в самом обработчике KurentoJsonrpcDemoHandler: this.getLabel(). Удобно, что она выводится в логах.
  3. withAllowedOrigins("*") — как и в обычном WebSocket’е можно настраивать от каких ресурсов принимать подключения. “*” — разрешает любые входящие.
  4. withPingWatchdog(true) — Включить/выключить регулярную проверку соединений. Если подключиться и не присылать сообщения “ping”, то сервер разорвет соединение. Время, через которое будет разорвано соединение, присылает клиент с первой командой “ping”. Это значение умножается на 3, т.е. можно пропустить два пинга. Если клиент не присылает это значение, то по умолчанию 20 секунд — то есть сервер разорвет соединение через минуту (20*3).

Перегружать метод createWebSocketContainer() необязательно, но рекомендуется, т.к. позволяет получить доступ к некоторым настройкам сервлета.

Из важного по теме:

  1. setMaxSessionIdleTimeout(Long timeoutInMillis) — если сокет не проявляет никакой активности, то он будет закрыт через указанное время.
  2. setMaxTextMessageBufferSize и setMaxBinaryMessageBufferSize — установить размеры буферов сокета. Для нужд конференций обычно хватает значений по умолчанию (8 * 1024 байт). Но были случаи, когда приходилось увеличивать: на клиентской машине стояло две сетевые карты и несколько виртуальных сетей. В итоге SDPOffer не поместился в этот буфер.

Сервер: обработка событий и методы


Самый главный метод — это handleRequest (transaction, request).

Он вызывается когда приходит команда от клиента.

Важно помнить, что существуют служебные методы сервера: "terminate", "poll”, "execute", "connect", "ping", "closeSession", которые не рекомендуется использовать в своей логике, если только вы не делаете это специально для взаимодействия с сервером.

К примеру, если послать на сервер JSON строку вида: {«jsonrpc»:«2.0»,«method»:«ping»,«id»:1}, то сервер воспримет её как свою служебную команду и не будет вызывать метод handleRequest. Т.е. не сообщит о таком сообщении в обработчик.

Другие используемые сервером строки и коды можно посмотреть в файле: org.kurento.jsonrpc.internal.JsonRpcConstants.class

Методы, которые вызываются на события в соединении


afterConnectionEstablished — соединение установлено. Вызывается вместе с первым сообщением от клиента. Внимание: вызывается не тогда, когда соединение установилось фактически, а когда сервер опознал клиента как работающего с JSON-RPC т.е. после первого сообщения.

afterConnectionClosed — соединение закрыто. Как и с первым методом есть свои тонкости, т.е. не связано с фактическим состоянием подключения.

Вызывается когда:

  • клиент прислал команду “closeSession
  • клиент не присылал команду “ping” дольше установленного времени
  • истек таймаут бездействия на сервлете

afterReconnection — клиент переподключился. Вызывается когда клиент считался отключенным, но, вдруг прислал сообщение: “connect” с параметром “sessionId”, который был присвоен клиенту в предыдущем подключении. Подробнее о сессиях и подключении будет ниже.

handleUncaughtException и handleTransportError — не использовал в примере. По умолчанию эти методы пишут в логи сообщения об ошибках, вызванных отказами на транспортном уровне. Можно переопределить их, но пользы от этого я особой не вижу.

Сессии и подключения


Когда к сокету сервера кто-то подключился, сервер об этом не сообщает, но запускает контроль за временем бездействия подключения (это время мы указали setMaxSessionIdleTimeout) и контроль за временем между командами “ping” от клиента.

Если на сервер приходит строка по протоколу json-rpc, он её парсит и создаёт внутреннюю сессию org.kurento.jsonrpc.Session. Если была команда отличная от “connect”, то будет создана новая сессия и ей присваивается новый sessionID. Если же команда подключения, то в параметрах ищется “SessionId” и сравнивается с теми, которые уже есть на сервере.

Логическая схема обработки команд на сервере:



Таким образом, на стороне клиента при разрыве связи и повторном подключении, необходимо отправить на сервер команду “connect” с параметром “SessionId” от предыдущего подключения. Тогда сервер свяжет новое подключение со старой сессией и данные о клиенте не будут потеряны.

Клиент


В качестве клиента рассмотрим JavaScript библиотеку от Kurento. Для удобства буду использовать написанный выше сервер для получения страницы с клиентским gui и кодом.

Надо прописать зависимости в файле pom.xml:

Зависимости: pom.xml
<!-- https://mvnrepository.com/artifact/org.kurento/kurento-jsonrpc-js -->
<dependency>
    <groupId>org.kurento</groupId>
    <artifactId>kurento-jsonrpc-js</artifactId>
    <version>6.13.0</version>
</dependency>

<!-- https://mvnrepository.com/artifact/org.webjars/webjars-locator -->
<dependency>
    <groupId>org.webjars</groupId>
    <artifactId>webjars-locator</artifactId>
    <version>0.40</version>
</dependency>
	
<!-- https://mvnrepository.com/artifact/org.webjars.bower/sockjs-client -->
<dependency>
    <groupId>org.webjars.bower</groupId>
    <artifactId>sockjs-client</artifactId>
    <version>1.4.0</version>
</dependency>


Теперь создать два файла:

  1. /src/main/resources/static/kurento-client.html — страница для браузера, на которой будут кнопки управления.
  2. /jsonrpc-demo/src/main/resources/static/js/kurento-client.js — скрипт с клиентом.

Для инициализации клиента надо определить его параметры, указать ссылки на функции CallBack’и, которые вызываются при определенных событиях и в момент поступления команд с сервера.

Настройки клиента:

var configuration = {… }

Создать клиент:

var jsonRpcClientWs = new RpcBuilder.clients.JsonRpcClient(configuration);

Более понятно будет показать в коде с комментариями:

Код html: kurento-client.html
<!DOCTYPE html>
<html>
<head>
	<meta charset="utf-8">
	<script src="webjars/sockjs-client/dist/sockjs.js"></script>
	<script src="js/kurento-jsonrpc.js"></script>
	<script src="js/kurento-client.js"></script>
</head>

<body>
    <h1>Kurento JSON RPC Demo</h1>
    <h2>Client by Kurento</h2>
    <p>Press F12 to see logs in console.</p>

    <!-- Кнопка для отправки простого текста на сервер -->
    <input type="button" value="Send 'Hello.'" onclick="sendTextMessage('Hello.')">
    <br><br>

    <!-- Кнопка для закрытия сокета -->
    <input type="button" value="Close socket" onclick="closeSocket()">
    <br><br>

    <!-- Кнопка для переподключения -->
    <input type="button" value="Reconnect" onclick="reconnect()">

</body>
</html>


Код JavaScript: kurento-client.js
var jsonRpcClientWs = null; // клиент
var mySessionId; // id текущей/последней сессии.

window.onload = function() {

    // Настройки подключения
    var configuration = {

        // Время в миллисекундах между ping-pong сообщениями. 
        // Если не указать, то пинги будут отключены.
        heartbeat: 10000, 

        // true/false посылать сообщение "closeSession" перед отключением
        // или просто оборвать связь (сервер сразу узнает об отключении или по пингу)
        sendCloseMessage: true, 
        ws: {
            uri: 'https://' + location.host + '/jsonrpc', // URI до сервера
            useSockJS: true, // true (использовать SockJS) / false (использовать WebSocket)
            onconnected: connectCallback,         // callback на подключение
            ondisconnect: disconnectCallback,	     // callback на обрыв связи
            onreconnecting: reconnectingCallback, // callback на переподключение
            onreconnected: reconnectedCallback,   // callback переподключение удалось
            onerror: errorCallback                // callback на ошибку
        },
        rpc: {
            // время в миллисекундах на отправку запроса.
            // (через какое время придет ошибка, если сервер не доступен)
            requestTimeout: 10000,
            myOwnMethod: myOwnMethodCall // определяем свой метод
        }
    }

    // Создание подключения
    jsonRpcClientWs = new RpcBuilder.clients.JsonRpcClient(configuration);
};

window.onbeforeunload = function() {
    jsonRpcClientWs.close();
}

function connectCallback() {
    console.log("connect");
    // сразу после подключения сокета, запрашиваем подключение к сессии на сервере
    sendConnect(); 
}

function disconnectCallback() {
    console.log("disconnect");
}

function reconnectingCallback() {
    console.log("REconnectING");
}

function reconnectedCallback() {
    console.log("REconnectED");
    // запрашиваем подключение к сессии на сервере
    sendConnect();
}

function errorCallback(error) {
    console.log("error: ", error);
}

// отправить текстовое сообщение на сервер
function sendTextMessage(textMsg) {
    var method = "text-message";
    var params = { text: textMsg };

    jsonRpcClientWs.send(method, params, function(error, response) {
        if (error) {
            console.log("error:", error);
            return;
        }
        console.log("response:", response);
    });
}

// закрыть сокет
// вызывается нажатием кнопки на странице
function closeSocket() {
    console.log("closeSocket");
    jsonRpcClientWs.close();
}

// переподключиться
// вызывается нажатием кнопки на странице
function reconnect() {
    console.log("reconnect");
    jsonRpcClientWs.reconnect();
}

// Функция отправляет на сервер команду подключения с уже имеющимся ИД сессии.
function sendConnect(){
    var method = "connect";
    var params = {};

    if(mySessionId){
        params = { sessionId: mySessionId };
    }

    jsonRpcClientWs.send(method, params, function(error, response) {
        mySessionId = undefined;
        if (error) {
            console.log("error:", error);
            return;
        }
        console.log("response:", response);
        if (response.value == "OK"){
            mySessionId = response.sessionId;
        }
    });
}

// Свой метод.
function myOwnMethodCall(params, request){
    console.log("==== myOwnMethodCall ====");
    console.log("params: ", params);
    console.log("request: ", request);
}


Теперь можно получить клиент в браузере по ссылке: localhost:8080/kurento-client.html

После создания клиента, он подключится к сокету на сервере и начнет посылать команды “ping”. В клиенте методы на события подключения и обрыва связи срабатывают сразу.

Внутри библиотеки версии на JavaScript не контролируется сессия на сервере. Поэтому мы контролируем её сами, вызывая функцию sendConnect() сразу при получении событий onconnected и onreconnected. Для этого приходится хранить переменную с ИД нашей сессии на сервере.

Беглый взгляд в исходники клиента показал, что там контролируется дополнительный параметр “peerID”, но к серверу на Java он отношения не имеет, так что наш SessionId храним сами.

Как это работает или общение клиента с сервером


От клиента к серверу


После создания подключения и привязки этого подключения к сессии на сервере клиент может отправлять запросы и назначать callback функции, которые будут вызываться при получении ответа:

jsonRpcClientWs.send( method, params, callback( error, response ) );
где,

  • method — строка наименования команды. Если команда служебная, то сервер её обработает сам и никак не сообщит о ней управляющему коду. Если сервер не знает такой команды, то он вызовет обработчик handleRequest и передаст ему эту команду и её параметры.
  • params — это js объект с параметрами, которые содержат дополнительную информацию для работы этой команды.
  • callback( error, response ) — эта функция будет вызвана при получении ответа от сервера именно на этот запрос или в случае ошибки. В переменной response будет храниться ответ сервера.

От сервера к клиенту


Сервер может общаться с клиентом двумя способами: отвечать на запросы и оповещать клиента без запроса.

На сервере запросы мы обрабатываем в методе handleRequest. Оттуда можем сразу и ответить: transaction.sendResponse( jsonObject );
где, jsonObject — содержит в себе параметры и их значения. Создать его можно, например, так:

JsonObject params = new JsonObject();
params.addProperty("name", "Patison");
params.addProperty("size", 11);
params.addProperty("isGood", true);

Если надо ответить позже, то необходимо вызвать: transaction.startAsync();
Сервер не будет ждать ответа сейчас, а продолжит выполнение программы. Нам же придется где-то сохранить переменную transaction, чтобы ответить, когда будет нужно по логике.

Посылать сообщения клиенту без запроса (оповещать) можно из любого места, где есть доступ к заранее сохраненной сессии:

kurentoSession.sendNotification(method, params);
С параметрами мы уже знакомы: method — это имя команды, params — это jsonObject.

Сессии надо предварительно сохранить, например, в методе, который вызывается при подключении нового клиента: afterConnectionEstablished(Session session)

Как клиент принимает уведомления/команды от сервера


Получив уведомление/команду от сервера, клиентская библиотека ищет одноименную callback функцию в своих конфигурациях, которые мы ей передали в момент создания. В коде выше, для примера, я определил параметр и соответствующую функцию: myOwnMethod: myOwnMethodCall.

Теперь, когда клиент получит метод method===”myOwnMethod”, он выполнит функцию myOwnMethodCall(params, request).

Если библиотека не найдет соответствующей функции или параметра, то будет вызван errorCallback(error). Т.е. на каждую команду, которую мы придумаем для логики, необходимо создать функцию ее реализации и прописать в конфигурационной переменной клиента.

Заключение


На удивление много текста для очень простого клиент-серверного приложения.

Из-за скромной документации по библиотеке Kurento JSON RPC и очень ограниченному количеству информации о ней в Интернете, я потратил много времени чтобы разобраться как оно работает. В этой статье я постарался зафиксировать всё, что узнал, изучая исходники и экспериментируя.

Библиотека очень полезна для моих проектов. Она позволяет быстро создать сервер и клиент, не заботиться о реализации протокола (не надо заниматься парсингом и сериализацией). Только логика приложения.

Уверен, что этой библиотекой будут пользоваться другие люди и им уже не придется тратить время на понимание её работы.

Исходники примера из статьи