Когда поступает HTTP-запрос, «обычное» веб-приложение сопоставляет запрос с конкретным потоком из пула потоков. Этот назначенный поток остается с запросом до тех пор, пока ответ не будет возвращен в сокет запроса. Попутно нам может потребоваться получить данные из некоторой веб-службы или базы данных, прочитать или записать в файл или выполнить другие вызовы ввода-вывода, во время которых поток блокируется и должен ждать, пока не получит ответ. Для приложений с высокой частотой запросов пул потоков может в какой-то момент исчерпаться, и тогда новые запросы больше не будут обрабатываться.

Здесь нам может помочь реактивное программирование. Вместо того, чтобы иметь большой пул потоков и модель «поток на запрос», реактивное приложение имеет только один поток на каждое ядро ​​ЦП, которое продолжает работать, и, если оно попадает в операцию ввода-вывода, оно разгружает эту операцию и работает над чем-то еще до тех пор, пока IO завершено. Мы говорим, что такое приложение неблокирующее. 

Подход появился, когда группа компаний объединилась в инициативе Reactive Streams, чтобы определить ключевые принципы и четыре интерфейса JVM. После этого они практически каждый пошли своим путем, чтобы создать реактивную библиотеку на основе этих соглашений. Одна из этих библиотек, Project Reactor, является основой, на которой Spring построил свою реактивную веб-платформу Spring WebFlux. 

Этот реактивный стек позволяет нам создавать неблокирующие веб-приложения в структуре, которая выглядит знакомой с точки зрения классов, методов и аннотаций, если вы работали с Spring MVC, но фактическая реализация методов может быть с довольно сложной кривой обучения. 

Взгляните на контроллер ниже.

@RestController
@RequestMapping("/cats")
class CatController(
    @Autowired val catRepository: CatRepository
) {
    @PutMapping("{id}")
    fun updateProduct(@PathVariable(value = "id") id: Long, @RequestBody newCat: Cat): Mono<ResponseEntity<Cat?>> {
        return catRepository
            .findById(id)
            .flatMap { existingCat ->
                catRepository.save(
                    existingCat.copy(
                        name = newCat.name,
                        type = newCat.type,
                        age = newCat.age
                    )
                )
            }
            .map { updatedCat -> ResponseEntity.ok(updatedCat) }
            .defaultIfEmpty(ResponseEntity.notFound().build())
    }
}

Как видите, полюбившийся нам императивный стиль заменен декларативной парадигмой с множеством комбинаторов. Наш код внезапно состоит из функциональных цепочек вызовов, которые передают информацию от издателей подписчикам. Кроме того, теперь нам внезапно нужно иметь дело с абстракциями, такими как Mono и Flux, на всех уровнях нашего приложения, и, как таковая, наша кодовая база становится связанной с реактивной библиотекой, которую мы используем. В конце концов, кажется, что мы должны оставить все, что мы знаем, только для того, чтобы получить неблокирующие вкусности. Кажется, это высокая цена, можем ли мы поступить иначе?

Да мы можем! Kotlin предоставляет нам языковую функцию, называемую Coroutines (корутины), которые концептуально представляют собой легкие потоки, которые позволяют выполнять асинхронное выполнение кода, даже если код по-прежнему выглядит так, как мы привыкли к последовательному выполнению. Мы можем реагировать, не подвергая нашу кодовую базу полной декларативной переработке, потому что наши друзья из Spring проделали очень хорошую работу по интеграции Kotlin, и особенно корутин, в свою структуру. Интеграция позволяет нам использовать корутины на уровне общедоступного API, в то время как мы по-прежнему используем Reactor под капотом.

Заинтересовались? В этой статье мы разработаем простой веб-API с использованием Spring WebFlux и постараемся максимально использовать специальные расширения Kotlin в Spring WebFlux. Эти расширения включают coRouter (подсказка, 'co' для корутин) DSL возвращает в наш маршрутизатор значение типа Flow, и использует адаптированные методы WebClient с 'await'.
Вместо модели на основе аннотаций @RestController, @RequestMapping и @Service мы используем функциональную веб-платформу, представляющую новую модель программирования, в которой мы используем функции для маршрутизации (Router) и обработки запросов (Handler).

Мы следуем за входящим запросом и поэтому начинаем извне. Запрос начинается с функций маршрутизатора, которые являются функциональной альтернативой контроллера @RequestMapping. Затем запрос передается функциональному варианту сервиса, который мы называем обработчиком. Наконец, мы приходим к уже знакомому репозиторию. Этот последний слой имеет то же имя, что и в Spring MVC, но технология, лежащая в его основе, совсем другая, чтобы сделать все неблокирующим. Если вы хотите придерживаться своих контроллеров и сервисов, тогда это также полностью поддерживается WebFlux, и большинство статей действительно демонстрирует этот подход. Однако я хотел бы изучить подход, который является более идиоматическим для Kotlin. Все примеры кода можно найти в этом репозитории GitHub.

Наш пример

В качестве примера мы создаем API для нашей собственной CMS (Cat Management System), которая может выполнять операции создания, чтения, обновления и удаления (CRUD). Ниже вы приведен обзор маршрутов, которые мы определим, и возможные ответы, которые возвращает приложение.

HTTP метод

Route

HTTP Статус ответа

Описание

GET

/api/cats

200 Ok

Все cat возвращены

GET

/api/cats/{id}

200 Ok

Требуется cat с существующим идентификатором

GET

/api/cats/{id}

404 Not Found

Запрашен cat с несуществующим идентификатором

POST

/api/cats

201 Created

Новый cat создан успешно

POST

/api/cats

400 Bad Request

Невозможно создать новый cat

PUT

/api/cats/{id}

200 Ok

Существующий cat обновлен

PUT

/api/cats/{id}

400 Bad Request

Cat не может быть обновлен

PUT

/api/cats/{id}

404 Not Found

Запрос на обновление несуществующего cat

DELETE

/api/cats/{id}

204 No Content

Существующий cat теперь удален

DELETE

/api/cats/{id}

404 Not Found

Запрошено удаление несуществующего cat

Настройка проекта

Начнем с создания нового проекта Spring Boot с помощью Spring Initializr. Мы говорим, что это проект Gradle, использующий Kotlin и упакованный в виде jar-файла. Мы также добавляем необходимые зависимости: Spring Reactive Web, который включает WebFlux и Netty, Spring Data R2DBC для наших репозиториев и H2 для создания простой базы данных в памяти для тестирования нашего приложения.

Затем нам нужно добавить еще две тестовые зависимости для Mockk и SpringMockk:

dependencies {
    ...
    testImplementation("io.mockk:mockk:1.10.2")
    testImplementation("com.ninja-squad:springmockk:3.0.1")
    ...
}

Они не являются строго обязательными для запуска нашего приложения, но используются в тестах. Я рекомендую вам проверить Maven Central на наличие последних версий.

Router

Функции Router (маршрутизатор) принимают аргумент типа ServerRequest и возвращают ServerResponse. Это варианты WebFlux для Spring MVC RequestEntity и ResponseEntity, соответственно. Мы используем router DSL Kotlin для определения наших маршрутов:

@Configuration
class CatRouterConfiguration(
    private val catHandler: CatHandler
) {
    @Bean
    fun apiRouter() = coRouter {
        "/api/cats".nest {
            accept(APPLICATION_JSON).nest {
                GET("", catHandler::getAll)

                contentType(APPLICATION_JSON).nest {
                    POST("", catHandler::add)
                }

                "/{id}".nest {
                    GET("", catHandler::getById)
                    DELETE("", catHandler::delete)

                    contentType(APPLICATION_JSON).nest {
                        PUT("", catHandler::update)
                    }
                }
            }
        }
    }
}

Функция coRouter создает RouterFunction на основе дальнейших вложенных операторов. Вы можете использовать функцию расширения String.nest для группировки маршрутов с общим префиксом пути. Подобные группировки могут быть сделаны на основе заголовков accept и contentType, а также других предикатов. Фактические маршруты добавляются через функции, которые соответствуют HTTP методам: GETPOSTPUTDELETE и другие. Фактической обработкой занимается Handler.

Handler

Реализации HandlerFunction представляет функции, которые принимают запросы и генерируют ответы на них. Подобно методам сервиса, соответствующие функции обработчика сгруппированы в класс Handler с использованием специфичного для Kotlin DSL. Эти функции читают и анализируют переменные пути и тела запросов, обращаются к репозиториям и создают объект ServerResponse для передачи в router

suspend fun getById(req: ServerRequest): ServerResponse {
    val id = Integer.parseInt(req.pathVariable("id"))
    val existingCat = catRepository.findById(id.toLong())

    return existingCat?.let {
        ServerResponse
            .ok()
            .contentType(MediaType.APPLICATION_JSON)
            .bodyValueAndAwait(it)
    } ?: ServerResponse.notFound().buildAndAwait()
}

suspend fun add(req: ServerRequest): ServerResponse {
    val receivedCat = req.awaitBodyOrNull(CatDto::class)

    return receivedCat?.let {
        ServerResponse
            .ok()
            .contentType(MediaType.APPLICATION_JSON)
            .bodyValueAndAwait(
                catRepository
                    .save(it.toEntity())
                    .toDto()
            )
    } ?: ServerResponse.badRequest().buildAndAwait()
}

suspend fun update(req: ServerRequest): ServerResponse {
    val id = req.pathVariable("id")

    val receivedCat = req.awaitBodyOrNull(CatDto::class) 
        ?: return ServerResponse.badRequest().buildAndAwait()

    val existingCat = catRepository.findById(id.toLong()) 
        ?: return ServerResponse.notFound().buildAndAwait()

    return ServerResponse
        .ok()
        .contentType(MediaType.APPLICATION_JSON)
        .bodyValueAndAwait(
            catRepository.save(
                receivedCat.toEntity().copy(id = existingCat.id)
            ).toDto()
        )
}

suspend fun delete(req: ServerRequest): ServerResponse {
    val id = req.pathVariable("id")

    return if (catRepository.existsById(id.toLong())) {
        catRepository.deleteById(id.toLong())
        ServerResponse.noContent().buildAndAwait()
    } else {
        ServerResponse.notFound().buildAndAwait()
    }
}

Здесь мы также видим расширения Kotlin, которые Spring встроил в WebFlux. По соглашению, builder методы ServerResponse, основанные на Reactor, имеют префикс «await» или суффикс «AndAwait» для формирования методов приостановки, при использовании корутин. Чтобы увидеть, что внутри это тот же механизм, давайте рассмотрим метод bodyValueAndAwait:

suspend fun ServerResponse.BodyBuilder.bodyValueAndAwait(body: Any): ServerResponse =
        bodyValue(body).awaitSingle()

Как мы видим, метод Reactor bodyValue вызывается в цепочке awaitSingle из пакета kotlinx.coroutines, который ожидает единственное значение от Publisher и возвращает результирующее значение для создания корутины типа bodyValueAndAwait.

Repository

Последняя "остановка" перед базой данных - это Repository (репозиторий) на уровне инфраструктуры персистентности. Как и в случае с другими слоями, здесь нам нужно быть неблокирующими. Поэтому мы не можем использовать блокирующий JDBC и должны использовать реактивную альтернативу, называемую R2DBC

Spring Data Reactive, к счастью, предлагает интерфейсы для неблокирующих репозиториев, которые очень похожи на их блокирующие аналоги JpaRepository или CrudRepository.

Если мы решим реализовать тип R2DBC, называемый ReactiveCrudRepository, указанные выше методы будут возвращать типы данных Reactor Mono и Flux. К счастью для нас, как и в случае с другими слоями, WebFlux предоставляет расширения для Kotlin и корутины типа CoroutineCrudRepository, которая возвращают только сущности:

interface CatRepository : CoroutineCrudRepository<Cat, Long> {
    override fun findAll(): Flow<Cat>
    override suspend fun findById(id: Long): Cat?
    override suspend fun existsById(id: Long): Boolean
    override suspend fun <S : Cat> save(entity: S): Cat
    override suspend fun deleteById(id: Long)
}

Тесты

Как хорошо обученные практики программной инженерии, мы хотим тщательно протестировать наши приложения. Ниже приведены два набора тестов, один из которых имитирует, что репозиторий не зависит от реальной базы данных, а другой использует базу данных H2 в памяти. Оба они предоставляют простой тест для каждого HTTP статуса, которым может отвечать каждый маршрут.

Оба теста включают две вспомогательных функций aCat() и anotherCat(), которые создают новый Cat с некоторыми значениями по умолчанию и дают возможность передавать другие значения. Такой подход к созданию объектов для наших тестов скрывает все детали реализации объекта Cat, кроме тех, которые имеют отношение к тесту, и в этом случае вы должны определить пользовательское значение для этих соответствующих полей.

1. Тесты с mocking

Этот подход я чаще всего вижу в других статьях: мы имитируем CatRepository, чтобы протестировать Router и Handler без зависимости от базы данных. Мы используем комбинацию Spring WebFlux аннотаций @WebFluxTest вместе с @Import и добавляем ее в классы Router и Handler. Мы могли бы использовать @SpringBootTest и @AutoConfigureWebTestClient для достижения того же без необходимости вручную импортировать классы с помощью @Import. Тем не менее, в этом случае наши тесты проходят быстрее, и мне также нравится если конкретный тестируемый Router и Handler явно упоминаются вверху.

Мы имитируем репозиторий, используя Mockk, фреймворк для имитации, созданный специально для Kotlin. Он хорошо подходит для приложения, которое мы создали, поскольку Mockk имеет встроенную поддержку корутин. Подробное описание Mockk выходит за рамки этой статьи. Что вы должны знать, так это то, что мы можем использовать комбинацию coEvery и coAnswers для установки возвращаемых значений для методов CatRepository, которые мы хотим имитировать. Кроме того, Spring (пока) не предлагает поддержку имитации beans с Mockk, как будто он дает нам аннотацию @MockBean для Mockito. Поэтому мы используем SpringMockk и его аннотацию @MockkBean для достижения того же.

@WebFluxTest
@Import(CatRouterConfiguration::class, CatHandler::class)
class MockedRepositoryIntegrationTest(
    @Autowired private val client: WebTestClient
) {
    @MockkBean
    private lateinit var repository: CatRepository

    private fun aCat(
        name: String = "Obi",
        type: String = "Dutch Ringtail",
        age: Int = 3
    ) =
        Cat(
            name = name,
            type = type,
            age = age
        )

    private fun anotherCat(
        name: String = "Wan",
        type: String = "Japanese Bobtail",
        age: Int = 5
    ) =
        aCat(
            name = name,
            type = type,
            age = age
        )

    @Test
    fun `Retrieve all cats`() {
        every {
            repository.findAll()
        } returns flow {
            emit(aCat())
            emit(anotherCat())
        }

        client
            .get()
            .uri("/api/cats")
            .exchange()
            .expectStatus()
            .isOk
            .expectBodyList<CatDto>()
            .hasSize(2)
            .contains(aCat().toDto(), anotherCat().toDto())
    }

    @Test
    fun `Retrieve cat by existing id`() {
        coEvery {
            repository.findById(any())
        } coAnswers {
            aCat()
        }

        client
            .get()
            .uri("/api/cats/2")
            .exchange()
            .expectStatus()
            .isOk
            .expectBody<CatDto>()
            .isEqualTo(aCat().toDto())
    }

    @Test
    fun `Retrieve cat by non-existing id`() {
        coEvery {
            repository.findById(any())
        } returns null

        client
            .get()
            .uri("/api/cats/2")
            .exchange()
            .expectStatus()
            .isNotFound
    }

    @Test
    fun `Add a new cat`() {
        val savedCat = slot<Cat>()
        coEvery {
            repository.save(capture(savedCat))
        } coAnswers {
            savedCat.captured
        }

        client
            .post()
            .uri("/api/cats/")
            .accept(MediaType.APPLICATION_JSON)
            .contentType(MediaType.APPLICATION_JSON)
            .bodyValue(aCat().toDto())
            .exchange()
            .expectStatus()
            .isOk
            .expectBody<CatDto>()
            .isEqualTo(savedCat.captured.toDto())
    }

    @Test
    fun `Add a new cat with empty request body`() {
        val savedCat = slot<Cat>()
        coEvery {
            repository.save(capture(savedCat))
        } coAnswers {
            savedCat.captured
        }

        client
            .post()
            .uri("/api/cats/")
            .accept(MediaType.APPLICATION_JSON)
            .contentType(MediaType.APPLICATION_JSON)
            .body(fromValue("{}"))
            .exchange()
            .expectStatus()
            .isBadRequest
    }

    @Test
    fun `Update a cat`() {
        coEvery {
            repository.findById(any())
        } coAnswers {
            aCat()
        }

        val savedCat = slot<Cat>()
        coEvery {
            repository.save(capture(savedCat))
        } coAnswers {
            savedCat.captured
        }

        val updatedCat = aCat(name = "New fancy name").toDto()

        client
            .put()
            .uri("/api/cats/2")
            .accept(MediaType.APPLICATION_JSON)
            .contentType(MediaType.APPLICATION_JSON)
            .bodyValue(updatedCat)
            .exchange()
            .expectStatus()
            .isOk
            .expectBody<CatDto>()
            .isEqualTo(updatedCat)
    }

    @Test
    fun `Update cat with non-existing id`() {
        val requestedId = slot<Long>()
        coEvery {
            repository.findById(capture(requestedId))
        } coAnswers {
            nothing
        }

        val updatedCat = aCat(name = "New fancy name").toDto()

        client
            .put()
            .uri("/api/cats/2")
            .accept(MediaType.APPLICATION_JSON)
            .contentType(MediaType.APPLICATION_JSON)
            .bodyValue(updatedCat)
            .exchange()
            .expectStatus()
            .isNotFound
    }

    @Test
    fun `Update cat with empty request body id`() {
        coEvery {
            repository.findById(any())
        } coAnswers {
            aCat()
        }

        client
            .put()
            .uri("/api/cats/2")
            .accept(MediaType.APPLICATION_JSON)
            .contentType(MediaType.APPLICATION_JSON)
            .body(fromValue("{}"))
            .exchange()
            .expectStatus()
            .isBadRequest
    }

    @Test
    fun `Delete cat with existing id`() {
        coEvery {
            repository.existsById(any())
        } coAnswers {
            true
        }

        coEvery {
            repository.deleteById(any())
        } coAnswers {
            nothing
        }

        client
            .delete()
            .uri("/api/cats/2")
            .exchange()
            .expectStatus()
            .isNoContent

        coVerify { repository.deleteById(any()) }
    }

    @Test
    fun `Delete cat by non-existing id`() {
        coEvery {
            repository.existsById(any())
        } coAnswers {
            false
        }

        client
            .delete()
            .uri("/api/cats/2")
            .exchange()
            .expectStatus()
            .isNotFound

        coVerify(exactly = 0) { repository.deleteById(any()) }
    }
}

2. Тесты без mocking

Мы также можем выполнить интеграционный тест с реальной базой данных. Нет ничего лучше настоящего, правда? Существуют различные варианты для тестирования, такие как testcontainers, реальные базы данных и базы данных в памяти. Мы выберем третий вариант и воспользуемся @DirtiesContext для воссоздания контекста приложения, включая базу данных, после каждого теста.

Важное отличие от предыдущего набора тестов заключается в том, что мы используем @AutoConfigureWebTestClient вместо @WebFluxTest. Последний отключает полную автоконфигурацию и настраивает только подмножество, которое считает актуальным, но теперь нам также нужно настроить наши репозитории.

Кроме того, поскольку мы больше не можем имитировать ответы репозитория, нам нужен другой способ установить его состояние в начале теста. 

Я определил метод расширения CatRepository.seed для сохранения Cat в базе данных и @AfterEach метод очистки после теста. Оба используют runBlocking для создания coroutine, которая блокирует прерываемый текущий поток до его завершения. Если мы не заблокируем поток, JUnit продолжит собственно тест до того, как Cat будут фактически вставлены в базу данных или удалены из нее.

@SpringBootTest
@AutoConfigureWebTestClient
@DirtiesContext(classMode = DirtiesContext.ClassMode.AFTER_EACH_TEST_METHOD)
class InMemoryDatabaseIntegrationTest(
    @Autowired val client: WebTestClient,
    @Autowired val repository: CatRepository
) {
    private fun aCat(
        name: String = "Obi",
        type: String = "Dutch Ringtail",
        age: Int = 3
    ) =
        Cat(
            name = name,
            type = type,
            age = age
        )

    private fun anotherCat(
        name: String = "Wan",
        type: String = "Japanese Bobtail",
        age: Int = 5
    ) =
        aCat(
            name = name,
            type = type,
            age = age
        )

    private fun CatRepository.seed(vararg cats: Cat) =
        runBlocking {
            repository.saveAll(cats.toList()).toList()
        }

    @AfterEach
    fun afterEach() {
        runBlocking {
            repository.deleteAll()
        }
    }

    @Test
    fun `Retrieve all cats`() {
        repository.seed(aCat(), anotherCat())

        client
            .get()
            .uri("/api/cats")
            .exchange()
            .expectStatus()
            .isOk
            .expectBodyList<CatDto>()
            .hasSize(2)
            .contains(aCat().toDto(), anotherCat().toDto())
    }

    @Test
    fun `Retrieve cat by existing id`() {
        repository.seed(aCat(), anotherCat())

        client
            .get()
            .uri("/api/cats/2")
            .exchange()
            .expectStatus()
            .isOk
            .expectBody<CatDto>()
            .isEqualTo(anotherCat().toDto())
    }

    @Test
    fun `Retrieve cat by non-existing id`() {
        client
            .get()
            .uri("/api/cats/2")
            .exchange()
            .expectStatus()
            .isNotFound
    }

    @Test
    fun `Add a new cat`() {
        client
            .post()
            .uri("/api/cats")
            .accept(MediaType.APPLICATION_JSON)
            .contentType(MediaType.APPLICATION_JSON)
            .bodyValue(aCat().toDto())
            .exchange()
            .expectStatus()
            .isOk
            .expectBody<CatDto>()
            .isEqualTo(aCat().toDto())
    }

    @Test
    fun `Add a new cat with empty request body`() {
        client
            .post()
            .uri("/api/cats")
            .accept(MediaType.APPLICATION_JSON)
            .contentType(MediaType.APPLICATION_JSON)
            .body(fromValue("{}"))
            .exchange()
            .expectStatus()
            .isBadRequest
    }

    @Test
    fun `Update a cat`() {
        repository.seed(aCat(), anotherCat())

        val updatedCat = aCat(name = "New fancy name").toDto()
        client
            .put()
            .uri("/api/cats/2")
            .accept(MediaType.APPLICATION_JSON)
            .contentType(MediaType.APPLICATION_JSON)
            .bodyValue(updatedCat)
            .exchange()
            .expectStatus()
            .isOk
            .expectBody<CatDto>()
            .isEqualTo(updatedCat)
    }

    @Test
    fun `Update cat with non-existing id`() {
        val updatedCat = aCat(name = "New fancy name").toDto()

        client
            .put()
            .uri("/api/cats/2")
            .accept(MediaType.APPLICATION_JSON)
            .contentType(MediaType.APPLICATION_JSON)
            .bodyValue(updatedCat)
            .exchange()
            .expectStatus()
            .isNotFound
    }

    @Test
    fun `Update cat with empty request body id`() {
        client
            .put()
            .uri("/api/cats/2")
            .accept(MediaType.APPLICATION_JSON)
            .contentType(MediaType.APPLICATION_JSON)
            .body(fromValue("{}"))
            .exchange()
            .expectStatus()
            .isBadRequest
    }

    @Test
    fun `Delete cat with existing id`() {
        repository.seed(aCat(), anotherCat())

        client
            .delete()
            .uri("/api/cats/2")
            .exchange()
            .expectStatus()
            .isNoContent
    }

    @Test
    fun `Delete cat by non-existing id`() {
        client
            .delete()
            .uri("/api/cats/2")
            .exchange()
            .expectStatus()
            .isNotFound
    }
}

Вывод

В этой статье мы рассмотрели, как создать неблокирующее веб-приложение с помощью Spring WebFlux, используя расширения для Kotlin.

В результате создается кодовая баз, не замусоренная Mono и Flux (хотя мы взамен получаем несколько suspend и Flow).

Описанный выше подход позволяет нам продолжать писать наш код в императивном стиле, как мы привыкли.

Мы также рассмотрели два способа тестирования нашего приложения: один с использованием mocking, а второй - без него.

Комментарии (1)


  1. Sigest
    14.12.2021 12:41

    Спасибо за перевод. Давно искал туториал как WebFlux подружить с корутинами. Пусть он и не подробный, но для старта мне пока хватит. Правда подход с написанием рутера для контролера мне не нравится. В самом спринге это сделано через аннотации, а здесь только строк писать надо, и я так понимаю на все контролеры надо не забыть написать свой рутер