Вступление


Все мы знаем вебсокеты, любим или не очень и можем написать их со Spring MVC.


А что на счет реактивного приложения?


В этой статье будет рассмотрено как создать вебсокеты с помощью Spring WebFlux.


Контент:


  1. Как сконфигурировать вебсокет.
  2. Как прочитать сообщение?
  3. Как отправить сообщение?
  4. Демо проект.
  5. Рекомендации как обезопасить вебсокеты.

Для создания проекта использовалась страница start.spring.io с зависимостью Reactive Web.


Как сконфигурировать вебсокет


Начнем с конфигурации, чтобы наглядно продемонтрировать почему нам нужен тот или иной компонет.


Первое что нужно сделать это написать конфигурацию для реактивного вебсокета.


Для этого нужно два бина: HandlerMapping и HamdlerAdapter.


HandlerAdapter создается довольно таки легко:


    @Bean
    public HandlerAdapter wsHandlerAdapter() {
        return new WebSocketHandlerAdapter();
    }

Зачем нужен этот бин?


Адаптер внутри себя инициализирует вебсокет сервис и использует (еще не написанную нами) имплементацию WebSocketHandler. Другими словами, этот бин включает вебсокет в Spring WebFlux.


Следующий бин это HandlerMapping. С помощью него мы можем настроить url и отвечающий за него WebSocketHandler, а так же порядок инициализации бина. Спринг будет знать как настоить нужный url и как его обрабатывать благодяря этой конфигурации. Наш url будет выглядить так http://localhost:8080/push, так как ниже мы настраиваем путь как /push.


    @Bean
    public HandlerMapping handlerMapping() {
    // url
        String path = "/push";

    // here webSocketHandler not defined yet
        Map<String, WebSocketHandler> map = Map.of(path, webSocketHandler);

    // -1 is order
        return new SimpleUrlHandlerMapping(map, -1);
    }

Конечно можно создавать сколько угодно url и WebSocketHandler.


Теперь можно создать имплементацию WebSocketHandler. С этим классом мы можем обрабатываь объект WebSocketSession, другими словами читать и отправлять сообщения.


На данный момент мы оставим Mono.empty(), но не волнуйтесь, ниже будет полноценная имплементация.


@Component
public class DefaultWebSocketHandler implements WebSocketHandler {

    @Override
    public Mono<Void> handle(WebSocketSession session) {
    return Mono.empty();
    }
}

Полный код конфигурации.


@Configuration
public class WebSocketConfig {

    private WebSocketHandler webSocketHandler;

    @Autowired
    public WebSocketConfig(WebSocketHandler webSocketHandler) {
        this.webSocketHandler = webSocketHandler;
    }

    @Bean
    public HandlerMapping handlerMapping() {
        String path = "/push";
        Map<String, WebSocketHandler> map = Map.of(path, webSocketHandler);
        return new SimpleUrlHandlerMapping(map, -1);
    }

    @Bean
    public HandlerAdapter wsHandlerAdapter() {
        return new WebSocketHandlerAdapter();
    }
}

Как прочитать сообщение?


Логика чтения может быть реализована в имплементации интерфейса WebSocketHandler.


Нам нужно использовать объект WebSocketSession и просто вызывать метод receive(), чтобы получить стрим и обработать его.


@Component
public class DefaultWebSocketHandler implements WebSocketHandler {

    @Override
    public Mono<Void> handle(WebSocketSession session) {
    session.receive().subscribe(message => {
        // process message here
    });
    return Mono.empty();
    }
}

Как отправить сообщение?


Перед тем как продолжить изменять WebSocketHandler нужно создать какуй-то простенький дата класс, который мы будем слать клиенту.


public class Event {

    private String name;

    private int count;

    public Event() {}

    public Event(String name, int count) {
        this.name = name;
        this.count = count;
    }

        // Getters, Setters, etc.
}

Так же нужно создать сервис, который сможет возвращать ивенты в виде реактивного стрима (как требует WebSocketSession).


Главное: нужно добавлсять новые ивенты в стрим.


Так что теперь может создать наш интерфейс:


public interface EventUnicastService {

    /**
     * Add message to stream
     * @param next - message which will be added to stream
     */
    void onNext(Event next);

    Flux<Event> getMessages();
}

Имплементация будет использовать EmitterProcessor, который подходит под все наши требования. Этот процессор может потреблять наши ивенты и раздавать подписчикам.


@Service
public class EventUnicastServiceImpl implements EventUnicastService {

    private EmitterProcessor<Event> processor = EmitterProcessor.create();

    @Override
    public void onNext(Event next) {
        processor.onNext(next);
    }

    @Override
    public Flux<Event> getMessages() {
        return processor.publish().autoConnect();
    }
}

Здесь:


  • publish() создает ConnectableFlux, который позволяет подписаться большому количеству объектов на Flux;
  • autoConnect() соеденяется с ConnectableFlux, когда кто-то выщывает метод subscribe у стрима.

Так же можно использовать метод replay(int), который кэширует указанное количество елементов и возвращает новым подписчикам.


Отправка сообщений будет выглядить вот так:



    @Override
    public Mono<Void> handle(WebSocketSession session) {
    Flux<WebSocketMessage> messages = unicastService.getMessages()
        .flatMap(o -> {
                    try {
                        return Mono.just(objectMapper.writeValueAsString(o)); // <- convert object to json
                    } catch (JsonProcessingException e) {
                        return Mono.error(e);
                    }
                })
            .map(session::textMessage);

    return session.send(messages);
    }

Как видно из кода выше, перед тем как отправить объект клиенту, его нужно сконвертировать в json. Spring WebFlux не предоставляет механизм для десериализации объекта в json для вебсокетов.


В примере используется Jackson, а именно ObjectMapper#writeValueAsString, так что на выходе мы имеем json строку.


Теперь можно объеденить чтение и отправку в DefaultWebSocketHandler.


@Component
public class DefaultWebSocketHandler implements WebSocketHandler {

    private EventUnicastService eventUnicastService;

    private ObjectMapper objectMapper;

    @Autowired
    public DefaultWebSocketHandler(EventUnicastService eventUnicastService, ObjectMapper objectMapper) {
        this.eventUnicastService = eventUnicastService;
        this.objectMapper = objectMapper;
    }

    @Override
    public Mono<Void> handle(WebSocketSession session) {
        Flux<WebSocketMessage> messages = session.receive()
                // .doOnNext(message -> { read message here or in the block below })
                .flatMap(message -> {
                    // or read message here
                    return eventUnicastService.getMessages();
                })
                .flatMap(o -> {
                    try {
                        return Mono.just(objectMapper.writeValueAsString(o));
                    } catch (JsonProcessingException e) {
                        return Mono.error(e);
                    }
                }).map(session::textMessage);
        return session.send(messages);
    }
}

Демо


Event generator


Для демо будет создан простой ивент генератор, который будет просто инкременитировать счетчик каждую секунду и отправлять ивент.


@Component
public class EventGenerator {

    private AtomicInteger counter = new AtomicInteger(0);

    private EventUnicastService eventUnicastService;

    @Autowired
    public EventGenerator(EventUnicastService eventUnicastService) {
        this.eventUnicastService = eventUnicastService;
    }

    @Scheduled(initialDelay = 1000, fixedDelay = 1000)
    public void generateEvent() {
        int count = counter.getAndIncrement();
        Event event = new Event("event", count);
        eventUnicastService.onNext(event);
    }
}

Не забываем включить Scheduler в приложении.


@SpringBootApplication
@EnableScheduling // <- enable scheduling!!!
public class WebfluxwebsocketsApplication

Клиент


Для клиента будет использоваться Angular.


Что нужно сделать:


Создать проект.


ng new projectname

Изменить app.component.ts.


import {Component, OnInit} from '@angular/core';
import {webSocket} from 'rxjs/webSocket';

@Component({
  selector: 'app-root',
  templateUrl: './app.component.html',
  styleUrls: ['./app.component.css']
})
export class AppComponent implements OnInit {
  title = 'websocket-angular';

  messages: string[] = [];

    // create WebSocket subject
  private subject = webSocket('ws://localhost:8080/push');

  ngOnInit(): void {
    this.subject.next({message: 'message'}); // <- ping first message
    this.subject.subscribe(message => {       // <- listen messages from server
      const event = message as Event
      this.messages.push(event.name + ' #' + event.count);
    });
  }
}

Здесь мы используем rxjs websockets.


Чтобы увидеть наши сообщения надо обновить app.component.html.


<div style="text-align:center">
  <h1>
    Welcome to {{ title }}!
  </h1>
</div>
<ul>
  <li *ngFor="let m of messages">{{m}}</li>
</ul>

Теперь можно запустить приложение и открыть в браузере localhost:4200.


Результат:



Рекомендации как обезопасить вебсокеты


  1. Используйте вебсокеты через SSL/TLS (wss:// протокол).
  2. Используйте валидацию данных клиета и сервера.
  3. Используйте вебсокет подключение после авторизации.
  4. Генерируйте уникальный ключ который можно получить по http и использовать в подключении к вебсокетам.
  5. Используйте Origin заголовок.

Выводы


В статье показана базовые кофигурация, чтение и отправка сообщений используя реактивные вебсокеты, которое было рассмотрено на примере демо проекта. Так же можно увидеть, что реализация упирается в знание Project Reactor.


Примеры кода можно найти в GitHub https://github.com/MaxNeutrino/snippets/tree/master/webflux-websocket