В первой части мы узнали, что такое реактивность и как с ней работать на базовом уровне. Если вы хотите продолжить изучение реактивного программирования с новым фреймворком от Spring, то добро пожаловать!

Сейчас мы разберем, что делать с полученными знаниями из прошлой статьи, а также построим небольшой веб-сервис, который будет работать в асинхронном режиме.

Для этого нам понадобятся:

  • База данных MongoDB. Дело в том, что далеко не все СУБД поддерживают асинхронную работу. MongoDB в этом смысле приятно отличается и полностью поддерживает нужный нам функционал.
  • Драйвер для асинхронной работы с базой данных. Обычный драйвер так не умеет.
  • Spring Boot 2. На данный момент Spring Boot находится в стадии разработки, но должен выйти 18 декабря. Он нам сильно облегчит работу и сэкономит много времени.
  • Gradle.Будет использован в качестве системы сборки.
  • Java 8. Да, это не последняя версия на данный момент, но для наших целей вполне подойдет.
  • Lombok. Поможет сократить код.

Все настройки и финальный проект можно посмотреть на github.

Приступим

Если вы еще не подключили все зависимости, то можете сделать это прямо сейчас. В gradle выглядеть они будут так:

	compile('org.springframework.boot:spring-boot-starter-data-mongodb-reactive')
	compile('org.springframework.boot:spring-boot-starter-webflux')
	compileOnly('org.projectlombok:lombok')
	testCompile('org.springframework.boot:spring-boot-starter-test')
	testCompile(‘io.projectreactor:reactor-test')


Итак, у нас все готово. Для начала создадим пользователя нашего приложения:
@Data
@AllArgsConstructor
@NoArgsConstructor
@Document
public class User {

    @Id
    private String id;
    private String firstName;
    private String lastName;
}

Напоминаю, что для успешной компиляции и работы в IntelliJ Idea необходимо выставить галочку в пункте enable annotation processing либо написать самостоятельно все сеттеры, гетеры и конструкторы.

Далее создадим репозиторий с нашими пользователями:

import org.faoxis.habrreactivemongo.domain.User;
import org.springframework.data.mongodb.repository.ReactiveMongoRepository;

public interface UserRepository extends ReactiveMongoRepository<User, String> {
}

Здесь следует заметить, что мы наследуемся от особого интерфейса для работы в реактивном режиме. Если заглянуть в интерфейс ReactiveMongoRepository, то можно увидеть, что нам возвращаются объекты, обернутые в уже знакомые нам классы Mono и Flux. Это значит, что при каком-либо обращении в БД, мы не получаем сразу же результат. Вместо этого мы получаем поток данных, из которого можно получить данные по мере готовности.

На данный момент многослойная архитектура является наиболее распространенным решением при работе с микросервисной архитектурой. Это действительно очень удобно. Давайте создадим сервисный слой. Для этого сделаем соответствующий интерфейс:

public interface UserService {

    Flux<User> get();
    Mono<User> save(User user);
}

Наш сервис достаточно прост, поэтому сразу создаем несколько полезных методов. И тут же реализуем их:

@Service
@AllArgsConstructor
public class UserServiceImpl implements UserService {

    private UserRepository userRepository;

    @Override
    public Flux<User> get() {
        return userRepository.findAll();
    }

    @Override
    public Mono<User> save(User user) {
        return userRepository.save(user);
    }
}

Здесь стоит заметить, что внедрение зависимости UserRepository происходит через конструктор с помощью аннотации AllArgsConstructor. На всякий случай напомню, что с некоторой версии Spring 4 можно осуществлять автоматическое внедрение зависимостей через конструктор без аннотации Autowire.

И, наконец, сделаем контроллер:

import lombok.AllArgsConstructor;
import org.faoxis.habrreactivemongo.domain.User;
import org.faoxis.habrreactivemongo.service.UserService;
import org.springframework.web.bind.annotation.*;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;

@RestController
@RequestMapping("/users")
@AllArgsConstructor
public class UserController {

    private UserService userService;

    @PostMapping
    public Mono<User> post(@RequestBody User user) {
        return userService.save(user);
    }

    @GetMapping
    public Flux<User> get() {
        return userService.get();
    }
}

Запустим наше приложение. Все должно работать. Теперь сделаем POST запрос на localhost:8080/users со следующим содержимым:

{
	"firstName": "Peter",
	"lastName": "Griffin"
}

В ответ мы получим такой же объект, но с присвоенным ему id:
{
    "id": "5a0bf0fdc48fd53478638c9e",
    "firstName": "Peter",
    "lastName": "Griffin"
}

Отлично! Давайте сохраним еще пару пользователей и попробуем посмотреть, что у нас уже есть в БД. У меня такой результат GET запроса на localhost:8080/users:

[
    {
        "id": "5a0bf0fdc48fd53478638c9e",
        "firstName": "Peter",
        "lastName": "Griffin"
    },
    {
        "id": "5a0bf192c48fd53478638c9f",
        "firstName": "Lois",
        "lastName": "Griffin"
    },
    {
        "id": "5a0bf19ac48fd53478638ca0",
        "firstName": "Mag",
        "lastName": "Griffin"
    }
]

Отлично! У нас есть целый сервис, который работает в асинхронном режиме! Но есть еще кое-что, о чем следует помнить. Работа с асинхронным репозиториями может очень сильно поменять вид сервиса, который получает данные из него.

Чтобы продемонстрировать это, создадим еще один обработчик URL метода в нашем контроллере:

    @GetMapping("{lastName}")
    public Flux<User> getByLastName(@PathVariable(name = "lastName") String lastName) {
        return userService.getByLastName(lastName);
    }

Здесь все просто. Мы получаем фамилию пользователя и выводим всех людей с такой фамилией.

Конечно, такую логику можно возложить на базу данных, но здесь я хотел бы обратить внимание, как это будет выглядит в сервисе:

    @Override
    public Flux<User> getByLastName(final String lastName) {
        return userRepository
                .findAll()
                .filter(user -> user.getLastName().equals(lastName));
    }

Обратите внимание, что есть разница в работе с данными и потоками данных. Обычно мы осуществляем какие-то действия над данными непосредственно. Здесь ситуация другая. Мы говорим, что следует сделать в потоке данных.

Попробуем сделать GET запрос по URL localhost:8080/users/Griffin. У меня такой результат:

[
    {
        "id": "5a0bf0fdc48fd53478638c9e",
        "firstName": "Peter",
        "lastName": "Griffin"
    },
    {
        "id": "5a0bf192c48fd53478638c9f",
        "firstName": "Lois",
        "lastName": "Griffin"
    },
    {
        "id": "5a0bf19ac48fd53478638ca0",
        "firstName": "Mag",
        "lastName": "Griffin"
    }
]

В этой статье мы рассмотрели, как построить асинхронный сервис с новым фреймворком WebFlux и сервером Netty (он идет из коробки по умолчанию). Также мы убедились, как легко этого достичь со Spring Boot 2. Если у вас на проекте микросервисная архитектура, то скорее всего вы легко сможете при желании перевести свои приложения на WebFlux с выходом Spring Boot 2(если, конечно, в этом есть потребность).

P.S. Есть несколько идей по продолжению. Если вам интересен материал, и вы не против моей подачи, то в следующих частях мы можем рассмотреть, например, асинхронное взаимодействие между приложениями. Спасибо за внимание!

Комментарии (10)


  1. kilonet
    16.11.2017 12:10

    тема интересная, но из приведенных примеров не видно отличие реактивного программирования от обычного (кроме оборачивания всех данных в Flux и Mono)


    1. faoxy Автор
      16.11.2017 12:44

      Согласен, но ведь за одну статью целую реактивную систему не построишь. Именно поэтому я и решил оставить такой P.S. Здесь я постарался сосредоточится на том как выглядит работа в таком режиме. Например, обратите внимание на действия в сервисном слое. Они отличаются от привычного декларативного стиля.


  1. Apx
    16.11.2017 12:24

    Есть небольшой вопрос касательно getByLastName метода. Для 'Hello World' выглядит неплохо. Но такое же ведь нельзя использовать на очень больших объемах данных? На каждый чих (запрос) мы будем вгружать всех юзеров и только потом фильтровать как я понимаю, а это при миллионе записей думаю будет неплохой удар ниже пояса.
    И еще вопрос как работает такой reactive repository с DB кешем или пока всё напрямую тащим из базы?


    1. faoxy Автор
      16.11.2017 12:40

      Логику выборки данных из базы мы реализуем в сервисе только в качестве показательного примера. В продакшене, конечно, это надо делать через запрос на стороне базы. Здесь хотел показать как выглядит работа с бизнес логикой в слое сервиса, вот и все. Именно поэтому я написал: «Конечно, такую логику можно возложить на базу данных, но здесь я хотел бы обратить внимание как это будет выглядит в сервисе».

      И второй вопрос. Кеш БД, на сколько я понимаю, остается и реализуется на уровне взаимодействия драйвера(обратите внимание, что здесь используется специальный драйвер) и базы.


      1. Apx
        16.11.2017 13:22

        Ну вот было интересно проводилось ли вскрытие этого специального драйвера с целью понять кеш есть или нет?)


      1. Apx
        17.11.2017 12:16

        Сегодня еще вспомнил что хотел спросить, это возвращаясь к теме специального драйвера и репозитория.
        Сможет ли webflux спокойно работать с методами декорированными @Async? Поддержка CompletableFuture и Stream в том-же spring-data есть. Mono класс я вообще как посмотрел — вылитый аналог CompletableFuture.
        Получается что в теории можно намешать синхронку «под асинк соусом» и webflux. Ведь всё равно для некоторых баз нет async драйвера и иди знай когда появится. (И да я понимаю что в таком случае база будет боттлнеком, с её локами и прочим)


  1. m1ld
    16.11.2017 13:12

    Очередной spring-guide, из которого люди будут копипастить repository.findAll() без Pageable.


  1. Hixon10
    16.11.2017 21:25

    Двоякая вещь, на самом деле. Пока базы данных не умеют работать в неблокирующей манере, и всё сводится к отдельному пулу потоков, в которых мы выполняем блокирующие операции (пускай это и скрыто от глаз разработчика в драйвер БД), реактивное программирование будет применимо далеко не везде.


  1. gkislin
    17.11.2017 00:11

    Спасибо:) Жду продолжения. Небольшое замечание- в сервисе и контроллере мне кажется вместо get() более адекватно getAll(). Ну и картинки мне не очень..


  1. vlanko
    17.11.2017 20:13

    А .findAll()
    .filter
    не тянет все из БД?
    Вижу, что тянет. А что реактивного дает драйвер Монго?