通俗易懂的JUC源码剖析-LinkedBlockingQueue
前言
LinkedBlockingQueue实现了BlockingQueue,它是阻塞队列的一种,可用于线程池中。不同于ConcurrentLinkedQueue的CAS非阻塞算法,它底层是用锁实现的阻塞队列。
实现原理
先来看关键属性:
// 队列容量,最大为Integer.MAX_VALUE
private final int capacity;
// 队列长度
private final AtomicInteger count = new AtomicInteger();
// 头结点
transient Node<E> head;
// 尾结点
private transient Node<E> last;
// 移除操作的锁,take/poll方法用到
private final ReentrantLock takeLock = new ReentrantLock();
// 移除操作需要等待的条件notEmpty,与takeLock绑定
private final Condition notEmpty = takeLock.newCondition();
// 入队操作的锁,put/offer方法用到
private final ReentrantLock putLock = new ReentrantLock();
// 入队操作需要等待的条件notFull,与putLock绑定
private final Condition notFull = putLock.newCondition();
可以看到,LinkedBlockingQueue内部是用单向链表实现的,并且它有两把锁:takeLock和putLock,以及对应的两个等待条件:notEmpty和notFull。takeLock控制同一时刻只有一个线程从队列头部获取/移除元素,putLock控制同一时刻只有一个线程在队列尾部添加元素。
再来看关键方法:
1.无参构造函数
public LinkedBlockingQueue() {
// 调用有参构造函数,初始化容量capacity为int最大值
this(Integer.MAX_VALUE);
}
2.有参构造函数
public LinkedBlockingQueue(int capacity) {
// 容量不能小于0,注意也不能等于0,这点与常规的集合不同
if (capacity <= 0) throw new IllegalArgumentException();
this.capacity = capacity;
// 初始化头结点和尾结点为哑节点
last = head = new Node<E>(null);
}
3.put()操作
public void put(E e) throws InterruptedException {
if (e == null) throw new NullPointerException();
int c = -1;
Node<E> node = new Node<E>(e);
final ReentrantLock putLock = this.putLock;
final AtomicInteger count = this.count;
putLock.lockInterruptibly();
try {
while (count.get() == capacity) {
notFull.await();
}
enqueue(node);
c = count.getAndIncrement();
if (c + 1 < capacity)
notFull.signal();
} finally {
putLock.unlock();
}
if (c == 0)
signalNotEmpty();
}
4.put()操作
public boolean offer(E e) {
if (e == null) throw new NullPointerException();
final AtomicInteger count = this.count;
if (count.get() == capacity)
return false;
int c = -1;
Node<E> node = new Node<E>(e);
final ReentrantLock putLock = this.putLock;
putLock.lock();
try {
if (count.get() < capacity) {
enqueue(node);
c = count.getAndIncrement();
if (c + 1 < capacity)
notFull.signal();
}
} finally {
putLock.unlock();
}
if (c == 0)
signalNotEmpty();
return c >= 0;
}
5.take()操作
public E take() throws InterruptedException {
E x;
int c = -1;
final AtomicInteger count = this.count;
final ReentrantLock takeLock = this.takeLock;
takeLock.lockInterruptibly();
try {
while (count.get() == 0) {
notEmpty.await();
}
x = dequeue();
c = count.getAndDecrement();
if (c > 1)
notEmpty.signal();
} finally {
takeLock.unlock();
}
if (c == capacity)
signalNotFull();
return x;
}
6.poll()操作
public E poll() {
final AtomicInteger count = this.count;
if (count.get() == 0)
return null;
E x = null;
int c = -1;
final ReentrantLock takeLock = this.takeLock;
takeLock.lock();
try {
if (count.get() > 0) {
x = dequeue();
c = count.getAndDecrement();
if (c > 1)
notEmpty.signal();
}
} finally {
takeLock.unlock();
}
if (c == capacity)
signalNotFull();
return x;
}
明天更新,先睡啦,晚安全世界!
java
阅读 41发布于 今天 15:05
本作品系原创,采用《署名-非商业性使用-禁止演绎 4.0 国际》许可协议
小强大人
9 声望
1 粉丝
0 条评论
得票时间
小强大人
9 声望
1 粉丝
宣传栏
目录
前言
LinkedBlockingQueue实现了BlockingQueue,它是阻塞队列的一种,可用于线程池中。不同于ConcurrentLinkedQueue的CAS非阻塞算法,它底层是用锁实现的阻塞队列。
实现原理
先来看关键属性:
// 队列容量,最大为Integer.MAX_VALUE
private final int capacity;
// 队列长度
private final AtomicInteger count = new AtomicInteger();
// 头结点
transient Node<E> head;
// 尾结点
private transient Node<E> last;
// 移除操作的锁,take/poll方法用到
private final ReentrantLock takeLock = new ReentrantLock();
// 移除操作需要等待的条件notEmpty,与takeLock绑定
private final Condition notEmpty = takeLock.newCondition();
// 入队操作的锁,put/offer方法用到
private final ReentrantLock putLock = new ReentrantLock();
// 入队操作需要等待的条件notFull,与putLock绑定
private final Condition notFull = putLock.newCondition();
可以看到,LinkedBlockingQueue内部是用单向链表实现的,并且它有两把锁:takeLock和putLock,以及对应的两个等待条件:notEmpty和notFull。takeLock控制同一时刻只有一个线程从队列头部获取/移除元素,putLock控制同一时刻只有一个线程在队列尾部添加元素。
再来看关键方法:
1.无参构造函数
public LinkedBlockingQueue() {
// 调用有参构造函数,初始化容量capacity为int最大值
this(Integer.MAX_VALUE);
}
2.有参构造函数
public LinkedBlockingQueue(int capacity) {
// 容量不能小于0,注意也不能等于0,这点与常规的集合不同
if (capacity <= 0) throw new IllegalArgumentException();
this.capacity = capacity;
// 初始化头结点和尾结点为哑节点
last = head = new Node<E>(null);
}
3.put()操作
public void put(E e) throws InterruptedException {
if (e == null) throw new NullPointerException();
int c = -1;
Node<E> node = new Node<E>(e);
final ReentrantLock putLock = this.putLock;
final AtomicInteger count = this.count;
putLock.lockInterruptibly();
try {
while (count.get() == capacity) {
notFull.await();
}
enqueue(node);
c = count.getAndIncrement();
if (c + 1 < capacity)
notFull.signal();
} finally {
putLock.unlock();
}
if (c == 0)
signalNotEmpty();
}
4.put()操作
public boolean offer(E e) {
if (e == null) throw new NullPointerException();
final AtomicInteger count = this.count;
if (count.get() == capacity)
return false;
int c = -1;
Node<E> node = new Node<E>(e);
final ReentrantLock putLock = this.putLock;
putLock.lock();
try {
if (count.get() < capacity) {
enqueue(node);
c = count.getAndIncrement();
if (c + 1 < capacity)
notFull.signal();
}
} finally {
putLock.unlock();
}
if (c == 0)
signalNotEmpty();
return c >= 0;
}
5.take()操作
public E take() throws InterruptedException {
E x;
int c = -1;
final AtomicInteger count = this.count;
final ReentrantLock takeLock = this.takeLock;
takeLock.lockInterruptibly();
try {
while (count.get() == 0) {
notEmpty.await();
}
x = dequeue();
c = count.getAndDecrement();
if (c > 1)
notEmpty.signal();
} finally {
takeLock.unlock();
}
if (c == capacity)
signalNotFull();
return x;
}
6.poll()操作
public E poll() {
final AtomicInteger count = this.count;
if (count.get() == 0)
return null;
E x = null;
int c = -1;
final ReentrantLock takeLock = this.takeLock;
takeLock.lock();
try {
if (count.get() > 0) {
x = dequeue();
c = count.getAndDecrement();
if (c > 1)
notEmpty.signal();
}
} finally {
takeLock.unlock();
}
if (c == capacity)
signalNotFull();
return x;
}
明天更新,先睡啦,晚安全世界!