【Java】通俗易懂的JUC源码剖析-ThreadPoolExecutor

通俗易懂的JUC源码剖析-ThreadPoolExecutor

小强大人发布于 今天 14:54

前言

ThreadPoolExecutor相信大家都很熟悉:线程池的实现类。今天我们就来看看它内部是怎么实现的。

实现原理

先来看看它的类结构:

public class ThreadPoolExecutor extends AbstractExecutorService {

}

public abstract class AbstractExecutorService implements ExecutorService {

}

public interface ExecutorService extends Executor {

void shutdown();

<T> Future<T> submit(Callable<T> task);

// ...

}

public interface Executor {

void execute(Runnable command);

}

再来看看它的关键属性:

// ctl高3位表示线程池的运行状态,低29位表示线程个数

private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));

// 线程个数掩码,Integer位数-3,与具体平台Integer位数有关,大部分是32-3=29

private static final int COUNT_BITS = Integer.SIZE - 3;

// 线程最大个数

private static final int CAPACITY = (1 << COUNT_BITS) - 1;

// runState is stored in the high-order bits

// 线程池状态

private static final int RUNNING = -1 << COUNT_BITS;

private static final int SHUTDOWN = 0 << COUNT_BITS;

private static final int STOP = 1 << COUNT_BITS;

private static final int TIDYING = 2 << COUNT_BITS;

private static final int TERMINATED = 3 << COUNT_BITS;

// 互斥锁

private final ReentrantLock mainLock = new ReentrantLock();

// 工作线程集合

private final HashSet<Worker> workers = new HashSet<Worker>();

// 线程池终止条件

private final Condition termination = mainLock.newCondition();

// 线程池核心参数

// 阻塞队列

private final BlockingQueue<Runnable> workQueue;

// 线程工厂

private volatile ThreadFactory threadFactory;

// 拒绝策略

private volatile RejectedExecutionHandler handler;

// 线程闲置时长

private volatile long keepAliveTime;

// 核心线程数

private volatile int corePoolSize;

// 最大线程数

private volatile int maximumPoolSize;

// 默认拒绝策略:AbortPolicy抛出异常

private static final RejectedExecutionHandler defaultHandler =

new AbortPolicy();

其中Worker是它的内部类,代表工作线程。

private final class Worker

extends AbstractQueuedSynchronizer

implements Runnable {

}

它继承了AQS,实现了Runnable接口

再来看关键方法:
execute()

public void execute(Runnable command) {

if (command == null)

throw new NullPointerException();

int c = ctl.get();

if (workerCountOf(c) < corePoolSize) {

if (addWorker(command, true))

return;

c = ctl.get();

}

if (isRunning(c) && workQueue.offer(command)) {

int recheck = ctl.get();

if (!isRunning(recheck) && remove(command))

reject(command);

else if (workerCountOf(recheck) == 0)

addWorker(null, false);

}

else if (!addWorker(command, false))

reject(command);

}

其中addWorker()方法如下:

private boolean addWorker(Runnable firstTask, boolean core) {

retry:

for (;;) {

int c = ctl.get();

int rs = runStateOf(c);

// Check if queue empty only if necessary.

if (rs >= SHUTDOWN &&

! (rs == SHUTDOWN &&

firstTask == null &&

!workQueue.isEmpty()))

return false;

for (;;) {

int wc = workerCountOf(c);

if (wc >= CAPACITY ||

wc >= (core ? corePoolSize : maximumPoolSize))

return false;

if (compareAndIncrementWorkerCount(c))

break retry;

c = ctl.get(); // Re-read ctl

if (runStateOf(c) != rs)

continue retry;

// else CAS failed due to workerCount change; retry inner loop

}

}

boolean workerStarted = false;

boolean workerAdded = false;

Worker w = null;

try {

w = new Worker(firstTask);

final Thread t = w.thread;

if (t != null) {

final ReentrantLock mainLock = this.mainLock;

mainLock.lock();

try {

// Recheck while holding lock.

// Back out on ThreadFactory failure or if // shut down before lock acquired. int rs = runStateOf(ctl.get());

if (rs < SHUTDOWN ||

(rs == SHUTDOWN && firstTask == null)) {

if (t.isAlive()) // precheck that t is startable

throw new IllegalThreadStateException();

workers.add(w);

int s = workers.size();

if (s > largestPoolSize)

largestPoolSize = s;

workerAdded = true;

}

} finally {

mainLock.unlock();

}

if (workerAdded) {

t.start();

workerStarted = true;

}

}

} finally {

if (! workerStarted)

addWorkerFailed(w);

}

return workerStarted;

}

我滴个乖乖,addWorker()这么复杂,明天再来吧。
晚安全世界!

java

阅读 36发布于 今天 14:54

本作品系原创,采用《署名-非商业性使用-禁止演绎 4.0 国际》许可协议

avatar

小强大人

9 声望

1 粉丝

0 条评论

得票时间

avatar

小强大人

9 声望

1 粉丝

宣传栏

前言

ThreadPoolExecutor相信大家都很熟悉:线程池的实现类。今天我们就来看看它内部是怎么实现的。

实现原理

先来看看它的类结构:

public class ThreadPoolExecutor extends AbstractExecutorService {

}

public abstract class AbstractExecutorService implements ExecutorService {

}

public interface ExecutorService extends Executor {

void shutdown();

<T> Future<T> submit(Callable<T> task);

// ...

}

public interface Executor {

void execute(Runnable command);

}

再来看看它的关键属性:

// ctl高3位表示线程池的运行状态,低29位表示线程个数

private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));

// 线程个数掩码,Integer位数-3,与具体平台Integer位数有关,大部分是32-3=29

private static final int COUNT_BITS = Integer.SIZE - 3;

// 线程最大个数

private static final int CAPACITY = (1 << COUNT_BITS) - 1;

// runState is stored in the high-order bits

// 线程池状态

private static final int RUNNING = -1 << COUNT_BITS;

private static final int SHUTDOWN = 0 << COUNT_BITS;

private static final int STOP = 1 << COUNT_BITS;

private static final int TIDYING = 2 << COUNT_BITS;

private static final int TERMINATED = 3 << COUNT_BITS;

// 互斥锁

private final ReentrantLock mainLock = new ReentrantLock();

// 工作线程集合

private final HashSet<Worker> workers = new HashSet<Worker>();

// 线程池终止条件

private final Condition termination = mainLock.newCondition();

// 线程池核心参数

// 阻塞队列

private final BlockingQueue<Runnable> workQueue;

// 线程工厂

private volatile ThreadFactory threadFactory;

// 拒绝策略

private volatile RejectedExecutionHandler handler;

// 线程闲置时长

private volatile long keepAliveTime;

// 核心线程数

private volatile int corePoolSize;

// 最大线程数

private volatile int maximumPoolSize;

// 默认拒绝策略:AbortPolicy抛出异常

private static final RejectedExecutionHandler defaultHandler =

new AbortPolicy();

其中Worker是它的内部类,代表工作线程。

private final class Worker

extends AbstractQueuedSynchronizer

implements Runnable {

}

它继承了AQS,实现了Runnable接口

再来看关键方法:
execute()

public void execute(Runnable command) {

if (command == null)

throw new NullPointerException();

int c = ctl.get();

if (workerCountOf(c) < corePoolSize) {

if (addWorker(command, true))

return;

c = ctl.get();

}

if (isRunning(c) && workQueue.offer(command)) {

int recheck = ctl.get();

if (!isRunning(recheck) && remove(command))

reject(command);

else if (workerCountOf(recheck) == 0)

addWorker(null, false);

}

else if (!addWorker(command, false))

reject(command);

}

其中addWorker()方法如下:

private boolean addWorker(Runnable firstTask, boolean core) {

retry:

for (;;) {

int c = ctl.get();

int rs = runStateOf(c);

// Check if queue empty only if necessary.

if (rs >= SHUTDOWN &&

! (rs == SHUTDOWN &&

firstTask == null &&

!workQueue.isEmpty()))

return false;

for (;;) {

int wc = workerCountOf(c);

if (wc >= CAPACITY ||

wc >= (core ? corePoolSize : maximumPoolSize))

return false;

if (compareAndIncrementWorkerCount(c))

break retry;

c = ctl.get(); // Re-read ctl

if (runStateOf(c) != rs)

continue retry;

// else CAS failed due to workerCount change; retry inner loop

}

}

boolean workerStarted = false;

boolean workerAdded = false;

Worker w = null;

try {

w = new Worker(firstTask);

final Thread t = w.thread;

if (t != null) {

final ReentrantLock mainLock = this.mainLock;

mainLock.lock();

try {

// Recheck while holding lock.

// Back out on ThreadFactory failure or if // shut down before lock acquired. int rs = runStateOf(ctl.get());

if (rs < SHUTDOWN ||

(rs == SHUTDOWN && firstTask == null)) {

if (t.isAlive()) // precheck that t is startable

throw new IllegalThreadStateException();

workers.add(w);

int s = workers.size();

if (s > largestPoolSize)

largestPoolSize = s;

workerAdded = true;

}

} finally {

mainLock.unlock();

}

if (workerAdded) {

t.start();

workerStarted = true;

}

}

} finally {

if (! workerStarted)

addWorkerFailed(w);

}

return workerStarted;

}

我滴个乖乖,addWorker()这么复杂,明天再来吧。
晚安全世界!

以上是 【Java】通俗易懂的JUC源码剖析-ThreadPoolExecutor 的全部内容, 来源链接: www.h5w3.com/114254.html

回到顶部