【Java】通俗易懂的JUC源码剖析-ScheduledThreadPoolExecutor

前言

ScheduledThreadPoolExecutor是一种带有延迟,定时执行任务的线程池。它在很多延时任务、定时任务的场景中有丰富的应用场景。今天就来分析下它的实现原理吧。

实现原理

先看看类图结构:

public class ScheduledThreadPoolExecutor

extends ThreadPoolExecutor

implements ScheduledExecutorService {

}

它继承了ThreadPoolExecutor类并实现了ScheduledExecutorService接口。其中ScheduledExecutorService继承了ExecutorService,并提供了schedule相关的方法:

// 经过delay延迟时间后,执行command任务

public ScheduledFuture<?> schedule(Runnable command, long delay, TimeUnit unit);

// 经过delay延迟时间后,执行callable任务

public <V> ScheduledFuture<V> schedule(Callable<V> callable, long delay, TimeUnit unit);

// 经过initialDelay延迟后,以period的周期时长周期性的执行command任务。

// (来自官方注释)注意:

// 1.如果某次任务抛出了异常,后续的周期任务不会被执行。

// 2.如果某次任务执行时长超过了period周期,那么下一个周期到来时,不会执行新的一轮任务,而是往后推迟,

// 等到当前任务执行完后再执行,以此来保证多次任务不会并发执行。

public ScheduledFuture<?> scheduleAtFixedRate(Runnable command, long initialDelay,

long period, TimeUnit unit);

// 经过initialDelay延迟后,执行任务command,然后每次等任务执行完毕后,延迟delay时长再执行新的一轮任务。

public ScheduledFuture<?> scheduleWithFixedDelay(Runnable command, long initialDelay,

long delay, TimeUnit unit);

其中scheduleAtFixedRate和scheduleWithFixedDelay容易混淆,这里再说明下它们的区别:

fixed-rate执行时机:initialDelay + n * period (n >= 0的整数)

fixed-delay执行时机:initialDelay, (task1 end time + delay), (task2 end time + delay), ... (taskN end time + delay)

由ScheduledExecutorService接口还可以看到,这几个方法的返回值都为ScheduledFuture。它的结构如下:

public interface ScheduledFuture<V> extends Delayed, Future<V> {

}

它继承了Delayed和Future接口。Delayed中有个getDelay()方法获取剩余延时。Future大家应该都比较熟悉,它用来表示任务的异步执行结果,可以通过Future.get()或Future.isDone()系列方法判断任务的执行情况和结果。

FutureTask是Future的一个常见实现类,它内部有个state变量代表任务的执行状态。

private volatile int state;

private static final int NEW = 0; // 初始状态

private static final int COMPLETING = 1; // 执行中

private static final int NORMAL = 2; // 正常运行结束

private static final int EXCEPTIONAL = 3; // 运行中发生异常

private static final int CANCELLED = 4; // 任务被取消

private static final int INTERRUPTING = 5; // 任务正在被中断

private static final int INTERRUPTED = 6; // 任务已经被中断

根据官方注释,可能的状态转换如下:

// Possible state transitions:

NEW -> COMPLETING -> NORMAL

NEW -> COMPLETING -> EXCEPTIONAL

NEW -> CANCELLED

NEW -> INTERRUPTING -> INTERRUPTED

ScheduledFutureTask继承了FutureTask,也是ScheduledFuture接口的实现类,它内部还用了period来表示周期类型。

根据官方注释,period的值代表含义如下。

period > 0,表示fixed-rate类型的周期任务。

period < 0,表示fixed-delay类型的周期任务。

period = 0,表示非周期性任务,即一次性任务。

(ps:这里个人感觉period设计的扩展性不够好,如果后续JDK版本想再加一种新的周期类型,period的值该如何表示呢?

我觉得可以换成枚举值的形式)

有了上述背景知识,再来看关键方法,就会轻松许多了。

1.构造方法:

public ScheduledThreadPoolExecutor(int corePoolSize) {

super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS, new DelayedWorkQueue());

}

可以看到,它使用的阻塞队列为DelayedWorkQueue,它是ScheduledThreadPoolExecutor的内部类,功能和DelayedQueue类似,也实现了BlockingQueue接口,因此可以用于线程池中。

2.schedule()方法:

public ScheduledFuture<?> schedule(Runnable command, long delay, TimeUnit unit) {

// 任务不能为空

if (command == null || unit == null)

throw new NullPointerException();

// 任务装饰和转换

RunnableScheduledFuture<?> t = decorateTask(command,

// triggerTime是获取任务触发(执行)时间戳

new ScheduledFutureTask<Void>(command, null, triggerTime(delay, unit)));

// 添加任务到执行队列去执行

delayedExecute(t);

return t;

}

其中 new ScheduledFutureTask(...)代码如下:

ScheduledFutureTask(Runnable r, V result, long ns) {

// 调用父类FutureTask构造函数初始化state=NEW

super(r, result);

// 初始化任务触发时间

this.time = ns;

// 一次性任务

this.period = 0;

this.sequenceNumber = sequencer.getAndIncrement();

}

其中decorateTask目前在ScheduledThreadPoolExecutor类实现中只是简单返回了task本身,但它是protected修饰的,允许我们自定义的子类去覆写这个方法,完成任务的装饰和修改逻辑。

protected <V> RunnableScheduledFuture<V> decorateTask(

Runnable runnable, RunnableScheduledFuture<V> task) {

return task;

}

再来看delayedExecute()方法代码:

private void delayedExecute(RunnableScheduledFuture<?> task) {

// 线程池一关闭,执行拒绝策略

if (isShutdown())

reject(task);

else {

// 将任务添加到阻塞队列中,DelayedWorkQueue,队首元素是延迟时间最短的元素

super.getQueue().add(task);

// 再次检查线程池状态,如果不能运行,从队列中移除任务,并取消任务

if (isShutdown() &&

!canRunInCurrentRunState(task.isPeriodic()) &&

remove(task))

task.cancel(false);

// 确保线程池中至少有一个线程在处理任务

else ensurePrestart();

}

}

其中ensurePrestart代码如下:

void ensurePrestart() {

int wc = workerCountOf(ctl.get());

// 增加核心线程数

if (wc < corePoolSize)

addWorker(null, true);

// 当前线程数为0,也添加一个线程

else if (wc == 0)

addWorker(null, false);

}

上面分析了任务如何被放入阻塞队列中,接下来分析下工作线程如何从队列中获取任务并执行。由ThreadPoolExecutor可以知道,Worker负责从工作队列中循环获取任务,并调用它的run()方法,这里的任务Runnable被包装成了ScheduledFutureTask,它重写了run(),所以需要看它的逻辑:

public void run() {

// 是否周期性任务

boolean periodic = isPeriodic();

// 当前线程池状态不能运行任务,则取消任务

if (!canRunInCurrentRunState(periodic))

cancel(false);

// 如果是一次性任务(调用schedule()会走这个分支),调用父类FutureTask的run()

else if (!periodic)

ScheduledFutureTask.super.run();

// 如果是周期性任务(调用scheduleAtFixedRate或scheduleWithFixedDelay会走这个分支)

else if (ScheduledFutureTask.super.runAndReset()) {

// 设置下一次执行时机

setNextRunTime();

// 重新加入队列中,周期执行

reExecutePeriodic(outerTask);

}

}

其中,FutureTask的run()代码如下:

public void run() {

// 如果任务状态不为NEW或者CAS设置执行线程为当前线程失败,则返回

if (state != NEW ||

!UNSAFE.compareAndSwapObject(this, runnerOffset, null, Thread.currentThread()))

return;

try {

Callable<V> c = callable;

// 再次检查任务状态是否为NEW

if (c != null && state == NEW) {

V result;

boolean ran;

try {

// 调用目标任务Callable,获得返回值result

result = c.call();

ran = true;

} catch (Throwable ex) {

result = null;

ran = false;

// 任务执行过程中发生异常,设置异常结果

setException(ex);

}

// 任务正常运行结束,设置正常结果

if (ran)

set(result);

}

} finally {

// runner must be non-null until state is settled to

// prevent concurrent calls to run() runner = null;

// state must be re-read after nulling runner to prevent

// leaked interrupts

int s = state;

if (s >= INTERRUPTING)

handlePossibleCancellationInterrupt(s);

}

}

其中set(result)和setException(ex)代码如下:

protected void set(V v) {

// CAS设置任务状态从NEW -> COMPLETING

if (UNSAFE.compareAndSwapInt(this, stateOffset, NEW, COMPLETING)) {

// 设置结果

outcome = v;

// 设置任务状态为正常结束,这里使用putOrderedInt效率会比putIntVolatile高些

// 且这里不要求设置的NORMAL状态对其它线程立即可见。

UNSAFE.putOrderedInt(this, stateOffset, NORMAL); // final state

finishCompletion();

}

}

// 和上面set()类似,不做赘述

protected void setException(Throwable t) {

if (UNSAFE.compareAndSwapInt(this, stateOffset, NEW, COMPLETING)) {

outcome = t;

UNSAFE.putOrderedInt(this, stateOffset, EXCEPTIONAL); // final state

finishCompletion();

}

}

3.scheduleWithFixedDelay()

public ScheduledFuture<?> scheduleWithFixedDelay(Runnable command,

long initialDelay, long delay, TimeUnit unit) {

if (command == null || unit == null)

throw new NullPointerException();

if (delay <= 0)

throw new IllegalArgumentException();

ScheduledFutureTask<Void> sft =

new ScheduledFutureTask<Void>(command,null, triggerTime(initialDelay, unit), unit.toNanos(-delay));

RunnableScheduledFuture<Void> t = decorateTask(command, sft);

sft.outerTask = t;

delayedExecute(t);

return t;

}

大体框架和schedule类似,但需要注意的是new ScheduledFutureTask(...)的区别,这里多传了个unit.toNanos(-delay)参数。它的period值为-delay,前面提到period < 0时,代表fixed-delay类型的周期任务。

ScheduledFutureTask(Runnable r, V result, long ns, long period) {

super(r, result);

this.time = ns;

this.period = period;

this.sequenceNumber = sequencer.getAndIncrement();

}

这时回到我们上面分析的ScheduledFutureTask的run(),会走这个if分支。

 // 运行并重置

else if (ScheduledFutureTask.super.runAndReset()) {

// 设置下一次执行时机

setNextRunTime();

// 重新加入队列中,周期执行

reExecutePeriodic(outerTask);

}

其中runAndReset()代码如下:

protected boolean runAndReset() {

if (state != NEW ||

!UNSAFE.compareAndSwapObject(this, runnerOffset, null, Thread.currentThread()))

return false;

boolean ran = false;

int s = state;

try {

Callable<V> c = callable;

if (c != null && s == NEW) {

try {

c.call(); // don't set result

ran = true;

} catch (Throwable ex) {

setException(ex);

}

}

} finally {

// runner must be non-null until state is settled to

// prevent concurrent calls to run() runner = null;

// state must be re-read after nulling runner to prevent

// leaked interrupts

s = state;

if (s >= INTERRUPTING)

handlePossibleCancellationInterrupt(s);

}

// 任务是否正常运行且状态为NEW

return ran && s == NEW;

}

它的逻辑和run()类似,只不过它没有获取c.call()运行结果,也没有设置任务状态为NORMAL正常结束,目的是使得任务成为可重复执行的。

再来看setNextRunTime()方法:

private void setNextRunTime() {

long p = period;

// fixed-rate类型

if (p > 0)

time += p;

// fixed-delay类型(注意能进入setNextRunTime()的前提条件是period != 0)

// 这里由于p为负数,所以需要延迟-p时间执行(time = now + delay)

else time = triggerTime(-p);

}

其中reExecutePeriodic()如下

void reExecutePeriodic(RunnableScheduledFuture<?> task) {

if (canRunInCurrentRunState(true)) {

// 重新将任务加入队列中

super.getQueue().add(task);

// 检查线程池状态

if (!canRunInCurrentRunState(true) && remove(task))

task.cancel(false);

// 确保至少一个线程在执行

else ensurePrestart();

}

}

4.scheduleAtFixedRate()

public ScheduledFuture<?> scheduleAtFixedRate(Runnable command,

long initialDelay, long period, TimeUnit unit) {

if (command == null || unit == null)

throw new NullPointerException();

if (period <= 0)

throw new IllegalArgumentException();

ScheduledFutureTask<Void> sft = new ScheduledFutureTask<Void>(command, null, triggerTime(initialDelay, unit),

unit.toNanos(period));

RunnableScheduledFuture<Void> t = decorateTask(command, sft);

sft.outerTask = t;

delayedExecute(t);

return t;

}

可以看到,它和scheduleWithFixedDelay()相似,不同点在于传递的period参数是unit.toNanos(period),而不是unit.toNanos(-delay),因为fixed-rate类型的period > 0.

所以,在执行setNextRunTime()方法时,会执行time += p,而不是time = triggerTime(-p).

总结

本文讲述了ScheduledThreadPoolExecutor的原理,内部使用DelayedWorkQueue存放任务,fixed-delay类型保证任务多次执行之间间隔固定时间,fixed-rate类型保证按照固定频率执行任务。

以上是 【Java】通俗易懂的JUC源码剖析-ScheduledThreadPoolExecutor 的全部内容, 来源链接: www.h5w3.com/115100.html

回到顶部