H5W3
当前位置:H5W3 > java > 正文

【Java】啃碎JDK源码(七):AbstractQueuedSynchronizer(AQS)

前言

在之前我们已经对部分JDK源码做了介绍:

啃碎JDK源码(一):String
啃碎JDK源码(二):Integer
啃碎JDK源码(三):ArrayList
啃碎JDK源码(四):HashMap
啃碎JDK源码(五):ConcurrentHashMap
啃碎JDK源码(六):LinkedList

今天我们正式开始介绍juc包下面的类,也就是和多线程打交道的地方,和锁打交道的类用的比较的的无非就是 ReentrantLockReentrantReadWriteLock 等,但是我们今天要介绍的是 AbstractQueuedSynchronizer 这个抽象类,也就是面试中经常被问到的 AQS,因为不管是ReentrantLock 还是ReentrantReadWriteLock 以及其它的一些都是基于它实现的,所以很有必要先来了解一下。

正文

AQS的全称为(AbstractQueuedSynchronizer),我们可以把它看成一个帮助我们实现锁的同步器,它基于FIFO(先进先出)的队列实现的,并且内部维护了一个状态变量 state,通过原子更新这个状态变量即可以实现加锁解锁操作。

来看下 AbstractQueuedSynchronizer 的继承结构

【Java】啃碎JDK源码(七):AbstractQueuedSynchronizer(AQS)

能看到 ReentrantLock等并不是直接继承 AbstractQueuedSynchronizer,而是其内部类 Sync

接着来看看一些重要的属性:

// 队列的头节点
private transient volatile Node head;
// 队列的尾节点
private transient volatile Node tail;
// 控制加锁解锁的状态变量
private volatile int state;

定义了一个状态变量和一个队列,状态变量用来控制加锁解锁,队列用来放置等待的线程

这个 state变量很重要,用来做状态标识。比方说在 ReentrantLock 里面它表示获取锁的线程数,假如等于0表示还没有线程获取锁,1表示有线程获取了锁。大于1表示重入锁的数量

注意,这几个变量都要使用 volatile 关键字来修饰,因为是在多线程环境下操作,要保证它们的值修改之后对其它线程立即可见。

还有我们的 Node内部类

static final class Node {
// 标识一个节点是共享模式
static final Node SHARED = new Node();
// 标识一个节点是互斥模式
static final Node EXCLUSIVE = null;
// 标识线程已取消
static final int CANCELLED =  1;
// 标识后继节点需要唤醒
static final int SIGNAL    = -1;
// 标识线程等待在一个条件上
static final int CONDITION = -2;
// 标识后面的共享锁需要无条件的传播
static final int PROPAGATE = -3;
// 当前节点保存的线程对应的等待状态
volatile int waitStatus;
// 上一个结点
volatile Node prev;
// 下一个结点
volatile Node next;
// 当前结点保存的线程
volatile Thread thread;
// 下一个等待在条件上的节点(Condition锁时使用)
Node nextWaiter;
// 是否是共享模式
final boolean isShared() {
return nextWaiter == SHARED;
}
// 获取前一个节点
final Node predecessor() throws NullPointerException {
Node p = prev;
if (p == null)
throw new NullPointerException();
else
return p;
}
Node() {
}
Node(Thread thread, Node mode) {
// 把共享模式还是互斥模式存储到nextWaiter这个字段里面了
this.nextWaiter = mode;
this.thread = thread;
}
Node(Thread thread, int waitStatus) {
// 等待的状态,在Condition中使用
this.waitStatus = waitStatus;
this.thread = thread;
}
}

上面是一个标准的双向链表结构,保存着当前线程、前一个节点、后一个节点以及线程的状态等信息。属性比较多,看不懂没关系,后面用到会重新讲一下。

那么在源码里面是如何修改这些变量的呢?其实就是通过我们之前说的 CAS 来修改,如果不了解的话请参考 一文看懂CAS

比方说 state 状态变量的修改

// 获取Unsafe类的实例
private static final Unsafe unsafe = Unsafe.getUnsafe();
// 状态变量state的偏移量
private static final long stateOffset;
static {
try {
stateOffset = unsafe.objectFieldOffset
(AbstractQueuedSynchronizer.class.getDeclaredField("state"));
.......
} catch (Exception ex) { throw new Error(ex); }
}

核心是 compareAndSetState 方法:

protected final boolean compareAndSetState(int expect, int update) {
// 如果当前值等于except,那么更新成update
return unsafe.compareAndSwapInt(this, stateOffset, expect, update);
}

既然 AbstractQueuedSynchronizer 是一个抽象类,那么子类要实现哪些接口呢?

比如说用来加锁的 tryAcquire 方法

// 互斥模式下尝试获取锁
protected boolean tryAcquire(int arg) {
throw new UnsupportedOperationException();
}

可以看到只是抛出了异常,并且值得注意的是该方法并没有定义成抽象方法,因为只要实现一部分方法就可以自己手动编写一个锁了,定义成 protect 也是方便子类去实现,除此之外还有

// 互斥模式下尝试释放锁
protected boolean tryRelease(int arg) {
throw new UnsupportedOperationException();
}
// 共享模式下尝试获取锁
protected int tryAcquireShared(int arg) {
throw new UnsupportedOperationException();
}
// 共享模式下尝试释放锁
protected int tryReleaseShared(int arg) {
throw new UnsupportedOperationException();
}
// 当前线程是否持有锁
protected int tryAcquireShared(int arg) {
throw new UnsupportedOperationException();
}

基于AQS手动实现一个锁

现在我们尝试一下基于AQS手动实现一个锁:

/*
* 实现Lock接口
*/
public class MyLock implements Lock {
private final Sync sync;
public MyLock() {
sync = new Sync();
}
// 定义一个内部类Sync继承AbstractQueuedSynchronizer
private static class Sync extends AbstractQueuedSynchronizer {
// 尝试获取独占锁
@Override
protected boolean tryAcquire(int arg) {
// AQS方法:CAS更新state状态变量
if (compareAndSetState(0, 1)) {
// AQS方法:设置当前线程为持有锁的线程
setExclusiveOwnerThread(Thread.currentThread());
return true;
}
return false;
}
// 尝试释放独占锁
@Override
protected boolean tryRelease(int arg) {
if (getState() == 0)
throw new IllegalMonitorStateException();
setExclusiveOwnerThread(null);
// AQS方法:当前线程已持有锁,可以直接修改state值,不需要通过CAS修改
setState(0);
return true;
}
// 锁是否已被释放
@Override
protected boolean isHeldExclusively() {
return getState() == 1;
}
Condition newCondition() {
return new ConditionObject();
}
}
@Override
public void lock() {
sync.acquire(1);
}
@Override
public void lockInterruptibly() throws InterruptedException {
sync.acquireInterruptibly(1);
}
@Override
public boolean tryLock() {
return sync.tryAcquire(1);
}
@Override
public boolean tryLock(long time, TimeUnit unit) throws InterruptedException {
return sync.tryAcquireNanos(1, unit.toNanos(time));
}
@Override
public void unlock() {
sync.release(1);
}
@Override
public Condition newCondition() {
return sync.newCondition();
}
}

好了,我们的锁已经写完了,核心就是使用一个静态内部类Sync继承AQS,实现加锁、释放锁等方法,其实我们熟悉的 ReentrantLock也是这样的实现原理,现在我们来测试一下:

public class TestLock {
private final Lock lock = new MyLock();
private volatile int count = 1;
private static class WorkThread extends Thread {
private final TestLock myLock;
public WorkThread(TestLock myLock) {
this.myLock = myLock;
}
@Override
public void run() {
myLock.execute();
}
}
public void execute() {
lock.lock();
try {
System.out.println(Thread.currentThread().getName() + "获取到的count=" + count++);
} finally {
lock.unlock();
}
}
public static void main(String[] args) {
TestLock myLock = new TestLock();
// 启动100个线程
for (int i = 1; i <= 100; i++) {
new WorkThread(myLock).start();
}
}
}

我们启动100个线程对 count 加一,看看打印结果:

【Java】啃碎JDK源码(七):AbstractQueuedSynchronizer(AQS)

可以看到最后的确是加到了100,也就是说我们的锁是可用的。

【Java】啃碎JDK源码(七):AbstractQueuedSynchronizer(AQS)

但是,我们现在的锁是不可重入锁,学过ReentrantLock的同学应该知道它是可重入锁,也就是在线程持有锁的情况下可以重新获得锁,假如我们改一下execute方法:

public void execute() {
lock.lock();
try {
System.out.println(Thread.currentThread().getName() + "获取到的count=" + count++);
if (count == 5) {
execute();
}
} finally {
lock.unlock();
}
}

当 count == 5 时执行调用自身,看下执行结果:

【Java】啃碎JDK源码(七):AbstractQueuedSynchronizer(AQS)

可以看到线程被阻塞了,因为当前持有锁的线程不能重新获取锁,所以我们需要对 tryAcquiretryRelease 方法进行改造:

        // 尝试获取独占锁
@Override
protected boolean tryAcquire(int arg) {
// 获取当前线程
Thread currentThread = Thread.currentThread();
int state = getState();
if (state == 0) {
// AQS方法:CAS更新state状态变量
if (compareAndSetState(0, 1)) {
// AQS方法:设置当前线程为持有锁的线程
setExclusiveOwnerThread(currentThread);
return true;
} else if (currentThread == getExclusiveOwnerThread()) {
// 因为是独占锁,所以同一时刻只能有一个线程能获取到锁,如果当前的锁是被当前线程获取过了,则将状态变量+1
int newState = state + arg;
// 设置新的状态变量
setState(newState);
return true;
}
}
return false;
}
// 尝试释放独占锁
@Override
protected boolean tryRelease(int arg) {
// 判断当前锁释放是当前线程锁独占的,如果判断不成立则抛出异常
if (Thread.currentThread() != getExclusiveOwnerThread())
throw new IllegalMonitorStateException();
int newState = getState() - arg;
boolean free = false;
if (newState == 0) {
// 如果状态为0了则说明当前线程可以释放对锁的持有了
setExclusiveOwnerThread(null);
free = true;
}
// AQS方法:当前线程已持有锁,可以直接修改state值,不需要通过CAS修改
setState(newState);
return free;
}

tryAcquire 方法主要加了判断:如果 state 不为0的时候,判断当前线程是否已经和锁绑定,已经绑定的话则将 state+1 同时返回true

tryRelease方法中主要增加了释放锁的时候是对 state 变量逐次减一当减到0的时候才将锁与当前线程绑定的状态去除,释放锁。

重新运行下已经不会阻塞了,如果不懂的地方看下注释就明白了。

AQS源码剖析

那么当线程获取不到锁的时候是如何等待的呢?又是什么时候被唤醒的呢?接下来我们一步步跟随源码看看到底做了什么?

独占模式

AQS独占模式和共享模式,首先来看看独占模式,看下lock加锁方法:

@Override
public void lock() {
sync.acquire(1);
}

这里可以看到并不是调用我们重写的 tryAcquire 方法,而是调用父类 AbstractQueuedSynchronizer 的方法:

public final void acquire(int arg) {
if (!tryAcquire(arg) &&
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
selfInterrupt();
}

此方法是独占模式下线程获取共享资源的顶层入口。如果获取锁成功,线程直接返回,否则进入等待队列,直到获取锁为止,且整个过程忽略中断的影响。

首先是调用 tryAcquire方法来获取尝试获取锁,跟过去看一下AQS的实现:

protected boolean tryAcquire(int arg) {
throw new UnsupportedOperationException();
}

可以看到该方法定义为protect,也就是由我们子类去实现的,如果获取锁失败的话那么会进入等待队列,来看看addWaiter方法:

/*
* 将当前线程添加到等待队列的队尾,并返回当前线程所在的节点
*/
private Node addWaiter(Node mode) {
// 以独占模式把当前线程封装成一个Node节点
Node node = new Node(Thread.currentThread(), mode);
// 尝试将结点放到队列尾部
Node pred = tail;
if (pred != null) {
node.prev = pred;
// 使用CAS把node作为尾节点
if (compareAndSetTail(pred, node)) {
pred.next = node;
return node;
}
}
// 尾节点为空或者利用CAS把node设为尾节点失败时通过enq方法进行入队
enq(node);
return node;
}
/*
* 采用for循环自旋的方式把node插入到队列中
*/
private Node enq(final Node node) {
for (;;) {
Node t = tail;
// 队列为空,需要初始化
if (t == null) {
if (compareAndSetHead(new Node()))
tail = head;
} else {
node.prev = t;
if (compareAndSetTail(t, node)) {
t.next = node;
return t;
}
}
}
}
  • addWaiter方法用于将当前线程添加到等待队列的队尾,并返回当前线程所在的节点。
  • enq 方法中采用了非常经典的自旋操作,只有通过CAS把node设为尾节点后,当前线程才能退出该方法,否则的话,当前线程不断的尝试,直到能把节点添加到队列中为止。

继续看一下acquireQueued方法:

  /*
* 通过自旋获取锁
*/
final boolean acquireQueued(final Node node, int arg) {
boolean failed = true;
try {
// 默认线程没有被中断过
boolean interrupted = false;
for (;;) {
// 获取该节点的前驱节点
final Node p = node.predecessor();
// 如果前驱节点是头节点并且当前线程获取到锁
if (p == head && tryAcquire(arg)) {
// 设置当前节点为头节点
setHead(node);
p.next = null; // help GC
failed = false;
return interrupted;
}
// 挂起当前线程
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
interrupted = true;
}
} finally {
if (failed)
// 移除当前节点
cancelAcquire(node);
}
}

该方法比较复杂,我们来仔细分析一下:

  • 首先获取前一个节点,如果前驱节点是头节点的话则尝试获取锁,如果获取锁成功的话设置当前节点为头节点
  • 否则调用 shouldParkAfterFailedAcquire 方法判断是否需要挂起当前线程

shouldParkAfterFailedAcquire方法从名字也能看出来是当获取锁失败后用来判断是否需要挂起当前线程,实现功能简单的讲就是把当前node节点的有效前驱(有效是指waitStatus不是CANCELLED的)找到,并且将有效前驱的状态设置为SIGNAL,之后便返回true代表马上可以阻塞了。来看看实现代码:

    /*
* 判断是否需要挂起当前线程
*/
private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
int ws = pred.waitStatus;
if (ws == Node.SIGNAL)
/*
* 前驱节点已经设置了SIGNAL,如果前驱变成了head,并且head的代表线程
* exclusiveOwnerThread释放了锁,就会来根据这个SIGNAL来唤醒自己
*/
return true;
if (ws > 0) {
/*
* 发现传入的前驱的状态大于0,即CANCELLED。说明前驱节点已经因为超时或响应了中断,而取消了自己。
* 所以需要向前遍历直到找到一个<=0的节点
*/
do {
node.prev = pred = pred.prev;
} while (pred.waitStatus > 0);
pred.next = node;
} else {
/*
* 如果是其他情况,那么CAS尝试设置前驱节点为SIGNAL
*/
compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
}
return false;
}

shouldParkAfterFailedAcquire返回true的情况下,继续看parkAndCheckInterrupted方法

  private final boolean parkAndCheckInterrupt() {
LockSupport.park(this);
return Thread.interrupted();
}

调用LockSupport的 park 方法挂起当前线程,返回该线程是否被中断过,如果被中断过,直接设置 interrupted = true
(LockSupport类是Java6引入的一个类,提供了基本的线程同步原语,有兴趣的小伙伴可以去了解一下)

最后来看下 cancelAcquire方法:

  /**
* 取消当前节点
*/
private void cancelAcquire(Node node) {
// Ignore if node doesn't exist
if (node == null)
return;
// 把当前节点的线程设为 null
node.thread = null;
// 和前面一样向前遍历找到有效的前驱节点
Node pred = node.prev;
while (pred.waitStatus > 0)
node.prev = pred = pred.prev;
Node predNext = pred.next;
// 把node节点的ws设为CANCELLED
node.waitStatus = Node.CANCELLED;
// 如果node是尾节点,利用CAS把前驱节点设为尾节点,后继节点为null方便GC
if (node == tail && compareAndSetTail(node, pred)) {
compareAndSetNext(pred, predNext, null);
} else {
// 前驱节点不是头结点 && 线程不为空 && waitStatus为singal
// 利用CAS把node的next设为pred的next节点
int ws;
if (pred != head &&
((ws = pred.waitStatus) == Node.SIGNAL ||
(ws <= 0 && compareAndSetWaitStatus(pred, ws, Node.SIGNAL))) &&
pred.thread != null) {
Node next = node.next;
if (next != null && next.waitStatus <= 0)
compareAndSetNext(pred, predNext, next);
} else {
// node是头结点,唤起它的后继节点
unparkSuccessor(node);
}
node.next = node; // help GC
}
}

最后如果acquireQueued方法返回true,需要进行自我中断。因为parkAndCheckInterrupted方法不响应中断,并且内部调用了Thread.interrupted方法清除中断标记位。所以当该方法返回true(被中断)时,需要手动补偿中断标记位。

static void selfInterrupt() {
Thread.currentThread().interrupt();
}

流程总结

  • tryAcquire()尝试直接去获取锁,如果成功则直接返回;
  • addWaiter()将该线程加入等待队列的尾部,并标记为独占模式;
  • acquireQueued()使线程在队列中等待直到获取锁。如果在整个等待过程中被中断过,则返回true,否则返回false。
  • 如果线程在等待过程中被中断过,它是不响应的。只是获取资源后才再进行自我中断selfInterrupt(),将中断补上。
独占式锁获取流程

调用同步器的acquire(int arg)方法可以获取同步状态,该方法对中断不敏感,即线程获取同步状态失败后进入同步队列,后续对线程进行中断操作时,线程不会从同步队列中移除。获取流程:

  1. 当前线程通过tryAcquire()方法尝试获取锁,成功则直接返回,失败则进入队列排队等待,通过CAS获取同步状态。
  2. 如果尝试获取锁失败的话,构造同步节点(独占式的Node.EXCLUSIVE),通过addWaiter(Node node,int args)方法,将节点加入到同步队列的队列尾部。
  3. 最后调用acquireQueued(final Node node, int args)方法,使该节点以死循环的方式获取同步状态,如果获取不到,则阻塞节点中的线程。acquireQueued方法当前线程在死循环中获取同步状态,而只有前驱节点是头节点的时候才能尝试获取锁(同步状态)( p == head && tryAcquire(arg))。

上面看完了加锁的流程,接下来看看是如何释放锁的?

   public final boolean release(int arg) {
// tryRelease由子类实现
if (tryRelease(arg)) {
Node h = head;
if (h != null && h.waitStatus != 0)
// 唤醒后继节点
unparkSuccessor(h);
return true;
}
return false;
}

代码比较简单,tryRelease 和上面一样也是有事子类去实现,如果释放锁成功的话那么我们需要调用 unparkSuccessor 方法去唤醒后继节点,看下具体实现:

  private void unparkSuccessor(Node node) {
int ws = node.waitStatus;
if (ws < 0)
// 将当前节点的状态修改为0
compareAndSetWaitStatus(node, ws, 0);
// 如果直接后继为空或者它的waitStatus大于0(已经放弃获取锁了),我们就遍历整个队列,获取第一个需要唤醒的节点
Node s = node.next;
if (s == null || s.waitStatus > 0) {
s = null;
for (Node t = tail; t != null && t != node; t = t.prev)
if (t.waitStatus <= 0)
s = t;
}
if (s != null)
// 唤醒节点
LockSupport.unpark(s.thread);
}

代码比较简单,这里就不展开细说了。

共享模式

上面讲完了独占模式,现在来讲下共享模式,所谓共享模式就是同一个时刻允许多个线程持有锁,比方说 ReentrantReadWriteLock 就是实现了共享模式的AQS,直接上代码:

  public final void acquireShared(int arg) {
if (tryAcquireShared(arg) < 0)
doAcquireShared(arg);
}
/*
* 共享模式获取锁
*/
private void doAcquireShared(int arg) {
// 加入队列尾部
final Node node = addWaiter(Node.SHARED);
boolean failed = true;
try {
boolean interrupted = false;
for (;;) {
final Node p = node.predecessor();
// head是已经拿到锁的节点
if (p == head) {
// 尝试获取锁,返回的r是剩余的资源数量,如果大于0那么需要唤醒后续节点
int r = tryAcquireShared(arg);
if (r >= 0) {
// 将head指向自己,还有剩余资源可以再唤醒之后的线程
setHeadAndPropagate(node, r);
p.next = null; // help GC
if (interrupted)
selfInterrupt();
failed = false;
return;
}
}
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
interrupted = true;
}
} finally {
if (failed)
cancelAcquire(node);
}
}

这个方法大致上看起来和独占模式是很相像的。区别只在于独占模式下,在本方法中获取到资源后,只是将本节点设置为head节点。而共享模式下,设置完head节点后,还需要判断是否需要唤醒多个线程,看一下如何唤醒线程:

private void setHeadAndPropagate(Node node, int propagate) {
Node h = head; // Record old head for check below
setHead(node);
/*
* 什么情况下要唤醒后继结点?
* 1.资源剩余数大于0,有剩余资源肯定是要唤醒后继结点的
* 2.头结点不存在。
* 3.头结点状态小于0,意味着后继节点要求node(也就是当前head)唤醒后继结点
*/
if (propagate > 0 || h == null || h.waitStatus < 0 ||
(h = head) == null || h.waitStatus < 0) {
Node s = node.next;
if (s == null || s.isShared())
doReleaseShared();
}
}

doReleaseShared方法里面才是真正来唤醒线程:

private void doReleaseShared() {
for (;;) {
Node h = head;
if (h != null && h != tail) {
int ws = h.waitStatus;
if (ws == Node.SIGNAL) {
// 由于该方法可能被并发调用,为了避免不必要的唤醒浪费,因为通过cas来抢占唤醒权利。
// 抢占成功者执行真正的后继结点唤醒任务。如果失败则进行重试
if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))
continue;            // loop to recheck cases
unparkSuccessor(h);
}
// 如果ws==0,可能是head结点已经没有后继结点,也有可能是因为head节点的后继结点唤醒权被其他线程刚抢占成功。
// 如果没有后继结点,显然不需要做任何事情
// 如果是唤醒权被其他线程抢占,则不需要做任何事情。因为唤醒是在队列上进行传播的。所以这里就cas将head节点的状态值修改为 PROPAGATE。用来表达该线程唤醒操作意图已经传达。但是会由别的线程真正的执行后续的唤醒动作。同样,如果失败了,则重试。
else if (ws == 0 &&
!compareAndSetWaitStatus(h, 0, Node.PROPAGATE))
continue;                // loop on failed CAS
}
if (h == head)                   // loop if head changed
break;
}
}

加锁流程

tryAcquireShared方法也是由子类去实现,但是AQS已经把其返回值的语义定义好了:负值代表获取失败;0代表获取成功,但没有剩余资源;正数表示获取成功,还有剩余资源,其他线程还可以去获取。所以这里acquireShared()的流程就是:

  1. tryAcquireShared()尝试获取资源,成功则直接返回;
  2. 失败则通过doAcquireShared()进入等待队列park(),直到被unpark()/interrupt()并成功获取到资源才返回。整个等待过程也是忽略中断的。
  3. doAcquireShared(int)此方法用于将当前线程加入等待队列尾部休息,直到其他线程释放锁唤醒自己,自己成功拿到相应的资源后才返回。

看完加锁的方法,现在来看共享模式下的释放锁方法:

  public final boolean releaseShared(int arg) {
if (tryReleaseShared(arg)) {
doReleaseShared();
return true;
}
return false;
}

可以看到解锁也是用的doReleaseShared方法,代码比较简单这里不再展开细说。

总结

今天有关AQS的知识点就介绍到这里,有什么不对的地方请多多指教!

【Java】啃碎JDK源码(七):AbstractQueuedSynchronizer(AQS)

本文地址:H5W3 » 【Java】啃碎JDK源码(七):AbstractQueuedSynchronizer(AQS)

评论 0

  • 昵称 (必填)
  • 邮箱 (必填)
  • 网址