AbstractQueuedSynchronizer的源码分析


AbstractQueuedSynchronizer的源码分析

AQS的全称是AbstractQueuedSynchronizer,AQS提供了一个以FIFO的队列,通过定义一系列阻塞线程的操作为ReentrantLockReentrantReadWriteLockSemaphoreCountDownLatch提供了底层框架支持。

AbstractQueuedSynchronizer的实现类图

AbstractQueuedSynchronizer的实现类

AbstractQueuedSynchronizer的实现类图关系

AQS是一个用于构建锁、同步器等线程协作工具类的底层框架,下面详细的对源码进行分析;

AQS的源码分析

AQS最核心的三大部分就是状态队列期望协作工具类去实现的获取/释放等重要方法。我们就从这三个部分出发,分别展开讲解;

state状态

//同步状态
private volatile int state;

在AQS中,同步状态state被定义为一个volatile的int类型变量,它表示锁的状态,初始值为0;在不同的实现类中它代表的含义各不相同,例如:

  • semaphore
//tryReleaseShared方法进行分析
  protected final boolean tryReleaseShared(int releases) {
      for (;;) {
          int current = getState();
          int next = current + releases;
          if (next < current) // overflow
              throw new Error("Maximum permit count exceeded");
          if (compareAndSetState(current, next))
              return true;
      }
  }

以tryReleaseShared进行分析,getState获取到当前信号量后,在将下一个state通过CAS的方式重新进行赋值;


protected final void setState(int newState) {
    state = newState;
}

protected final boolean compareAndSetState(int expect, int update) {
    return STATE.compareAndSet(this, expect, update);
}

VarHandle是JDK中用于替代unsafe的实现类,这里需要注意的是setState方法,它在这里采用的是直接赋值的方式,因为state被定义为一个用volatile修饰的int类型变量,所以直接赋值的方式是线程安全的;

FIFO队列

FIFO队列指的是AQS中的线程等待队列,代码如下所示,用来存储等待线程的队列,后续包括线程的等待和唤醒都是依赖该队列进行实现;

/** CLH Nodes */
abstract static class Node {
    volatile Node prev;       // 前驱节点
    volatile Node next;       // 后驱节点
    Thread waiter;            // 等待线程
    volatile int status;      // 后驱节点状态
}
  • CLH队列图示
    CLH队列图示

其中需要注意的是头节点Head并不在等待队列中,因为当线程没有竞争时,是不会创建等待队列,只有当线程发生竞争时才会创建等待队列;

CLH (Craig, Landin, and Hagersten) 队列是一种无锁、可扩展的自旋锁队列结构,用于实现高并发环境下的线程同步。CLH 队列锁是基于链表的等待队列(FIFO 队列),每个线程在尝试获取锁时会自旋等待,但不会相互干扰。这种方式减少了线程之间的竞争,适合用于多核系统中减少锁争用,提升并发性能。

在 Java 中,AbstractQueuedSynchronizer(AQS)中的同步队列就是基于 CLH 队列思想实现的。
CLH队列的基本思想是,每个线程在等待获取锁时,都会生成一个节点,并在其节点中记录它的状态(例如是否在等待)。队列中的每个节点都表示一个线程的状态,并且这些节点按顺序排列。线程会通过检查前驱节点的状态来判断自己是否可以获取锁。在 AQS中,CLH 队列通过Node的waitStatus字段来跟踪每个节点的状态。每个节点指向前一个节点,而当前线程通过前驱节点的状态来判断自己是否可以获取锁。这一机制可以有效实现多线程环境下的高效排队和锁的公平性。

  • acquireQueued方法

// 尝试获取资源,如果成功则直接返回
public final void acquire(int arg) {
    if (!tryAcquire(arg) &&
        acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
        selfInterrupt();
}

//自旋的方式获取锁
final boolean acquireQueued(final Node node, int arg) {
    boolean interrupted = false;
    try {
        for (;;) {
            final Node p = node.predecessor();
            if (p == head && tryAcquire(arg)) {
                setHead(node);
                p.next = null; // help GC
                return interrupted;
            }
            if (shouldParkAfterFailedAcquire(p, node))
                interrupted |= parkAndCheckInterrupt();
        }
    } catch (Throwable t) {
        cancelAcquire(node);
        if (interrupted)
            selfInterrupt();
        throw t;
    }
}

// 阻塞当前线程
private final boolean parkAndCheckInterrupt() {
    LockSupport.park(this);
    return Thread.interrupted();
}

先通过tryAcquire方法尝试获取锁,如果成功则直接返回;如果失败则进入acquireQueued方法,该方法中通过自旋的方式获取锁,其中阻塞线程是通过LockSupport.park方法来进行实现的;

总结:

参考资料

一行一行源码分析清楚AbstractQueuedSynchronizer
自旋锁、排队自旋锁、MCS锁、CLH锁


  TOC