AbstractQueuedSynchronizer

本篇主要继续盘点java.util.concurrent中的工具包

AbstractQueuedSynchronizer

曾在先前的文章里介绍过, ReentrantLock是基于API实现的重入锁的数据结构. 在API层面的具体实现而是依靠AbstractQueuedSynchronizer实现的. AbstractQueuedSynchronizer本质是一个修改后 CLH(Craig, Landin and Hagersten) 队列.

CLH队列是由FIFO的队列实现的, 简单的 CLH 队列节点存在一个 locked 域, 每一个节点存放一个竞争资源的线程. 当一个节点尝试获取锁时, 会在队列尾端插入一个节点, 对队列的 tail 域进行 CAS 操作, 反复试图使将节点插入到队列尾端. 并获取其前驱节点的 locked 的域, 该线程会在其前驱节点的 locked 域旋转, 直到前驱节点释放锁将 locked 置为 false.

Java AbstractQueuedSynchronizer 是一个Abstract类, ReentrantLock 中的 Sync 类继承了该类, Sync 又被 NonfairSync 和 FairSync 所继承, 分别用于公平锁与非公平锁. ReentrantLock 默认是非公平锁, 即加锁时无需考虑排队问题, 可以直接尝试获取锁, 获取失败会假如排队队列. 公平锁需要在获取锁时加入排队的队列.

ReentrantLock 主要以修改后版本的CLH队列与sun..misc.Unsafe包(Sun Hotspot JVM专属)的CAS操作, 以非公平锁为例, 在加锁时:

final void lock() {

if (compareAndSetState(0, 1))

setExclusiveOwnerThread(Thread.currentThread());

else

acquire(1);

}

首先直接尝试用 CAS 方法修改锁状态, 如果成功则将当前线程设置为独享线程, 否则执行 acquire(1), 公平锁直接执行 acquire(1), 参数说明锁是可以重入的.

在 acquire() 方法中

public final void acquire(int arg) {

if (!tryAcquire(arg) &&

acquireQueued(addWaiter(Node.EXCLUSIVE), arg))

selfInterrupt();

}

首先说结论, 如果目前锁已上锁或者与其他线程竞争失败, tryAcquire()会返回false, 接下来会在acquireQueued()方法中执行LockSupport.park()方法时产生阻塞, 直到与lock()对应的unlock()/release()方法中执行LockSupport.unpark()解除阻塞.

如果线程发生中断, (例如被其他线程中断), acquireQueued()会返回interrupted()的结果True(如上一篇所描述, 同时还清理了标志位), 然后会响应这一中断, 接下来自行在selfInterrupt()中自行使用 Thread.currentThread.interrupt() 产生中断.

tryAcquire(args) 是尝试获取(获取的东西可以理解为进一步的执行许可)

final boolean nonfairTryAcquire(int acquires) {

final Thread current = Thread.currentThread();

int c = getState();

if (c == 0) {

if (compareAndSetState(0, acquires)) {

setExclusiveOwnerThread(current);

return true;

}

}

else if (current == getExclusiveOwnerThread()) {

int nextc = c + acquires;

if (nextc < 0) // overflow

throw new Error("Maximum lock count exceeded");

setState(nextc);

return true;

}

return false;

}

c 表示的是当前锁的状态(此状态每加锁1次会加上1), c为0表示目前锁是空闲状态, 于是尝试CAS方法修改状态, 修改成功则设置独占线程, 返回True, 在acqure()中短路"AND"直接跳出判断, 加锁成功. (竞争)修改失败则返回false.

当然也有可能当前锁处于已上锁的状态, 如果当前线程是上锁的独占线程, 则直接重入, 直接修改状态, 加锁成功.

如果 c 表示处于已上锁, 而且当前线程并不是上锁的独占线程, tryAcquire()失败, 会接着执行 acquireQueued(addWaiter(Node.EXCLUSIVE), arg)).

在AddWaiter()方法中:

private Node addWaiter(Node mode) {

Node node = new Node(Thread.currentThread(), mode);

// Try the fast path of enq; backup to full enq on failure

Node pred = tail;

if (pred != null) {

node.prev = pred;

if (compareAndSetTail(pred, node)) {

pred.next = node;

return node;

}

}

enq(node);

return node;

}

与CLH队列中相似, 试图将含有当前线程信息的节点加入队列. 需要强调的是, Node有如下两个个状态:

SHARED(Node, default new Node()), EXCLUSIVE(Node, default null)

ReentrantLock使用的是 EXCLUSIVE 模式, 即区别于SHARED, 线程对锁是独占的.

在这里 addWaiter() 与 enq() 的操作大致是相同的, 都是尝试以CAS操作插入队列尾部. 如果队列不为空, 而且此刻不存在竞争, (这是大部分的情况) 那么就直接更新 tail 节点, 将此节点的前驱节点置为原 tail 节点

如果队列为空或者竞争失败, 将执行enq()

private Node enq(final Node node) {

for (;;) {

Node t = tail;

if (t == null) { // Must initialize

if (compareAndSetHead(new Node()))

tail = head;

} else {

node.prev = t;

if (compareAndSetTail(t, node)) {

t.next = node;

return t;

}

}

}

}

方法体是一个自旋循环(死循环), 在 return t的位置会跌落, 其逻辑与addWaiter()类似. 分队列为空和队列非空两种情况, 队列为空则需要尝试CAS设置一次 head 节点, 再去CAS设置 tail 节点. 队列非空一直尝试 CAS 设置 tail 节点, 直到插入成功. 假设一共有 N 个线程竞争, 最多需要 N+1 次循环即可从死循环中跌落.

在插入队列成功之后, 会返回该节点, 执行aquireQueued()方法.

final boolean acquireQueued(final Node node, int arg) {

boolean failed = true;

try {

boolean interrupted = false;

for (;;) {

final Node p = node.predecessor();

if (p == head && tryAcquire(arg)) {

setHead(node);

p.next = null; // help GC

failed = false;

return interrupted;

}

if (shouldParkAfterFailedAcquire(p, node) &&

parkAndCheckInterrupt())

interrupted = true;

}

} finally {

if (failed)

cancelAcquire(node);

}

}

方法体同样是一个自旋循环.如果循环没走完第一个分支就被外来因素强行结束了, 则会将结点等待状态置为CANCEL(见下介绍). 而循环内是两个if分支, 第一个if分支代表着如果当前节点的前驱节点为 head, (可能经过了漫长的等待), 当前节点已经在队列的队头了. 由于存在非公平的实现, 新来的线程可能插队, 所以这时仍然需要尝试获取(tryAcquire())一次.

如果竞争成功, 则将该节点设置为头节点, 将原头节点主动清空方便回收, 返回变量interrupted.(如果发生中断, 在第二个if分支内会被修改为true, 否则false)

第二个if分支内, 需要了解结点的四种等待状态, 它们分别是:

WaitStatus: CANCELLED(int 1), SIGNAL(int -1), CONDITION(int -2), PROPAGATE(int -3).

CANCELLED 表示这个节点退出排队, 不再等待锁, SIGNAL是比较正常的状态, 表示当前节点的后继节点已经或者很快会通过 park 阻塞, 当前节点释放或者取消必须通知其后继节点可以尝试acquire了, acquire() 方法必须指示需要signal信号, 尝试失败依旧会阻塞

CONDITION 表示这个节点的状态在设置为0之前都不会当同步队列的一个节点使用

PROPAGATE 用于共享模式, 目的用于将 releaseShared 传播出去.

值得注意的是, 非负的状态是不需要进行 signal 的.

shouldParkAfterFailedAcquire(Node, Node)是判定 node 结点在 acquire 失败之后是否应该 park

private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {

int ws = pred.waitStatus;

if (ws == Node.SIGNAL)

/*

* This node has already set status asking a release

* to signal it, so it can safely park.

*/

return true;

if (ws > 0) {

/*

* Predecessor was cancelled. Skip over predecessors and

* indicate retry.

*/

do {

node.prev = pred = pred.prev;

} while (pred.waitStatus > 0);

pred.next = node;

} else {

/*

* waitStatus must be 0 or PROPAGATE. Indicate that we

* need a signal, but don't park yet. Caller will need to

* retry to make sure it cannot acquire before parking.

*/

compareAndSetWaitStatus(pred, ws, Node.SIGNAL);

}

return false;

}

这个方法在调用时会将一个节点的前置节点 pred 与节点本身 node 同时传入(为什么?), 如果前置节点的状态为 signal, 则说明当前节点应当 park 产生阻塞. 如果为正, 则说明不需要signal, 那么会从这个前置节点一直向前搜索并更新, 直到找到负状态的结点. 而且非负的结点都被抛弃.

如果现在前置节点小于0, 试图将等待状态修改为 signal, 返回 false (should not park), 下一次再调用时(也就是acquireQueued()的下一次循环) 即可返回 should park 为 true 的标志.

回到acquireQueued() 中, shouldParkAfterFailedAcquire()为true, 则会接着执行 park 操作产生阻塞, 具体是通过UNSAFE的Native方法park()操作的.

完成了上锁的过程后, 通过ReentrantLockunlock(), 也就是 Sync 中的 release() 进行释放

public final boolean release(int arg) {

if (tryRelease(arg)) {

Node h = head;

if (h != null && h.waitStatus != 0)

unparkSuccessor(h);

return true;

}

return false;

}

protected final boolean tryRelease(int releases) {

int c = getState() - releases;

if (Thread.currentThread() != getExclusiveOwnerThread())

throw new IllegalMonitorStateException();

boolean free = false;

if (c == 0) {

free = true;

setExclusiveOwnerThread(null);

}

setState(c);

return free;

}

锁真正的 release 需要有几个条件

i. release 线程应当与独占线程一致, 否则会throw Exception, 例如锁已释放时, Exclusive Owner Thread 已被重置为 null, 此时再尝试释放则会throw

ii. 由于锁可以重入多次, release的参量小于 state (lock的次数), 则会 releas 失败, 但是 state 会被修改.

iii. 如果 release 参数大于 state 会返回false, 但是 state 会变成负数, 此时会怎样? 会有着一种情况否? 不得而知.

如果确实真正 release 了锁, 将会通知自己的下个节点 unpark

private void unparkSuccessor(Node node) {

/*

* If status is negative (i.e., possibly needing signal) try

* to clear in anticipation of signalling. It is OK if this

* fails or if status is changed by waiting thread.

*/

int ws = node.waitStatus;

if (ws < 0)

compareAndSetWaitStatus(node, ws, 0);

/*

* Thread to unpark is held in successor, which is normally

* just the next node. But if cancelled or apparently null,

* traverse backwards from tail to find the actual

* non-cancelled successor.

*/

Node s = node.next;

if (s == null || s.waitStatus > 0) {

s = null;

for (Node t = tail; t != null && t != node; t = t.prev)

if (t.waitStatus <= 0)

s = t;

}

if (s != null)

LockSupport.unpark(s.thread);

}

要 unpark 自己的后继节点, 首先将自己的等待状态清除, 然后一般而言, 要 unpark 的结点为自己的后继节点. 如果后继节点的状态是 CANCELLED 或者 null, 那么就从队列尾部开始遍历, 找到最接近自己的 waitStatus 为负的节点 进行unpark.

(不明白什么时候队列可能出现 null 节点, 那么队列岂不是断掉了?)

unpark 会唤醒正在 park 中的阻塞线程, 于是阻塞线程被激活, 又开始 tryAcquire()

整体而言, ReentrantLock 只利用了一部分 AbstractQueuedSynchronizer 的特征, 当然是因为 java.util.concurrent 包中还有其他各种工具会利用到 AQS 的其他方法, 例如 ReadWriteLock/Semaphore 会使用 AQS 队列的共享模式. 在这里仅以 ReentrantLock 为例, 简单一窥 AQS 的机制.

1 thought on “AbstractQueuedSynchronizer”

Leave a Reply

Your email address will not be published. Required fields are marked *

This site uses Akismet to reduce spam. Learn how your comment data is processed.