В этой статье мы рассмотрим, как использовать Spring Boot 2.x и Redis для выполнения асинхронных задач, а полный код продемонстрирует шаги, описанные в этом посте.

Spring/Spring Boot

Spring — самый популярный фреймворк для разработки Java приложений. Таким образом, Spring имеет одно из крупнейших сообществ с открытым исходным кодом. Кроме того, Spring предоставляет обширную и актуальную документацию, которая охватывает внутреннюю работу фреймворка и примеры проектов в своем блоге, а на StackOverflow более 100 тысяч вопросов и ответов. 

Вначале Spring поддерживал только конфигурацию на основе XML, и из-за этого был подвержен множеству критических замечаний. Позже Spring представила конфигурацию на основе аннотаций, которая изменила все. Spring 3.0 была первой версией, которая поддерживала конфигурацию на основе аннотаций. В 2014 году была выпущена Spring Boot 1.0, полностью изменившая наш взгляд на экосистему фреймворка Spring. Более подробное описание истории Spring можно найти здесь

Redis

Redis — одна из самых популярных NoSQL баз данных в памяти. Redis поддерживает разные типы структур данных. Redis поддерживает различные типы структур данных, например Set, Hash table, List, простую пару ключ-значение — это лишь некоторые из них. Задержка вызова Redis составляет менее миллисекунд, поддержка набора реплик и т. д. Задержка операции Redis составляет менее миллисекунд, что делает ее еще более привлекательной для сообщества разработчиков.

Почему асинхронное выполнение задачи

Типичный вызов API состоит из пяти этапов:

  1. Выполние одного или нескольких запросов к базе данных (RDBMS / NoSQL)

  2. Одна или несколько операций системы кэширования (In-Memory, Distributed и т. д.)

  3. Некоторые вычисления (это может быть обработка данных при выполнении некоторых математических операций)

  4. Вызов других служб (внутренних / внешних)

  5. Планирование выполнения одной или нескольких задач на более позднее время или немедленно, но в фоновом режиме 

Задачу можно запланировать на более позднее время по многим причинам. Например, счет-фактура должен быть создан через 7 дней после создания или отгрузки заказа. Точно так же уведомления по электронной почте не нужно отправлять немедленно, поэтому мы можем отложить их. 

Имея в виду эти реальные примеры, иногда нам нужно выполнять задачи асинхронно, чтобы сократить время ответа API. Например, если мы удалим более 1K записей за один раз, и если мы удалим все эти записи в одном вызове API, то время ответа API наверняка увеличится. Чтобы сократить время ответа API, мы можем запустить задачу в фоновом режиме, которая удалит эти записи. 

Отложенная очередь

Каждый раз, когда мы планируем запуск задачи в определенное время или через определенный интервал, мы используем задания cron, которые запланированы на определенное время или интервал. Мы можем запускать задачи по расписанию, используя различные инструменты, такие как crontab в стиле UNIX, Chronos, если мы используем фреймворки Spring, тогда речь идет об аннотации Scheduled ??. 

Большинство заданий cron просматривают записи о том, когда должно быть предпринято определенное действие, например, поиск всех поставок по истечении семи дней, по которым не были созданы счета. Большинство таких механизмов планирования страдают проблемами масштабирования, когда мы сканируем базы данных, чтобы найти соответствующие строки/записи. Во многих случаях это приводит к полному сканированию таблицы, которое работает очень медленно. Представьте себе случай, когда одна и та же база данных используется приложением реального времени и этой системой пакетной обработки. Поскольку она не является масштабируемый, нам понадобится какая-то масштабируемая система, которая может выполнять задачи в заданное время или интервал без каких-либо проблем с производительностью. Есть много способов масштабирования таким образом, например, запускать задачи в пакетном режиме или управлять задачами для определенного подмножества пользователей/регионов. Другой способ — запустить конкретную задачу в определенное время без зависимости от других задач, например безсерверной функции. Отложенная очередь может использоваться в тех случаях, как только таймер достигнет запланированного времени работа будет вызвана. Имеется множество систем/программного обеспечения для организации очередей, но очень немногие из них, например SQS, предоставляют функцию, которая обеспечивает задержку на 15 минут, а не произвольную задержку, такую ??как 7 часов или 7 дней и т. д.

Rqueue

Rqueue — это брокер сообщений, созданный для платформы Spring, который хранит данные в Redis и предоставляет механизм для выполнения задачи с любой указанной задержкой. Rqueue поддерживается Redis, поскольку Redis имеет некоторые преимущества перед широко используемыми системами очередей, такими как Kafka, SQS. В большинстве серверных приложений веб-приложений Redis используется для хранения данных кеша или для других целей. В настоящее время 8,4% веб-приложений используют базу данных Redis.

Как правило, для очереди мы используем либо Kafka/SQS, либо некоторые другие системы, эти системы приносят дополнительные накладные расходы в разных измерениях, например, финансовые затраты, которые можно уменьшить до нуля с помощью Rqueue и Redis.

Помимо затрат, если мы используем Kafka, нам необходимо выполнить настройку инфраструктуры, обслуживание, то есть больше операций, так как большинство приложений уже используют Redis, поэтому у нас не будет накладных расходов на операции, на самом деле можно использовать тот же сервер/кластер Redis с Rqueue. Rqueue поддерживает произвольную задержку

Доставка сообщений

Rqueue гарантирует доставку сообщения хотя бы раз, так как длинные данные не теряются в базе данных. Подробнее об этом читайте на странице Введение в Rqueue.

Инструменты, которые нам понадобятся:

  1. Любая IDE

  2. Gradle 

  3. Java

  4. Redis 

Мы собираемся использовать Spring Boot для простоты. Мы создадим проект Gradle с помощью инициализатора Spring Boot по адресу https://start.spring.io/.

Из зависимостей нам понадобятся:  

  1. Spring Data Redis

  2. Spring Web

  3. Lombok и некоторые другие

Структура каталогов/папок показана ниже:

Мы собираемся использовать библиотеку Rqueue для выполнения любых задач с произвольной задержкой. Rqueue — это основанный на Spring исполнитель асинхронных задач, который может выполнять задачи с любой задержкой, он построен на библиотеке обмена сообщениями Spring и поддерживается Redis.

Мы добавим зависимость spring boot starter для Rqueue com.github.sonus21:rqueue-spring-boot-starter:2.0.0-RELEASE с помощью кода:

dependencies {  
  implementation 'org.springframework.boot:spring-boot-starter-data-redis'
  implementation 'org.springframework.boot:spring-boot-starter-web'
  implementation 'com.github.sonus21:rqueue-spring-boot-starter:2.0.0-RELEASE'
  compileOnly 'org.projectlombok:lombok'   
  annotationProcessor 'org.projectlombok:lombok'
  providedRuntime 'org.springframework.boot:spring-boot-starter-tomcat'
  testImplementation('org.springframework.boot:spring-boot-starter-test') {
    exclude group: 'org.junit.vintage', module: 'junit-vintage-engine'  
  }
}

Нам нужно включить функции Redis Spring Boot. В целях тестирования мы также включим WEB MVC.

Обновите файл application как:

@SpringBootApplication
@EnableRedisRepositories
@EnableWebMvc
public class AsynchronousTaskExecutorApplication { 
  public static void main(String[] args) { 
    SpringApplication.run(AsynchronousTaskExecutorApplication.class, args);
  }
}

Добавлять задачи с помощью Rqueue очень просто. Нам нужно аннотировать метод с помощью RqueueListener. В RqueuListenerаннотации есть несколько полей, которые можно настроить в зависимости от варианта использования. Установите deadLetterQueueдля отправки задач в другую очередь. В противном случае задача будет отброшена в случае неудачи. Мы также можем установить, сколько раз задача должна быть повторена, используя поле. numRetries

Создайте файл Java с именем MessageListenerи добавьте несколько методов для выполнения задач:

@Component
@Slf4j
public class MessageListener {

  @RqueueListener(value = "${email.queue.name}") (1)
  public void sendEmail(Email email) {
    log.info("Email {}", email);
  }

  @RqueueListener(value = "${invoice.queue.name}") (2)
  public void generateInvoice(Invoice invoice) {
    log.info("Invoice {}", invoice);
  }
}

Нам понадобится классы Email и Invoiceдля хранения данных электронной почты и счетов-фактур соответственно. Для простоты у классов будет только небольшое количество полей.

Invoice.java:

import lombok.Data;

@Data
@AllArgsConstructor
@NoArgsConstructor
public class Invoice {
  private String id;
  private String type;
}

Email.java:

import lombok.Data;

@Data
@AllArgsConstructor
@NoArgsConstructor
public class Email {
  private String email;
  private String subject;
  private String content;
}

Отправка задач в очередь

Задачу можно отправить в очередь с помощью  RqueueMessageSender bean-компонента. У которого есть несколько методов для постановки задачи в очередь в зависимости от сценария использования, используйте один из доступных методов. Для простых задач используйте enqueue, для отложенных задач используйте enqueueIn.

Нам нужно автоматически подключить RqueueMessageSenderили использовать внедрение на основе конструктора для внедрения этих bean-компонентов.

Вот как создать контроллер для тестирования. 

Мы планируем создать счет-фактуру, который нужно будет выполнить через 30 секунд. Для этого мы отправим задачу с задержкой 30000 (миллисекунд) в очереди счетов. Кроме того, мы постараемся отправить электронное письмо, которое может выполняться в фоновом режиме. Для этого мы добавим два метода GET, sendEmail и generateInvoice, мы также можем использовать POST. 

@RestController
@RequiredArgsConstructor(onConstructor = @__(@Autowired))
@Slf4j
public class Controller {
  private @NonNull RqueueMessageSender rqueueMessageSender;

  @Value("${email.queue.name}")
  private String emailQueueName;

  @Value("${invoice.queue.name}")
  private String invoiceQueueName;

  @Value("${invoice.queue.delay}")
  private Long invoiceDelay;

  @GetMapping("email")
  public String sendEmail(
      @RequestParam String email, @RequestParam String subject, @RequestParam String content) {
    log.info("Sending email");
    rqueueMessageSender.enqueu(emailQueueName, new Email(email, subject, content));
    return "Please check your inbox!";
  }

  @GetMapping("invoice")
  public String generateInvoice(@RequestParam String id, @RequestParam String type) {
    log.info("Generate invoice");
    rqueueMessageSender.enqueueIn(invoiceQueueName, new Invoice(id, type), invoiceDelay);
    return "Invoice would be generated in " + invoiceDelay + " milliseconds";
  }
}

Добавим в файл application.properties следующие строки:

email.queue.name=email-queue
invoice.queue.name=invoice-queue
# 30 seconds delay for invoice
invoice.queue.delay=300000

Теперь мы можем запустить приложение. После успешного запуска приложения вы можете просмотреть результат по этой ссылке.

В журнале мы видим, что задача электронной почты выполняется в фоновом режиме:

Ниже приведено расписание выставления счетов через 30 секунд:

http://localhost:8080/invoice?id=INV-1234&type=PROFORMA

Заключение

Теперь мы можем планировать задачи с помощью Rqueue без большого объёма вспомогательного кода! Были приведены основные соображения по настройке и использованию библиотеки Rqueue. Следует иметь в виду одну важную вещь: независимо от того, является ли задача отложенной задачей или нет, по умолчанию предполагается, что задачи необходимо выполнить как можно скорее.

Полный код этого поста можно найти в репозитории на GitHub

Дополнительное чтение

Spring Boot: Creating Asynchronous Methods Using @Async Annotation

Spring and Threads: Async

Distributed Tasks Execution and Scheduling in Java, Powered by Redis