Java中守时使命完成方法及源码分析
概述
在企业级运用开发场景中,守时使命占有着至关重要的位置。比方以下这些场景:
- 用户4个小时以内没有进行任何操作,就主动铲除用户会话。
- 每天晚上清晨主动拉取另一个事务系统的某部分数据。
- 每隔15分钟,主动履行一段逻辑,更新某部分数据。
相似的场景会频频出现在咱们的日常开发中。在Java开发系统中,也有许多完结方案来满意这些场景。可是每个完结方案都有各自的长处和缺陷。本文将要点剖析不同完结方案的技能原理以及它们之间的运用场景和差异。
在开端之前,需求先想下守时使命场景的中心技能点是什么?我是这样了解的:
抵达未来某一个时刻点,履行对应的逻辑。在抵达这个时刻点之前,需求一向等候。
中心的技能点在于,程序怎么知道达到了指定的时刻点,然后触发履行对应的逻辑。只要能完结这点,就能够完结守时使命了。本文罗列以下六种方案:
- 循环判守时刻
- Sleep
- Timer
- ScheduledExecutorService
- Spring Scheduling
本文这五种方案的解说次序是有讲究的,从简略到杂乱,从底层到上层。循环判守时刻、Sleep旨在脱节一切组件或许结构带来的杂乱度搅扰,从最实质上了解守时使命的完结思路。Timer是JDK新近对守时使命的完结,相对来说是比较简略的。ScheduledExecutorService是对Timer的优化、而Spring Scheduling则是依据ScheduledExecutorService完结的。
循环判守时刻
咱们在线程中,直接运用死循环来不断的判守时刻,看是否到了预期的时刻点,假如到了就履行逻辑,不然就持续循环。这个办法应该根本不会运用到,可是最能阐明守时使命的中心实质。举一个生活化场景的比如:咱们请家人帮助,30分钟后叫自己起床,假如家人不定闹钟的话,他就得不断的去看时刻,到了30分钟叫咱们起床。
完结
public class LoopScheduler {
public static void main(String[] args) {
long nowTime = System.currentTimeMillis();
long nextTime = nowTime + 15000;
while (true) {
if (System.currentTimeMillis() >= nextTime) {
nowTime = System.currentTimeMillis();
nextTime = nowTime + 15000;
System.out.println(nowTime + ":触发一次");
service();
}
}
}
public static void service() {
System.out.println("自界说逻辑履行");
}
}
以上代码就能够完结每隔15s履行service办法一次。以下是履行状况:
能够看到的确十分严厉的15s履行一次。完结了守时使命的作用。
剖析
这种完结办法之所以根本不会实际运用,是由于这个while循环的空转会占用十分多名贵的cpu资源。可是能够借此看到守时使命的完结结构。
Sleep
借助于Thread.sleep(),咱们能够完结让线程等候指守时刻后再履行。Thread.sleep()办法是一个JNI办法,其底层是与操作系统内核进行交互。调用该办法,能够将线程进入睡觉状况,在这种状况中,该线程不会获取到cpu资源。直到指定的睡觉时刻完毕,操作系统会依据调度战略将线程唤醒。
完结
public class SleepScheduler {
public static void main(String[] args) {
Thread thread = new Thread(new Runnable() {
@Override
public void run() {
while (true) {
try {
Thread.sleep(15000);
System.out.println(System.currentTimeMillis() + ":触发一次");
service();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
public void service() {
System.out.println("自界说逻辑履行");
}
});
thread.start();
}
}
以上代码,先界说了一个线程,然后发动。线程的履行逻辑是,不断循环,每次循环里边先sleep 15s。然后再履行指定的逻辑。根本上能够完结跟上面相同的作用,每15s履行一次service逻辑。
咱们调查下履行的状况就会发现,每次履行的距离并不是严厉的15s。一般都会比15s要多一点。这是由于sleep的机制导致的。sleep完毕之后,线程并不会立马取得履行,线程仅仅会被从头放入调度行列参加下一次调度。
剖析
运用Thread.sleep()跟咱们自己用循环去判别时刻比较,最大的优势在于咱们节省了CPU资源。运用操作系统的线程调度才能去完结对时刻的操控和判别。
Timer
Timer是JDK自带的调度东西类,针对守时使命的场景场景现已做了笼统和封装。Timer的中心进口是Timer类的schedule办法。用于提交一个使命,而且能够指定推迟时刻和重复距离。
/**
* @param task task to be scheduled.
* @param delay delay in milliseconds before task is to be executed.
* @param period time in milliseconds between successive task executions.
*/
public void schedule(TimerTask task, long delay, long period) {
if (delay < 0)
throw new IllegalArgumentException("Negative delay.");
if (period <= 0)
throw new IllegalArgumentException("Non-positive period.");
sched(task, System.currentTimeMillis()+delay, -period);
}
怎么运用
public class TimerScheduler {
public static void main(String[] args) {
// 创立SimpleDateFormat目标,界说格式化款式
SimpleDateFormat sdf = new SimpleDateFormat("HH:mm:ss.SSS");
// 创立Timer目标
Timer timer = new Timer();
// 单次履行使命示例
TimerTask singleTask = new TimerTask() {
@Override
public void run() {
// 将当时时刻的毫秒数转换为格式化后的时刻字符串
String formattedTime = sdf.format(System.currentTimeMillis());
System.out.println(formattedTime + "单次履行使命:守时使命履行了");
}
};
// 推迟3000毫秒(3秒)后履行单次使命
timer.scheduleAtFixedRate(singleTask, 3000, 15000);
String formattedTime = sdf.format(System.currentTimeMillis());
System.out.println(formattedTime + "单次履行使命:守时使命已发动");
}
}
以上代码首要创立了一个TimerTask的实例,然后重写run办法封装咱们的事务逻辑,然后调用Timer的scheduleAtFixedRate办法将使命进行提交。提交之后3s之后开端履行一次,然后以15s履行一次的固定频率不断履行下去。
Timer总共供给了6个api来提交使命,能够分为三大类:
- 下图1和2:需求传入period的schedule(),提交的是周期使命,period是周期。不过这儿的周期是上一次使命成功完毕和下一次使命开端履行的距离。能够认为是依照固定的推迟去重复。
- 下图3和4:不需求传入period的schedule(),提交的是单次使命。
- 下图5和6:scheduleAtFixedRate(),提交的是周期使命,period是履行频率,每次使命的方案履行时刻在提交的那一刻就能够确认,跟上一次使命什么时分完毕没有关系。
原了解析
Timer现已是JDK针对守时使命场景做了笼统和封装后的作用,其中心完结类除了Timer、TimerTask。还有Timer的内部类TaskQueue、TimerThread。
TimerTask
public abstract class TimerTask implements Runnable {
/**
* This object is used to control access to the TimerTask internals.
*/
final Object lock = new Object();/**
* The state of this task, chosen from the constants below.
*/
int state = VIRGIN;
/**This task has not yet been scheduled.
*/
static final int VIRGIN = 0;/**This task is scheduled for execution. If it is a non-repeating task,it has not yet been executed.
*/
static final int SCHEDULED = 1;/**This non-repeating task has already executed (or is currentlyexecuting) and has not been cancelled.
*/
static final int EXECUTED = 2;/**This task has been cancelled (with a call to TimerTask.cancel).
*/
static final int CANCELLED = 3;/**Next execution time for this task in the format returned bySystem.currentTimeMillis, assuming this task is scheduled for execution.For repeating tasks, this field is updated prior to each task execution.
*/
long nextExecutionTime;/**Period in milliseconds for repeating tasks. A positive value indicatesfixed-rate execution. A negative value indicates fixed-delay execution.A value of 0 indicates a non-repeating task.
*/
long period = 0;
}
望文生义,TimerTask是对使命的笼统,现已界说好了使命状况、是否可重复(period)、下次履行时刻等特点。除此之外,TimerTask完结了Runnable,咱们能够经过重写run办法来封装咱们的事务逻辑。
Timer
Timer是对调度器的笼统。其暴露了schedule中心api用来让咱们进行使命提交。它有两个比较重要的内部类,是了解Timer完结原理的要害。
public class Timer {
/**
* The timer task queue. This data structure is shared with the timer
* thread. The timer produces tasks, via its various schedule calls,
* and the timer thread consumes, executing timer tasks as appropriate,
* and removing them from the queue when they’re obsolete.
*/
private final TaskQueue queue = new TaskQueue();/**
* The timer thread.
*/
private final TimerThread thread = new TimerThread(queue);
public Timer(String name) {
thread.setName(name);
thread.start();
}
}
在Timer的结构办法中,会直接将TimerThread发动。
TaskQueue
TaskQueue内部持有一个TimerTask数组。用来存储经过schedule办法提交的TimerTask实例。实际上TimerTask是一个小顶堆,每一次增加元素的时分,都会依据使命的下次履行时刻(nextExecutionTime)来保护小顶堆,将下次履行时刻最近的使命放在堆顶。
class TaskQueue {
/**
* Priority queue represented as a balanced binary heap: the two children
* of queue[n] are queue[2n] and queue[2n+1]. The priority queue is
* ordered on the nextExecutionTime field: The TimerTask with the lowest
* nextExecutionTime is in queue[1] (assuming the queue is nonempty). For
* each node n in the heap, and each descendant of n, d,
* n.nextExecutionTime <= d.nextExecutionTime.
*/
private TimerTask[] queue = new TimerTask[128];/**
* The number of tasks in the priority queue. (The tasks are stored in
* queue[1] up to queue[size]).
*/
private int size = 0;
}
TimerThread
TimerThread承继了Thread。会在Timer类初始化的时分直接发动。所以中心要重视的是TimerThread的run办法,run办法的主体是调用mainLoop()办法。
class TimerThread extends Thread {
/**
* Our Timer's queue. We store this reference in preference to
* a reference to the Timer so the reference graph remains acyclic.
* Otherwise, the Timer would never be garbage-collected and this
* thread would never go away.
*/
private TaskQueue queue;
TimerThread(TaskQueue queue) {
this.queue = queue;
}
public void run() {
try {
mainLoop();
} finally {
// Someone killed this Thread, behave as if Timer cancelled
synchronized(queue) {
newTasksMayBeScheduled = false;
queue.clear(); // Eliminate obsolete references
}
}
}
private void mainLoop() {
while (true) {
try {
TimerTask task;
boolean taskFired;
synchronized(queue) {
// Wait for queue to become non-empty
while (queue.isEmpty() && newTasksMayBeScheduled)
queue.wait();
if (queue.isEmpty())
break; // Queue is empty and will forever remain; die
// Queue nonempty; look at first event and do the right thing
long currentTime, executionTime;
task = queue.getMin();
synchronized(task.lock) {
if (task.state == TimerTask.CANCELLED) {
queue.removeMin();
continue; // No action required, poll queue again
}
currentTime = System.currentTimeMillis();
executionTime = task.nextExecutionTime;
if (taskFired = (executionTime<=currentTime)) {
if (task.period == 0) { // Non-repeating, remove
queue.removeMin();
task.state = TimerTask.EXECUTED;
} else { // Repeating task, reschedule
queue.rescheduleMin(
task.period<0 ? currentTime - task.period
: executionTime + task.period);
}
}
}
if (!taskFired) // Task hasn't yet fired; wait
queue.wait(executionTime - currentTime);
}
if (taskFired) // Task fired; run it, holding no locks
task.run();
} catch(InterruptedException e) {
}
}
}
}
mainLoop主体逻辑如下:
- 主体是一个while循环,进入循环后,会先看使命行列是否有使命,假如没有使命则履行quere.wait()。等候其他线程(一般是主线程在增加使命后)履行queue.notify()办法后,TimerThread会再次醒来。
- 假如行列不为空,成功获取到堆顶元素后,会判别使命是否现已被撤销,假如撤销的话直接移除,然后进入下一次循环。
- 假如使命没有被撤销,则看使命的履行时刻是否现已到了,也便是该使命是否应该被触发了。假如现已触发了,那么应该履行该使命,一起假如该使命是周期使命,还应该核算下次使命的履行时刻,然后触发堆的下滤操作(从头找一个下次履行时刻最近的使命到堆顶)。
- 假如使命还没有到履行时刻,那么就核算还剩多少时刻应该履行,然后等候这个时刻。从头进入下一次循环。
仔细的同学,应该能够注意到,在处理重复使命的时分,会判别period是负数仍是正数。
负数的话,使命的下次履行时刻是:currentTime - task.period(当时使命完毕的时刻 - 周期)
正数的话,使命的下次履行时刻是:executionTime + task.period(当时使命的履行时刻 + 周期)
可是咱们传入的period不是都是正数吗?能够看下面代码,schedule在调用sched办法的时分对咱们的period取负了。能够这样了解,period一起也是一个使命类型的标识,period为0表明的是单次使命。period为负数就表明的是依照固定推迟去重复的使命,period为正数表明的是依照固定频率去重复的使命。
public void schedule(TimerTask task, long delay, long period) {
if (delay < 0)
throw new IllegalArgumentException("Negative delay.");
if (period <= 0)
throw new IllegalArgumentException("Non-positive period.");
sched(task, System.currentTimeMillis()+delay, -period);
}
public void scheduleAtFixedRate(TimerTask task, long delay, long period) {
if (delay < 0)
throw new IllegalArgumentException("Negative delay.");
if (period <= 0)
throw new IllegalArgumentException("Non-positive period.");
sched(task, System.currentTimeMillis()+delay, period);
}
归纳剖析
- Timer是依据Object.wait()办法来完结时刻等候的。在实质上跟咱们用Thread.sleep()完结没有太大的差异,底层都是经过操作系统内核让线程进入睡觉状况。
- Timer是单线程形式,经过上面的剖析能够看到,Timer类持有一个TimerThread实例,它便是一个线程实例。这种形式的问题在于,当咱们在同一个Timer中提交了许多调度使命之后,而且有的使命时刻过长的话,就可能会导致其他使命的调度拖延。
注意事项
- 经过上面的源码剖析能够看到,TimerThread履行task.run()的时分,没有进行反常处理,所以当运用Timer的时分,事务逻辑一定要进行反常处理,不然假如一旦抛出反常将导致TimerThread线程挂掉。一切调度使命就失效了。
ScheduledExecutorService
ScheduledExecutorService也是JDK自带的守时使命东西,是在JDK1.5引进的。能够了解为是对Timer的晋级,经过上文咱们知道:Timer是依据单线程模型的,必定跟不上Java范畴开发的开展脚步。而ScheduledExecutorService是依据线程池来完结的。这也是对Timer最首要的优化,当咱们了解了Timer之后,再看ScheduledExecutorService就会简略许多。
怎么运用
public class ScheduldExecutorServiceScheduler {
public static void main(String[] args) {
// 创立SimpleDateFormat目标,界说格式化款式
SimpleDateFormat sdf = new SimpleDateFormat("HH:mm:ss.SSS");
// 创立ScheduledExecutorService目标
ScheduledExecutorService executor = Executors.newSingleThreadScheduledExecutor();
Runnable runnable = new Runnable() {
@Override
public void run() {
// 将当时时刻的毫秒数转换为格式化后的时刻字符串
String formattedTime = sdf.format(System.currentTimeMillis());
System.out.println(formattedTime + "单次履行使命:守时使命履行了");
}
};
// 推迟3000毫秒(3秒)后履行单次使命,每15s开端履行一次使命
executor.scheduleAtFixedRate(runnable, 3000, 15000, java.util.concurrent.TimeUnit.MILLISECONDS);
// 推迟3000毫秒(3秒)后履行单次使命,每次使命完毕后15s履行下一次使命
//executor.scheduleWithFixedDelay(runnable, 3000, 15000, java.util.concurrent.TimeUnit.MILLISECONDS);
// 推迟3000毫秒(3秒)后履行单次使命
//executor.schedule(runnable, 3000, java.util.concurrent.TimeUnit.MILLISECONDS);
String formattedTime = sdf.format(System.currentTimeMillis());
System.out.println(formattedTime + "单次履行使命:守时使命已发动");
}
}
上述代码经过Executors创立了一个单线程的调度履行器。然后经过调用履行器的scheduleAtFixedRate()办法提交封装好的Runnable使命,跟前文的比如共同,也是推迟3s后履行,然后依照15s的固定频率去重复履行。
下图是履行作用。
除此之外,ScheduledExecutorService也暴露了scheduleWithFixedDelay(固定推迟)、schedule办法(单次履行)。与上文咱们剖析的Timer的API是共同的。只不过scheduleWithFixedDelay的api愈加语义化了。
假如需求多线程的履行器,那么运用Executors.newScheduledThreadPool()办法创立指定线程数量的线程池履行器。
public static void main(String[] args) {
// 创立SimpleDateFormat目标,界说格式化款式
SimpleDateFormat sdf = new SimpleDateFormat("HH:mm:ss.SSS");
// 创立ScheduledExecutorService目标
//ScheduledExecutorService executor = Executors.newSingleThreadScheduledExecutor();
ScheduledExecutorService executor = Executors.newScheduledThreadPool(10);
Runnable runnable = new Runnable() {
@Override
public void run() {
// 将当时时刻的毫秒数转换为格式化后的时刻字符串
String formattedTime = sdf.format(System.currentTimeMillis());
System.out.println(formattedTime + "单次履行使命:守时使命履行了");
}
};
// 推迟3000毫秒(3秒)后履行单次使命,每15s开端履行一次使命
executor.scheduleAtFixedRate(runnable, 3000, 15000, java.util.concurrent.TimeUnit.MILLISECONDS);
// 推迟3000毫秒(3秒)后履行单次使命,每次使命完毕后15s履行下一次使命
//executor.scheduleWithFixedDelay(runnable, 3000, 15000, java.util.concurrent.TimeUnit.MILLISECONDS);
// 推迟3000毫秒(3秒)后履行单次使命
//executor.schedule(runnable, 3000, java.util.concurrent.TimeUnit.MILLISECONDS);
String formattedTime = sdf.format(System.currentTimeMillis());
System.out.println(formattedTime + "单次履行使命:守时使命已发动");
}
原了解析
在解读Timer的源码的时分,咱们知道有这几个中心人物:
TimerTask:使命,对具体使命逻辑的笼统和封装。
Timer:调度器,担任一切使命的提交和注册。
TaskQueue:使命行列,Timer目标持有,经过Timer提交的使命都存在这个行列中
TimerThread:单线程履行器,Timer目标持有,TimerThread会不断地从TaskQueue中获取使命来履行。
在ScheduledExecutorService中,这几个人物相同存在,只不过换了姓名。而且在技能层面做了优化。
ScheduledFutureTask:对使命的封装。
ScheduledExecutorService:调度器,担任一切使命的提交和注册。
BlockingQueue:堵塞行列,用来寄存使命,与一般行列不同的是,当测验从堵塞行列中获取元素时,假如行列为空,那么线程会堵塞。
ThreadPoolExecutor:线程池履行器。每个线程都会不断地从使命行列中测验获取使命并履行。
ScheduledFutureTask
依据ScheduledFutureTask的UML图能够看到,ScheduledFutureTask首要承继了FutureTask类。FutureTask是java异步编程的一个中心类,这儿暂不打开。先简略了解为他对异步使命的履行生命周期、回来成果做了封装。咱们能够经过FutureTask的api去判别一个使命是否履行完结、撤销使命、获取履行成果等操作。
下面是ScheduledFutureTask的中心部分代码,为了理清首要头绪,对代码做了删减。
能够看到的是,ScheduledFutureTask跟前文的Timer相同,有使命的周期、下次履行时刻等特点。不同的是ScheduledFutureTask有自己的run办法。 在Timer中,TimerTask的run办法是咱们自己重写的,便是事务逻辑代码。在ScheduledExecutorService中,咱们把Runnable提交给ScheduledExecutorService之后,则是先调用ScheduledFutureTask的结构办法,在ScheduledFutureTask的结构办法中,又调用父类(FutureTask)的结构办法,并传入Runnable实例。
private class ScheduledFutureTask<V>
extends FutureTask<V> implements RunnableScheduledFuture<V> {
/** The time the task is enabled to execute in nanoTime units */
private long time;
/**
* Period in nanoseconds for repeating tasks. A positive
* value indicates fixed-rate execution. A negative value
* indicates fixed-delay execution. A value of 0 indicates a
* non-repeating task.
*/
private final long period;
/**
* Creates a one-shot action with given nanoTime-based trigger time.
*/
ScheduledFutureTask(Runnable r, V result, long ns) {
super(r, result);
this.time = ns;
this.period = 0;
this.sequenceNumber = sequencer.getAndIncrement();
}
/**
* Overrides FutureTask version so as to reset/requeue if periodic.
*/
public void run() {
boolean periodic = isPeriodic();
if (!canRunInCurrentRunState(periodic))
cancel(false);
else if (!periodic)
ScheduledFutureTask.super.run();
else if (ScheduledFutureTask.super.runAndReset()) {
setNextRunTime();
reExecutePeriodic(outerTask);
}
}
}
这儿run办法的逻辑是:
- 判别使命是否能履行,由于存在现已将调度器shutdown了的状况。
- 假如对错周期使命,那么履行即可
- 假如是周期使命,除了要履行之外,还需求从头安排下一次调度。