【Java】通俗易懂的JUC源码剖析-ArrayBlockingQueue

通俗易懂的JUC源码剖析-ArrayBlockingQueue

小强大人发布于 今天 14:06

前言

ArrayBlockingQueue也是BlockingQueue接口的实现类,从它的命名就能猜出来,它底层是用数组实现的,不同于LinkedBlockingQueue的链表结构。

实现原理

首先来看它的关键属性:

// 存放元素的数组

final Object[] items;

// 记录下次take操作的数组位置

int takeIndex;

// 记录下次put操作的数组位置

int putIndex;

// 数组长度

int count;

// 操作数组的锁

final ReentrantLock lock;

// 数组非空条件

private final Condition notEmpty;

// 数组未满条件

private final Condition notFull;

再来看它的重要方法:
offer()

public boolean offer(E e) {

checkNotNull(e);

final ReentrantLock lock = this.lock;

lock.lock();

try {

// 队列满了,则返回false

if (count == items.length)

return false;

else {

// 队列没满,则入队

enqueue(e);

return true;

}

} finally {

lock.unlock();

}

}

put()

public void put(E e) throws InterruptedException {

checkNotNull(e);

final ReentrantLock lock = this.lock;

lock.lockInterruptibly();

try {

// 数组满了则阻塞等待,用while不用if,是为了避免虚假唤醒

while (count == items.length)

notFull.await();

// 队列未满时入队

enqueue(e);

} finally {

lock.unlock();

}

}

poll()

public E poll() {

final ReentrantLock lock = this.lock;

lock.lock();

try {

// 数组为空返回null,否则返回移除的元素

return (count == 0) ? null : dequeue();

} finally {

lock.unlock();

}

}

take()

public E take() throws InterruptedException {

final ReentrantLock lock = this.lock;

lock.lockInterruptibly();

try {

// 数组为空则阻塞等待

while (count == 0)

notEmpty.await();

// 数组不为空时返回移除的元素

return dequeue();

} finally {

lock.unlock();

}

}

与LinkedBlockingQueue类似,take()、put()是阻塞的,poll()、offer()是非阻塞的。
其中,enqueue()、dequeue()方法如下:

private void enqueue(E x) {

// assert lock.getHoldCount() == 1;

// assert items[putIndex] == null;

final Object[] items = this.items;

items[putIndex] = x;

if (++putIndex == items.length)

putIndex = 0;

count++;

notEmpty.signal();

}

private E dequeue() {

// assert lock.getHoldCount() == 1;

// assert items[takeIndex] != null;

final Object[] items = this.items;

@SuppressWarnings("unchecked")

E x = (E) items[takeIndex];

// 移除元素时,要手动将takeIndex位置处的元素置为null,让它被垃圾回收掉,否则会导致内存泄漏

items[takeIndex] = null; // help GC

if (++takeIndex == items.length)

takeIndex = 0;

count--;

if (itrs != null)

itrs.elementDequeued();

notFull.signal();

return x;

}

由于数组的特性,ArrayBlockingQueue并没有像LinkedBlockingQueue那样使用2把锁,而是使用takeIndex和putIndex记录下一次入队/出队操作的数组位置。并且takeIndex,putIndex,count三个变量都没有用volatile修饰或者Atomic原子类,因为他们的修改都在加锁后的环境中进行的,已经保证了线程安全。

参考资料:
《Java并发编程之美》

java

阅读 33发布于 今天 14:06

本作品系原创,采用《署名-非商业性使用-禁止演绎 4.0 国际》许可协议

avatar

小强大人

9 声望

1 粉丝

0 条评论

得票时间

avatar

小强大人

9 声望

1 粉丝

宣传栏

前言

ArrayBlockingQueue也是BlockingQueue接口的实现类,从它的命名就能猜出来,它底层是用数组实现的,不同于LinkedBlockingQueue的链表结构。

实现原理

首先来看它的关键属性:

// 存放元素的数组

final Object[] items;

// 记录下次take操作的数组位置

int takeIndex;

// 记录下次put操作的数组位置

int putIndex;

// 数组长度

int count;

// 操作数组的锁

final ReentrantLock lock;

// 数组非空条件

private final Condition notEmpty;

// 数组未满条件

private final Condition notFull;

再来看它的重要方法:
offer()

public boolean offer(E e) {

checkNotNull(e);

final ReentrantLock lock = this.lock;

lock.lock();

try {

// 队列满了,则返回false

if (count == items.length)

return false;

else {

// 队列没满,则入队

enqueue(e);

return true;

}

} finally {

lock.unlock();

}

}

put()

public void put(E e) throws InterruptedException {

checkNotNull(e);

final ReentrantLock lock = this.lock;

lock.lockInterruptibly();

try {

// 数组满了则阻塞等待,用while不用if,是为了避免虚假唤醒

while (count == items.length)

notFull.await();

// 队列未满时入队

enqueue(e);

} finally {

lock.unlock();

}

}

poll()

public E poll() {

final ReentrantLock lock = this.lock;

lock.lock();

try {

// 数组为空返回null,否则返回移除的元素

return (count == 0) ? null : dequeue();

} finally {

lock.unlock();

}

}

take()

public E take() throws InterruptedException {

final ReentrantLock lock = this.lock;

lock.lockInterruptibly();

try {

// 数组为空则阻塞等待

while (count == 0)

notEmpty.await();

// 数组不为空时返回移除的元素

return dequeue();

} finally {

lock.unlock();

}

}

与LinkedBlockingQueue类似,take()、put()是阻塞的,poll()、offer()是非阻塞的。
其中,enqueue()、dequeue()方法如下:

private void enqueue(E x) {

// assert lock.getHoldCount() == 1;

// assert items[putIndex] == null;

final Object[] items = this.items;

items[putIndex] = x;

if (++putIndex == items.length)

putIndex = 0;

count++;

notEmpty.signal();

}

private E dequeue() {

// assert lock.getHoldCount() == 1;

// assert items[takeIndex] != null;

final Object[] items = this.items;

@SuppressWarnings("unchecked")

E x = (E) items[takeIndex];

// 移除元素时,要手动将takeIndex位置处的元素置为null,让它被垃圾回收掉,否则会导致内存泄漏

items[takeIndex] = null; // help GC

if (++takeIndex == items.length)

takeIndex = 0;

count--;

if (itrs != null)

itrs.elementDequeued();

notFull.signal();

return x;

}

由于数组的特性,ArrayBlockingQueue并没有像LinkedBlockingQueue那样使用2把锁,而是使用takeIndex和putIndex记录下一次入队/出队操作的数组位置。并且takeIndex,putIndex,count三个变量都没有用volatile修饰或者Atomic原子类,因为他们的修改都在加锁后的环境中进行的,已经保证了线程安全。

参考资料:
《Java并发编程之美》

以上是 【Java】通俗易懂的JUC源码剖析-ArrayBlockingQueue 的全部内容, 来源链接: www.h5w3.com/113499.html

回到顶部