ReadWriteLock的源码分析
ReadWriteLock是JUC包下的定义的读写锁的接口,定义两个接口readLock()、writeLock()分别是返回读锁和返回一个写锁。
ReadWriteLock默认有两个实现分别是ReadWriteLockView、ReentrantReadWriteLock。
ReentrantReadWriteLock是默认的读写锁的实现
ReadWriteLockView是StampedLock的内部类,StampedLock是JDK 1.8中对ReentrantReadWriteLock的一个增强的实现
下面会先分析ReentrantReadWriteLock,在对StampedLock进行分析
ReentrantReadWriteLock
ReentrantReadWriteLock是实现ReadWriteLock接口,对外提供read()和wirte()方法。特点主要是支持公平锁选择、可重入、锁降级的分类
用例
这个用例是对ReentrantReadWriteLock提供的用例CachedData的简化版本
- CachedData
class CachedData {
private lateinit var data: String
private var cacheValid: Boolean = false
private var lock: ReentrantReadWriteLock = ReentrantReadWriteLock()
fun processCacheData() {
//获取读锁
lock.readLock().lock()
if (!cacheValid) {
//获取写锁之前先要释放读锁
lock.readLock().unlock()
//获取写锁
lock.writeLock().lock()
try {
//再次检查标记,因为上一次检查标记是在获取写锁之前
if (!cacheValid) {
TimeUnit.SECONDS.sleep(2)
println(Thread.currentThread().name + " set data")
data = Thread.currentThread().name + " " + LocalDateTime.now().toString()
cacheValid = true
}
//重新获取读锁来完成锁降级
lock.readLock().lock()
} finally {
lock.writeLock().unlock()
}
}
try {
println(Thread.currentThread().name + "打印data:" + data)
} finally {
lock.readLock().unlock()
}
}
}
- main
fun main() {
//创建线程池
val poolExecutor = ThreadPoolExecutor(2, 2, 0, TimeUnit.SECONDS, SynchronousQueue())
//创建任务
var cache:CachedData = CachedData()
var barrier:CyclicBarrier = CyclicBarrier(2)
IntStream.range(0, 2).forEach{
poolExecutor.submit{
barrier.await()
cache.processCacheData()
}
}
}
- 执行结果
可以看到两个线程对data的写锁进行竞争,但是只有一个线程成功执行set方法,另外的一个线程只能执行读锁的操作
从现象上来看一个ReentrantReadWriteLock对外提供了读锁和写锁两个功能,下面就开始对代码进行详细的分析
构造函数
/**
* Creates a new {@code ReentrantReadWriteLock} with
* default (nonfair) ordering properties.
*/
public ReentrantReadWriteLock() {
this(false);
}
/**
* Creates a new {@code ReentrantReadWriteLock} with
* the given fairness policy.
*
* @param fair {@code true} if this lock should use a fair ordering policy
*/
public ReentrantReadWriteLock(boolean fair) {
sync = fair ? new FairSync() : new NonfairSync();
readerLock = new ReadLock(this);
writerLock = new WriteLock(this);
}
可以看到无参的构造函数是非公平锁的策略,在构造方法中主要是创建了三个成员变量
ReentrantReadWriteLock{
//公平锁类型
final Sync sync;
//内部读锁
private final ReentrantReadWriteLock.ReadLock readerLock;
//内部写锁
private final ReentrantReadWriteLock.WriteLock writerLock;
}
源码解析
构造方法
- 获取锁的方法
获取读锁/写锁就是直接返回内部的读锁/写锁变量
public ReentrantReadWriteLock.WriteLock writeLock() { return writerLock; }
public ReentrantReadWriteLock.ReadLock readLock() { return readerLock; }
可以看到锁的类型分别是ReadLock和WriteLock,继续分析ReadLock/WriteLock
-
ReadLock与WriteLock对比
-
ReadLock
public static class ReadLock implements Lock, java.io.Serializable {
private static final long serialVersionUID = -5992448646407690164L;
private final Sync sync;
protected ReadLock(ReentrantReadWriteLock lock) {
sync = lock.sync;
}
public void lock() {
sync.acquireShared(1);
}
public void lockInterruptibly() throws InterruptedException {
sync.acquireSharedInterruptibly(1);
}
public boolean tryLock() {
return sync.tryReadLock();
}
public boolean tryLock(long timeout, TimeUnit unit)
throws InterruptedException {
return sync.tryAcquireSharedNanos(1, unit.toNanos(timeout));
}
public void unlock() {
sync.releaseShared(1);
}
public Condition newCondition() {
throw new UnsupportedOperationException();
}
public String toString() {
int r = sync.getReadLockCount();
return super.toString() +
"[Read locks = " + r + "]";
}
}
- WriteLock
public static class WriteLock implements Lock, java.io.Serializable {
private static final long serialVersionUID = -4992448646407690164L;
//ReentrantReadWriteLock内部的AQS公平/非公平抽象类
private final Sync sync;
protected WriteLock(ReentrantReadWriteLock lock) {
sync = lock.sync;
}
public void lock() {
sync.acquire(1);
}
public void lockInterruptibly() throws InterruptedException {
sync.acquireInterruptibly(1);
}
public boolean tryLock() {
return sync.tryWriteLock();
}
public boolean tryLock(long timeout, TimeUnit unit)
throws InterruptedException {
return sync.tryAcquireNanos(1, unit.toNanos(timeout));
}
public void unlock() {
sync.release(1);
}
public Condition newCondition() {
return sync.newCondition();
}
public String toString() {
Thread o = sync.getOwner();
return super.toString() + ((o == null) ?
"[Unlocked]" :
"[Locked by thread " + o.getName() + "]");
}
//查询当前线程是否持有写锁
public boolean isHeldByCurrentThread() {
return sync.isHeldExclusively();
}
//当前写锁的锁定次数
public int getHoldCount() {
return sync.getWriteHoldCount();
}
}
通过对比可知WriteLock只是比ReadLock多两个方法
- isHeldByCurrentThread()
查询当前线程是否持有写锁 - getHoldCount()
当前写锁的锁定次数
同时可以看到不管是WriteLock、ReadLock都是使用Sync来实现的功能,下面详细的分析一下Sync类的实现
- Sync
abstract static class Sync extends AbstractQueuedSynchronizer {
private transient ThreadLocalHoldCounter readHolds;
private volatile int state;
//构造函数
Sync() {
//创建ThreadLocalHoldCounter(记录线程持有的锁数量)
readHolds = new ThreadLocalHoldCounter();
//设置状态,调用AQS的setState()
setState(getState()); // ensures visibility of readHolds
}
}
从上面代码可以看出Sync是继承自AbstractQueuedSynchronizer,在构造方法中创建ThreadLocalHoldCounter和调用AQS的setState()方法
有关AbstractQueuedSynchronizer的内容可以参考之前写的
AbstractQueuedSynchronizer的源码分析
-
ThreadLocalHoldCounter
ThreadLocalHoldCounter的代码如下所示 -
ThreadLocalHoldCounter
static final class ThreadLocalHoldCounter
extends ThreadLocal<HoldCounter> {
public HoldCounter initialValue() {
return new HoldCounter();
}
}
- HoldCounter
static final class HoldCounter {
//初始为0
int count;
//使用id来标识Thread而不是引用,避免引用逃逸
final long tid = LockSupport.getThreadId(Thread.currentThread());
}
ThreadLocalHoldCounter继承于ThreadLocal然后在初始化时用initialValue()方法返回一个HoldCounter引用
HoldCounter是用来记录线程中加锁的统计与线程id相关联,这里关联线程引用是比较优秀的,通过LockSupport.getThreadId获取一个long类型的线程标识
- LockSupport.getThreadId
static final long getThreadId(Thread thread) {
//U是Unsafe类型的对象
return U.getLong(thread, TID);
}
Sync除了通过构造函数初始化上述的ThreadLocalHoldCounter对象以外,还有一些静态成员变量来完成一些例如控制允许线程重复持有锁的次数等
static final int SHARED_SHIFT = 16;
static final int SHARED_UNIT = (1 << SHARED_SHIFT);
static final int MAX_COUNT = (1 << SHARED_SHIFT) - 1;
static final int EXCLUSIVE_MASK = (1 << SHARED_SHIFT) - 1;
//省略...
int w = exclusiveCount(c);
if (c != 0) {
// (Note: if c != 0 and w == 0 then shared count != 0)
if (w == 0 || current != getExclusiveOwnerThread())
return false;
if (w + exclusiveCount(acquires) > MAX_COUNT)
throw new Error("Maximum lock count exceeded");
// Reentrant acquire
setState(c + acquires);
return true;
}
这里是对同一个线程持有的锁超过阈值的演示
readLock()
- lock
public void lock() {
sync.acquireShared(1);
}
直接使用AbstractQueuedSynchronizer的acquireShared()获取一把共享锁,失败就阻塞
- AbstractQueuedSynchronizer.acquireShared
public final void acquireShared(int arg) {
//调用子类的tryAcquireShared()实现
if (tryAcquireShared(arg) < 0)
acquire(null, arg, true, false, false, 0L);
}
- ReentrantReadWriteLock.tryAcquireShared()
@ReservedStackAccess
protected final int tryAcquireShared(int unused) {
/*
* 例子:
* 1. 如果写锁被其他线程持有就返回失败
* 2. 因此该线程获取到写入资格,根据线程队列判断是否要进行阻塞,不需要进行阻塞时就通过CAS的方式来操作锁以及计数
* 3. 对cas失败的场景进行重试
*/
Thread current = Thread.currentThread();
//获取同步状态
int c = getState();
//如果有独占线程并且独占线程不是当前线程时直接返回-1(失败)
if (exclusiveCount(c) != 0 && getExclusiveOwnerThread() != current)
return -1;
//共享计数
int r = sharedCount(c);
//1. 通过readerShouldBlock()判断当前是否可以操作,readerShouldBlock()主要是对公平锁和非公平锁的一个判断
//2. 判断当前是否操作了最大的加锁量
//3. 通过CAS进行操作
if (!readerShouldBlock() && r < MAX_COUNT && compareAndSetState(c, c + SHARED_UNIT)) {
//如果是首次加锁,设置首次加锁线程和次数
if (r == 0) {
firstReader = current;
firstReaderHoldCount = 1;
} else if (firstReader == current) {
//如果是首次加锁线程进行继续加锁那么次数++
firstReaderHoldCount++;
} else {
//处理holdCounter对象
HoldCounter rh = cachedHoldCounter;
if (rh == null || rh.tid != LockSupport.getThreadId(current))
cachedHoldCounter = rh = readHolds.get();
else if (rh.count == 0)
readHolds.set(rh);
rh.count++;
}
return 1;
}
//重复处理
return fullTryAcquireShared(current);
}
tryAcquireShared是会执行锁的计数、初始线程的绑定等工作,并且会对执行失败进行自旋重试
- unlock
public void unlock() {
sync.releaseShared(1);
}
使用releaseShared来完成释放锁的操作,AQS又是通过tryReleaseShared来完成的,下面可以看一下tryReleaseShared的代码
//尝试释放锁
@ReservedStackAccess
protected final boolean tryReleaseShared(int unused) {
Thread current = Thread.currentThread();
//如果是首次加锁线程对首次的标记进行修改
if (firstReader == current) {
// assert firstReaderHoldCount > 0;
if (firstReaderHoldCount == 1)
firstReader = null;
else
firstReaderHoldCount--;
} else {
//对HoldCounter进行修改
//如果不是临时holdCounter,那么获取holdCounter后在进行处理
HoldCounter rh = cachedHoldCounter;
if (rh == null || rh.tid != LockSupport.getThreadId(current))
rh = readHolds.get();
int count = rh.count;
if (count <= 1) {
readHolds.remove();
if (count <= 0)
throw unmatchedUnlockException();
}
--rh.count;
}
//自旋
for (;;) {
int c = getState();
int nextc = c - SHARED_UNIT;
//CAS的方式对锁进行释放
if (compareAndSetState(c, nextc))
return nextc == 0;
}
}
writeLock()
- lock
public void lock() {
sync.acquire(1);
}
直接使用AbstractQueuedSynchronizer的acquire()获取一把独占锁,失败就阻塞
- unlock
public void unlock() {
sync.release(1);
}
使用release来完成释放锁的操作
通过上面的代码可以看到所有的操作都是通过Sync这个类委托给AbstractQueuedSynchronizer来进行的,主要是三个功能
- 加锁(共享/独占)
- 解锁(共享/独占)
- 计数(共享/独占)
然后AQS作为入口也会通过调用ReentrantReadWriteLock的具体的加锁/解锁/计数的进行操作。
读锁不进行排队,写锁会进行排队阻塞
ReentrantReadWriteLock中有趣的操作是把state的高16位作为读锁标识,低16位作为写锁标识,因此也是只能加锁2^16的原因
StampedLock
StampedLock是对ReentrantReadWriteLock的迭代,在对StampedLock中优化了写锁饥饿的问题
用例
- StampedLockDemo
class StampedLockDemo {
private val stampedLock = StampedLock()
private var data = 0
fun writeData() {
val stamp = stampedLock.writeLock()
data += try {
1
} finally {
stampedLock.unlockWrite(stamp)
}
}
fun readData(): Int {
//1.使用乐观锁
var stamp = stampedLock.tryOptimisticRead()
var curData = data
//2. 写法1 双重锁保证读取到的是最新数据
if (!stampedLock.validate(stamp)) {
try {
stamp = stampedLock.readLock()
curData = data
} finally {
stampedLock.unlockRead(stamp)
}
}
//3. 写法2 循环
// while(!stampedLock.validate(stamp)) {
// stamp = stampedLock.tryOptimisticRead();
// curData = this.data;
// }
return curData
}
}
StampedLock在加锁时会返回一个戳(stamp),可以把它理解为版本号/时间戳,在后续解锁时会用到.
-
提供了乐观锁和悲观锁的实现
- tryOptimisticRead()
- tryReadLock()
-
锁降/升级
- tryConvertToWriteLock()
- tryConvertToReadLock()
总结
ReadWriteLock是读写锁的接口,默认有两个实现分别是ReentrantReadWriteLock、StampedLock,分别是针对读多写少的场景和需要使用乐观锁的场景。
底层都采用一个标志位来进行区分读锁/写锁标识,一个是Int(32位),一个是Long(64位),并且都继承与AbstractQueuedSynchronizer来进行实现的。