AQS即AbstractQueuedSynchronizer,中文叫做队列同步器。是用来构建锁或者其他同步组件的基础框架,它使用了一个int成员变量表示同步状态,通过内置的FIFO队列来完成资源获取线程的排队工作。
以上方法不需要全部都实现,根据获取锁的种类可以选择实现不同的方法,支持独占获取锁的同步器应该实现tryAcquire,tryRelease和isHeldExclusively,而支持共享获取的同步器应该实现tryAcquireShared,tryReleaseShared和isHeldExclusively。
//NonfairSync实现
final void lock() {
//安全的设置同步状态,如果成功,则获得锁,如果失败,则去尝试获取独享锁
if (compareAndSetState(0, 1))
setExclusiveOwnerThread(Thread.currentThread());
else
acquire(1);
}
//AQS实现。尝试获取独享锁,如果失败则进入等待队列并循环尝试获取独享锁
public final void acquire(int arg) {
if (!tryAcquire(arg) &&
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
selfInterrupt();
}
//NonfairSync实现
protected final boolean tryAcquire(int acquires) {
return nonfairTryAcquire(acquires);
}
//Sync实现。
final boolean nonfairTryAcquire(int acquires) {
final Thread current = Thread.currentThread();
int c = getState();
//如果锁没有被占用,则尝试获取锁,成功则返回,和lock方法逻辑类似
if (c == 0) {
if (compareAndSetState(0, acquires)) {
setExclusiveOwnerThread(current);
return true;
}
}
//如果是当前线程只有锁,则同步状态+1并成功返回,这里实现了可重入锁的逻辑
else if (current == getExclusiveOwnerThread()) {
int nextc = c + acquires;
if (nextc < 0) // overflow
throw new Error("Maximum lock count exceeded");
setState(nextc);
return true;
}
return false;
}
//AQS实现。创建线程节点并添加到同步队列的尾节点。
private Node addWaiter(Node mode) {
Node node = new Node(Thread.currentThread(), mode);
Node pred = tail;
if (pred != null) {
node.prev = pred;
if (compareAndSetTail(pred, node)) {
pred.next = node;
return node;
}
}
//如果尾节点为null,说明队列没有初始化,则进行队列初始化并加入到尾节点
enq(node);
return node;
}
//AQS实现。初始化队列并循环添加到尾节点直到成功。
private Node enq(final Node node) {
for (;;) {
Node t = tail;
if (t == null) {
//初始化队列
if (compareAndSetHead(new Node()))
tail = head;
} else {
node.prev = t;
if (compareAndSetTail(t, node)) {
t.next = node;
return t;
}
}
}
}
//AQS实现。
final boolean acquireQueued(final Node node, int arg) {
boolean failed = true;
try {
boolean interrupted = false;
for (;;) {
final Node p = node.predecessor();
//当节点先驱是头节点时,则尝试获取锁
if (p == head && tryAcquire(arg)) {
//如果获取到锁,则将当前节点设置为头节点,并回收旧的头节点
setHead(node);
p.next = null; // help GC
failed = false;
return interrupted;
}
//如果先驱节点不是头节点或者获取锁失败的话,判断是否可以休息,可以的话进入waiting,直到被unpark()
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
interrupted = true;
}
} finally {
if (failed)
cancelAcquire(node);
}
}
//ReentrantLock实现
public void unlock() {
sync.release(1);
}
//AQS实现。
public final boolean release(int arg) {
if (tryRelease(arg)) {
Node h = head;
if (h != null && h.waitStatus != 0)
unparkSuccessor(h);//如果头节点不为空,为唤醒头节点
return true;
}
return false;
}
//Sync实现。释放锁设置同步状态
protected final boolean tryRelease(int releases) {
int c = getState() - releases;
if (Thread.currentThread() != getExclusiveOwnerThread())
throw new IllegalMonitorStateException();
boolean free = false;
if (c == 0) {
free = true;
setExclusiveOwnerThread(null);
}
setState(c);
return free;
}