Вступление
Все мы знаем вебсокеты, любим или не очень и можем написать их со Spring MVC.
А что на счет реактивного приложения?
В этой статье будет рассмотрено как создать вебсокеты с помощью Spring WebFlux.
Контент:
- Как сконфигурировать вебсокет.
- Как прочитать сообщение?
- Как отправить сообщение?
- Демо проект.
- Рекомендации как обезопасить вебсокеты.
Для создания проекта использовалась страница start.spring.io с зависимостью Reactive Web.
Как сконфигурировать вебсокет
Начнем с конфигурации, чтобы наглядно продемонтрировать почему нам нужен тот или иной компонет.
Первое что нужно сделать это написать конфигурацию для реактивного вебсокета.
Для этого нужно два бина: HandlerMapping и HamdlerAdapter.
HandlerAdapter создается довольно таки легко:
@Bean
public HandlerAdapter wsHandlerAdapter() {
return new WebSocketHandlerAdapter();
}
Зачем нужен этот бин?
Адаптер внутри себя инициализирует вебсокет сервис и использует (еще не написанную нами) имплементацию WebSocketHandler. Другими словами, этот бин включает вебсокет в Spring WebFlux.
Следующий бин это HandlerMapping. С помощью него мы можем настроить url и отвечающий за него WebSocketHandler, а так же порядок инициализации бина. Спринг будет знать как настоить нужный url и как его обрабатывать благодяря этой конфигурации. Наш url будет выглядить так http://localhost:8080/push, так как ниже мы настраиваем путь как /push.
@Bean
public HandlerMapping handlerMapping() {
// url
String path = "/push";
// here webSocketHandler not defined yet
Map<String, WebSocketHandler> map = Map.of(path, webSocketHandler);
// -1 is order
return new SimpleUrlHandlerMapping(map, -1);
}
Конечно можно создавать сколько угодно url и WebSocketHandler.
Теперь можно создать имплементацию WebSocketHandler. С этим классом мы можем обрабатываь объект WebSocketSession, другими словами читать и отправлять сообщения.
На данный момент мы оставим Mono.empty(), но не волнуйтесь, ниже будет полноценная имплементация.
@Component
public class DefaultWebSocketHandler implements WebSocketHandler {
@Override
public Mono<Void> handle(WebSocketSession session) {
return Mono.empty();
}
}
Полный код конфигурации.
@Configuration
public class WebSocketConfig {
private WebSocketHandler webSocketHandler;
@Autowired
public WebSocketConfig(WebSocketHandler webSocketHandler) {
this.webSocketHandler = webSocketHandler;
}
@Bean
public HandlerMapping handlerMapping() {
String path = "/push";
Map<String, WebSocketHandler> map = Map.of(path, webSocketHandler);
return new SimpleUrlHandlerMapping(map, -1);
}
@Bean
public HandlerAdapter wsHandlerAdapter() {
return new WebSocketHandlerAdapter();
}
}
Как прочитать сообщение?
Логика чтения может быть реализована в имплементации интерфейса WebSocketHandler.
Нам нужно использовать объект WebSocketSession и просто вызывать метод receive(), чтобы получить стрим и обработать его.
@Component
public class DefaultWebSocketHandler implements WebSocketHandler {
@Override
public Mono<Void> handle(WebSocketSession session) {
session.receive().subscribe(message => {
// process message here
});
return Mono.empty();
}
}
Как отправить сообщение?
Перед тем как продолжить изменять WebSocketHandler нужно создать какуй-то простенький дата класс, который мы будем слать клиенту.
public class Event {
private String name;
private int count;
public Event() {}
public Event(String name, int count) {
this.name = name;
this.count = count;
}
// Getters, Setters, etc.
}
Так же нужно создать сервис, который сможет возвращать ивенты в виде реактивного стрима (как требует WebSocketSession).
Главное: нужно добавлсять новые ивенты в стрим.
Так что теперь может создать наш интерфейс:
public interface EventUnicastService {
/**
* Add message to stream
* @param next - message which will be added to stream
*/
void onNext(Event next);
Flux<Event> getMessages();
}
Имплементация будет использовать EmitterProcessor, который подходит под все наши требования. Этот процессор может потреблять наши ивенты и раздавать подписчикам.
@Service
public class EventUnicastServiceImpl implements EventUnicastService {
private EmitterProcessor<Event> processor = EmitterProcessor.create();
@Override
public void onNext(Event next) {
processor.onNext(next);
}
@Override
public Flux<Event> getMessages() {
return processor.publish().autoConnect();
}
}
Здесь:
- publish() создает ConnectableFlux, который позволяет подписаться большому количеству объектов на Flux;
- autoConnect() соеденяется с ConnectableFlux, когда кто-то выщывает метод subscribe у стрима.
Так же можно использовать метод replay(int), который кэширует указанное количество елементов и возвращает новым подписчикам.
Отправка сообщений будет выглядить вот так:
@Override
public Mono<Void> handle(WebSocketSession session) {
Flux<WebSocketMessage> messages = unicastService.getMessages()
.flatMap(o -> {
try {
return Mono.just(objectMapper.writeValueAsString(o)); // <- convert object to json
} catch (JsonProcessingException e) {
return Mono.error(e);
}
})
.map(session::textMessage);
return session.send(messages);
}
Как видно из кода выше, перед тем как отправить объект клиенту, его нужно сконвертировать в json. Spring WebFlux не предоставляет механизм для десериализации объекта в json для вебсокетов.
В примере используется Jackson, а именно ObjectMapper#writeValueAsString, так что на выходе мы имеем json строку.
Теперь можно объеденить чтение и отправку в DefaultWebSocketHandler.
@Component
public class DefaultWebSocketHandler implements WebSocketHandler {
private EventUnicastService eventUnicastService;
private ObjectMapper objectMapper;
@Autowired
public DefaultWebSocketHandler(EventUnicastService eventUnicastService, ObjectMapper objectMapper) {
this.eventUnicastService = eventUnicastService;
this.objectMapper = objectMapper;
}
@Override
public Mono<Void> handle(WebSocketSession session) {
Flux<WebSocketMessage> messages = session.receive()
// .doOnNext(message -> { read message here or in the block below })
.flatMap(message -> {
// or read message here
return eventUnicastService.getMessages();
})
.flatMap(o -> {
try {
return Mono.just(objectMapper.writeValueAsString(o));
} catch (JsonProcessingException e) {
return Mono.error(e);
}
}).map(session::textMessage);
return session.send(messages);
}
}
Демо
Event generator
Для демо будет создан простой ивент генератор, который будет просто инкременитировать счетчик каждую секунду и отправлять ивент.
@Component
public class EventGenerator {
private AtomicInteger counter = new AtomicInteger(0);
private EventUnicastService eventUnicastService;
@Autowired
public EventGenerator(EventUnicastService eventUnicastService) {
this.eventUnicastService = eventUnicastService;
}
@Scheduled(initialDelay = 1000, fixedDelay = 1000)
public void generateEvent() {
int count = counter.getAndIncrement();
Event event = new Event("event", count);
eventUnicastService.onNext(event);
}
}
Не забываем включить Scheduler в приложении.
@SpringBootApplication
@EnableScheduling // <- enable scheduling!!!
public class WebfluxwebsocketsApplication
Клиент
Для клиента будет использоваться Angular.
Что нужно сделать:
Создать проект.
ng new projectname
Изменить app.component.ts.
import {Component, OnInit} from '@angular/core';
import {webSocket} from 'rxjs/webSocket';
@Component({
selector: 'app-root',
templateUrl: './app.component.html',
styleUrls: ['./app.component.css']
})
export class AppComponent implements OnInit {
title = 'websocket-angular';
messages: string[] = [];
// create WebSocket subject
private subject = webSocket('ws://localhost:8080/push');
ngOnInit(): void {
this.subject.next({message: 'message'}); // <- ping first message
this.subject.subscribe(message => { // <- listen messages from server
const event = message as Event
this.messages.push(event.name + ' #' + event.count);
});
}
}
Здесь мы используем rxjs websockets.
Чтобы увидеть наши сообщения надо обновить app.component.html.
<div style="text-align:center">
<h1>
Welcome to {{ title }}!
</h1>
</div>
<ul>
<li *ngFor="let m of messages">{{m}}</li>
</ul>
Теперь можно запустить приложение и открыть в браузере localhost:4200.
Результат:
Рекомендации как обезопасить вебсокеты
- Используйте вебсокеты через SSL/TLS (wss:// протокол).
- Используйте валидацию данных клиета и сервера.
- Используйте вебсокет подключение после авторизации.
- Генерируйте уникальный ключ который можно получить по http и использовать в подключении к вебсокетам.
- Используйте Origin заголовок.
Выводы
В статье показана базовые кофигурация, чтение и отправка сообщений используя реактивные вебсокеты, которое было рассмотрено на примере демо проекта. Так же можно увидеть, что реализация упирается в знание Project Reactor.
Примеры кода можно найти в GitHub https://github.com/MaxNeutrino/snippets/tree/master/webflux-websocket
SergeyZV
А как лолькально протестировать wss без наличия сертификата?
MaxNeutrino Автор
Не знаю можно так или нет. Однако, для локального тестирования, можно сделать самоподписанный сертификат с помощью openssl и добавить в исключения браузера.