Всем привет! Эта статья о том как связать клиентское приложение Angular2 с Rails 5 сервером используя Websocket.
Rails
Для примера нам потребуется простейший ApplicationCable::Channel:
class ChatChannel < ApplicationCable::Channel
def subscribed
stream_from 'chat_channel'
end
def index
ActionCable.server.broadcast 'chat_channel', { messages: Message.serialize_all( Message.all ) }
end
def create( data )
Message.create( name: data[ 'name' ], text: data[ 'text' ] );
ActionCable.server.broadcast 'chat_channel', { messages: Message.serialize_all( Message.all ) }
end
def unsubscribed
end
end
Angular2
WebSocketService
Для начала нам потребуется сервис, который будет обеспечивать наше приложение обменом данных с сервером:
import { Injectable } from "@angular/core";
import { Subject, Observable, Subscription } from 'rxjs/Rx';
import { WebSocketSubject } from "rxjs/observable/dom/WebSocketSubject";
@Injectable()
export class WebSocketService {
private ws: WebSocketSubject<Object>;
private socket: Subscription;
private url: string;
public message: Subject<Object> = new Subject();
public opened: Subject<boolean> = new Subject();
public close():void{
this.socket.unsubscribe();
this.ws.complete();
}
public sendMessage( message:string ):void{
this.ws.next( message );
}
public start( url: string ):void{
let self = this;
this.url = url;
this.ws = Observable.webSocket( this.url );
this.socket = this.ws.subscribe( {
next: ( data:MessageEvent ) => {
if( data[ 'type' ] == 'welcome' ){
self.opened.next( true );
}
this.message.next( data );
},
error: () => {
self.opened.next( false );
this.message.next( { type: 'closed' } );
self.socket.unsubscribe();
setTimeout( () => {
self.start( self.url );
}, 1000 );
},
complete: () => {
this.message.next( { type: 'closed' } );
}
} );
}
}
В данном сервисе есть 3 приватные и 2 публичные переменные, а также 3 публичные функции.
private ws: WebSocketSubject<Object>;
private socket: Subscription;
private url: string;
ws — наблюдаемая переменная WebSocketSubject.
socket — переменная для подписки на ws.
url — ссылка на сокет.
public message: Subject<Object> = new Subject();
public opened: Subject<boolean> = new Subject();
message — наблюдаемая переменная в которую транслируются все данные из сокета.
opened — наблюдаемая переменная которая следит за открытием/закрытием соединения с сокетом.
public close():void{
this.socket.unsubscribe();
this.ws.complete();
}
Функция для закрытия сокета.
public sendMessage( message:string ):void{
this.ws.next( message );
}
Функция для отправки данных в сокет.
public start( url: string ):void{
let self = this;
this.url = url;
this.ws = Observable.webSocket( this.url );
this.socket = this.ws.subscribe( {
next: ( data:MessageEvent ) => {
if( data[ 'type' ] == 'welcome' ){
self.opened.next( true );
}
this.message.next( data );
},
error: () => {
self.opened.next( false );
this.message.next( { type: 'closed' } );
self.socket.unsubscribe();
setTimeout( () => {
self.start( self.url );
}, 1000 );
},
complete: () => {
this.message.next( { type: 'closed' } );
}
} );
}
Эта функция открывает соединение с сокетом, записывая его объект в наблюдаемую переменную и подписывается на трансляцию из нее. При потере связи каждую секунду пытается восстановить связь.
ChannelWebsocketService
Наследуемый сервис для подписки на каналы Rails5 Action Cable:
import { Injectable } from "@angular/core";
import { Subject } from "rxjs/Subject";
import { WebSocketService } from "./websocket.service";
@Injectable()
export class ChannelWebsocketService {
private socketStarted: boolean;
public observableData: Subject<Object> = new Subject();
public identifier:Object = {};
public identifierStr: string;
public subscribed: Subject<boolean> = new Subject();
constructor( private websocketService: WebSocketService ){
this.observeOpened();
this.observeMessage();
}
private static encodeIdentifier( identifier:string ):Object{
return JSON.parse( identifier );
}
private static getDataString( parameters:Object ):string{
let first = true,
result = '';
for ( let key in parameters ){
if( first ){
first = false;
result += `\"${ key }\":\"${ parameters[ key ] }\"`;
} else {
result += `, \"${ key }\":\"${ parameters[ key ] }\"`;
}
}
return `{ ${ result } }`;
}
private getSubscribeString():string{
this.identifierStr = ChannelWebsocketService.getDataString( this.identifier );
return JSON.stringify( {
command: 'subscribe',
identifier: this.identifierStr
} );
};
private isThisChannel( data:Object ):boolean {
if( data[ 'identifier' ] ){
let identifier = ChannelWebsocketService.encodeIdentifier( data[ 'identifier' ] );
if ( JSON.stringify( identifier ) === JSON.stringify( this.identifier ) ){
return true;
}
}
return false;
}
private observeMessage(){
let self = this;
this.websocketService.message.subscribe( ( data: Object ) => {
if( self.isThisChannel( data ) ){
if( data[ 'type' ] && data[ 'type' ] == 'confirm_subscription' ){
this.subscribed.next( true );
} else if ( data[ 'message' ] ){
this.observableData.next( data[ 'message' ] );
}
}
} );
}
private observeOpened(){
let self = this;
this.websocketService.opened.subscribe( ( data: boolean ) => {
self.socketStarted = data;
if( data ){
self.subscribe();
}
} );
}
private subscribe(){
this.websocketService.sendMessage( this.getSubscribeString() );
}
public send( data: Object ){
this.websocketService.sendMessage( JSON.stringify( {
command:'message',
identifier: this.identifierStr,
data: ChannelWebsocketService.getDataString( data )
} ) );
}
public unsubscribe(){
this.websocketService.sendMessage( JSON.stringify( {
command: 'unsubscribe',
identifier: this.identifierStr } ) );
this.subscribed.next( false );
}
}
В данном сервисе есть 2 приватные и 3 публичные переменные, а также 7 приватных и 2 публичные функции.
private socketStarted: boolean;
private identifierStr: string;
socketStarted — переменная в которую транслируется состояние подписки на сокет.
identifierStr — специально подготовленная строка идентификатор для Rails5 Action Cable канала.
public observableData: Subject<Object> = new Subject();
public identifier:Object = {};
public subscribed: Subject<boolean> = new Subject();
observableData — наблюдаемая переменная в которую записывается сообщение из сокета для канала.
identifier — объект идентификатор для Rails5 Action Cable канала.
subscribed — наблюдаемая переменная в которую записывается состояние подписки.
constructor( private websocketService: WebSocketService ){
this.observeOpened();
this.observeMessage();
}
...
private observeMessage(){
let self = this;
this.websocketService.message.subscribe( ( data: Object ) => {
if( self.isThisChannel( data ) ){
if( data[ 'type' ] && data[ 'type' ] == 'confirm_subscription' ){
this.subscribed.next( true );
} else if ( data[ 'message' ] ){
this.observableData.next( data[ 'message' ] );
}
}
} );
}
private observeOpened(){
let self = this;
this.websocketService.opened.subscribe( ( data: boolean ) => {
self.socketStarted = data;
if( data ){
self.subscribe();
}
} );
}
В конструкторе этого сервиса мы вызываем 2 функции: observeMessage и observeOpened, которые отслеживают присланные сокетом данные и состояние сокета соответственно.
private static encodeIdentifier( identifier:string ):Object{
return JSON.parse( identifier );
}
private static getDataString( parameters:Object ):string{
let first = true,
result = '';
for ( let key in parameters ){
if( first ){
first = false;
result += `\"${ key }\":\"${ parameters[ key ] }\"`;
} else {
result += `, \"${ key }\":\"${ parameters[ key ] }\"`;
}
}
return `{ ${ result } }`;
}
encodeIdentifier — статическая приватная функция декодирует строку идентификатора, которую вернул сокет для идентификации сообщения на принадлежность к каналу.
getDataString — преобразовывает объект в формат строки, который принимает Rails5 Action Cable.
private getSubscribeString():string{
this.identifierStr = ChannelWebsocketService.getDataString( this.identifier );
return JSON.stringify( {
command: 'subscribe',
identifier: this.identifierStr
} );
};
Возвращает строку для подписки на канал Rails5 Action Cable.
private isThisChannel( data:Object ):boolean {
if( data[ 'identifier' ] ){
let identifier = ChannelWebsocketService.encodeIdentifier( data[ 'identifier' ] );
if ( JSON.stringify( identifier ) === JSON.stringify( this.identifier ) ){
return true;
}
}
return false;
}
Определяет принадлежность сообщения от сокета к данному каналу.
private subscribe(){
this.websocketService.sendMessage( this.getSubscribeString() );
}
Подписывает на канал.
public send( data: Object ){
this.websocketService.sendMessage( JSON.stringify( {
command:'message',
identifier: this.identifierStr,
data: ChannelWebsocketService.getDataString( data )
} ) );
}
Отправляет данные в канал Rails5 Action Cable;
public unsubscribe(){
this.websocketService.sendMessage( JSON.stringify( {
command: 'unsubscribe',
identifier: this.identifierStr } ) );
this.subscribed.next( false );
}
Отписывает от канала.
ChatChannelService
Cервис, унаследованный от ChannelWebsocketService для подписки на канал ChatChannel:
import { Injectable } from "@angular/core";
import { ChannelWebsocketService } from "./channel.websocket.service";
import { WebSocketService } from "./websocket.service";
@Injectable()
export class ChatChannelService extends ChannelWebsocketService {
constructor( websocketService: WebSocketService ){
super( websocketService );
this.identifier = {
channel: 'ChatChannel'
};
}
}
В конструкторе этого сервиса переопределяется переменная identifier для идентификации канала.
ChatComponent
Компонент который используя ChatChannelService принимает/отправляет данные в канал.
Пример кода не привожу, он есть в GitHub, ссылка на который приведена ниже
Пример
Здесь можно скачать пример.
Для старта клиентского приложения переходим в папку «client» и вызываем:
npm install
gulp
Надеюсь эта статья поможет разобраться в данном вопросе.
Комментарии (6)
LastDragon
21.12.2016 09:05Самое сложное при работе с websocket-ом это восстановление после дисконектов (на мобилках он например может закрываться при смене вкладки и/или сворачивании браузера) и пропадающие пакеты — когда сокет умирает браузер далеко не сразу узнает об этом и до этого момента исходящие сообщения уходят в никуда. И если первая проблема решаема, то вот что делать с потерей данных мы так и не придумали :(
sentyaev
21.12.2016 09:20Добавьте пинг-понг (ask) в свой протокол общения с сервером.
Т.е. периодически спрашивайте сервер жив ли он.LastDragon
21.12.2016 09:38Код усложнится, но проблема полностью не исчезнет (а еще websocket сам это все умеет) — связь может потеряться сразу после получения понга и следующие сообщения точно так же уйдут в ничто. Поэтому у себя в чате просто забили, благо оно не сильно критично...
Если кто захочет поэкспериментировать:открываем сокет и вынимаем сетевой кабель.
sentyaev
21.12.2016 21:19а еще websocket сам это все умеет
Насколько я знаю не все так хорошо, это зависит от браузера.
Со стандартными сокетами те-же проблемы. Стандартное решение — в сообщение добавлять MessageID, соответственно сервер должен ответить, что сообщение принял.
Да, сложнее. Да, выглядит не очень, но работает.
railsfun
Спасибо, нужная публикация.