【Java】通俗易懂的JUC源码剖析-LinkedBlockingQueue

通俗易懂的JUC源码剖析-LinkedBlockingQueue

小强大人发布于 今天 15:05

前言

LinkedBlockingQueue实现了BlockingQueue,它是阻塞队列的一种,可用于线程池中。不同于ConcurrentLinkedQueue的CAS非阻塞算法,它底层是用锁实现的阻塞队列。

实现原理

先来看关键属性:

// 队列容量,最大为Integer.MAX_VALUE

private final int capacity;

// 队列长度

private final AtomicInteger count = new AtomicInteger();

// 头结点

transient Node<E> head;

// 尾结点

private transient Node<E> last;

// 移除操作的锁,take/poll方法用到

private final ReentrantLock takeLock = new ReentrantLock();

// 移除操作需要等待的条件notEmpty,与takeLock绑定

private final Condition notEmpty = takeLock.newCondition();

// 入队操作的锁,put/offer方法用到

private final ReentrantLock putLock = new ReentrantLock();

// 入队操作需要等待的条件notFull,与putLock绑定

private final Condition notFull = putLock.newCondition();

可以看到,LinkedBlockingQueue内部是用单向链表实现的,并且它有两把锁:takeLock和putLock,以及对应的两个等待条件:notEmpty和notFull。takeLock控制同一时刻只有一个线程从队列头部获取/移除元素,putLock控制同一时刻只有一个线程在队列尾部添加元素。

再来看关键方法:

1.无参构造函数

public LinkedBlockingQueue() {

// 调用有参构造函数,初始化容量capacity为int最大值

this(Integer.MAX_VALUE);

}

2.有参构造函数

public LinkedBlockingQueue(int capacity) {

// 容量不能小于0,注意也不能等于0,这点与常规的集合不同

if (capacity <= 0) throw new IllegalArgumentException();

this.capacity = capacity;

// 初始化头结点和尾结点为哑节点

last = head = new Node<E>(null);

}

3.put()操作

public void put(E e) throws InterruptedException {

if (e == null) throw new NullPointerException();

int c = -1;

Node<E> node = new Node<E>(e);

final ReentrantLock putLock = this.putLock;

final AtomicInteger count = this.count;

putLock.lockInterruptibly();

try {

while (count.get() == capacity) {

notFull.await();

}

enqueue(node);

c = count.getAndIncrement();

if (c + 1 < capacity)

notFull.signal();

} finally {

putLock.unlock();

}

if (c == 0)

signalNotEmpty();

}

4.put()操作

public boolean offer(E e) {

if (e == null) throw new NullPointerException();

final AtomicInteger count = this.count;

if (count.get() == capacity)

return false;

int c = -1;

Node<E> node = new Node<E>(e);

final ReentrantLock putLock = this.putLock;

putLock.lock();

try {

if (count.get() < capacity) {

enqueue(node);

c = count.getAndIncrement();

if (c + 1 < capacity)

notFull.signal();

}

} finally {

putLock.unlock();

}

if (c == 0)

signalNotEmpty();

return c >= 0;

}

5.take()操作

public E take() throws InterruptedException {

E x;

int c = -1;

final AtomicInteger count = this.count;

final ReentrantLock takeLock = this.takeLock;

takeLock.lockInterruptibly();

try {

while (count.get() == 0) {

notEmpty.await();

}

x = dequeue();

c = count.getAndDecrement();

if (c > 1)

notEmpty.signal();

} finally {

takeLock.unlock();

}

if (c == capacity)

signalNotFull();

return x;

}

6.poll()操作

public E poll() {

final AtomicInteger count = this.count;

if (count.get() == 0)

return null;

E x = null;

int c = -1;

final ReentrantLock takeLock = this.takeLock;

takeLock.lock();

try {

if (count.get() > 0) {

x = dequeue();

c = count.getAndDecrement();

if (c > 1)

notEmpty.signal();

}

} finally {

takeLock.unlock();

}

if (c == capacity)

signalNotFull();

return x;

}

明天更新,先睡啦,晚安全世界!

java

阅读 41发布于 今天 15:05

本作品系原创,采用《署名-非商业性使用-禁止演绎 4.0 国际》许可协议

avatar

小强大人

9 声望

1 粉丝

0 条评论

得票时间

avatar

小强大人

9 声望

1 粉丝

宣传栏

前言

LinkedBlockingQueue实现了BlockingQueue,它是阻塞队列的一种,可用于线程池中。不同于ConcurrentLinkedQueue的CAS非阻塞算法,它底层是用锁实现的阻塞队列。

实现原理

先来看关键属性:

// 队列容量,最大为Integer.MAX_VALUE

private final int capacity;

// 队列长度

private final AtomicInteger count = new AtomicInteger();

// 头结点

transient Node<E> head;

// 尾结点

private transient Node<E> last;

// 移除操作的锁,take/poll方法用到

private final ReentrantLock takeLock = new ReentrantLock();

// 移除操作需要等待的条件notEmpty,与takeLock绑定

private final Condition notEmpty = takeLock.newCondition();

// 入队操作的锁,put/offer方法用到

private final ReentrantLock putLock = new ReentrantLock();

// 入队操作需要等待的条件notFull,与putLock绑定

private final Condition notFull = putLock.newCondition();

可以看到,LinkedBlockingQueue内部是用单向链表实现的,并且它有两把锁:takeLock和putLock,以及对应的两个等待条件:notEmpty和notFull。takeLock控制同一时刻只有一个线程从队列头部获取/移除元素,putLock控制同一时刻只有一个线程在队列尾部添加元素。

再来看关键方法:

1.无参构造函数

public LinkedBlockingQueue() {

// 调用有参构造函数,初始化容量capacity为int最大值

this(Integer.MAX_VALUE);

}

2.有参构造函数

public LinkedBlockingQueue(int capacity) {

// 容量不能小于0,注意也不能等于0,这点与常规的集合不同

if (capacity <= 0) throw new IllegalArgumentException();

this.capacity = capacity;

// 初始化头结点和尾结点为哑节点

last = head = new Node<E>(null);

}

3.put()操作

public void put(E e) throws InterruptedException {

if (e == null) throw new NullPointerException();

int c = -1;

Node<E> node = new Node<E>(e);

final ReentrantLock putLock = this.putLock;

final AtomicInteger count = this.count;

putLock.lockInterruptibly();

try {

while (count.get() == capacity) {

notFull.await();

}

enqueue(node);

c = count.getAndIncrement();

if (c + 1 < capacity)

notFull.signal();

} finally {

putLock.unlock();

}

if (c == 0)

signalNotEmpty();

}

4.put()操作

public boolean offer(E e) {

if (e == null) throw new NullPointerException();

final AtomicInteger count = this.count;

if (count.get() == capacity)

return false;

int c = -1;

Node<E> node = new Node<E>(e);

final ReentrantLock putLock = this.putLock;

putLock.lock();

try {

if (count.get() < capacity) {

enqueue(node);

c = count.getAndIncrement();

if (c + 1 < capacity)

notFull.signal();

}

} finally {

putLock.unlock();

}

if (c == 0)

signalNotEmpty();

return c >= 0;

}

5.take()操作

public E take() throws InterruptedException {

E x;

int c = -1;

final AtomicInteger count = this.count;

final ReentrantLock takeLock = this.takeLock;

takeLock.lockInterruptibly();

try {

while (count.get() == 0) {

notEmpty.await();

}

x = dequeue();

c = count.getAndDecrement();

if (c > 1)

notEmpty.signal();

} finally {

takeLock.unlock();

}

if (c == capacity)

signalNotFull();

return x;

}

6.poll()操作

public E poll() {

final AtomicInteger count = this.count;

if (count.get() == 0)

return null;

E x = null;

int c = -1;

final ReentrantLock takeLock = this.takeLock;

takeLock.lock();

try {

if (count.get() > 0) {

x = dequeue();

c = count.getAndDecrement();

if (c > 1)

notEmpty.signal();

}

} finally {

takeLock.unlock();

}

if (c == capacity)

signalNotFull();

return x;

}

明天更新,先睡啦,晚安全世界!

以上是 【Java】通俗易懂的JUC源码剖析-LinkedBlockingQueue 的全部内容, 来源链接: www.h5w3.com/113104.html

回到顶部