Всем доброго времени суток. Некоторое время назад мною была написана статья, где я детально описал процесс разработки демо игрового вебсокет сервера. На этот раз, я хотел бы поделиться более усовершенствованным и оптимизированным материалом на Kotlin и реактивном стеке.

Недостатки и преимущества исходного решения

Старый проект имеет ряд существенных недостатков для коммерческого/энтерпрайз использования — отсутствие прямого взаимодействия с некоторыми из компонентов SpringBoot. Например, невозможность прикрутить стандартную авторизацию(oidc) из коробки. Кроме того, решение весьма устарело, время и опыт толкает двигаться вперед и улучшать продукт.

Среди премуществ — вы имеете возможность напрямую познакомиться с внутренним устройством Netty, понять принципы его работы и влиять на его механизмы.

Концепт

Как и в прошлый раз концепт остается прежним, но с небольшими изменениями.

  1. На входе: Kotlin, Reactor‑Netty, Spring, Maven

  2. Цели те же — хотим авторизовываться, подключаться к игровой комнате и начать играть, двигая кружок на экране.

  3. На выходе: детальное описание работы сервера и его особенностей.

Стек

  • Kotlin 1.8

  • JDK 21

  • Spring Boot 3.2.2

  • Spring Webflux (Reactor-netty)

Сборка

Исходник проекта здесь

В качестве сборщика проекта и менеджера зависимостей используется Maven

pom.xml выглядит следующим образом:

<?xml version="1.0" encoding="ISO-8859-15"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
    <modelVersion>4.0.0</modelVersion>
    <name>webflux-game-server</name>

    <groupId>com.tfkfan</groupId>
    <artifactId>webflux-game-server</artifactId>
    <version>1.0.0-SNAPSHOT</version>
    <packaging>jar</packaging>

    <parent>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-parent</artifactId>
        <version>3.2.2</version>
        <relativePath/>
    </parent>

    <properties>
        <kotlin.version>1.8.0</kotlin.version>
        <kotlin.coroutines.version>1.8.0-RC</kotlin.coroutines.version>
        <maven.compiler.version>21</maven.compiler.version>
    </properties>

    <dependencies>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-webflux</artifactId>
        </dependency>
        <dependency>
            <groupId>org.jetbrains.kotlinx</groupId>
            <artifactId>kotlinx-coroutines-core</artifactId>
            <version>${kotlin.coroutines.version}</version>
        </dependency>
        <dependency>
            <groupId>org.jetbrains.kotlin</groupId>
            <artifactId>kotlin-stdlib</artifactId>
            <version>${kotlin.version}</version>
        </dependency>
        <dependency>
            <groupId>org.jetbrains.kotlin</groupId>
            <artifactId>kotlin-reflect</artifactId>
            <version>${kotlin.version}</version>
        </dependency>
        <dependency>
            <groupId>com.google.code.gson</groupId>
            <artifactId>gson</artifactId>
            <version>2.8.9</version>
        </dependency>
        <dependency>
            <groupId>org.apache.commons</groupId>
            <artifactId>commons-lang3</artifactId>
            <version>3.12.0</version>
        </dependency>
    </dependencies>

    <build>
        <plugins>
            <plugin>
                <artifactId>kotlin-maven-plugin</artifactId>
                <groupId>org.jetbrains.kotlin</groupId>
                <version>${kotlin.version}</version>
                <executions>
                    <execution>
                        <id>compile</id>
                        <phase>process-sources</phase>
                        <goals>
                            <goal>compile</goal>
                        </goals>
                        <configuration>
                            <sourceDirs>
                                <sourceDir>${project.basedir}/src/main/kotlin</sourceDir>
                            </sourceDirs>
                        </configuration>
                    </execution>
                    <execution>
                        <id>test-compile</id>
                        <goals>
                            <goal>test-compile</goal>
                        </goals>
                        <configuration>
                            <sourceDirs>
                                <sourceDir>${project.basedir}/src/test/kotlin</sourceDir>
                            </sourceDirs>
                        </configuration>
                    </execution>
                </executions>
            </plugin>
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-compiler-plugin</artifactId>
            </plugin>
        </plugins>
    </build>
</project>

Все достаточно просто, все что нам нужно — webflux вместе с embedded netty сервером.

Сборка и запуск

./mvnw clean verify spring-boot:run

Клиент

Клиентское приложение находится в статике /src/main/resources/static. Краткое описание:

  • index.html — страница клиентской части приложения

  • app.js — отрисовка объектов и обработка сообщений

  • network.js — функционал для работы с сетью (websocket)

  • types.js — справочник типов сообщений

  • styles.css — таблица стилей приложения

Не будем останавливаться на клиенте, он достаточно примитивен и прост. Вместо этого уделим большее внимание серверной части, про что и писалась статья.

Сервер

Http‑порт по умолчанию — 8080. Для вебсокета выделен отдельный маршрут под этим портом: /websocket

Серверный код разделен на следующие пакеты:

  • config — Конфигурация приложения, пропертя, константы

  • event — Функционал для работы с внутриигровыми событиями

  • game — Внутрикомнатная игровая логика, игровые модели, игровые поля (сюда же должны входить и различные варианты оптимизированных контейнеров внутриигровых объектов, таких как Mesh, QuadTree и т. п.)

  • math — Математические манипуляции и объекты

  • network — Работа с сетью, основной подкапотный функционал

  • service — Сервисы, отвечающие за конкретную функцию в приложении, например, авторизация, менеджер сессий, менеджер комнат (он же матчмейкер) и др.

  • shared — Различные объекты, используемые повсеместно в приложении

Как видно из структуры проекта, большая часть игровой логики в итоге будет разделена между сервисами и игровыми комнатами. Больше не нужно писать декодеры, netty‑обработчики и пайплайны, код становится значительно проще.

Рассмотрим работу с сетью и пакет network.

Network. Работа с сетью и конфигурация сервера

Работа сервера начинается разумеется с Application.java, который в свою очередь поднимает встроенный в webflux netty-сервер.

Напомню, Netty — фреймворк для работы с сетью, в основе которого лежат неблокирующие операции. Тоесть, грубо говоря, когда приходит клиентский запрос, поток, в последствии обрабатывающий его, не ждет чтения из сокета, в отличие от блокирующих инструментов ввода‑вывода, а может быть занят чем либо еще и вызовется, когда сообщение будет прочитано и готово к обработке.

Влиять на конфикурацию самого Netty и его ресурсы вы можете с помощью переменных окружения

-Dreactor.netty.*

См. https://projectreactor.io/docs/netty/release/reference/index.html

Основной обработчик, работающий с вебсокет сообщениями:

@Service
class MainWebSocketHandler(
    private val webSocketSessionService: WebSocketSessionService
) : WebSocketHandler {
    private val objectMapper = Gson()
    private lateinit var roomService: RoomService

    companion object {
        val log: Logger = LogManager.getLogger(this::class.java)
    }

    override fun handle(webSocketSession: WebSocketSession): Mono<Void> {
        val input = webSocketSession.receive().share()
        val userSession = UserSession(webSocketSession.id, webSocketSession.handshakeInfo)
        val sessionHandler = UserSessionWebSocketHandler(
            userSession,
            webSocketSessionService, roomService
        )

        val receive = input
            .filter { it.type == WebSocketMessage.Type.TEXT }
            .map(this::toMessage)
            .doOnNext(sessionHandler::onNext)

        val send = webSocketSession.send(webSocketSessionService.onActive(userSession)
            .map {
                webSocketSession.textMessage(objectMapper.toJson(it))
            }
            .doOnError { handleError(webSocketSession, it) })

        val security = webSocketSession.handshakeInfo.principal.doOnNext {
            webSocketSessionService.onPrincipalInit(userSession, it)
        }
        return Flux.merge(receive, send, security)
            .doOnSubscribe { webSocketSessionService.onSubscribe(userSession, it) }
            .doOnTerminate { webSocketSessionService.onInactive(userSession) }
            .doOnError { handleError(webSocketSession, it) }
            .then()
    }

    private fun toMessage(webSocketMessage: WebSocketMessage): Message =
        objectMapper.fromJson(InputStreamReader(webSocketMessage.payload.asInputStream()), Message::class.java)

    private fun handleError(webSocketSession: WebSocketSession, exception: Throwable) {
        log.error("Error in ${webSocketSession.id} session", exception)
    }

    @Autowired
    fun setGameRoomManagementService(@Lazy roomService: RoomService) {
        this.roomService = roomService
    }
}

Как только произошел upgrade протокола — пользователь попадает в метод handle. В нем сразу создается объект пользовательской сессии UserSession, а также навешиваются ряд асинхронных лямбд, обрабатывающих как входящие сообщения, так и исходящие.

Стоит отметить обработчик handshake:

val security = webSocketSession.handshakeInfo.principal.doOnNext {
    webSocketSessionService.onPrincipalInit(userSession, it)
}

Здесь происходит отложенная инициализация принципалов, если авторизация прошла успешно, причем публикация принципала в Mono происходит на момент подписки клиента за пределами обработчика. На данный момент, иного механизма получения кредов из handshakeInfo, в том числе в методе handle, — нет, а свежий реактивный websocket‑security функционал на стадии разработки командой spring.

Рассмотрим класс UserSessionWebSocketHandler:

class UserSessionWebSocketHandler(
    private val userSession: UserSession,
    private val webSocketSessionService: WebSocketSessionService,
    private val roomService: RoomService
) {
    private val objectMapper = Gson()

    companion object {
        val log: Logger = LogManager.getLogger(this::class.java)
    }

    fun onNext(message: Message) {
        val messageData = if (message.data != null) message.data as Map<*, *> else null

        when (message.type) {
            GAME_ROOM_JOIN -> {
                    log.debug("Join attempt from {}", userSession.handshakeInfo.remoteAddress)
                    roomService.addPlayerToWait(
                        userSession,
                        objectMapper.fromJson(messageData.toString(), GameRoomJoinEvent::class.java)
                    )
            }

            INIT -> {
                roomService.getRoomByKey(userSession.roomKey)
                    .ifPresent { it.onPlayerInitRequest(userSession) }
            }

            PLAYER_KEY_DOWN -> {
                roomService.getRoomByKey(userSession.roomKey)
                    .ifPresent {
                        it.onPlayerKeyDown(
                            userSession,
                            objectMapper.fromJson(messageData.toString(), KeyDownPlayerEvent::class.java)
                        )
                    }
            }

        }
    }
}

Здесь происходит обработка входящих сообщений пользователя в зависимости от числового message.type.

Рассмотрим интерфейсы WebSocketSessionService, WebSocketMessagePublisher и их имплементацию:

interface WebSocketMessagePublisher {
    fun send(userSession: UserSession, message: Any)
    fun sendFailure(userSession: UserSession, message: Any)
    fun sendBroadcast(type: MessageType, message: String)
    fun sendBroadcast(message: Any)
    fun sendBroadcast(messageFunction: Function<UserSession, Any>)
    fun sendBroadcast(userSessions: Collection<UserSession>, message: Any)
    fun send(userSession: UserSession, function: Function<UserSession, Any>)
    fun sendBroadcast(userSessions: Collection<UserSession>, function: Function<UserSession, Any>)
}
interface WebSocketSessionService : WebSocketMessagePublisher {
    fun onActive(userSession: UserSession): Flux<Any>
    fun onSubscribe(userSession: UserSession, subscription: Subscription)
    fun onPrincipalInit(userSession: UserSession, principal: Principal)
    fun onInactive(userSession: UserSession)
    fun close(userSession: UserSession): Mono<Void>
    fun close(userSessionId: String): Mono<Void>
    fun closeAll():Mono<Void>
    fun roomIds():Mono<Collection<String>>
    fun sessionIds():Mono<Collection<String>>
    fun roomSessionIds(roomId:UUID):Mono<Collection<String>>
}
@Service
open class WebSocketSessionServiceImpl : WebSocketSessionService {
    companion object {
        val log = LogManager.getLogger(this::class.java)
    }

    private var sessionPublishers: MutableMap<String, Many<Any>> = HashMap()
    private var sessionSubscriptions: MutableMap<String, Subscription> = HashMap()
    private var sessions: MutableMap<String, UserSession> = HashMap()
    private lateinit var roomService: RoomService

    override fun sendBroadcast(message: Any) = sessionPublishers.values.forEach { it.tryEmitNext(message) }
    override fun close(userSession: UserSession): Mono<Void> {
        if (sessionSubscriptions.containsKey(userSession.id))
            sessionSubscriptions[userSession.id]!!.cancel()
        return Mono.empty()
    }

    override fun close(userSessionId: String): Mono<Void> {
        if (sessions.containsKey(userSessionId))
            return close(sessions[userSessionId]!!)
        return Mono.empty()
    }

    override fun closeAll(): Mono<Void> {
        sessions.values.forEach(this::close)
        return Mono.empty()
    }

    override fun roomIds(): Mono<Collection<String>> = Mono.fromCallable {
        roomService.getRoomIds()
    }

    override fun sessionIds(): Mono<Collection<String>> = Mono.fromCallable {
        sessions.keys.toList()
    }

    override fun roomSessionIds(roomId: UUID): Mono<Collection<String>> = Mono.fromCallable {
        roomService.getRoomSessionIds(roomId)
    }

    override fun sendBroadcast(messageFunction: Function<UserSession, Any>) =
        sessions.values.stream().forEach { this.send(it, messageFunction) }

    override fun sendBroadcast(userSessions: Collection<UserSession>, message: Any) =
        userSessions.forEach { send(it, message) }

    override fun send(
        userSession: UserSession,
        function: Function<UserSession, Any>
    ) =
        send(userSession, function.apply(userSession))

    override fun sendBroadcast(
        userSessions: Collection<UserSession>,
        function: Function<UserSession, Any>
    ) = userSessions.forEach { send(it, function.apply(it)) }

    override fun send(userSession: UserSession, message: Any) {
        val webSocketSessionId = userSession.id
        if (sessionPublishers.containsKey(webSocketSessionId)) sessionPublishers[webSocketSessionId]!!
            .tryEmitNext(message)
    }

    override fun sendFailure(userSession: UserSession, message: Any) =
        send(userSession, Message(FAILURE, message))

    override fun sendBroadcast(type: MessageType, message: String) =
        sendBroadcast(Message(MESSAGE, GameMessagePack(type.type, message)))

    override fun onActive(userSession: UserSession): Flux<Any> {
        log.debug("Client ${userSession.id} connected")
        /*        UriComponentsBuilder builder = UriComponentsBuilder.fromUri(session.getHandshakeInfo().getUri());
Map<String,String> queryParams = builder.build().getQueryParams().toSingleValueMap();*/
        val sink = Sinks.many().unicast().onBackpressureBuffer<Any>()
        sessionPublishers[userSession.id] = sink
        sessions[userSession.id] = userSession
        return sink.asFlux()
    }

    override fun onSubscribe(userSession: UserSession, subscription: Subscription) {
        sessionSubscriptions[userSession.id] = subscription
    }

    override fun onPrincipalInit(
        userSession: UserSession,
        principal: Principal
    ) {
        userSession.principal = principal
    }

    override fun onInactive(userSession: UserSession) {
        log.debug("Client ${userSession.id} disconnected")
        if (!sessionPublishers.containsKey(userSession.id)) return
        sessionPublishers.remove(userSession.id)
        sessions.remove(userSession.id)

        if (userSession.roomKey != null)
            roomService.getRoomByKey(userSession.roomKey!!)
                .ifPresent { it.onDisconnect(userSession) }
    }

    @Autowired
    fun setGameRoomManagementService(@Lazy roomService: RoomService) {
        this.roomService = roomService
    }
}

Данные сервисы служат, очевидно, для публикации сообщений и контроля сессий пользователя. Все это реализуется через классы Sinks.Many, которые сочетают внутри себя как Publisher так и Subscriber. В ранних версиях project reactor активно использовались Processor<T, R> с тем же назначением, в поздних версиях эти классы помечены как Deprecated.

Сессия

class UserSession(
    override val id: String,
    val handshakeInfo: HandshakeInfo
) : AbstractEntity<String>(id) {
    var player: Player? = null
    var roomKey: UUID? = null
    var principal: Principal? = null
    override fun toString(): String = "UserSession [id=" + id + "player=" + player + ", parentGameRoom=" + roomKey + "]"
}

Сессия пользователя хранит в себе все необходимые данные, в т.ч. Principal, полученный в момент успешной авторизации. Id пользовательской сессии берется из вебсокет сессии:

UserSession(webSocketSession.id, webSocketSession.handshakeInfo)

Сообщения

Формат сообщений остался прежним, поле type имеет ключевое значение

open class Message {
    var type = 0
    var data: Any? = null

    constructor()
    constructor(type: Int) : this(type, null)
    constructor(type: Int, data: Any?) {
        this.type = type
        this.data = data
    }

    override fun toString(): String = "Message [type=" + type + ", source=" + data.toString() + "]"
}

Пакеты исходящих сообщений разделены на:

  • Pack — главный предок всех пакетов. Обычно, это пакеты, отвечающие за кусок логики, не связанный с игровым процессом напрямую, в том числе и контейнер ошибки ExceptionPack.

  • InitPack — пакеты инициализации игровых объектов на поле боя

  • UpdatePack — пакеты обновления состояния объектов, рассылаемые всем игрокам

  • RemovePack — пакеты очистки и удаления части состояния игровых объектов.

  • PrivateUpdatePack — индивидуальные пакеты обновления состояния объектов

  • PrivateRemovePack — индивидуальные пакеты очистки и удаления части состояния игровых объектов.

  • PrivateInitPack — индивидуальные пакеты инициализации игровых объектов на поле боя

Game. Игровая логика и комнаты

Схема работы игровой комнаты
Схема работы игровой комнаты

Основу игровой логики представляет игровая комната — GameRoom

interface GameRoom : Runnable, Updatable, WebSocketMessagePublisher {
    fun onRoomCreated(userSessions: List<UserSession>)
    fun onRoomStarted()
    fun onBattleStarted()
    fun onDestroy(userSessions: List<UserSession>)
    fun onDisconnect(userSession: UserSession): UserSession
    fun sessions(): Collection<UserSession>
    fun currentPlayersCount(): Int
    fun getPlayerSessionBySessionId(userSession: UserSession): Optional<UserSession>
    fun key(): UUID
    fun key(key: UUID)
    fun close(): Collection<UserSession>
    fun onClose(userSession: UserSession)
    fun schedule(runnable: Runnable, delayMillis: Long):Boolean
    fun schedulePeriodically(runnable: Runnable, initDelay:Long, loopRate:Long):Boolean
}
abstract class AbstractGameRoom protected constructor(
    private var gameRoomId: UUID,
    private val schedulerService: Scheduler,
    protected val roomService: RoomService,
    protected val webSocketSessionService: WebSocketSessionService
) : GameRoom {
    private val roomFutureList: MutableList<Disposable> = ArrayList()
    companion object {
        val log: Logger = LogManager.getLogger(this::class.java)
    }

    private var sessions: MutableMap<String, UserSession> = HashMap()

    override fun onRoomCreated(userSessions: List<UserSession>) {
        for (playerSession in userSessions) {
            this.sessions[playerSession.id] = playerSession
            sendBroadcast(Message(MESSAGE, playerSession.player!!.id.toString() + " successfully joined"))
        }
    }

    override fun onDestroy(userSessions: List<UserSession>) {
        for (playerSession in userSessions) {
            this.sessions.remove(playerSession.id)
            sendBroadcast(Message(MESSAGE, playerSession.player!!.id.toString() + " left"))
        }
    }

    override fun schedule(runnable: Runnable, delayMillis: Long)=
        roomFutureList.add(schedulerService.schedule(runnable, delayMillis, TimeUnit.MILLISECONDS))

    override fun schedulePeriodically(runnable: Runnable, initDelay: Long, loopRate: Long)=
        roomFutureList.add(schedulerService.schedulePeriodically(runnable,initDelay, loopRate,TimeUnit.MILLISECONDS))

    override fun onDisconnect(userSession: UserSession): UserSession = sessions.remove(userSession.id)!!
    override fun send(userSession: UserSession, message: Any) =
        webSocketSessionService.send(userSession, message)

    override fun sendBroadcast(message: Any) =
        webSocketSessionService.sendBroadcast(sessions.values, message)

    override fun sendBroadcast(messageFunction: Function<UserSession, Any>) =
        webSocketSessionService.sendBroadcast(sessions.values, messageFunction)

    override fun sendFailure(userSession: UserSession, message: Any) {
        webSocketSessionService.sendFailure(userSession, message)
    }

    override fun sendBroadcast(type: MessageType, message: String) {
       webSocketSessionService.sendBroadcast(type, message)
    }

    override fun sendBroadcast(userSessions: Collection<UserSession>, message: Any) {
        webSocketSessionService.sendBroadcast(userSessions, message)
    }

    override fun send(userSession: UserSession, function: Function<UserSession, Any>) {
        webSocketSessionService.send(userSession, function)
    }

    override fun sendBroadcast(userSessions: Collection<UserSession>, function: Function<UserSession, Any>) {
        webSocketSessionService.sendBroadcast(userSessions, function)
    }

    override fun run() {
        try {
            update()
        } catch (e: Exception) {
            log.error("room update exception", e)
        }
    }

    override fun close(): Collection<UserSession> {
        val result: Collection<UserSession> = sessions.values
        sessions.values.forEach { this.onClose(it) }
        roomFutureList.forEach { it.dispose() }
        log.trace("Room {} has been closed", key())
        return result
    }

    override fun onClose(userSession: UserSession) {}
    override fun getPlayerSessionBySessionId(userSession: UserSession): Optional<UserSession> =
        if (sessions.containsKey(userSession.id)) Optional.of(sessions[userSession.id]!!)
        else Optional.empty()

    override fun sessions(): Collection<UserSession> = sessions.values
    override fun currentPlayersCount(): Int = sessions.size
    override fun key(): UUID = gameRoomId
    override fun key(key: UUID) {
        this.gameRoomId = key
    }
}

Что представляет собой в сущности stateful обертку вокруг Runnable-задачи, которая является итерацией внутриигрового цикла (см. метод update). Экземпляр комнаты отвечает за следующие функции:

  • Хранение, использование и утилизацию сессий игроков

  • Коммуникацию с менеджером комнат

  • Коммуникацию с клиентом

  • Вызов, обработку и планирование внутриигровых событий

  • Обработку игровых моделей

Логика работы игровой комнаты устроена на поочередном запуске узловых событий:

  1. onRoomCreated - вызывается сразу после создания комнаты и инициализации сессий игроков в ней.

  2. onRoomStarted - вызывается практически одновременно с onRoomCreated, когда игровая комната готова к работе.

  3. Старт игрового цикла и вызов onBattleStarted - по истечению отсрочки старта битвы, служащей для ожидания подключения всех игроков, реактивный Scheduler запускает вызов метода onBattleStarted и периодическое выполнение метода run игровой комнаты, а он в свою очередь оборачивает вызов метода update. Интервал обновления состояния задается в конфиге application.room.loop-rate. Параметр отсрочки старта задается в конфиге application.room.start-delay.

  4. Завершение игрового цикла и вызов onBattleEnded - вызывается по истечению времени битвы (задается в конфиге application.room.end-delay).

    Кроме того, вы можете прикрутить любое временное событие на сервере с помощью методов schedule и schedulePeriodically

    Полностью готовый класс игровой комнаты выглядит так:

    class DefaultGameRoom(
        val map: GameMap,
        gameRoomId: UUID,
        roomService: RoomService,
        webSocketSessionService: WebSocketSessionService,
        schedulerService: Scheduler,
        val gameProperties: GameProperties,
        val roomProperties: RoomProperties
    ) : AbstractGameRoom(gameRoomId, schedulerService, roomService, webSocketSessionService) {
        private val started = AtomicBoolean(false)
    
        override fun onRoomCreated(userSessions: List<UserSession>) {
            if (userSessions.isNotEmpty()) {
                userSessions.forEach {
                    val defaultPlayer = it.player as DefaultPlayer
                    defaultPlayer.position = Vector(100.0, 100.0)
                    map.addPlayer(defaultPlayer)
                }
            }
    
            super.onRoomCreated(userSessions)
    
            sendBroadcast {
                Message(
                    GAME_ROOM_JOIN_SUCCESS,
                    GameSettingsPack(
                        roomProperties.loopRate
                    )
                )
            }
    
            schedulePeriodically(
                this,
                roomProperties.initDelay,
                roomProperties.loopRate
            )
    
            schedule(
                { roomService.onBattleEnd(this) },
                roomProperties.endDelay + roomProperties.startDelay
            )
    
            log.trace("Room {} has been created", key())
        }
    
        override fun onRoomStarted() {
            started.set(false)
            sendBroadcast(
                Message(
                    GAME_ROOM_START,
                    GameRoomPack(
                        OffsetDateTime.now().plus(roomProperties.startDelay, ChronoUnit.MILLIS).toInstant().toEpochMilli()
                    )
                )
            )
            schedule({ this.onBattleStarted() }, roomProperties.startDelay)
            log.trace("Room {} has been started", key())
        }
    
        override fun onBattleStarted() {
            log.trace("Room {}. Battle has been started", key())
            started.set(true)
            sendBroadcast(
                Message(
                    GAME_ROOM_BATTLE_START, GameRoomPack(
                        OffsetDateTime.now().plus(roomProperties.endDelay, ChronoUnit.MILLIS).toInstant().toEpochMilli()
                    )
                )
            )
        }
    
        //room's game loop
        override fun update() {
            if (!started.get()) return
            for (currentPlayer in map.getPlayers()) {
                if (currentPlayer.isAlive) currentPlayer.update()
                val updatePack = currentPlayer.getPrivateUpdatePack()
                val playerUpdatePackList = map.getPlayers()
                    .map { it.getUpdatePack() }
    
                val session = currentPlayer.userSession
                send(
                    session, Message(
                        UPDATE,
                        GameUpdatePack(
                            updatePack,
                            playerUpdatePackList
                        )
                    )
                )
            }
        }
    
        fun onPlayerKeyDown(userSession: UserSession, event: KeyDownPlayerEvent) {
            if (!started.get()) return
            val player = userSession.player as DefaultPlayer
            if (!player.isAlive) return
            val direction = Direction.valueOf(event.inputId)
            player.updateState(direction, event.state)
        }
    
        fun onPlayerInitRequest(userSession: UserSession) {
            send(
                userSession, Message(
                    INIT,
                    GameInitPack(
                        (userSession.player as DefaultPlayer).getInitPack(),
                        roomProperties.loopRate,
                        map.alivePlayers()
                    )
                )
            )
        }
    
        override fun onDestroy(userSessions: List<UserSession>) {
            userSessions.forEach { userSession: UserSession ->
                map.removePlayer(
                    userSession.player as DefaultPlayer
                )
            }
            super.onDestroy(userSessions)
        }
    
        override fun onClose(userSession: UserSession) {
            send(userSession, Message(GAME_ROOM_CLOSE))
            super.onClose(userSession)
        }
    }

Обратите внимание на метод update:

//room's game loop
    override fun update() {
        if (!started.get()) return
        for (currentPlayer in map.getPlayers()) {
            if (currentPlayer.isAlive) currentPlayer.update()
            val updatePack = currentPlayer.getPrivateUpdatePack()
            val playerUpdatePackList = map.getPlayers()
                .map { it.getUpdatePack() }

            val session = currentPlayer.userSession
            send(
                session, Message(
                    UPDATE,
                    GameUpdatePack(
                        updatePack,
                        playerUpdatePackList
                    )
                )
            )
        }
    }

В нем, как стало ясно ранее, происходит цикличное обновление внутриигровых моделей. Разумеется, по смыслу, это должно происходить в объекте GameMap, отвечающем за игровое поле в целом, но для удобства восприятия контента статьи вынесено в игровую комнату. В этом цикле вы можете добавить все что требуется для игры, в том числе и NPC, выстрелы(как отдельные сущности), умения, эффекты и т. д. В данном случае обновляется только состояние игроков в классе DefaultPlayer:

override fun update() {
        velocity.x =
            if (isMoving && movingState[Direction.RIGHT] == true) ABS_PLAYER_SPEED else (if (isMoving && movingState[Direction.LEFT] == true) -ABS_PLAYER_SPEED else 0.0)
        velocity.y =
            if (isMoving && movingState[Direction.UP] == true) -ABS_PLAYER_SPEED else (if (isMoving && movingState[Direction.DOWN] == true) ABS_PLAYER_SPEED else 0.0)

        if (isMoving) position.sum(velocity)
}

Затем из обновленного состояния собираются PlayerUpdatePack, пересылаемые клиенту в цикле ниже в формате GameUpdatePack (свое состояние + коллекция состояний остальных игроков).

data class PlayerUpdatePack(
    val id: Long,
    val position: Vector
) : UpdatePack
data class PrivatePlayerUpdatePack(
    val id: Long
) : PrivateUpdatePack
data class GameUpdatePack(
    val player: PrivatePlayerUpdatePack,
    val players: Collection<PlayerUpdatePack>
) : UpdatePack

Matchmaker

Матчмейкером в проекте является RoomService:

interface RoomService{
    fun getRoomSessionIds(key: UUID?):Collection<String>
    fun getRoomIds():Collection<String>
    fun getRooms():Collection<DefaultGameRoom>
    fun getRoomByKey(key: UUID?): Optional<DefaultGameRoom>
    fun addPlayerToWait(userSession: UserSession, initialData: GameRoomJoinEvent)
    fun removePlayerFromWaitQueue(session: UserSession)  
    fun onBattleEnd(room: DefaultGameRoom)
    fun close(key: UUID?):Mono<Void>
}

@Service
open class RoomServiceImpl(
    private val playerFactory: PlayerFactory<GameRoomJoinEvent, DefaultPlayer, DefaultGameRoom>,
    private val applicationProperties: ApplicationProperties,
    private val schedulerService: Scheduler
) : RoomService {
    private lateinit var webSocketSessionService: WebSocketSessionService

    companion object {
        val log: Logger = LogManager.getLogger(this::class.java)
    }

    private val gameRoomMap: MutableMap<UUID, DefaultGameRoom> = mutableMapOf()
    private val sessionQueue: Queue<WaitingPlayerSession> = ArrayDeque()
    override fun getRoomSessionIds(key: UUID?): Collection<String> = getRoomByKey(key).map { room ->
        room.sessions().map { it.id }
    }.orElse(listOf())

    override fun getRoomIds(): Collection<String> = gameRoomMap.keys.map { it.toString() }
    override fun getRooms(): Collection<DefaultGameRoom> = gameRoomMap.values.toList()
    override fun getRoomByKey(key: UUID?): Optional<DefaultGameRoom> =
        if (key == null) Optional.empty() else Optional.ofNullable(gameRoomMap[key])

    override fun addPlayerToWait(userSession: UserSession, initialData: GameRoomJoinEvent) {
        sessionQueue.add(WaitingPlayerSession(userSession, initialData))
        webSocketSessionService.send(userSession, Message(GAME_ROOM_JOIN_WAIT))

        if (sessionQueue.size < applicationProperties.room.maxPlayers) return

        val gameMap = GameMap()
        val room = createRoom(gameMap)
        val userSessions: MutableList<UserSession> = ArrayList()
        while (userSessions.size.toLong() != applicationProperties.room.maxPlayers) {
            val waitingPlayerSession = sessionQueue.remove()
            val ps: UserSession = waitingPlayerSession.userSession
            val id: GameRoomJoinEvent = waitingPlayerSession.initialData
            val player: Player = playerFactory.create(gameMap.nextPlayerId(),  id, room, ps)
            ps.roomKey = room.key()
            ps.player = player
            userSessions.add(ps)
        }
        launchRoom(room, userSessions)
    }

    override fun removePlayerFromWaitQueue(session: UserSession) {
            sessionQueue.removeIf{waitingPlayerSession -> waitingPlayerSession.userSession == session }
    }

    private fun createRoom(gameMap: GameMap): DefaultGameRoom {
        val room = DefaultGameRoom(gameMap,
            UUID.randomUUID(), this, webSocketSessionService,
            schedulerService, applicationProperties.game,
            applicationProperties.room
        )
        gameRoomMap[room.key()] = room
        return room
    }

    private fun launchRoom(room: DefaultGameRoom, userSessions: List<UserSession>) {
        room.onRoomCreated(userSessions)
        room.onRoomStarted()
    }

    override fun onBattleEnd(room: DefaultGameRoom) {
        room.close()
        gameRoomMap.remove(room.key())
    }

    override fun close(key: UUID?): Mono<Void> {
        getRoomByKey(key).ifPresent {
            onBattleEnd(it)
        }
        return Mono.empty()
    }

    @Autowired
    fun setGameManager(@Lazy webSocketSessionService: WebSocketSessionService) {
        this.webSocketSessionService = webSocketSessionService
    }
}

Который отвечает за хранение, манипулирование экземплярами игровых комнат, позволяет добавить сессию игрока в очередь на ожидание, создать для группы игроков комнату, удалить игрока из очереди и обработать событие завершения матча.

Заключение

В результате запуска демки вы получите простенькое игровое поле с задатками на будущий мультиплеер в режиме Battle-royale.

Пару комментариев:

Все коллекции и контейнеры в демо не являются Thread-safe, однако в предыдущем проекте одним из читателей были блокировки в RoomService, влияющие на все комнаты. Используйте java.util.concurrent.* коллекции только там где это необходимо.

Авторизация спокойно реализуется путем добавления spring-starter-security в проект и настройкой oidc/oauth. Более детальные примеры настройки авторизации смотрите в generator-jhipster.

В данном примере используется JDK 21. Начиная с 19 версии появляется экспериментальные виртуальные потоки, что также сильно повышает производительность приложения на больших нагрузках. См. ApplicationConfiguration

Браузерный клиент приложения
Браузерный клиент приложения

Многим может показаться, что кода очень много для такого маленького приложения, но это всего лишь основа для возможно большего проекта. Большая часть подкапотных механизмов написана, вам остается обогатить свой сервер сугубо игровой логикой.

Будьте мощными специалистами, читайте, стремитесь к своим целям. Спасибо за внимание.

Комментарии (1)


  1. wiezmin
    16.03.2024 14:47
    +1

    Спасибо большое, очень интересно было почитать про реализацию именно игровых серверов. А то привыкаешь к своим крудам только, а там ещё целый мир есть:)