一分钟掌握Java Quartz定时任务

目录
  • 前言
  • 角色介绍
  • 官方例子
  • Quartz如何分布式?
  • 跟着官方例子看源码
  • Trigger的处理
  • 结合起来
  • 总结

前言

前几篇介绍了单体架构的定时任务解决方式,但是现代软件架构由于业务复杂度高,业务的耦合性太强,已经由单体架构拆分成了分布式架构。因此,定时任务的架构也随之修改。而Quartz是分布式定时任务解决方案中使用简单,结构清晰,且不依赖第三方分布式调度中间件的。上车,mars酱带你车里细说~

角色介绍

Quartz入门使用的角色不多,三个角色足够,分别是:

Scheduler:调度器。用来负责任务的调度;

Job:任务。这是一个接口,业务代码继承Job接口并实现它的execute方法,是业务执行的主体部分;

Trigger: 触发器。也是个接口,有两个触发器比较关键,一个是SimpleTrigger,另一个是CronTrigger。前者支持简单的定时,比如:按时、按秒等;后者直接支持cron表达式。下面我们从官方的源代码入手,看看Quartz如何做到分布式的。

官方例子

官方源代码down下来之后,有个examples文件夹:

example1是入门级中最简单的。就两个java文件,一个HelloJob:

package org.quartz.examples.example1;
import java.util.Date;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.quartz.Job;
import org.quartz.JobExecutionContext;
import org.quartz.JobExecutionException;
/**
 * <p>
 * This is just a simple job that says "Hello" to the world.
 * </p>
 *
 * @author Bill Kratzer
 */
public class HelloJob implements Job {
    private static Logger _log = LoggerFactory.getLogger(HelloJob.class);
    /**
     * <p>
     * Empty constructor for job initilization
     * </p>
     * <p>
     * Quartz requires a public empty constructor so that the
     * scheduler can instantiate the class whenever it needs.
     * </p>
     */
    public HelloJob() {
    }
    /**
     * <p>
     * Called by the <code>{@link org.quartz.Scheduler}</code> when a
     * <code>{@link org.quartz.Trigger}</code> fires that is associated with
     * the <code>Job</code>.
     * </p>
     *
     * @throws JobExecutionException
     *             if there is an exception while executing the job.
     */
    public void execute(JobExecutionContext context)
        throws JobExecutionException {
        // Say Hello to the World and display the date/time
        _log.info("Hello World! - " + new Date());
    }
}

另一个SimpleExample:

 package org.quartz.examples.example1;
 import org.quartz.JobDetail;
 import org.quartz.Scheduler;
 import org.quartz.SchedulerFactory;
 import org.quartz.Trigger;
 import org.quartz.impl.StdSchedulerFactory;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import java.util.Date;
 import static org.quartz.DateBuilder.evenMinuteDate;
 import static org.quartz.JobBuilder.newJob;
 import static org.quartz.TriggerBuilder.newTrigger;
 /**
  * This Example will demonstrate how to start and shutdown the Quartz scheduler and how to schedule a job to run in
  * Quartz.
  *
  * @author Bill Kratzer
  */
 public class SimpleExample {
     public void run() throws Exception {
         Logger log = LoggerFactory.getLogger(SimpleExample.class);
         log.info("------- Initializing ----------------------");
         // 1. 创建一个scheduler
         SchedulerFactory sf = new StdSchedulerFactory();
         Scheduler sched = sf.getScheduler();
         log.info("------- Initialization Complete -----------");
         // computer a time that is on the next round minute
         Date runTime = evenMinuteDate(new Date());
         log.info("------- Scheduling Job  -------------------");
         // 2. 指定一个job
         JobDetail job = newJob(HelloJob.class).withIdentity("job1", "group1").build();
         // 3. 指定一个trigger
         Trigger trigger = newTrigger().withIdentity("trigger1", "group1").startAt(runTime).build();
         // 4. 绑定job和trigger
         sched.scheduleJob(job, trigger);
         log.info(job.getKey() + " will run at: " + runTime);
         // 5. 执行
         sched.start();
         log.info("------- Started Scheduler -----------------");
         // wait long enough so that the scheduler as an opportunity to
         // run the job!
         log.info("------- Waiting 65 seconds... -------------");
         try {
             // wait 65 seconds to show job
             Thread.sleep(65L * 1000L);
             // executing...
         } catch (Exception e) {
             //
         }
         // shut down the scheduler
         log.info("------- Shutting Down ---------------------");
         sched.shutdown(true);
         log.info("------- Shutdown Complete -----------------");
     }
     public static void main(String[] args) throws Exception {
         SimpleExample example = new SimpleExample();
         example.run();
     }
 }

整个SimpleExample只有五个步骤:

  • 创建Scheduler,这是一个调度器,例子中使用调度器工厂来创建一个调度器;
  • 创建一个Job。实际上Job就是那个HelloJob,但是这里把HelloJob丢给了JobDetail对象,Job接口本身只有一个execute函数,没有其他的属性了,如果需要附加其他属性,JobDetail就支持,比如我们需要往Job中传递参数,JobDetail中提供了一个JobDataMap。当Job在运行的时候,execute函数里面的就能获取到JobDetail对象,并将设置的数据传递给Job接口的实现;
  • 创建一个Trigger。Trigger对象主责是任务的执行时间,比如官方例子中的startAt函数,就指定了具体的运行时间,还有startNow(立即执行);
  • 用scheduler绑定Job和Trigger;
  • 执行scheduler。

Quartz的使用是不是简单又清晰?Job是任务,单一职责,不做任何其他事情。Trigger负责执行的频率等等属性。Scheduler负责按照Trigger的规则去执行Job的内容。各自部分的功能符合单一原则。

但是,到这里都不是分布式的方式,依然是单体架构的。那么,Quartz如何做到分布式的呢?

Quartz如何分布式?

Quartz的分布式实现方式并不依赖其他分布式协调管理中间件完成,而是使用数据锁来实现。使用数据做协调管理中间件的唯一的前提是:需要把集群的每台机器时间校对一致。

Quartz数据库核心表如下:

表名 功能描述
QRTZ_CALENDARS 存储Quartz的Calendar信息
QRTZ_CRON_TRIGGERS 存储CronTrigger,包括Cron表达式和时区信息
QRTZ_FIRED_TRIGGERS 存储与已触发的Trigger相关的状态信息,以及相联Job的执行信息
QRTZ_PAUSED_TRIGGER_GRPS 存储已暂停的Trigger组的信息
QRTZ_SCHEDULER_STATE 存储少量的有关Scheduler的状态信息,和别的Scheduler实例
QRTZ_LOCKS 存储程序的悲观锁的信息
QRTZ_JOB_DETAILS 存储每一个已配置的Job的详细信息
QRTZ_JOB_LISTENERS 存储有关已配置的JobListener的信息
QRTZ_SIMPLE_TRIGGERS 存储简单的Trigger,包括重复次数、间隔、以及已触的次数
QRTZ_BLOG_TRIGGERS Trigger作为Blob类型存储
QRTZ_TRIGGER_LISTENERS 存储已配置的TriggerListener的信息
QRTZ_TRIGGERS 存储已配置的Trigger的信息

字体加粗的QRTZ_LOCKS表是一个悲观锁的存储实现,Quartz认为每条记录都可能会产生并发冲突。以上表的SQL可以在quartz目录中找到:

找到自己喜欢的数据库品牌,并创建好表即可。

跟着官方例子看源码

我们从Hello的execute方法开始,反着找,继续看看分布式的方式如何实现。为什么反着找呢?因为这里是我们业务实现的主体内容,Quartz框架最终必须要调用到这个execute的具体实现的。我们找到调用execute的地方有很多处:

从类名来分析,DirectoryScanJob看着是目录扫描任务,FileScanJob直译是文件扫描任务,SendMailJob是发送邮件任务,最后只剩那个JobRunShell,毕竟翻译过来叫“任务运行の核心”啊。进入JobRunShell,找到调用execute函数的部分,execute函数被包裹在一个一百三十多行长又长的run函数中:

public void run() {
    qs.addInternalSchedulerListener(this);
    try {
        // ...省略很多源代码
        do {
            // ...省略很多源代码
            try {
                begin();
            } catch (SchedulerException se) {
                // ... 省略源代码
            }
            // ... 省略源代码
            try {
                log.debug("Calling execute on job " + jobDetail.getKey());
                // 这里负责执行job的execute函数
                job.execute(jec);
                endTime = System.currentTimeMillis();
            } catch (JobExecutionException jee) {
                // ... 省略源代码
            } catch (Throwable e) {
                // ... 省略源代码
            }
            // ...省略很多源代码
            try {
                complete(true);
            } catch (SchedulerException se) {
                // ... 省略源代码
            }
            // ...省略很多源代码
        } while (true);
    } finally {
        qs.removeInternalSchedulerListener(this);
    }
}

可以看到run中间的execute被夹在一个begin函数和comlete函数中,而begin和complete的实现是一个基于JTA事务的JTAJobRunShell的实现来完成的。JobRunShell是一个Runnable接口的实现,那么刚刚的run方法,必定在某处启用了线程(池)的start方法。

mars酱继续跟踪查找源代码,在QuartzSchedulerThread中的run函数中,找到JobRunShell的调用部分:

@Override
public void run() {
    int acquiresFailed = 0;
    while (!halted.get()) {
        // ...省略很多源代码
        // 源代码279行
        int availThreadCount = qsRsrcs.getThreadPool().blockForAvailableThreads();
        // ...省略很多源代码
        if(availThreadCount > 0) {
            // ...省略很多源代码
            // 取下一个trigger,周期是30秒取一次
            triggers = qsRsrcs.getJobStore().acquireNextTriggers(
                now + idleWaitTime, Math.min(availThreadCount, qsRsrcs.getMaxBatchSize()), qsRsrcs.getBatchTimeWindow());
            // ...省略很多源代码
            // 触发trigger
            List<TriggerFiredResult> res = qsRsrcs.getJobStore().triggersFired(triggers);
            // ...省略很多源代码
            // 释放trigger,当bndle的结果是null就释放trigger
            if (bndle == null) {
                qsRsrcs.getJobStore().releaseAcquiredTrigger(triggers.get(i));
                continue;
            }
            // ...省略很多源代码
            JobRunShell shell = null;
            try {
                shell = qsRsrcs.getJobRunShellFactory().createJobRunShell(bndle);
                shell.initialize(qs);
            } catch (SchedulerException se) {
                qsRsrcs.getJobStore().triggeredJobComplete(triggers.get(i), bndle.getJobDetail(), CompletedExecutionInstruction.SET_ALL_JOB_TRIGGERS_ERROR);
                continue;
            }
            // 这里调用JobRunShell
            if (qsRsrcs.getThreadPool().runInThread(shell) == false) {
                // ...省略很多源代码
            }
        }
    }
}

QuartzSchedulerThread的run函数就是核心处理流程了,qsRsrcs.getThreadPool().runInThread(shell)内部就根据具体的SimpleThreadPool或者ZeroSizeThreadPool来执行run函数,while循环基本就是不停的在轮询不断的去拿trigger,然后判断trigger的时间是不是到了,再按照时间trigger的时间规则执行Job,最后再标记为完成、释放trigger。

Trigger的处理

上面的逻辑都是通过qsRsrcs.getJobStore()得到的对象去处理Trigger的,返回对象是JobStore。任意查看qsRsrcs.getJobStore()调用的函数,比如:releaseAcquiredTriggerJobStore,它的实现有两个是比较重要的:一个是RAMJobStore,一个是JobStoreSupport。前者是RAM作为存储介质,作者还特意写上了这样一段注释:

This class implements a JobStore that utilizes RAM as its storage device.

As you should know, the ramification of this is that access is extrememly fast, but the data is completely volatile - therefore this JobStore should not be used if true persistence between program shutdowns is required.

这段英文的央视翻译:

这个类实现了一个使用RAM作为存储设备的JobStore。

您应该知道,这样做的后果是访问速度非常快,但是数据是完全不稳定的——因此,如果需要在程序关闭之间实现真正的持久性,则不应该使用这个JobStore。

而且内存存储也无法分布式处理吧?所以,mars酱选择了观看JobStoreSupport:

从import可以知道,这个玩意是连接了数据库的,所以呢,acquireNextTriggers、triggersFired、releaseAcquiredTrigger这些方法负责具体trigger的相关操作,都最终会执行到JobStoreSupport的第3844行的executeInNonManagedTXLock函数:

    /**
     * Execute the given callback having optionally acquired the given lock.
     * This uses the non-managed transaction connection.
     *
     * @param lockName The name of the lock to acquire, for example
     * "TRIGGER_ACCESS".  If null, then no lock is acquired, but the
     * lockCallback is still executed in a non-managed transaction.
     */
    protected <T> T executeInNonManagedTXLock(
            String lockName,
            TransactionCallback<T> txCallback, final TransactionValidator<T> txValidator) throws JobPersistenceException {
        boolean transOwner = false;
        Connection conn = null;
        try {
            if (lockName != null) {
                // If we aren't using db locks, then delay getting DB connection
                // until after acquiring the lock since it isn't needed.
                if (getLockHandler().requiresConnection()) {
                    conn = getNonManagedTXConnection();
                }
                transOwner = getLockHandler().obtainLock(conn, lockName);
            }
            if (conn == null) {
                conn = getNonManagedTXConnection();
            }
            final T result = txCallback.execute(conn);
            try {
                commitConnection(conn);
            } catch (JobPersistenceException e) {
                rollbackConnection(conn);
                if (txValidator == null || !retryExecuteInNonManagedTXLock(lockName, new TransactionCallback<Boolean>() {
                    @Override
                    public Boolean execute(Connection conn) throws JobPersistenceException {
                        return txValidator.validate(conn, result);
                    }
                })) {
                    throw e;
                }
            }
            Long sigTime = clearAndGetSignalSchedulingChangeOnTxCompletion();
            if(sigTime != null && sigTime >= 0) {
                signalSchedulingChangeImmediately(sigTime);
            }
            return result;
        } catch (JobPersistenceException e) {
            rollbackConnection(conn);
            throw e;
        } catch (RuntimeException e) {
            rollbackConnection(conn);
            throw new JobPersistenceException("Unexpected runtime exception: "
                    + e.getMessage(), e);
        } finally {
            try {
                releaseLock(lockName, transOwner);
            } finally {
                cleanupConnection(conn);
            }
        }
    }

整体的过程简要说明就是:获取数据库连接,给需要执行的trigger加锁,处理完之后再释放锁。

结合起来

结合前面的流程来看,一个调度器在执行前如果涉及到分布式的情况,流程如下:

  • 首先要获取QUARTZ_LOCKS表中对应的锁(在executeInNonManagedTXLock函数的getLockHandler().obtainLock(conn, lockName)中);
  • 获取锁后执行QuartzSchedulerThread中的JobRunShell,完成任务的执行;
  • 最后QuartzSchedulerThread中调用triggeredJobComplete函数,锁被释放,在executeInNonManagedTXLock函数的releaseLock(lockName, transOwner)中执行;

集群中的每一个调度器实例都遵循这样的操作流程。

总结

Quartz 是一款用于分布式系统的高性能调度框架,它采用了数据库作为分布式锁机制来保证同一时刻只有一个 Scheduler 实例访问数据库中的 Trigger。

在 Quartz 中,调度器线程会争抢访问数据库中的 Trigger,以确保在同一时刻只有一个调度器线程执行某个 Trigger 的操作。如果有多个调度器线程同时尝试访问同一个 Trigger,它们会相互等待对方释放锁。当一个调度器线程获得了锁,它就可以访问数据库并执行相应的操作。

另外,Quartz 还采用了悲观锁的策略来避免死锁的发生。当一个调度器线程尝试获取锁时,如果锁已经被其他线程占用,那么这个线程会等待,直到有线程释放了锁。如果在等待过程中没有其他线程释放锁,那么这个线程就会一直等待下去,直到调度器重新分配了锁。

总之,Quartz 的分布式调度原理是通过数据库锁和悲观锁来实现的,以保证同一时刻只有一个调度器线程访问数据库中的 Trigger,从而提高系统的性能和可靠性。

以上就是一分钟掌握Java Quartz定时任务的详细内容,更多关于Java Quartz定时任务的资料请关注我们其它相关文章!

(0)

相关推荐

  • Java中Quartz高可用定时任务快速入门

    目录 定时任务使用指南 1.引入依赖 2.快速上手 3.手动触发定时任务 4.带参数任务 5.任务并发 6.持久化 定时任务使用指南 如果你想做定时任务,有高可用方面的需求,或者仅仅想入门快,上手简单,那么选用它准没错. 定时任务模块是对Quartz框架进一步封装,使用更加简洁. 1.引入依赖 <dependency> <groupId>xin.altitude.cms</groupId> <artifactId>ucode-cms-quartz</a

  • Java 任务调度框架 Quartz实操

    目录 1.Quartz 1.1 引入依赖 1.2 入门案例 任务类,需要实现 Job 接口 定时器类 1.3 Job 与 JobDetail 1.4 JobExecutionContext 1.5 JobDataMap 手动获取 JDM 参数案例 2.Job 类实现 JDM 参数的 Setter 方法,实例化时自动绑定参数 1.6 Job 状态 有无状态 Job 区别案例 1.7 Trigger Trigger 获取参数案例 1.8 SimpleTripper  Quartz相较于Timer,

  • 详解Java中Quartz的简单使用

    目录 简单示例 usingJobData 非并发执行 Scheduler 每次执行,都会根据JobDetail创建一个新的Job实例,这样就可以规避并发访问的问题(jobDetail的实例也是新的) Quzrtz 定时任务默认都是并发执行,不会等待上一次任务执行完毕,只要间隔时间到就会执行,如果定时任务执行太长,会长时间占用资源,导致其它任务堵塞 @DisallowConcurrentExecution: job类上,禁止并发地执行同一个job定义 (JobDetail定义的)的多个实例. sc

  • JAVA使用quartz添加定时任务,并依赖注入对象操作

    最近在写定时任务,以前没接触过.查了些相关资料说使用quartz定时框架. 需要配置文件:config-quartz.xml 相关配置如下(红色部分是之后添加的,在后面步骤会说明): <?xml version="1.0" encoding="UTF-8"?> <beans xmlns="http://www.springframework.org/schema/beans" xmlns:xsi="http://www

  • Java使用quartz实现定时任务示例详解

    目录 正文 配置文件 pom 定时任务和触发器 定时任务的业务代码 正文 最近新到了一个项目,用到定时任务的地方是真滴多. 就稍微研究了一下,来做个demo. 其实定时任务使用很广泛也很方便,之前做的人事管理项目,就会定期执行定时任务计算工资,对于一个saas服务来说,即时的计算所有员工的工资有点奢侈,所以在每周末计算一次就ok了. 国外有的公司是一周发一次工资,所以当时的逻辑是一周算一次.在国内就一月一次很ok了.在当时的report服务中,也是定时任务同步数据到Birt服务,然后展现数据.

  • Java Quartz触发器CronTriggerBean配置用法详解

    CronTrigger表达式分为七项子表达式,其中每一项以空格隔开,从左到右分别是:秒,分,时,月的某天,月,星期的某天,年:其中年不是必须的,也就是说任何一个表达式最少需要六项!  例:0 0 12 ? * WED 表示每个星期三的12点执行,这里没有"年"这项!  字段名(项)  必须  值范围  特殊字符  秒 是 0-59  , - * /   分 是 0-59  , - * /   时 是 0-23  , - * /   月的某天  是 1-31  , - * ? / L W

  • Java中简单实用Quartz概述

    目录 1.Quartz是什么? 2. Quartz谁维护? 3. 简单的应用 3.1 需求描述 3.1.1. Spring中自带的 3.1.2. Quartz的简单使用 4. Quartz相较Spring的@Scheduled有什么优势 1.Quartz是什么? Quartz是一个开源的Java调度框架,可以用来实现在指定的时间或时间间隔触发任务执行的功能.它支持多种方式的作业调度,如基于日期.时间间隔和Cron表达式的调度.Quartz允许开发人员定义并执行大量的作业,并且在大规模部署时可以实

  • 5分钟快速掌握Python定时任务框架的实现

    APScheduler 简介 在实际开发中我们经常会碰上一些重复性或周期性的任务,比如像每天定时爬取某个网站的数据.一定周期定时运行代码训练模型等,类似这类的任务通常需要我们手动来进行设定或调度,以便其能够在我们设定好的时间内运行. 在 Windows 上我们可以通过计划任务来手动实现,而在 Linux 系统上往往我们会用到更多关于 crontab 的相关操作.但手动管理并不是一个很好的选择,如果我们需要有十几个不同的定时任务需要管理,那么每次通过人工来进行干预未免有些笨拙,那这时候就真的是「人

  • Java中定时任务的6种实现方式

    目录 1.线程等待实现 2.JDK自带Timer实现 2.1 核心方法 2.2使用示例 2.2.1指定延迟执行一次 2.2.2固定间隔执行 2.2.3固定速率执行 2.3 schedule与scheduleAtFixedRate区别 2.3.1schedule侧重保持间隔时间的稳定 2.3.2scheduleAtFixedRate保持执行频率的稳定 2.4 Timer的缺陷 3.JDK自带ScheduledExecutorService 3.1 scheduleAtFixedRate方法 3.2

  • Java实现定时任务的方法详解

    目录 前言 定时任务是什么 定时任务的有哪些是实现方式 纯手写单线程循环 Timer 和它的小伙伴 ScheduledExecutorService Spring 提供的定时任务 总结 前言 学过定时任务,但是我忘了,忘得一干二净,害怕,一直听别人说: 你写一个定时任务就好了. 写个定时任务让他去爬取就行了. 我不会,所以现在得补回来了,欠下的终究要还的,/(ㄒoㄒ)/~~ 定时任务是什么 大家都用过闹钟,闹钟可以说是一种定时任务. 比如我们设定了周一到周五早上7点半的时间响铃,那么闹钟就会在周

  • springboot整合quartz定时任务框架的完整步骤

    目录 Spring整合Quartz pom文件 对应的properties 文件 配置类 自定义任务类:ScheduledTask 获取spring中bean的工具类:SpringContextUtil 定时任务服务接口:QuartzService QuartzService实现类:QuartzServiceImpl ScheduledTaskRunner类 任务实体类:QuartzTask 任务service层 service实现类 任务controller 数据表 具体使用 具体效果 总结

  • Spring Boot 配置 Quartz 定时任务的方法

    Quartz有四个核心概念: Job:是一个接口,只定义一个方法 execute(JobExecutionContext context),在实现接口的 execute 方法中编写所需要定时执行的 Job(任务) Double slongitude = Double.valueOf(jobExecutionContext.getJobDetail().getJobDataMap().get("slongitude").toString()); JobDetail:Quartz 每次调度

  • SpringBoot 整合 Quartz 定时任务框架详解

    目录 前言 一.简单聊一聊 Quartz 1.1.Quartz 概念 二.SpringBoot 使用 Quartz 2.1.基本步骤 2.2.执行 Quartz 需要的SQL文件 2.3.Controller 2.4.Service 划重点 2.5.实体类 2.6.简单的 Job 案例 2.7.那么该如何使用呢? 前言 在选择技术栈之前,一定要先明确一件事情,你真的需要用它吗?还有其他方式可以使用吗? 相比其他技术技术,优点在哪里呢?使用了之后的利与弊等等. 写这个主要是因为一直想写一下定时任务

  • Java实现定时任务

    本文实例为大家分享了Java实现定时任务的具体代码,供大家参考,具体内容如下 1 使用java.util.Timer 这种方式的定时任务主要用到两个类,Timer 和 TimerTask,使用起来比较简单.其中 Timer 负责设定 TimerTask 的起始与间隔执行时间. TimerTask是一个抽象类,new的时候实现自己的 run 方法,然后将其丢给 Timer 去执行即可. 代码示例: import java.time.LocalDateTime; import java.util.T

  • 使用java执行定时任务示例

    这是一个演示如何使用java执行定时任务的实例,本实例开始运行后不会自动结束,请在运行本实例后手动结束程序. 复制代码 代码如下: package com.hongyuan.test; import java.awt.Desktop;import java.io.BufferedInputStream;import java.io.BufferedReader;import java.io.IOException;import java.io.InputStreamReader;import j

  • 一分钟了解Java中List集合与set集合的多种遍历方式

    List集合与set集合的多种遍历方式 方法有: 1. for循环遍历 2. foreach增强循环遍历 3. 迭代器遍历 4. lambda表达式遍历 一.List集合的遍历 1.创建一个集合,添加一些元素放在集合当中 public static void main(String[] args) { //int[] ins = new int[8];//int List<String> list = new ArrayList<String>();//List,ArrayList

随机推荐