【Java】通俗易懂的JUC源码剖析-PriorityBlockingQueue

通俗易懂的JUC源码剖析-PriorityBlockingQueue

小强大人发布于 今天 14:38

前言

PriorityBlockingQueue是BlockingQueue接口的实现类,它是一种优先级阻塞队列,每次出队都返回优先级最高或最低的元素,其内部是用平衡二叉树堆实现的。这里的优先级指的是元素类必须实现Comparable接口,然后用compareTo()方法比较元素的优先级大小,当然也可指定自定义的比较器comparator。

实现原理

先来看看它的重要属性:

// 队列默认容量为11

private static final int DEFAULT_INITIAL_CAPACITY = 11;

// 队列最大容量

private static final int MAX_ARRAY_SIZE = Integer.MAX_VALUE - 8;

// 存放元素的数组

private transient Object[] queue;

// 队列长度

private transient int size;

// 自定义比较器

private transient Comparator<? super E> comparator;

// 操作元素数组的互斥锁

private final ReentrantLock lock;

// 数组非空条件

private final Condition notEmpty;

// 数组扩容操作的自璇锁,1表示正在扩容,0表示没有在扩容

private transient volatile int allocationSpinLock;

// 优先级队列

private PriorityQueue<E> q;

再来看它的几个构造函数:

public PriorityBlockingQueue() {

this(DEFAULT_INITIAL_CAPACITY, null);

}

public PriorityBlockingQueue(int initialCapacity) {

this(initialCapacity, null);

}

public PriorityBlockingQueue(int initialCapacity,

Comparator<? super E> comparator) {

if (initialCapacity < 1)

throw new IllegalArgumentException();

this.lock = new ReentrantLock();

this.notEmpty = lock.newCondition();

this.comparator = comparator;

this.queue = new Object[initialCapacity];

}

再来看重要方法:
put():

public void put(E e) {

offer(e); // never need to block

}

offer():

public boolean offer(E e) {

if (e == null)

throw new NullPointerException();

final ReentrantLock lock = this.lock;

lock.lock();

int n, cap;

Object[] array;

// 队列当前长度>=队列容量时,进行扩容

while ((n = size) >= (cap = (array = queue).length))

tryGrow(array, cap);

try {

Comparator<? super E> cmp = comparator;

// 未指定比较器时,则使用默认的compareTo()来计算插入元素的位置

if (cmp == null)

siftUpComparable(n, e, array);

else

// 指定了时,则使用指定的比较器计算位置

siftUpUsingComparator(n, e, array, cmp);

size = n + 1;

// 唤醒某个等待在notEmpty条件的线程

notEmpty.signal();

} finally {

lock.unlock();

}

return true;

}

其中tryGrow()方法如下:

private void tryGrow(Object[] array, int oldCap) {

// 先释放操作数组的互斥锁,去尝试获取扩容的自璇锁

lock.unlock(); // must release and then re-acquire main lock

Object[] newArray = null;

// 尝试获取扩容的锁

if (allocationSpinLock == 0 &&

UNSAFE.compareAndSwapInt(this, allocationSpinLockOffset,

0, 1)) {

try {

// 计算扩容后的新容量

int newCap = oldCap + ((oldCap < 64) ?

(oldCap + 2) : // grow faster if small

(oldCap >> 1));

// 新容量超出最大容量时,则取最大容量

if (newCap - MAX_ARRAY_SIZE > 0) { // possible overflow

// 旧容量加1仍然溢出时,抛内存溢出异常

int minCap = oldCap + 1;

if (minCap < 0 || minCap > MAX_ARRAY_SIZE)

throw new OutOfMemoryError();

newCap = MAX_ARRAY_SIZE;

}

if (newCap > oldCap && queue == array)

newArray = new Object[newCap];

} finally {

allocationSpinLock = 0;

}

}

// 其他线程抢到了扩容锁并正在扩容时,当前线程则让出CPU调度权

if (newArray == null) // back off if another thread is allocating

Thread.yield();

// 获取操作数组的互斥锁

lock.lock();

// 扩容操作成功时,将旧数组元素拷贝到扩容后的新数组

if (newArray != null && queue == array) {

queue = newArray;

System.arraycopy(array, 0, newArray, 0, oldCap);

}

}

siftUpComparable()方法如下:

private static <T> void siftUpComparable(int k, T x, Object[] array) {

Comparable<? super T> key = (Comparable<? super T>) x;

while (k > 0) {

int parent = (k - 1) >>> 1;

Object e = array[parent];

if (key.compareTo((T) e) >= 0)

break;

array[k] = e;

k = parent;

}

array[k] = key;

}

siftUpUsingComparator()方法如下:

private static <T> void siftUpUsingComparator(int k, T x, Object[] array,

Comparator<? super T> cmp) {

while (k > 0) {

int parent = (k - 1) >>> 1;

Object e = array[parent];

if (cmp.compare(x, (T) e) >= 0)

break;

array[k] = e;

k = parent;

}

array[k] = x;

}

take():

public E take() throws InterruptedException {

final ReentrantLock lock = this.lock;

lock.lockInterruptibly();

E result;

try {

// 队列元素为空时,阻塞等待

while ( (result = dequeue()) == null)

notEmpty.await();

} finally {

lock.unlock();

}

return result;

}

poll():

public E poll() {

final ReentrantLock lock = this.lock;

lock.lock();

try {

// 非阻塞,队列为空时返回null

return dequeue();

} finally {

lock.unlock();

}

}

其中dequeue()方法如下:

private E dequeue() {

int n = size - 1;

if (n < 0)

return null;

else {

Object[] array = queue;

E result = (E) array[0];

E x = (E) array[n];

array[n] = null;

Comparator<? super E> cmp = comparator;

if (cmp == null)

siftDownComparable(0, x, array, n);

else

siftDownUsingComparator(0, x, array, n, cmp);

size = n;

return result;

}

}

其中siftDownComparable()方法如下:

private static <T> void siftDownComparable(int k, T x, Object[] array,

int n) {

if (n > 0) {

Comparable<? super T> key = (Comparable<? super T>)x;

int half = n >>> 1; // loop while a non-leaf

while (k < half) {

int child = (k << 1) + 1; // assume left child is least

Object c = array[child];

int right = child + 1;

if (right < n &&

((Comparable<? super T>) c).compareTo((T) array[right]) > 0)

c = array[child = right];

if (key.compareTo((T) c) <= 0)

break;

array[k] = c;

k = child;

}

array[k] = key;

}

}

siftDownUsingComparator()方法如下:

private static <T> void siftDownUsingComparator(int k, T x, Object[] array,

int n,

Comparator<? super T> cmp) {

if (n > 0) {

int half = n >>> 1;

while (k < half) {

int child = (k << 1) + 1;

Object c = array[child];

int right = child + 1;

if (right < n && cmp.compare((T) c, (T) array[right]) > 0)

c = array[child = right];

if (cmp.compare(x, (T) c) <= 0)

break;

array[k] = c;

k = child;

}

array[k] = x;

}

}

先睡了,明天再分析优先级的具体代码。晚安全世界!

java

阅读 34发布于 今天 14:38

本作品系原创,采用《署名-非商业性使用-禁止演绎 4.0 国际》许可协议

avatar

小强大人

9 声望

1 粉丝

0 条评论

得票时间

avatar

小强大人

9 声望

1 粉丝

宣传栏

前言

PriorityBlockingQueue是BlockingQueue接口的实现类,它是一种优先级阻塞队列,每次出队都返回优先级最高或最低的元素,其内部是用平衡二叉树堆实现的。这里的优先级指的是元素类必须实现Comparable接口,然后用compareTo()方法比较元素的优先级大小,当然也可指定自定义的比较器comparator。

实现原理

先来看看它的重要属性:

// 队列默认容量为11

private static final int DEFAULT_INITIAL_CAPACITY = 11;

// 队列最大容量

private static final int MAX_ARRAY_SIZE = Integer.MAX_VALUE - 8;

// 存放元素的数组

private transient Object[] queue;

// 队列长度

private transient int size;

// 自定义比较器

private transient Comparator<? super E> comparator;

// 操作元素数组的互斥锁

private final ReentrantLock lock;

// 数组非空条件

private final Condition notEmpty;

// 数组扩容操作的自璇锁,1表示正在扩容,0表示没有在扩容

private transient volatile int allocationSpinLock;

// 优先级队列

private PriorityQueue<E> q;

再来看它的几个构造函数:

public PriorityBlockingQueue() {

this(DEFAULT_INITIAL_CAPACITY, null);

}

public PriorityBlockingQueue(int initialCapacity) {

this(initialCapacity, null);

}

public PriorityBlockingQueue(int initialCapacity,

Comparator<? super E> comparator) {

if (initialCapacity < 1)

throw new IllegalArgumentException();

this.lock = new ReentrantLock();

this.notEmpty = lock.newCondition();

this.comparator = comparator;

this.queue = new Object[initialCapacity];

}

再来看重要方法:
put():

public void put(E e) {

offer(e); // never need to block

}

offer():

public boolean offer(E e) {

if (e == null)

throw new NullPointerException();

final ReentrantLock lock = this.lock;

lock.lock();

int n, cap;

Object[] array;

// 队列当前长度>=队列容量时,进行扩容

while ((n = size) >= (cap = (array = queue).length))

tryGrow(array, cap);

try {

Comparator<? super E> cmp = comparator;

// 未指定比较器时,则使用默认的compareTo()来计算插入元素的位置

if (cmp == null)

siftUpComparable(n, e, array);

else

// 指定了时,则使用指定的比较器计算位置

siftUpUsingComparator(n, e, array, cmp);

size = n + 1;

// 唤醒某个等待在notEmpty条件的线程

notEmpty.signal();

} finally {

lock.unlock();

}

return true;

}

其中tryGrow()方法如下:

private void tryGrow(Object[] array, int oldCap) {

// 先释放操作数组的互斥锁,去尝试获取扩容的自璇锁

lock.unlock(); // must release and then re-acquire main lock

Object[] newArray = null;

// 尝试获取扩容的锁

if (allocationSpinLock == 0 &&

UNSAFE.compareAndSwapInt(this, allocationSpinLockOffset,

0, 1)) {

try {

// 计算扩容后的新容量

int newCap = oldCap + ((oldCap < 64) ?

(oldCap + 2) : // grow faster if small

(oldCap >> 1));

// 新容量超出最大容量时,则取最大容量

if (newCap - MAX_ARRAY_SIZE > 0) { // possible overflow

// 旧容量加1仍然溢出时,抛内存溢出异常

int minCap = oldCap + 1;

if (minCap < 0 || minCap > MAX_ARRAY_SIZE)

throw new OutOfMemoryError();

newCap = MAX_ARRAY_SIZE;

}

if (newCap > oldCap && queue == array)

newArray = new Object[newCap];

} finally {

allocationSpinLock = 0;

}

}

// 其他线程抢到了扩容锁并正在扩容时,当前线程则让出CPU调度权

if (newArray == null) // back off if another thread is allocating

Thread.yield();

// 获取操作数组的互斥锁

lock.lock();

// 扩容操作成功时,将旧数组元素拷贝到扩容后的新数组

if (newArray != null && queue == array) {

queue = newArray;

System.arraycopy(array, 0, newArray, 0, oldCap);

}

}

siftUpComparable()方法如下:

private static <T> void siftUpComparable(int k, T x, Object[] array) {

Comparable<? super T> key = (Comparable<? super T>) x;

while (k > 0) {

int parent = (k - 1) >>> 1;

Object e = array[parent];

if (key.compareTo((T) e) >= 0)

break;

array[k] = e;

k = parent;

}

array[k] = key;

}

siftUpUsingComparator()方法如下:

private static <T> void siftUpUsingComparator(int k, T x, Object[] array,

Comparator<? super T> cmp) {

while (k > 0) {

int parent = (k - 1) >>> 1;

Object e = array[parent];

if (cmp.compare(x, (T) e) >= 0)

break;

array[k] = e;

k = parent;

}

array[k] = x;

}

take():

public E take() throws InterruptedException {

final ReentrantLock lock = this.lock;

lock.lockInterruptibly();

E result;

try {

// 队列元素为空时,阻塞等待

while ( (result = dequeue()) == null)

notEmpty.await();

} finally {

lock.unlock();

}

return result;

}

poll():

public E poll() {

final ReentrantLock lock = this.lock;

lock.lock();

try {

// 非阻塞,队列为空时返回null

return dequeue();

} finally {

lock.unlock();

}

}

其中dequeue()方法如下:

private E dequeue() {

int n = size - 1;

if (n < 0)

return null;

else {

Object[] array = queue;

E result = (E) array[0];

E x = (E) array[n];

array[n] = null;

Comparator<? super E> cmp = comparator;

if (cmp == null)

siftDownComparable(0, x, array, n);

else

siftDownUsingComparator(0, x, array, n, cmp);

size = n;

return result;

}

}

其中siftDownComparable()方法如下:

private static <T> void siftDownComparable(int k, T x, Object[] array,

int n) {

if (n > 0) {

Comparable<? super T> key = (Comparable<? super T>)x;

int half = n >>> 1; // loop while a non-leaf

while (k < half) {

int child = (k << 1) + 1; // assume left child is least

Object c = array[child];

int right = child + 1;

if (right < n &&

((Comparable<? super T>) c).compareTo((T) array[right]) > 0)

c = array[child = right];

if (key.compareTo((T) c) <= 0)

break;

array[k] = c;

k = child;

}

array[k] = key;

}

}

siftDownUsingComparator()方法如下:

private static <T> void siftDownUsingComparator(int k, T x, Object[] array,

int n,

Comparator<? super T> cmp) {

if (n > 0) {

int half = n >>> 1;

while (k < half) {

int child = (k << 1) + 1;

Object c = array[child];

int right = child + 1;

if (right < n && cmp.compare((T) c, (T) array[right]) > 0)

c = array[child = right];

if (cmp.compare(x, (T) c) <= 0)

break;

array[k] = c;

k = child;

}

array[k] = x;

}

}

先睡了,明天再分析优先级的具体代码。晚安全世界!

以上是 【Java】通俗易懂的JUC源码剖析-PriorityBlockingQueue 的全部内容, 来源链接: www.h5w3.com/113636.html

回到顶部