【Java】通俗易懂的JUC源码剖析-CyclicBarrier

通俗易懂的JUC源码剖析-CyclicBarrier

小强大人发布于 34 分钟前

前言

我们知道,CountDownLatch的计数器是一次性的,它不能重置。也就是说,当count值变为0时,再调用await()方法会立即返回,不会阻塞。
本文要说的CyclicBarrier就是一种可以重置计数器的线程同步工具类。CyclicBarrier字面意思是“回环屏障”,它可以让一组线程全部到达一个状态后再全部同时往下执行。之所以叫回环是因为当所有线程执行完毕,并重置CyclicBarrier的状态后它可以被重用。而之所以叫屏障是因为当某个线程调用await方法后就会被阻塞,这个阻塞点就称为屏障,等其他所有线程都调用了await方法后,这组线程就会一起冲破屏障,并往下执行。

使用场景

两个子任务分别执行自己的工作,等它们都执行完后,主任务汇总子任务的结果,并做一些处理,处理完成后两个子任务又继续做其他事情。示例代码:

import java.util.concurrent.BrokenBarrierException;

import java.util.concurrent.CyclicBarrier;

public class CyclicBarrierDemo {

private static CyclicBarrier cyclicBarrier = new CyclicBarrier(2, () -> {

try {

System.out.println("main task merge subtask result begin");

// simulate merge work

Thread.sleep(5000);

System.out.println("main task merge subtask result finished");

} catch (InterruptedException e) {

// ignore

}

});

public static void main(String[] args) {

Thread thread1 = new Thread(() -> {

try {

Thread.sleep(4000);

System.out.println("thread1 finished its work");

cyclicBarrier.await();

System.out.println("thread1 continue work");

} catch (InterruptedException | BrokenBarrierException e) {

// ignore

}

});

Thread thread2 = new Thread(() -> {

try {

Thread.sleep(5000);

System.out.println("thread2 finished its work");

cyclicBarrier.await();

System.out.println("thread2 continue work");

} catch (InterruptedException | BrokenBarrierException e) {

// ignore

}

});

thread1.start();

thread2.start();

}

}

输出结果:
【Java】通俗易懂的JUC源码剖析-CyclicBarrier

可以看到,线程1和线程2调用await()时,会被阻塞,等主线程任务完成后,线程1和线程2就会冲破屏障,继续往下执行。这里的主线程合并工作是可选的,也就是说可以直接new CyclicBarric(int parties),这种情况下就没有到达屏障后的合并工作,会直接在全部线程到达屏障后同时冲破屏障往下执行。可以比喻成举办同学聚会的场景。有20个人参加聚会,第1个人到达集合地点后要等其他人,第2个,第3个,...第19个人也需要等,当最后一个人到的时候,全部的20个人就可以出发去嗨皮了。

上面介绍的是“屏障”的应用场景,再来看个“回环”的应用场景。

假设一个任务由阶段1,阶段2,阶段3这三个阶段组成,每个线程都串行的依次执行阶段1,2,3。当多个线程执行任务时,必须保证等所有线程都执行完阶段1后,才能执行阶段2,同样地,也必须保证所有线程都执行完阶段2后,才能执行阶段3。示例代码:

import java.util.concurrent.BrokenBarrierException;

import java.util.concurrent.CyclicBarrier;

public class CyclicBarrierDemo2 {

private static CyclicBarrier cyclicBarrier = new CyclicBarrier(2);

public static void main(String[] args) {

Thread thread1 = new Thread(() -> {

try {

System.out.println("thread1 step 1");

cyclicBarrier.await();

System.out.println("thread1 step 2");

cyclicBarrier.await();

System.out.println("thread1 step 3");

} catch (InterruptedException | BrokenBarrierException e) {

// ignore

}

});

Thread thread2 = new Thread(() -> {

try {

System.out.println("thread2 step 1");

cyclicBarrier.await();

System.out.println("thread2 step 2");

cyclicBarrier.await();

System.out.println("thread2 step 3");

} catch (InterruptedException | BrokenBarrierException e) {

// ignore

}

});

thread1.start();

thread2.start();

}

}

输出结果如下:
【Java】通俗易懂的JUC源码剖析-CyclicBarrier
可以看到,实现了这种同阶段等待的效果。

实现原理

先来看看重要属性:

private static class Generation {

// 屏障是否被打破

boolean broken = false;

}

/** The lock for guarding barrier entry */

private final ReentrantLock lock = new ReentrantLock();

/** Condition to wait on until tripped */

private final Condition trip = lock.newCondition();

/** The number of parties */

private final int parties;

/* The command to run when tripped */

private final Runnable barrierCommand;

/** The current generation */

private Generation generation = new Generation();

/**

* Number of parties still waiting. Counts down from parties to 0 on each generation.

* It is reset to parties on each new generation or when broken.

*/

private int count;

可以看到,CyclicBarrier里用了独占锁ReentrantLock实现多线程间的计数器同步,parties表示当多少个线程到达屏障后,冲破屏障往下执行,而count表示当前还剩余多少个线程还未到达屏障,当所有线程都冲破屏障后,它又会在新一轮(new generation)被重置为parties的值。也就是说,count和Generation是用来实现重置效果的。

再看看构造方法的属性赋值:

public CyclicBarrier(int parties, Runnable barrierAction) {

if (parties <= 0) throw new IllegalArgumentException();

this.parties = parties;

this.count = parties;

this.barrierCommand = barrierAction;

}

再来看看关键方法:
await()

public int await() throws InterruptedException, BrokenBarrierException {

try {

// false表示不设置超时

return dowait(false, 0L);

} catch (TimeoutException toe) {

throw new Error(toe); // cannot happen

}

}

dowait()方法代码如下:

// timed:是否超时等待, nanos:超时时间

private int dowait(boolean timed, long nanos)

throws InterruptedException, BrokenBarrierException,

TimeoutException {

final ReentrantLock lock = this.lock;

lock.lock();

try {

final Generation g = generation;

if (g.broken)

throw new BrokenBarrierException();

if (Thread.interrupted()) {

breakBarrier();

throw new InterruptedException();

}

int index = --count;

// 如果index为0,表示所有线程都已到达了屏障,此时去执行初始化时设定的barrierCommand(如果有的话)

if (index == 0) { // tripped

boolean ranAction = false;

try {

final Runnable command = barrierCommand;

if (command != null)

command.run();

ranAction = true;

// 唤醒其他线程,并重置进行下一轮

nextGeneration();

// 返回

return 0;

} finally {

if (!ranAction)

breakBarrier();

}

}

// 否则需要等其他线程都达到屏障

// loop until tripped, broken, interrupted, or timed out

for (;;) {

try {

// 区分超时等待与不超时等待

if (!timed)

trip.await();

else if (nanos > 0L)

nanos = trip.awaitNanos(nanos);

} catch (InterruptedException ie) {

if (g == generation && ! g.broken) {

breakBarrier();

throw ie;

} else {

// We're about to finish waiting even if we had not

// been interrupted, so this interrupt is deemed to // "belong" to subsequent execution.

Thread.currentThread().interrupt();

}

}

if (g.broken)

throw new BrokenBarrierException();

// g != generation 说明被唤醒后已重置了轮次,说明所有线程均已到达线程屏障,可以返回了。

if (g != generation)

return index;

// 等待超时,抛出超时异常

if (timed && nanos <= 0L) {

breakBarrier();

throw new TimeoutException();

}

}

} finally {

lock.unlock();

}

}

其中,nextGeneration()方法如下:

private void nextGeneration() {

// signal completion of last generation

// 唤醒等待在trip条件(即屏障)上的其他所有线程

trip.signalAll();

// set up next generation

// 重置count的值为初始值parties

count = parties;

// 重置当前轮次

generation = new Generation();

}

参考资料:
《Java并发编程之美》

java

阅读 26发布于 34 分钟前

本作品系原创,采用《署名-非商业性使用-禁止演绎 4.0 国际》许可协议

avatar

小强大人

9 声望

1 粉丝

0 条评论

得票时间

avatar

小强大人

9 声望

1 粉丝

宣传栏

前言

我们知道,CountDownLatch的计数器是一次性的,它不能重置。也就是说,当count值变为0时,再调用await()方法会立即返回,不会阻塞。
本文要说的CyclicBarrier就是一种可以重置计数器的线程同步工具类。CyclicBarrier字面意思是“回环屏障”,它可以让一组线程全部到达一个状态后再全部同时往下执行。之所以叫回环是因为当所有线程执行完毕,并重置CyclicBarrier的状态后它可以被重用。而之所以叫屏障是因为当某个线程调用await方法后就会被阻塞,这个阻塞点就称为屏障,等其他所有线程都调用了await方法后,这组线程就会一起冲破屏障,并往下执行。

使用场景

两个子任务分别执行自己的工作,等它们都执行完后,主任务汇总子任务的结果,并做一些处理,处理完成后两个子任务又继续做其他事情。示例代码:

import java.util.concurrent.BrokenBarrierException;

import java.util.concurrent.CyclicBarrier;

public class CyclicBarrierDemo {

private static CyclicBarrier cyclicBarrier = new CyclicBarrier(2, () -> {

try {

System.out.println("main task merge subtask result begin");

// simulate merge work

Thread.sleep(5000);

System.out.println("main task merge subtask result finished");

} catch (InterruptedException e) {

// ignore

}

});

public static void main(String[] args) {

Thread thread1 = new Thread(() -> {

try {

Thread.sleep(4000);

System.out.println("thread1 finished its work");

cyclicBarrier.await();

System.out.println("thread1 continue work");

} catch (InterruptedException | BrokenBarrierException e) {

// ignore

}

});

Thread thread2 = new Thread(() -> {

try {

Thread.sleep(5000);

System.out.println("thread2 finished its work");

cyclicBarrier.await();

System.out.println("thread2 continue work");

} catch (InterruptedException | BrokenBarrierException e) {

// ignore

}

});

thread1.start();

thread2.start();

}

}

输出结果:
【Java】通俗易懂的JUC源码剖析-CyclicBarrier

可以看到,线程1和线程2调用await()时,会被阻塞,等主线程任务完成后,线程1和线程2就会冲破屏障,继续往下执行。这里的主线程合并工作是可选的,也就是说可以直接new CyclicBarric(int parties),这种情况下就没有到达屏障后的合并工作,会直接在全部线程到达屏障后同时冲破屏障往下执行。可以比喻成举办同学聚会的场景。有20个人参加聚会,第1个人到达集合地点后要等其他人,第2个,第3个,...第19个人也需要等,当最后一个人到的时候,全部的20个人就可以出发去嗨皮了。

上面介绍的是“屏障”的应用场景,再来看个“回环”的应用场景。

假设一个任务由阶段1,阶段2,阶段3这三个阶段组成,每个线程都串行的依次执行阶段1,2,3。当多个线程执行任务时,必须保证等所有线程都执行完阶段1后,才能执行阶段2,同样地,也必须保证所有线程都执行完阶段2后,才能执行阶段3。示例代码:

import java.util.concurrent.BrokenBarrierException;

import java.util.concurrent.CyclicBarrier;

public class CyclicBarrierDemo2 {

private static CyclicBarrier cyclicBarrier = new CyclicBarrier(2);

public static void main(String[] args) {

Thread thread1 = new Thread(() -> {

try {

System.out.println("thread1 step 1");

cyclicBarrier.await();

System.out.println("thread1 step 2");

cyclicBarrier.await();

System.out.println("thread1 step 3");

} catch (InterruptedException | BrokenBarrierException e) {

// ignore

}

});

Thread thread2 = new Thread(() -> {

try {

System.out.println("thread2 step 1");

cyclicBarrier.await();

System.out.println("thread2 step 2");

cyclicBarrier.await();

System.out.println("thread2 step 3");

} catch (InterruptedException | BrokenBarrierException e) {

// ignore

}

});

thread1.start();

thread2.start();

}

}

输出结果如下:
【Java】通俗易懂的JUC源码剖析-CyclicBarrier
可以看到,实现了这种同阶段等待的效果。

实现原理

先来看看重要属性:

private static class Generation {

// 屏障是否被打破

boolean broken = false;

}

/** The lock for guarding barrier entry */

private final ReentrantLock lock = new ReentrantLock();

/** Condition to wait on until tripped */

private final Condition trip = lock.newCondition();

/** The number of parties */

private final int parties;

/* The command to run when tripped */

private final Runnable barrierCommand;

/** The current generation */

private Generation generation = new Generation();

/**

* Number of parties still waiting. Counts down from parties to 0 on each generation.

* It is reset to parties on each new generation or when broken.

*/

private int count;

可以看到,CyclicBarrier里用了独占锁ReentrantLock实现多线程间的计数器同步,parties表示当多少个线程到达屏障后,冲破屏障往下执行,而count表示当前还剩余多少个线程还未到达屏障,当所有线程都冲破屏障后,它又会在新一轮(new generation)被重置为parties的值。也就是说,count和Generation是用来实现重置效果的。

再看看构造方法的属性赋值:

public CyclicBarrier(int parties, Runnable barrierAction) {

if (parties <= 0) throw new IllegalArgumentException();

this.parties = parties;

this.count = parties;

this.barrierCommand = barrierAction;

}

再来看看关键方法:
await()

public int await() throws InterruptedException, BrokenBarrierException {

try {

// false表示不设置超时

return dowait(false, 0L);

} catch (TimeoutException toe) {

throw new Error(toe); // cannot happen

}

}

dowait()方法代码如下:

// timed:是否超时等待, nanos:超时时间

private int dowait(boolean timed, long nanos)

throws InterruptedException, BrokenBarrierException,

TimeoutException {

final ReentrantLock lock = this.lock;

lock.lock();

try {

final Generation g = generation;

if (g.broken)

throw new BrokenBarrierException();

if (Thread.interrupted()) {

breakBarrier();

throw new InterruptedException();

}

int index = --count;

// 如果index为0,表示所有线程都已到达了屏障,此时去执行初始化时设定的barrierCommand(如果有的话)

if (index == 0) { // tripped

boolean ranAction = false;

try {

final Runnable command = barrierCommand;

if (command != null)

command.run();

ranAction = true;

// 唤醒其他线程,并重置进行下一轮

nextGeneration();

// 返回

return 0;

} finally {

if (!ranAction)

breakBarrier();

}

}

// 否则需要等其他线程都达到屏障

// loop until tripped, broken, interrupted, or timed out

for (;;) {

try {

// 区分超时等待与不超时等待

if (!timed)

trip.await();

else if (nanos > 0L)

nanos = trip.awaitNanos(nanos);

} catch (InterruptedException ie) {

if (g == generation && ! g.broken) {

breakBarrier();

throw ie;

} else {

// We're about to finish waiting even if we had not

// been interrupted, so this interrupt is deemed to // "belong" to subsequent execution.

Thread.currentThread().interrupt();

}

}

if (g.broken)

throw new BrokenBarrierException();

// g != generation 说明被唤醒后已重置了轮次,说明所有线程均已到达线程屏障,可以返回了。

if (g != generation)

return index;

// 等待超时,抛出超时异常

if (timed && nanos <= 0L) {

breakBarrier();

throw new TimeoutException();

}

}

} finally {

lock.unlock();

}

}

其中,nextGeneration()方法如下:

private void nextGeneration() {

// signal completion of last generation

// 唤醒等待在trip条件(即屏障)上的其他所有线程

trip.signalAll();

// set up next generation

// 重置count的值为初始值parties

count = parties;

// 重置当前轮次

generation = new Generation();

}

参考资料:
《Java并发编程之美》

以上是 【Java】通俗易懂的JUC源码剖析-CyclicBarrier 的全部内容, 来源链接: www.h5w3.com/115004.html

回到顶部