H5W3
当前位置:H5W3 > java > 正文

【Java】Java高并发BlockingQueue重要的实现类

Java高并发BlockingQueue重要的实现类

入门小站发布于 21 分钟前

ArrayBlockingQueue

public class ArrayBlockingQueue<E> extends AbstractQueue<E>
implements BlockingQueue<E>, java.io.Serializable {
/** 队列元素 */
final Object[] items;
/** 下一次读取操作的位置, poll, peek or remove */
int takeIndex;
/** 下一次写入操作的位置, offer, or add */
int putIndex;
/** 元素数量 */
int count;
/*
* Concurrency control uses the classic two-condition algorithm
* found in any textbook.
* 它采用一个 ReentrantLock 和相应的两个 Condition 来实现。
*/
/** Main lock guarding all access */
final ReentrantLock lock;
/** Condition for waiting takes */
private final Condition notEmpty;
/** Condition for waiting puts */
private final Condition notFull;
/** 指定大小 */
public ArrayBlockingQueue(int capacity) {
this(capacity, false);
}
/**
* 指定容量大小与指定访问策略
* @param fair 指定独占锁是公平锁还是非公平锁。非公平锁的吞吐量比较高,公平锁可以保证每次都是等待最久的线程获取到锁;
*/
public ArrayBlockingQueue(int capacity, boolean fair) {}
/**
* 指定容量大小、指定访问策略与最初包含给定集合中的元素
* @param c 将此集合中的元素在构造方法期间就先添加到队列中
*/
public ArrayBlockingQueue(int capacity, boolean fair,
Collection<? extends E> c) {}
}
  • ArrayBlockingQueue在生产者放入数据和消费者获取数据,都是共用一个锁对象,由此也意味着两者无法真正并行运行。按照实现原理来分析,ArrayBlockingQueue完全可以采用分离锁,从而实现生产者和消费者操作的完全并行运行。然而事实上并没有如此,因为ArrayBlockingQueue的数据写入已经足够轻巧,以至于引入独立的锁机制,除了给代码带来额外的复杂性外,其在性能上完全占不到任何便宜。
  • 通过构造函数得知,参数fair控制对象内部是否采用公平锁,默认采用非公平锁。
  • items、takeIndex、putIndex、count等属性并没有使用volatile修饰,这是因为访问这些变量(通过方法获取)使用都在锁内,并不存在可见性问题,如size()
  • 另外有个独占锁lock用来对出入对操作加锁,这导致同时只有一个线程可以访问入队出队。

Put源码分析

/** 进行入队操作 */
public void put(E e) throws InterruptedException {
//e为null,则抛出NullPointerException异常
checkNotNull(e);
//获取独占锁
final ReentrantLock lock = this.lock;
/**
* lockInterruptibly()
* 获取锁定,除非当前线程为interrupted
* 如果锁没有被另一个线程占用并且立即返回,则将锁定计数设置为1。
* 如果当前线程已经保存此锁,则保持计数将递增1,该方法立即返回。
* 如果锁被另一个线程保持,则当前线程将被禁用以进行线程调度,并且处于休眠状态
*
*/
lock.lockInterruptibly();
try {
//空队列
while (count == items.length)
//进行条件等待处理
notFull.await();
//入队操作
enqueue(e);
} finally {
//释放锁
lock.unlock();
}
}
/** 真正的入队 */
private void enqueue(E x) {
// assert lock.getHoldCount() == 1;
// assert items[putIndex] == null;
//获取当前元素
final Object[] items = this.items;
//按下一个插入索引进行元素添加
items[putIndex] = x;
// 计算下一个元素应该存放的下标,可以理解为循环队列
if (++putIndex == items.length)
putIndex = 0;
count++;
//唤起消费者
notEmpty.signal();
}

Take源码分析

public E take() throws InterruptedException {
final ReentrantLock lock = this.lock;
lock.lockInterruptibly();
try {
while (count == 0)
notEmpty.await();
return dequeue();
} finally {
lock.unlock();
}
}
private E dequeue() {
// assert lock.getHoldCount() == 1;
// assert items[takeIndex] != null;
final Object[] items = this.items;
@SuppressWarnings("unchecked")
E x = (E) items[takeIndex];
items[takeIndex] = null;
if (++takeIndex == items.length)
takeIndex = 0;
count--;
//这里有些特殊
if (itrs != null)
//保持队列中的元素和迭代器的元素一致
itrs.elementDequeued();
notFull.signal();
return x;
}
//该类的迭代器,所有的迭代器共享数据,队列改变会影响所有的迭代器
transient Itrs itrs = null; //其存放了目前所创建的所有迭代器。
/**
* 迭代器和它们的队列之间的共享数据,允许队列元素被删除时更新迭代器的修改。
*/
class Itrs {
void elementDequeued() {
// assert lock.getHoldCount() == 1;
if (count == 0)
//队列中数量为0的时候,队列就是空的,会将所有迭代器进行清理并移除
queueIsEmpty();
//takeIndex的下标是0,意味着队列从尾中取完了,又回到头部获取
else if (takeIndex == 0)
takeIndexWrapped();
}
/**
* 当队列为空的时候做的事情
* 1. 通知所有迭代器队列已经为空
* 2. 清空所有的弱引用,并且将迭代器置空
*/
void queueIsEmpty() {}
/**
* 将takeIndex包装成0
* 并且通知所有的迭代器,并且删除已经过期的任何对象(个人理解是置空对象)
* 也直接的说就是在Blocking队列进行出队的时候,进行迭代器中的数据同步,保持队列中的元素和迭代器的元素是一致的。
*/
void takeIndexWrapped() {}
}

Itrs迭代器创建的时机

//从这里知道,在ArrayBlockingQueue对象中调用此方法,才会生成这个对象
//那么就可以理解为,只要并未调用此方法,则ArrayBlockingQueue对象中的Itrs对象则为空
public Iterator<E> iterator() {
return new Itr();
}
private class Itr implements Iterator<E> {
Itr() {
//这里就是生产它的地方
//count等于0的时候,创建的这个迭代器是个无用的迭代器,可以直接移除,进入detach模式。
//否则就把当前队列的读取位置给迭代器当做下一个元素,cursor存储下个元素的位置。
if (count == 0) {
// assert itrs == null;
cursor = NONE;
nextIndex = NONE;
prevTakeIndex = DETACHED;
} else {
final int takeIndex = ArrayBlockingQueue.this.takeIndex;
prevTakeIndex = takeIndex;
nextItem = itemAt(nextIndex = takeIndex);
cursor = incCursor(takeIndex);
if (itrs == null) {
itrs = new Itrs(this);
} else {
itrs.register(this); // in this order
itrs.doSomeSweeping(false);
}
prevCycles = itrs.cycles;
// assert takeIndex >= 0;
// assert prevTakeIndex == takeIndex;
// assert nextIndex >= 0;
// assert nextItem != null;
}
}
}

代码演示

package com.rumenz.task;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
/**
* @className: BlockingQuqueExample
* @description: TODO 类描述
* @author: mac
* @date: 2021/1/20
**/
public class BlockingQueueExample {
private static volatile   Boolean flag=false;
public static void main(String[] args) {
BlockingQueue blockingQueue=new ArrayBlockingQueue(1024);
ExecutorService executorService = Executors.newFixedThreadPool(2);
executorService.execute(()->{
try{
blockingQueue.put(1);
Thread.sleep(2000);
blockingQueue.put(3);
flag=true;
}catch (Exception e){
e.printStackTrace();
}
});
executorService.execute(()->{
try {
while (!flag){
Integer i = (Integer) blockingQueue.take();
System.out.println(i);
}
}catch (Exception e){
e.printStackTrace();
}
});
executorService.shutdown();
}
}

LinkedBlockingQueue

public class LinkedBlockingQueue<E> extends AbstractQueue<E>
implements BlockingQueue<E>, java.io.Serializable {
//队列的容量,指定大小或为默认值Integer.MAX_VALUE
private final int capacity;
//元素的数量
private final AtomicInteger count = new AtomicInteger();
//队列头节点,始终满足head.item==null
transient Node<E> head;
//队列的尾节点,始终满足last.next==null
private transient Node<E> last;
/** Lock held by take, poll, etc */
//出队的锁:take, poll, peek 等读操作的方法需要获取到这个锁
private final ReentrantLock takeLock = new ReentrantLock();
/** Wait queue for waiting takes */
//当队列为空时,保存执行出队的线程:如果读操作的时候队列是空的,那么等待 notEmpty 条件
private final Condition notEmpty = takeLock.newCondition();
/** Lock held by put, offer, etc */
//入队的锁:put, offer 等写操作的方法需要获取到这个锁
private final ReentrantLock putLock = new ReentrantLock();
/** Wait queue for waiting puts */
//当队列满时,保存执行入队的线程:如果写操作的时候队列是满的,那么等待 notFull 条件
private final Condition notFull = putLock.newCondition();
//传说中的无界队列
public LinkedBlockingQueue() {}
//传说中的有界队列
public LinkedBlockingQueue(int capacity) {
if (capacity <= 0) throw new IllegalArgumentException();
this.capacity = capacity;
last = head = new Node<E>(null);
}
//传说中的无界队列
public LinkedBlockingQueue(Collection<? extends E> c){}
/**
* 链表节点类
*/
static class Node<E> {
E item;
/**
* One of:
* - 真正的继任者节点
* - 这个节点,意味着继任者是head.next
* - 空,意味着没有后继者(这是最后一个节点)
*/
Node<E> next;
Node(E x) { item = x; }
}
}
  • 如果需要获取(take)一个元素,需要获取takeLock锁,但是获取了锁还不够,如果队列此时为空,还需要队列不为空(notEmpty)这个条件(Condition)。
  • 如果要插入(put)一个元素,需要获取putLock锁,但是获取了锁还不够,如果队列此时已满,还是需要队列不满(notFull)的这个条件(Condition)。

Put源码分析

public class LinkedBlockingQueue<E> extends AbstractQueue<E>
implements BlockingQueue<E>, java.io.Serializable {
/**
* 将指定元素插入到此队列的尾部,如有必要,则等待空间变得可用。
*/
public void put(E e) throws InterruptedException {
if (e == null) throw new NullPointerException();
// 如果你纠结这里为什么是 -1,可以看看 offer 方法。这就是个标识成功、失败的标志而已。
int c = -1;
//包装成node节点
Node<E> node = new Node<E>(e);
final ReentrantLock putLock = this.putLock;
final AtomicInteger count = this.count;
//获取锁定
putLock.lockInterruptibly();
try {
/** 如果队列满,等待 notFull 的条件满足。 */
while (count.get() == capacity) {
notFull.await();
}
//入队
enqueue(node);
//原子性自增
c = count.getAndIncrement();
// 如果这个元素入队后,还有至少一个槽可以使用,调用 notFull.signal() 唤醒等待线程。
// 哪些线程会等待在 notFull 这个 Condition 上呢?
if (c + 1 < capacity)
notFull.signal();
} finally {
//解锁
putLock.unlock();
}
// 如果 c == 0,那么代表队列在这个元素入队前是空的(不包括head空节点),
// 那么所有的读线程都在等待 notEmpty 这个条件,等待唤醒,这里做一次唤醒操作
if (c == 0)
signalNotEmpty();
}
/** 链接节点在队列末尾 */
private void enqueue(Node<E> node) {
// assert putLock.isHeldByCurrentThread();
// assert last.next == null;
// 入队的代码非常简单,就是将 last 属性指向这个新元素,并且让原队尾的 next 指向这个元素
//last.next = node;
//last = node;
// 这里入队没有并发问题,因为只有获取到 putLock 独占锁以后,才可以进行此操作
last = last.next = node;
}
/**
* 等待PUT信号
* 仅在 take/poll 中调用
* 也就是说:元素入队后,如果需要,则会调用这个方法唤醒读线程来读
*/
private void signalNotFull() {
final ReentrantLock putLock = this.putLock;
putLock.lock();
try {
notFull.signal();//唤醒
} finally {
putLock.unlock();
}
}
}

Take源码分析

public class LinkedBlockingQueue<E> extends AbstractQueue<E>
implements BlockingQueue<E>, java.io.Serializable {
public E take() throws InterruptedException {
E x;
int c = -1;
final AtomicInteger count = this.count;
final ReentrantLock takeLock = this.takeLock;
//首先,需要获取到 takeLock 才能进行出队操作
takeLock.lockInterruptibly();
try {
// 如果队列为空,等待 notEmpty 这个条件满足再继续执行
while (count.get() == 0) {
notEmpty.await();
}
//// 出队
x = dequeue();
//count 进行原子减 1
c = count.getAndDecrement();
// 如果这次出队后,队列中至少还有一个元素,那么调用 notEmpty.signal() 唤醒其他的读线程
if (c > 1)
notEmpty.signal();
} finally {
takeLock.unlock();
}
if (c == capacity)
signalNotFull();
return x;
}
/**
* 出队
*/
private E dequeue() {
// assert takeLock.isHeldByCurrentThread();
// assert head.item == null;
Node<E> h = head;
Node<E> first = h.next;
h.next = h; // help GC
head = first;
E x = first.item;
first.item = null;
return x;
}
/**
* Signals a waiting put. Called only from take/poll.
*/
private void signalNotFull() {
final ReentrantLock putLock = this.putLock;
putLock.lock();
try {
notFull.signal();
} finally {
putLock.unlock();
}
}
}

ArrayBlockingQueue对比

LinkedBlockingQueue实现一个线程添加文件对象,四个线程读取文件对象

package concurrent;
import java.io.File;
import java.io.FileFilter;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.atomic.AtomicInteger;
public class TestBlockingQueue {
static long randomTime() {
return (long) (Math.random() * 1000);
}
public static void main(String[] args) {
// 能容纳100个文件
final BlockingQueue<File> queue = new LinkedBlockingQueue<File>(100);
// 线程池
final ExecutorService exec = Executors.newFixedThreadPool(5);
final File root = new File("F:\\JavaLib");
// 完成标志
final File exitFile = new File("");
// 读个数
final AtomicInteger rc = new AtomicInteger();
// 写个数
final AtomicInteger wc = new AtomicInteger();
// 读线程
Runnable read = new Runnable() {
public void run() {
scanFile(root);
scanFile(exitFile);
}
public void scanFile(File file) {
if (file.isDirectory()) {
File[] files = file.listFiles(new FileFilter() {
public boolean accept(File pathname) {
return pathname.isDirectory()
|| pathname.getPath().endsWith(".java");
}
});
for (File one : files)
scanFile(one);
} else {
try {
int index = rc.incrementAndGet();
System.out.println("Read0: " + index + " "
+ file.getPath());
queue.put(file);
} catch (InterruptedException e) {
}
}
}
};
exec.submit(read);
// 四个写线程
for (int index = 0; index < 4; index++) {
// write thread
final int NO = index;
Runnable write = new Runnable() {
String threadName = "Write" + NO;
public void run() {
while (true) {
try {
Thread.sleep(randomTime());
int index = wc.incrementAndGet();
File file = queue.take();
// 队列已经无对象
if (file == exitFile) {
// 再次添加"标志",以让其他线程正常退出
queue.put(exitFile);
break;
}
System.out.println(threadName + ": " + index + " "
+ file.getPath());
} catch (InterruptedException e) {
}
}
}
};
exec.submit(write);
}
exec.shutdown();
}
}

关注微信公众号:【入门小站】,解锁更多知识点。

【Java】Java高并发BlockingQueue重要的实现类

java多线程
阅读 10发布于 21 分钟前
本作品系原创,采用《署名-非商业性使用-禁止演绎 4.0 国际》许可协议
avatar

入门小站

rumenz.com

41 声望
2 粉丝

0 条评论
得票时间

avatar

入门小站

rumenz.com

41 声望
2 粉丝

宣传栏

ArrayBlockingQueue

public class ArrayBlockingQueue<E> extends AbstractQueue<E>
implements BlockingQueue<E>, java.io.Serializable {
/** 队列元素 */
final Object[] items;
/** 下一次读取操作的位置, poll, peek or remove */
int takeIndex;
/** 下一次写入操作的位置, offer, or add */
int putIndex;
/** 元素数量 */
int count;
/*
* Concurrency control uses the classic two-condition algorithm
* found in any textbook.
* 它采用一个 ReentrantLock 和相应的两个 Condition 来实现。
*/
/** Main lock guarding all access */
final ReentrantLock lock;
/** Condition for waiting takes */
private final Condition notEmpty;
/** Condition for waiting puts */
private final Condition notFull;
/** 指定大小 */
public ArrayBlockingQueue(int capacity) {
this(capacity, false);
}
/**
* 指定容量大小与指定访问策略
* @param fair 指定独占锁是公平锁还是非公平锁。非公平锁的吞吐量比较高,公平锁可以保证每次都是等待最久的线程获取到锁;
*/
public ArrayBlockingQueue(int capacity, boolean fair) {}
/**
* 指定容量大小、指定访问策略与最初包含给定集合中的元素
* @param c 将此集合中的元素在构造方法期间就先添加到队列中
*/
public ArrayBlockingQueue(int capacity, boolean fair,
Collection<? extends E> c) {}
}
  • ArrayBlockingQueue在生产者放入数据和消费者获取数据,都是共用一个锁对象,由此也意味着两者无法真正并行运行。按照实现原理来分析,ArrayBlockingQueue完全可以采用分离锁,从而实现生产者和消费者操作的完全并行运行。然而事实上并没有如此,因为ArrayBlockingQueue的数据写入已经足够轻巧,以至于引入独立的锁机制,除了给代码带来额外的复杂性外,其在性能上完全占不到任何便宜。
  • 通过构造函数得知,参数fair控制对象内部是否采用公平锁,默认采用非公平锁。
  • items、takeIndex、putIndex、count等属性并没有使用volatile修饰,这是因为访问这些变量(通过方法获取)使用都在锁内,并不存在可见性问题,如size()
  • 另外有个独占锁lock用来对出入对操作加锁,这导致同时只有一个线程可以访问入队出队。

Put源码分析

/** 进行入队操作 */
public void put(E e) throws InterruptedException {
//e为null,则抛出NullPointerException异常
checkNotNull(e);
//获取独占锁
final ReentrantLock lock = this.lock;
/**
* lockInterruptibly()
* 获取锁定,除非当前线程为interrupted
* 如果锁没有被另一个线程占用并且立即返回,则将锁定计数设置为1。
* 如果当前线程已经保存此锁,则保持计数将递增1,该方法立即返回。
* 如果锁被另一个线程保持,则当前线程将被禁用以进行线程调度,并且处于休眠状态
*
*/
lock.lockInterruptibly();
try {
//空队列
while (count == items.length)
//进行条件等待处理
notFull.await();
//入队操作
enqueue(e);
} finally {
//释放锁
lock.unlock();
}
}
/** 真正的入队 */
private void enqueue(E x) {
// assert lock.getHoldCount() == 1;
// assert items[putIndex] == null;
//获取当前元素
final Object[] items = this.items;
//按下一个插入索引进行元素添加
items[putIndex] = x;
// 计算下一个元素应该存放的下标,可以理解为循环队列
if (++putIndex == items.length)
putIndex = 0;
count++;
//唤起消费者
notEmpty.signal();
}

Take源码分析

public E take() throws InterruptedException {
final ReentrantLock lock = this.lock;
lock.lockInterruptibly();
try {
while (count == 0)
notEmpty.await();
return dequeue();
} finally {
lock.unlock();
}
}
private E dequeue() {
// assert lock.getHoldCount() == 1;
// assert items[takeIndex] != null;
final Object[] items = this.items;
@SuppressWarnings("unchecked")
E x = (E) items[takeIndex];
items[takeIndex] = null;
if (++takeIndex == items.length)
takeIndex = 0;
count--;
//这里有些特殊
if (itrs != null)
//保持队列中的元素和迭代器的元素一致
itrs.elementDequeued();
notFull.signal();
return x;
}
//该类的迭代器,所有的迭代器共享数据,队列改变会影响所有的迭代器
transient Itrs itrs = null; //其存放了目前所创建的所有迭代器。
/**
* 迭代器和它们的队列之间的共享数据,允许队列元素被删除时更新迭代器的修改。
*/
class Itrs {
void elementDequeued() {
// assert lock.getHoldCount() == 1;
if (count == 0)
//队列中数量为0的时候,队列就是空的,会将所有迭代器进行清理并移除
queueIsEmpty();
//takeIndex的下标是0,意味着队列从尾中取完了,又回到头部获取
else if (takeIndex == 0)
takeIndexWrapped();
}
/**
* 当队列为空的时候做的事情
* 1. 通知所有迭代器队列已经为空
* 2. 清空所有的弱引用,并且将迭代器置空
*/
void queueIsEmpty() {}
/**
* 将takeIndex包装成0
* 并且通知所有的迭代器,并且删除已经过期的任何对象(个人理解是置空对象)
* 也直接的说就是在Blocking队列进行出队的时候,进行迭代器中的数据同步,保持队列中的元素和迭代器的元素是一致的。
*/
void takeIndexWrapped() {}
}

Itrs迭代器创建的时机

//从这里知道,在ArrayBlockingQueue对象中调用此方法,才会生成这个对象
//那么就可以理解为,只要并未调用此方法,则ArrayBlockingQueue对象中的Itrs对象则为空
public Iterator<E> iterator() {
return new Itr();
}
private class Itr implements Iterator<E> {
Itr() {
//这里就是生产它的地方
//count等于0的时候,创建的这个迭代器是个无用的迭代器,可以直接移除,进入detach模式。
//否则就把当前队列的读取位置给迭代器当做下一个元素,cursor存储下个元素的位置。
if (count == 0) {
// assert itrs == null;
cursor = NONE;
nextIndex = NONE;
prevTakeIndex = DETACHED;
} else {
final int takeIndex = ArrayBlockingQueue.this.takeIndex;
prevTakeIndex = takeIndex;
nextItem = itemAt(nextIndex = takeIndex);
cursor = incCursor(takeIndex);
if (itrs == null) {
itrs = new Itrs(this);
} else {
itrs.register(this); // in this order
itrs.doSomeSweeping(false);
}
prevCycles = itrs.cycles;
// assert takeIndex >= 0;
// assert prevTakeIndex == takeIndex;
// assert nextIndex >= 0;
// assert nextItem != null;
}
}
}

代码演示

package com.rumenz.task;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
/**
* @className: BlockingQuqueExample
* @description: TODO 类描述
* @author: mac
* @date: 2021/1/20
**/
public class BlockingQueueExample {
private static volatile   Boolean flag=false;
public static void main(String[] args) {
BlockingQueue blockingQueue=new ArrayBlockingQueue(1024);
ExecutorService executorService = Executors.newFixedThreadPool(2);
executorService.execute(()->{
try{
blockingQueue.put(1);
Thread.sleep(2000);
blockingQueue.put(3);
flag=true;
}catch (Exception e){
e.printStackTrace();
}
});
executorService.execute(()->{
try {
while (!flag){
Integer i = (Integer) blockingQueue.take();
System.out.println(i);
}
}catch (Exception e){
e.printStackTrace();
}
});
executorService.shutdown();
}
}

LinkedBlockingQueue

public class LinkedBlockingQueue<E> extends AbstractQueue<E>
implements BlockingQueue<E>, java.io.Serializable {
//队列的容量,指定大小或为默认值Integer.MAX_VALUE
private final int capacity;
//元素的数量
private final AtomicInteger count = new AtomicInteger();
//队列头节点,始终满足head.item==null
transient Node<E> head;
//队列的尾节点,始终满足last.next==null
private transient Node<E> last;
/** Lock held by take, poll, etc */
//出队的锁:take, poll, peek 等读操作的方法需要获取到这个锁
private final ReentrantLock takeLock = new ReentrantLock();
/** Wait queue for waiting takes */
//当队列为空时,保存执行出队的线程:如果读操作的时候队列是空的,那么等待 notEmpty 条件
private final Condition notEmpty = takeLock.newCondition();
/** Lock held by put, offer, etc */
//入队的锁:put, offer 等写操作的方法需要获取到这个锁
private final ReentrantLock putLock = new ReentrantLock();
/** Wait queue for waiting puts */
//当队列满时,保存执行入队的线程:如果写操作的时候队列是满的,那么等待 notFull 条件
private final Condition notFull = putLock.newCondition();
//传说中的无界队列
public LinkedBlockingQueue() {}
//传说中的有界队列
public LinkedBlockingQueue(int capacity) {
if (capacity <= 0) throw new IllegalArgumentException();
this.capacity = capacity;
last = head = new Node<E>(null);
}
//传说中的无界队列
public LinkedBlockingQueue(Collection<? extends E> c){}
/**
* 链表节点类
*/
static class Node<E> {
E item;
/**
* One of:
* - 真正的继任者节点
* - 这个节点,意味着继任者是head.next
* - 空,意味着没有后继者(这是最后一个节点)
*/
Node<E> next;
Node(E x) { item = x; }
}
}
  • 如果需要获取(take)一个元素,需要获取takeLock锁,但是获取了锁还不够,如果队列此时为空,还需要队列不为空(notEmpty)这个条件(Condition)。
  • 如果要插入(put)一个元素,需要获取putLock锁,但是获取了锁还不够,如果队列此时已满,还是需要队列不满(notFull)的这个条件(Condition)。

Put源码分析

public class LinkedBlockingQueue<E> extends AbstractQueue<E>
implements BlockingQueue<E>, java.io.Serializable {
/**
* 将指定元素插入到此队列的尾部,如有必要,则等待空间变得可用。
*/
public void put(E e) throws InterruptedException {
if (e == null) throw new NullPointerException();
// 如果你纠结这里为什么是 -1,可以看看 offer 方法。这就是个标识成功、失败的标志而已。
int c = -1;
//包装成node节点
Node<E> node = new Node<E>(e);
final ReentrantLock putLock = this.putLock;
final AtomicInteger count = this.count;
//获取锁定
putLock.lockInterruptibly();
try {
/** 如果队列满,等待 notFull 的条件满足。 */
while (count.get() == capacity) {
notFull.await();
}
//入队
enqueue(node);
//原子性自增
c = count.getAndIncrement();
// 如果这个元素入队后,还有至少一个槽可以使用,调用 notFull.signal() 唤醒等待线程。
// 哪些线程会等待在 notFull 这个 Condition 上呢?
if (c + 1 < capacity)
notFull.signal();
} finally {
//解锁
putLock.unlock();
}
// 如果 c == 0,那么代表队列在这个元素入队前是空的(不包括head空节点),
// 那么所有的读线程都在等待 notEmpty 这个条件,等待唤醒,这里做一次唤醒操作
if (c == 0)
signalNotEmpty();
}
/** 链接节点在队列末尾 */
private void enqueue(Node<E> node) {
// assert putLock.isHeldByCurrentThread();
// assert last.next == null;
// 入队的代码非常简单,就是将 last 属性指向这个新元素,并且让原队尾的 next 指向这个元素
//last.next = node;
//last = node;
// 这里入队没有并发问题,因为只有获取到 putLock 独占锁以后,才可以进行此操作
last = last.next = node;
}
/**
* 等待PUT信号
* 仅在 take/poll 中调用
* 也就是说:元素入队后,如果需要,则会调用这个方法唤醒读线程来读
*/
private void signalNotFull() {
final ReentrantLock putLock = this.putLock;
putLock.lock();
try {
notFull.signal();//唤醒
} finally {
putLock.unlock();
}
}
}

Take源码分析

public class LinkedBlockingQueue<E> extends AbstractQueue<E>
implements BlockingQueue<E>, java.io.Serializable {
public E take() throws InterruptedException {
E x;
int c = -1;
final AtomicInteger count = this.count;
final ReentrantLock takeLock = this.takeLock;
//首先,需要获取到 takeLock 才能进行出队操作
takeLock.lockInterruptibly();
try {
// 如果队列为空,等待 notEmpty 这个条件满足再继续执行
while (count.get() == 0) {
notEmpty.await();
}
//// 出队
x = dequeue();
//count 进行原子减 1
c = count.getAndDecrement();
// 如果这次出队后,队列中至少还有一个元素,那么调用 notEmpty.signal() 唤醒其他的读线程
if (c > 1)
notEmpty.signal();
} finally {
takeLock.unlock();
}
if (c == capacity)
signalNotFull();
return x;
}
/**
* 出队
*/
private E dequeue() {
// assert takeLock.isHeldByCurrentThread();
// assert head.item == null;
Node<E> h = head;
Node<E> first = h.next;
h.next = h; // help GC
head = first;
E x = first.item;
first.item = null;
return x;
}
/**
* Signals a waiting put. Called only from take/poll.
*/
private void signalNotFull() {
final ReentrantLock putLock = this.putLock;
putLock.lock();
try {
notFull.signal();
} finally {
putLock.unlock();
}
}
}

ArrayBlockingQueue对比

LinkedBlockingQueue实现一个线程添加文件对象,四个线程读取文件对象

package concurrent;
import java.io.File;
import java.io.FileFilter;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.atomic.AtomicInteger;
public class TestBlockingQueue {
static long randomTime() {
return (long) (Math.random() * 1000);
}
public static void main(String[] args) {
// 能容纳100个文件
final BlockingQueue<File> queue = new LinkedBlockingQueue<File>(100);
// 线程池
final ExecutorService exec = Executors.newFixedThreadPool(5);
final File root = new File("F:\\JavaLib");
// 完成标志
final File exitFile = new File("");
// 读个数
final AtomicInteger rc = new AtomicInteger();
// 写个数
final AtomicInteger wc = new AtomicInteger();
// 读线程
Runnable read = new Runnable() {
public void run() {
scanFile(root);
scanFile(exitFile);
}
public void scanFile(File file) {
if (file.isDirectory()) {
File[] files = file.listFiles(new FileFilter() {
public boolean accept(File pathname) {
return pathname.isDirectory()
|| pathname.getPath().endsWith(".java");
}
});
for (File one : files)
scanFile(one);
} else {
try {
int index = rc.incrementAndGet();
System.out.println("Read0: " + index + " "
+ file.getPath());
queue.put(file);
} catch (InterruptedException e) {
}
}
}
};
exec.submit(read);
// 四个写线程
for (int index = 0; index < 4; index++) {
// write thread
final int NO = index;
Runnable write = new Runnable() {
String threadName = "Write" + NO;
public void run() {
while (true) {
try {
Thread.sleep(randomTime());
int index = wc.incrementAndGet();
File file = queue.take();
// 队列已经无对象
if (file == exitFile) {
// 再次添加"标志",以让其他线程正常退出
queue.put(exitFile);
break;
}
System.out.println(threadName + ": " + index + " "
+ file.getPath());
} catch (InterruptedException e) {
}
}
}
};
exec.submit(write);
}
exec.shutdown();
}
}

关注微信公众号:【入门小站】,解锁更多知识点。

【Java】Java高并发BlockingQueue重要的实现类

本文地址:H5W3 » 【Java】Java高并发BlockingQueue重要的实现类

评论 0

  • 昵称 (必填)
  • 邮箱 (必填)
  • 网址