AbstractQueuedSynchronizer的源码分析
AQS的全称是AbstractQueuedSynchronizer,AQS提供了一个以FIFO的队列,通过定义一系列阻塞线程的操作为ReentrantLock、ReentrantReadWriteLock、Semaphore、CountDownLatch提供了底层框架支持。
AbstractQueuedSynchronizer的实现类图
AQS是一个用于构建锁、同步器等线程协作工具类的底层框架,下面详细的对源码进行分析;
AQS的源码分析
AQS最核心的三大部分就是状态、队列和期望协作工具类去实现的获取/释放等重要方法。我们就从这三个部分出发,分别展开讲解;
state状态
//同步状态
private volatile int state;
在AQS中,同步状态state被定义为一个volatile的int类型变量,它表示锁的状态,初始值为0;在不同的实现类中它代表的含义各不相同,例如:
- semaphore
//tryReleaseShared方法进行分析
protected final boolean tryReleaseShared(int releases) {
for (;;) {
int current = getState();
int next = current + releases;
if (next < current) // overflow
throw new Error("Maximum permit count exceeded");
if (compareAndSetState(current, next))
return true;
}
}
以tryReleaseShared进行分析,getState获取到当前信号量后,在将下一个state通过CAS的方式重新进行赋值;
protected final void setState(int newState) {
state = newState;
}
protected final boolean compareAndSetState(int expect, int update) {
return STATE.compareAndSet(this, expect, update);
}
VarHandle是JDK中用于替代unsafe的实现类,这里需要注意的是setState方法,它在这里采用的是直接赋值的方式,因为state被定义为一个用volatile修饰的int类型变量,所以直接赋值的方式是线程安全的;
FIFO队列
FIFO队列指的是AQS中的线程等待队列,代码如下所示,用来存储等待线程的队列,后续包括线程的等待和唤醒都是依赖该队列进行实现;
/** CLH Nodes */
abstract static class Node {
volatile Node prev; // 前驱节点
volatile Node next; // 后驱节点
Thread waiter; // 等待线程
volatile int status; // 后驱节点状态
}
- CLH队列图示
其中需要注意的是头节点Head并不在等待队列中,因为当线程没有竞争时,是不会创建等待队列,只有当线程发生竞争时才会创建等待队列;
CLH (Craig, Landin, and Hagersten) 队列是一种无锁、可扩展的自旋锁队列结构,用于实现高并发环境下的线程同步。CLH 队列锁是基于链表的等待队列(FIFO 队列),每个线程在尝试获取锁时会自旋等待,但不会相互干扰。这种方式减少了线程之间的竞争,适合用于多核系统中减少锁争用,提升并发性能。
在 Java 中,AbstractQueuedSynchronizer(AQS)中的同步队列就是基于 CLH 队列思想实现的。
CLH队列的基本思想是,每个线程在等待获取锁时,都会生成一个节点,并在其节点中记录它的状态(例如是否在等待)。队列中的每个节点都表示一个线程的状态,并且这些节点按顺序排列。线程会通过检查前驱节点的状态来判断自己是否可以获取锁。在 AQS中,CLH 队列通过Node的waitStatus字段来跟踪每个节点的状态。每个节点指向前一个节点,而当前线程通过前驱节点的状态来判断自己是否可以获取锁。这一机制可以有效实现多线程环境下的高效排队和锁的公平性。
- acquireQueued方法
// 尝试获取资源,如果成功则直接返回
public final void acquire(int arg) {
if (!tryAcquire(arg) &&
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
selfInterrupt();
}
//自旋的方式获取锁
final boolean acquireQueued(final Node node, int arg) {
boolean interrupted = false;
try {
for (;;) {
final Node p = node.predecessor();
if (p == head && tryAcquire(arg)) {
setHead(node);
p.next = null; // help GC
return interrupted;
}
if (shouldParkAfterFailedAcquire(p, node))
interrupted |= parkAndCheckInterrupt();
}
} catch (Throwable t) {
cancelAcquire(node);
if (interrupted)
selfInterrupt();
throw t;
}
}
// 阻塞当前线程
private final boolean parkAndCheckInterrupt() {
LockSupport.park(this);
return Thread.interrupted();
}
先通过tryAcquire方法尝试获取锁,如果成功则直接返回;如果失败则进入acquireQueued方法,该方法中通过自旋的方式获取锁,其中阻塞线程是通过LockSupport.park方法来进行实现的;