Цель данного мини-туториала

Ниже будет краткий обзор настройки приложения, написанного на Kotlin + Spring Boot, которое развернуто в кластере в нескольких экземплярах и использует библиотеку Quartz для выполнения запланированных по cron заданий только на одном из инстансов этого сервиса.

Пример: в OpenShift крутится несколько подов микросервиса, один из которых раз в сутки должен осуществлять формирование отчета. Если под упал во время работы по каким-либо причинам, данную задачу должен перехватить и выполнить другой под. Если формирование отчета было неуспешным, нужно попытаться запустить джобу создания отчета еще несколько раз в течениепары ближайших часов. После N неудачных попыток, необходимо восстановить изначальный cron для данного задания. Конфигурация всех подов микросервиса должна быть одинаковой.

Краткое введение

Перед прочтением данной статьи, очень советую ознакомиться с этим замечательным обширным обзором библиотеки Quartz

Альтернативы

Возможно, вам не потребуется весь функционал, который предоставляет Quartz.
В таком случае, если вы используете в проекте Spring, советую посмотреть на библиотеку ShedLock. Ссылка на репозиторий в GitHub.
Если вкратце ShedLock - это простая библиотека, которая гарантирует, что любое задание будет выполнено не более одного раза.
Реализация построена на локах, хранящихся в базе - буквально пару таблиц. Очень удобно, что всю настройку можно произвести только с помощью аннотаций в стиле Spring аннотации @Scheduled.
Однако главный минус данной библиотеки -
ShedLock не отслеживает жизненный цикл задания (нет возможности проконтролировать, что задание было выполнено, перенести задание, если необходимо).

Если вы рассматриваете альтернативные механизмы синхронизации нескольких инстансов вашего приложения, советую почитать эту ветку на StackOverflow
(здесь есть мой пост с содержанием данной статьи)

Реализация

Описание задачи

Ниже приведен пример настройки приложения на Spring Boot, которое одновременно крутится на нескольких серверах и смотрит на одну базу данных. В каждом инстансе приложения есть бин - задание, которое выполняется по cron. Это здание должно быть выполнено только единожды (на каком-то одном из инстансов).
В случае падения пода, который выполнял джобу, задание должно быть перезапущено на любом другом рабочем поде. Если же под не упал во время выполнения задания, но
задание не было выполнено (получили exception во время исполнения), задание необходимо перезапустить еще 2 раза с задержкой в 5 часов * количество попыток.
Если 2-я попытка перезапуска была неуспешной, то необходимо установить дефолтный cron
для нашего задания:
0 0 4 L-1 * ? * - выполнение в 4 утра предпоследнего дня каждого месяца.

Теперь мы определились, что точно будем использовать Quartz и будем использовать его в режиме кластера

Подключаем зависимость:

Gradle
implementation("org.springframework.boot:spring-boot-starter-quartz")

Maven
<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-quartz</artifactId>
</dependency>

Наполняем базу:

Перед тем как приступить к написанию конфигураций, необходимо наполнить нашу базу таблицами, с которыми будет работать Quartz. Я использовал liquibase и официальные скрипты - вот отсюда.
Под задачи Quartz лучше завести отдельную схему в базе.

Задание, которое будет выполняться по cron:

Имитируем поведение, при котором джоба или один из сервисов, которые использует данное задание могут вернуть ошибку в 50% случаев.

@Component
@Profile("quartz")
class SomeJob(
    private val someService: SomeService
) : QuartzJobBean() {
    private val log: Logger = LoggerFactory.getLogger(SomeJob::class.java)
    
    override fun executeInternal(jobExecutionContext: JobExecutionContext) {
        try {
            log.info("Doing awesome work...")
            someService.work()
            if ((1..10).random() > 5) throw RuntimeException("Something went wrong...")
        } catch (e: Exception) {
            throw JobExecutionException(e)
        }
    }
}

Настройка конфигурации

(больше информации здесь):

@Configuration
@Profile("quartz")
class JobConfig {
    //JobDetail дла задания выше
    @Bean
    fun someJobDetail(): JobDetail {
        return JobBuilder
            .newJob(SomeJob::class.java).withIdentity("SomeJob")
            .withDescription("Some job")
            // Устанавливаем данное значение в true, если хотим, чтобы джоба была перезапущена
            // в случае падения пода
            .requestRecovery(true)
            // не удаляем задание из базы даже в случае, если ни один из триггеров на задание не укаывает
            .storeDurably().build()
    }

    //Trigger
    @Bean
    fun someJobTrigger(someJobDetail: JobDetail): Trigger {
        return TriggerBuilder.newTrigger().forJob(someJobDetail)
            .withIdentity("SomeJobTrigger")
            .withSchedule(CronScheduleBuilder.cronSchedule("0 0 4 L-1 * ? *"))
            .build()

    }

    // Необходимо также при старте пересоздавать уже имеющиеся задания 
    // (нужно на случай, если вы заходите изменить cron выражение для какого-либо из ваших заданий,
    // которые уже были созданы ранее, в противном случае в базе сохранится старое cron выражение)
    @Bean
    fun scheduler(triggers: List<Trigger>, jobDetails: List<JobDetail>, factory: SchedulerFactoryBean): Scheduler {
        factory.setWaitForJobsToCompleteOnShutdown(true)
        val scheduler = factory.scheduler
        factory.setOverwriteExistingJobs(true)
        //https://stackoverflow.com/questions/39673572/spring-quartz-scheduler-race-condition
        factory.setTransactionManager(JdbcTransactionManager())
        rescheduleTriggers(triggers, scheduler)
        scheduler.start()
        return scheduler
    }

    private fun rescheduleTriggers(triggers: List<Trigger>, scheduler: Scheduler) {
        triggers.forEach {
            if (!scheduler.checkExists(it.key)) {
                scheduler.scheduleJob(it)
            } else {
                scheduler.rescheduleJob(it.key, it)
            }
        }
    }
}
    

Создание слушателя, который будет следить за выполнением нашего задания:

Чтобы listener заработал, необходимо его зарегистрировать в шедулере.

@Component
@Profile("quartz")
class JobListenerConfig(
    private val schedulerFactory: SchedulerFactoryBean,
    private val jobListener: JobListener
) {
    @PostConstruct
    fun addListener() {
        schedulerFactory.scheduler.listenerManager.addJobListener(jobListener, KeyMatcher.keyEquals(jobKey("SomeJob")))
    }
}

Основная логика обработки жизненного цикла задания:

За статусом выполнения задания следим с помощью слушателя, которого мы зарегистрировали в шедулере ранее. У листенера есть 2 метода:
jobToBeExecuted(context: JobExecutionContext)
и
jobWasExecuted(context: JobExecutionContext, jobException: JobExecutionException?),
которые выполняются до старта и после исполнения задания (вне зависимости от того, было ли выполнено задание успешно или нет)

Вся логика представлена ниже. Добавлю лишь пару комментариев:

  • Шудулер понимает, сколько раз триггер был перезапущен с помощью информации в jobDataMap. Причем эти данные хранятся в бд, поэтому в случае рестарта инстанса, предыдущее значение количества неуспешных запусков триггера будет вычитано.

  • В случае падения приложения во время выполнения задания, в базе может остаться невыполненный триггер, который в свою очередь во время рестарта приложения может быть преобразован в recovery триггер (его название будет начинаться с recovery_ и триггер будет иметь группу RECOVERING_JOBS)

@Profile("quartz")
class JobListener(
    //можно вытащить из контекста выполнения, либо заинжектить напрямую из application контекста
    private val scheduler: Scheduler,
    private val triggers: List<Trigger>
): JobListenerSupport() {

    private lateinit var triggerCronMap: Map<String, String>

    @PostConstruct
    fun post(){
        //В лист триггеров будут помещены только самописные задания, recover триггеры (если 
        //они существуют на момент старта приложения в этот лист внедрены не будут)
        triggerCronMap = triggers.associate {
            it.key.name to (it as CronTrigger).cronExpression
        }
    }

    override fun getName(): String {
        return "myJobListener"
    }


    override fun jobToBeExecuted(context: JobExecutionContext) {
        log.info("Job: ${context.jobDetail.key.name} ready to start by trigger: ${context.trigger.key.name}")
    }


    override fun jobWasExecuted(context: JobExecutionContext, jobException: JobExecutionException?) {
        //либо можно использовать context.mergedJobDataMap
        val dataMap = context.trigger.jobDataMap
        val count = if (dataMap["count"] != null) dataMap.getIntValue("count") else {
            dataMap.putAsString("count", 1)
            1
        }
        //В этот блок if можно добавить следующее условие: && !context.trigger.key.name.startsWith("recover_")
        // в этом случае шедулер не будет будет перезапускать recover триггеры, которые могут образоваться
        // в случае падения приложения во время выполнения задания.
        if (jobException != null ){
            if (count < 3) {
                log.warn("Job: ${context.jobDetail.key.name} filed while execution. Restart attempts count: $count ")
                val oldTrigger = context.trigger
                var newTriggerName = context.trigger.key.name + "_retry"
                //на случай, если триггер с таким именем уже существует (остался в бд после падения инстанса)
                context.scheduler.getTriggersOfJob(context.jobDetail.key)
                    .map { it.key.name }
                    .takeIf { it.contains(newTriggerName) }
                    ?.apply { newTriggerName += "_retry" }
                val newTrigger = TriggerBuilder.newTrigger()
                    .forJob(context.jobDetail)
                    .withIdentity(newTriggerName, context.trigger.key.group)
                    //заменяем наш cron триггер simple триггером, который будет запущен 
                    // через 5 часов * количество попыток перезапуска задания
                    .startAt(Date.from(Instant.now().plus((5 * count).toLong(), ChronoUnit.HOURS)))
                    .usingJobData("count", count + 1 )
                    .build()
                val date = scheduler.rescheduleJob(oldTrigger.key, newTrigger)
                log.warn("Rescheduling trigger: ${oldTrigger.key} to trigger: ${newTrigger.key}")
            } else {
                log.warn("The maximum number of restarts has been reached. Restart attempts: $count")
                recheduleWithDefaultTrigger(context)
            }
        } else if (count > 1) {
            recheduleWithDefaultTrigger(context)
        }
        else {
            log.info("Job: ${context.jobDetail.key.name} completed successfully")
        }
        context.scheduler.getTriggersOfJob(context.trigger.jobKey).forEach {
            log.info("Trigger with key: ${it.key} for job: ${context.trigger.jobKey.name} will start at ${it.nextFireTime ?: it.startTime}")
        }
    }

    private fun recheduleWithDefaultTrigger(context: JobExecutionContext) {
        val clone = context.jobDetail.clone() as JobDetail
        val defaultTriggerName = context.trigger.key.name.split("_")[0]
        //Recovery триггеры не должны быть перешедулены
        if (!triggerCronMap.contains(defaultTriggerName)) {
            log.warn("This trigger: ${context.trigger.key.name} for job: ${context.trigger.jobKey.name} is not self-written trigger. It can be recovery trigger or whatever. This trigger must not be recheduled.")
            return
        }
        log.warn("Remove all triggers for job: ${context.trigger.jobKey.name} and schedule default trigger for it: $defaultTriggerName")
        scheduler.deleteJob(clone.key)
        scheduler.addJob(clone, true)
        scheduler.scheduleJob(
            TriggerBuilder.newTrigger()
                .forJob(clone)
                .withIdentity(defaultTriggerName)
                .withSchedule(CronScheduleBuilder.cronSchedule(triggerCronMap[defaultTriggerName]))
                .usingJobData("count", 1)
                .startAt(Date.from(Instant.now().plusSeconds(5)))
                .build()
        )
    }
}

Стоит обратить внимание на пару методов:
jobException.setRefireImmediately(true), который можно использовать совместно с context.refireCount, если у вас нет необходимости в переносе задания после получения ошибки во время выполнения. Задание будет перезапущено немедленно.
В одном из ответов на StackOverflow рекомендовали использовать в джобе
Thread.sleep(N-seconds) вместо переназначения задания в случае падения - это явно не лучшая идея ☺

Файл application-quartz.yaml

И последнее вещь, которую осталось сделать - написать конфигурационный yaml файл для профиля quartz, который мы будем использовать. Комментарии в файле оставлю без перевода:

spring:
  quartz:
    job-store-type: jdbc #Database Mode
    jdbc:
      initialize-schema: never #Do not initialize table structure
    properties:
      org:
        quartz:
          scheduler:
            instanceId: AUTO #Default hostname and timestamp generate instance ID, which can be any string, but must be the only corresponding qrtz_scheduler_state INSTANCE_NAME field for all dispatchers
            #instanceName: clusteredScheduler #quartzScheduler
          jobStore:
#            a few problems with the two properties below: https://github.com/spring-projects/spring-boot/issues/28758#issuecomment-974628989 & https://github.com/quartz-scheduler/quartz/issues/284
#            class: org.springframework.scheduling.quartz.LocalDataSourceJobStore #Persistence Configuration
            driverDelegateClass: org.quartz.impl.jdbcjobstore.PostgreSQLDelegate #We only make database-specific proxies for databases
#            useProperties: true #Indicates that JDBC JobStore stores all values in JobDataMaps as strings, so more complex objects can be stored as name-value pairs rather than serialized in BLOB columns.In the long run, this is safer because you avoid serializing non-String classes to BLOB class versions.
            tablePrefix: quartz_schema.QRTZ_  #Database Table Prefix
            misfireThreshold: 60000 #The number of milliseconds the dispatcher will "tolerate" a Trigger to pass its next startup time before being considered a "fire".The default value (if you do not enter this property in the configuration) is 60000 (60 seconds).
            clusterCheckinInterval: 5000 #Set the frequency (in milliseconds) of this instance'checkin'* with other instances of the cluster.Affects the speed of detecting failed instances.
            isClustered: true #Turn on Clustering
          threadPool: #Connection Pool
            class: org.quartz.simpl.SimpleThreadPool
            threadCount: 3
            threadPriority: 1
            threadsInheritContextClassLoaderOfInitializingThread: true

Локальная отладка проводилась следующим образом: написал пару docker compose файлов, в которых поднял базу и несколько инстансов приложения, которые "натравил" на поднятую базу. Если интересно, могу отдельно это описать.

Дополнительная информация:

Вот еще несколько интересных статей по теме, которые советую к прочтению:
About quartz
Spring boot using quartz in cluster mode
Интересная статья от коллег из OTUS
Cluster effectively quartz

P.S. Буду рад конструктивной критике предложенного выше решения и с удовольствием изучу альтернативы.

Благодарю за уделенное время!

Комментарии (10)


  1. sved
    16.04.2023 01:23
    -1

    Из своего многолетнего опыта в джаве могу заказать, что quartz - это одна из худших библиотек, что я видел. Создавать этот мусор в БД, кучу конфигураций и весь этот вышеупомянутый ад только для того чтобы сделать то, что делает Spring Scheduler из коробки - это за гранью добра и зла. До сих пор помню, сколько я мучался что бы адаптировать это под mongo db.

    Просто не используйте это поделие.


    1. gearbase Автор
      16.04.2023 01:23
      +4

      К сожалению, Spring Scheduler из коробки не поддерживает кластерность, т.е. все написанные в вашем приложении шедулеры будут запущены на каждом инстансе вашего приложения + с помощью коробочного шедулера нет возможности следить за жизненным циклом задания.

      Тот же SchedLock, упомянутый в статье, настройка которого производится в стиле коробочного шедулера с помощью аннотаций, позволяет решить только одну проблему - запуск запланированного задания только на одном инстансе, но без возможности слежения за жизненным циклом таски: если инстанс грохнется во время выполнения задания, другие экземпляры приложения не перехватят это задание. Quartz позволяет решить как первую, так и вторую проблемы, описанные выше.

      К сожалению, quartz работает только с реляционными бд, поэтому представляю каких трудов это стоило, чтобы прикрутить его к mongo. Получилось, кстати?

      Вам в любом случае нужен инструмент синхронизации инстансов между собой - это либо бд, либо брокер (Кафка, например), либо какой-нибудь редис. И это будет уже кастом решение, которое потребует кмк не меньших трудозатрат, скорее всего даже больших. Других интересных библиотек, позволяющих решить данные задачи, я, к сожалению, не нашел. Может подскажете, какие решения применяли вы в кластере для работы с механизмом планировки заданий?


      1. Ksnz
        16.04.2023 01:23

        Можно например завести под задания по расписанию отдельные cron-cli микросервисы и менеджить кластерность, расписание и жизненный цикл через k8s

        Можно даже оставить формирование отчета вашему многокластерному монолиту с точкой входа в виде эндпоинта, а по крону будет запускаться микросервис который вызовет соответствующий эндпоинт.


  1. sved
    16.04.2023 01:23
    +1

    Скедулер и не должен поддерживать синглетон на кластере. Для этого существуют другие инструменты, например zookeeper или что-то типа Apache Ratis.

    Впрочем, если есть общая база, то несложно тупо сделать запрос update Locker set instance = <my instance id> where instance is NULL. Для этого нужна одна таблица (c одной записью), а не 100.

    если инстанс грохнется во время выполнения задания, другие экземпляры приложения не перехватят это задание

    Задачи повтора и эксклюзивного выполнения и запуска по расписанию - это разные вещи.

    Разве запуск отчёта вручную, с точки зрения вашей логики по повторам и пр, чем-то отличается от запуска по расписания?

    Если вы, например, используете какой-нибудь движок для батч-процессинга, то этим должен он заниматься, а вовсе не планировщик

    К сожалению, quartz работает только с реляционными бд, поэтому представляю каких трудов это стоило, чтобы прикрутить его к mongo. Получилось, кстати?

    Только часть функционала


    1. gearbase Автор
      16.04.2023 01:23
      +1

      Спасибо за ответ!

      Впрочем, если есть общая база, то несложно тупо сделать запрос update Locker set instance = <my instance id> where instance is NULL. Для этого нужна одна таблица (c одной записью), а не 100.

      Если при этом ещё и нужно знать, завершено ли задание успешно или нет, если нет, то сколько попыток запуска джобы было? Если не успешно, то как организовать перезапуск со смещением по времени? Как минимум потребуется чуть больше таблиц.

      Задачи повтора и эксклюзивного выполнения и запуска по расписанию - это разные вещи.

      К примеру, кейс формирования ежеквартального отчёта: формирование отчёта запускается в последний день каждого 3-го месяца, логика формирования отчёта громоздка и не должна происходить на нескольких инстансах одновременно, также логика использует запросы к внешним системам, которые время от времени могут не отвечать, но отчёт должен быть сформирован. После нескольких неудачных попыток создания отчёта в течение определенного времени необходимо уведомить администратора о том, что наблюдаются какие-то проблемы и прекратить попытки формирования отчёта в автоматическом режиме, но при этом через 3 месяца по-прежнему необходимо создание нового отчёта.

      Чем не кейс эксклюзивного запуска по расписанию с возможностью повтора задания? ????

      Если вы, например, используете какой-нибудь движок для батч-процессинга, то этим должен он заниматься, а вовсе не планировщик

      Подобного движка в нашем проекте, к сожалению, не используется.

      Про Apache Ratis обязательно почитаю, спасибо большое!


  1. ultrinfaern
    16.04.2023 01:23

    Насколько понимаю, тут следует разделить шедулинг и систему единичной копии. Кстати, решение единичной копии можно же решать и в отрыве от шедулинга. Можно, например вообще запускать отдельную копию которая и будет делать только отчет, тогда и проблемы единичного запуска нет. Да и контролировать потом эту копию легче.


    1. gearbase Автор
      16.04.2023 01:23

      В случае, описанном вами, нужно поднимать отдельный сервис, который будет собирать только отчёт, но опять же нужно поднимать несколько инстансов этого "сборщика" - здесь опять же встаёт проблема синхронизации этих сборщиков между собой (вернулись к началу статьи), потому что если этот единственный инстанс сборщика упадет - будет грустно.


  1. AstarothAst
    16.04.2023 01:23

    Кажется тут смешали воедино две проблемы — исполнение периодических заданий, и сохранение стэйта заданий в распределенной среде. Первая часть замечательно решается средствами Спринга, вторая решается в полном отрыве от первой части, и диапазон возможных решений тут от использования всяких БД, включая Redis, и до IMDG, например Hazelcast. Последний позволит держать распределенный стейт вообще парой строчек.


  1. mirwide
    16.04.2023 01:23

    С производительностью у quartz беда. Там, насколько я помню 2 лока на кластер: instance_name/trigger_access и instance_name/state_access. На сколько-нибудь высоком рейте всё встает колом на конкуренции за получением лока.


    1. gearbase Автор
      16.04.2023 01:23

      Интересная информация, спасибо!

      Да, для высокой нагрузки лучше использовать что-то другое, но в случае высокой нагрузки, как мне кажется, и нет необходимости в отслеживании жизненного цикла задания (т.е. всех приседаний, которые описаны в статье). В моем случае речь идёт о тяжеловесных тасках, которые запускаются крайне редко и должны быть выполнены любой ценой ????