H5W3
当前位置:H5W3 > java > 正文

【Java】通俗易懂的JUC源码剖析-ConcurrentLinkedQueue

通俗易懂的JUC源码剖析-ConcurrentLinkedQueue

小强大人发布于 今天 14:42

简介

ConcurrentLinkedQueue是JUC包下的线程安全的无界非阻塞队列,它与BlockingQueue接口实现类最大的不同就是,BlockingQueue是阻塞队列,而ConcurrentLinkedQueue是非阻塞队列。这里的阻塞非阻塞,指的是队列满了或为空的时候,线程移除或放入元素的时候,是否需要阻塞挂起。BlockingQueue底层是用锁实现的,而ConcurrentLinkedQueue底层使用CAS实现的。

实现原理

先来看重要的属性和数据结构:

// 头结点
private transient volatile Node<E> head;
// 尾结点
private transient volatile Node<E> tail;
public ConcurrentLinkedQueue() {
head = tail = new Node<E>(null);
}

其中,Node是它的内部类:

private static class Node<E> {
volatile E item;
volatile Node<E> next;
Node(E item) {
UNSAFE.putObject(this, itemOffset, item);
}
// CAS方式修改当前结点的元素
boolean casItem(E cmp, E val) {
return UNSAFE.compareAndSwapObject(this, itemOffset, cmp, val);
}
// 延迟设置当前结点的后继结点
void lazySetNext(Node<E> val) {
// 有序、延迟版本的putObjectVolatile方法,不保证值的改变被其他线程立即可见。只有在field被volatile修饰时有效
UNSAFE.putOrderedObject(this, nextOffset, val);
}
// CAS方式设置当前结点的后继结点
boolean casNext(Node<E> cmp, Node<E> val) {
return UNSAFE.compareAndSwapObject(this, nextOffset, cmp, val);
}
// Unsafe mechanics
private static final sun.misc.Unsafe UNSAFE;
private static final long itemOffset;
private static final long nextOffset;
static {
try {
UNSAFE = sun.misc.Unsafe.getUnsafe();
Class<?> k = Node.class;
itemOffset = UNSAFE.objectFieldOffset
(k.getDeclaredField("item"));
nextOffset = UNSAFE.objectFieldOffset
(k.getDeclaredField("next"));
} catch (Exception e) {
throw new Error(e);
}
}
}

不难看出,ConcurrentLinkedQueue是用单向链表实现的。

再来看它的重要方法实现:
offer操作

public boolean offer(E e) {
// e为空则抛空指针
checkNotNull(e);
// 构造待插入的元素结点
final Node<E> newNode = new Node<E>(e);
// 多线程环境下,从尾结点处,循环尝试插入
for (Node<E> t = tail, p = t;;) {
Node<E> q = p.next;
if (q == null) {
// q为null说明p是尾结点,尝试CAS插入
// p is last node
if (p.casNext(null, newNode)) {
// Successful CAS is the linearization point
// for e to become an element of this queue,
// and for newNode to become "live".
if (p != t) // hop two nodes at a time
casTail(t, newNode); // Failure is OK.
return true;
}
// Lost CAS race to another thread; re-read next
}
else if (p == q)
// We have fallen off list.  If tail is unchanged, it
// will also be off-list, in which case we need to
// jump to head, from which all live nodes are always
// reachable.  Else the new tail is a better bet.
p = (t != (t = tail)) ? t : head;
else
// Check for tail updates after two hops.
p = (p != t && t != (t = tail)) ? t : q;
}
}

我滴妈,逻辑太复杂了,后续再分析,今天先睡了。

java
阅读 34更新于 今天 14:43
本作品系原创,采用《署名-非商业性使用-禁止演绎 4.0 国际》许可协议
avatar

小强大人
9 声望
1 粉丝

0 条评论
得票时间

avatar

小强大人
9 声望
1 粉丝

宣传栏

简介

ConcurrentLinkedQueue是JUC包下的线程安全的无界非阻塞队列,它与BlockingQueue接口实现类最大的不同就是,BlockingQueue是阻塞队列,而ConcurrentLinkedQueue是非阻塞队列。这里的阻塞非阻塞,指的是队列满了或为空的时候,线程移除或放入元素的时候,是否需要阻塞挂起。BlockingQueue底层是用锁实现的,而ConcurrentLinkedQueue底层使用CAS实现的。

实现原理

先来看重要的属性和数据结构:

// 头结点
private transient volatile Node<E> head;
// 尾结点
private transient volatile Node<E> tail;
public ConcurrentLinkedQueue() {
head = tail = new Node<E>(null);
}

其中,Node是它的内部类:

private static class Node<E> {
volatile E item;
volatile Node<E> next;
Node(E item) {
UNSAFE.putObject(this, itemOffset, item);
}
// CAS方式修改当前结点的元素
boolean casItem(E cmp, E val) {
return UNSAFE.compareAndSwapObject(this, itemOffset, cmp, val);
}
// 延迟设置当前结点的后继结点
void lazySetNext(Node<E> val) {
// 有序、延迟版本的putObjectVolatile方法,不保证值的改变被其他线程立即可见。只有在field被volatile修饰时有效
UNSAFE.putOrderedObject(this, nextOffset, val);
}
// CAS方式设置当前结点的后继结点
boolean casNext(Node<E> cmp, Node<E> val) {
return UNSAFE.compareAndSwapObject(this, nextOffset, cmp, val);
}
// Unsafe mechanics
private static final sun.misc.Unsafe UNSAFE;
private static final long itemOffset;
private static final long nextOffset;
static {
try {
UNSAFE = sun.misc.Unsafe.getUnsafe();
Class<?> k = Node.class;
itemOffset = UNSAFE.objectFieldOffset
(k.getDeclaredField("item"));
nextOffset = UNSAFE.objectFieldOffset
(k.getDeclaredField("next"));
} catch (Exception e) {
throw new Error(e);
}
}
}

不难看出,ConcurrentLinkedQueue是用单向链表实现的。

再来看它的重要方法实现:
offer操作

public boolean offer(E e) {
// e为空则抛空指针
checkNotNull(e);
// 构造待插入的元素结点
final Node<E> newNode = new Node<E>(e);
// 多线程环境下,从尾结点处,循环尝试插入
for (Node<E> t = tail, p = t;;) {
Node<E> q = p.next;
if (q == null) {
// q为null说明p是尾结点,尝试CAS插入
// p is last node
if (p.casNext(null, newNode)) {
// Successful CAS is the linearization point
// for e to become an element of this queue,
// and for newNode to become "live".
if (p != t) // hop two nodes at a time
casTail(t, newNode); // Failure is OK.
return true;
}
// Lost CAS race to another thread; re-read next
}
else if (p == q)
// We have fallen off list.  If tail is unchanged, it
// will also be off-list, in which case we need to
// jump to head, from which all live nodes are always
// reachable.  Else the new tail is a better bet.
p = (t != (t = tail)) ? t : head;
else
// Check for tail updates after two hops.
p = (p != t && t != (t = tail)) ? t : q;
}
}

我滴妈,逻辑太复杂了,后续再分析,今天先睡了。

本文地址:H5W3 » 【Java】通俗易懂的JUC源码剖析-ConcurrentLinkedQueue

评论 0

  • 昵称 (必填)
  • 邮箱 (必填)
  • 网址