【并发编程系列5】JUC必知ReentrantLock和AQS同步队列实现原理分析

微信扫一扫,分享到朋友圈

【并发编程系列5】JUC必知ReentrantLock和AQS同步队列实现原理分析

前言

锁是一种用来控制多线程访问共享资源的工具。通常,锁可以独占共享资源:同一时间只有一个线程可以获得锁,并且所有访问共享资源的线程都必须首先获得锁。前面我们介绍过了synchronized,使用synchronized的方法和代码块作用域机制使得使用监视器锁更加简单,并且帮助避免了许多关于锁的常见编程错误,比如锁未及时释放等问题。但是有时候我们需要更灵活的使用锁资源,例如,一些遍历并发访问的数据结构的算法需要使用“手动”方法,或者“锁链”:你先获得节点A的锁,然后是节点B,然后释放A获得C,再释放B获得D,以此类推。这种方式如果要使用synchronized就不是很好实现,但是有了Lock就不一样了,Lock接口允许以不同的范围去获取和释放锁,并且允许同时获得多把锁,也可以以任意的顺序释放。

Lock

Lock在J.U.C 中是最核心的组件,Lock是一个接口,它定义了释放锁和获得锁的抽象方法,今天我们要分析的ReentrantLock就实现了Lock接口,Lock接口中主要定义了5个方法:

方法 描述
void lock() 获取锁,该方法不会响应中断。如果获取锁失败,当前线程将被阻塞直到成功获得锁
void lockInterruptibly() throws InterruptedException 获取锁,会响应中断。其他和lock()方法一样
boolean tryLock() 非阻塞获取锁,该方法需要立即返回获取锁结果,成功返回true,失败返回false
boolean tryLock(long time, TimeUnit unit) throws InterruptedException 指定时间内获取锁,会响应中断: 1.在指定的超时时间内获得锁成功则返回true; 2.在超时时间内被中断则立刻返回获得锁结果; 3.在指定超时时间结束后立刻返回获得锁结果
void unlock() 释放锁。如果没有获得锁则会抛异常
Condition newCondition()

初识ReentrantLock

ReentrantLock,重入锁。表示支持重新进入的锁,也就是说,如果当前线程 t1 通过调用 lock方法获取了锁之后,再次调用 lock,是不会再阻塞去获取锁的,直接增加重入次数就行了(synchronized也是支持重入的)。

ReentrantLock基本使用

ReentrantLock的使用非常简单,下面就是一个使用的例子:

package com.zwx.concurrent;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
public class ReentrantLockDemo {
public static void main(String[] args) {
Lock lock = new ReentrantLock();
lock.lock();
try {
System.out.println(1);
}finally {
lock.unlock();
}
}
}
复制代码

使用Lock的时候尤其要注意,当加锁和解锁发生在不同地方时,必须小心确保所有的持有锁的代码都受到try-finally或try-catch代码块的保护,用以确保在必要时锁得到释放,这也是灵活使用所带来的代价,因为使用synchronized时,不需要考虑这个问题。

ReentrantLock原理

假如我们是Lock的设计者,那我们想一下,如果多个线程都来并发抢占同一把锁资源时,而同一时间肯定只有一个线程能抢占成功,那么抢占失败的线程怎么办?答案很简单,肯定是找个地方存起来,但是具体要怎么存,这个就是个值得思考的问题。ReentrantLock中采用的是一个同步队列的数据结构来存储阻塞等待的线程。

同步队列(AQS)

同步队列,全称为:AbstractQueuedSynchronizer,又可以简称为AQS。在Lock中,这是一个非常重要的核心组件,J.U.C工具包中很多地方都使用到了AQS,所以,如果理解了AQS,那么再去理解ReentrantLock,Condition,CountDownLatch等工具的实现原理,就会非常轻松。

AQS的两种功能

从使用层面来说,AQS 的功能分为两种:独占和共享。

  • 独占锁:每次只有一个线程持有锁,如:ReentrantLock 就是以独占方式实现的互斥锁
  • 共享锁:允许多个线程同时获取锁 ,并发访问共享资源 , 如:ReentrantReadWriteLock

AQS 的内部实现

AQS依赖内部的一个FIFO双向队列来完成同步状态的管理,当前线程获取锁失败时,AQS会将当前线程以及等待状态等信息构造成为一个节点(Node对象)并将其加入AQS中,同时会阻塞当前线程,当锁被释放时,会把首节点中的线程唤醒,使其再次尝试获取同步状态。AQS中有一个头(head)节点和一个尾(tail)节点,中间每个节点(Node)都有一个prev和next指针指向前一个节点和后一个节点,如下图:


Node对象组成

AQS中每一个节点就时一个Node对象,并且通过节点中的状态等信息来控制队列,Node对象是AbstractQueuedSynchronizer对象中的一个静态内部类,下面就是Node对象的源码:

static final class Node {
static final Node SHARED = new Node();
static final Node EXCLUSIVE = null;
static final int CANCELLED =  1;//表示当前线程状态是取消的
static final int SIGNAL    = -1;//表示当前线程正在等待锁
static final int CONDITION = -2;//Condition队列有使用到,暂时用不到
static final int PROPAGATE = -3;//CountDownLatch等工具中使用到,暂时用不到
volatile int waitStatus;//节点中线程的状态,默认为0
volatile Node prev;//当前节点的前一个节点
volatile Node next;//当前节点的后一个节点
volatile Thread thread;//当前节点封装的线程信息
Node nextWaiter;//Condition队列中的关系,暂时用不到
final boolean isShared() {//暂时用不到
return nextWaiter == SHARED;
}
final Node predecessor() throws NullPointerException {//获取当前节点的上一个节点
Node p = prev;
if (p == null)
throw new NullPointerException();
else
return p;
}
Node() {
}
Node(Thread thread, Node mode) {//构造一个节点:addWaiter方法中会使用,此时waitStatus默认等于0
this.nextWaiter = mode;
this.thread = thread;
}
Node(Thread thread, int waitStatus) { //构造一个节点:Condition中会使用
this.waitStatus = waitStatus;
this.thread = thread;
}
}
复制代码

上面代码中加上了注释,总来来说应该还是比较好理解,注意,Node对象并不是AQS才会使用,Condition队列以及其他工具中也会使用,所以有些状态和方法在这里是暂时用不上的本文就不会过多关注。

lock.lock()源码解读

上面花了一点篇幅介绍了ReentrantLock内部的实现机制,相信大家的脑海里有了一个初步的轮廓,接下来就让我们一步步从加锁到释放锁进行源码解读吧

ReentrantLock#lock()

上文的示例中,当我们调用lock.lock()时,我们进入Lock接口的实现类ReentrantLock中的加锁入口:


然后发现这里调用了sync中的一个lock()方法,sync是ReentrantLock类当中的一个组合类,我们知道,AQS是一个同步队列,但是因为AQS只是一个同步队列,并不具备一些业务执行能力,所以通过了另一个Sync来继承AbstractQueuedSynchronizer,并根据不同的业务场景来实现不同的业务逻辑:


进入Sync类,我们发现,Sync的lock()方法是一个抽象方法,也就是说我们还需要去执行Sync中的实现类,Sync有两个实现类:FairSync和NonfairSync,也就是公平锁和非公平锁。


公平锁和非公平锁其实大致逻辑都是差不多,唯一的区别是非公平锁在抢占锁之前会先判断一下当前AQS队列中是不是有线程正在等待,如果没有才会通过CAS操作去抢占锁,相信看完这篇文章,大家自己去看也会很容易理解。这里我们就以非公平锁为例,进入非公平锁中的lock()方法继续我们的源码之旅:

NonfairSync#lock()


这个方法很简短,首先通过一个CAS操作,如果CAS成功则表示当前线程获得锁成功,获取锁成功之后,需要在AQS当中记录一下当前获得锁的线程信息:


CAS操作

CAS操作是多线程当中的一个原子操作。


上面代码的意思是,如果当前内存中的stateOffset的值和预期值expect相等,则替换为 update值。更新成功返回true,否则返回false。这个操作是原子的,不会出现线程安全问题。

stateOffset是AQS中的一个属性,它在不同的实现中所表达的含义不一样,对于重入锁的实现来说,表示一个同步状态。它有两个含义:

  1. 当 state=0 时,表示无锁状态
  2. 当 state>0 时,表示已经有线程获得了锁,也就是state=1,但是因为ReentrantLock允许重入,所以同一个线程多次获得同步锁的时候,state会递增,比如重入 5 次,那么state=5。而在释放锁的时候,同样需要释放5次直到state=0其他线程才有资格获得锁。
    至于Unsafe类中都是一些本地(native)方法,就不过多叙述了。

CAS操作的ABA问题

CAS操作是一个原子操作,但是CAS操作同样会存在问题,这就是经典的ABA问题。假如一个值一开始是A,然后被修改成B,最后又被改回了A,那么这时候有其他线程过来,会发现预期值还是是A,就会以为没变(实际上变了:A-B-A),这时候就会CAS操作成功,解决这个问题的办法就是可以加入一个版本号,比如原始值设置为A1(1表示版本号),改成B的时候版本号也同时递增,修改成B2,这时候再次改回A,就变成A3,那么A1和A3就不相同了,可以避免了ABA问题的产生。

AQS#acquire(arg)

上面的操作中,我们假如线程A是第一次进来,那么肯定获得锁成功,这时候我们可以得到这样的一个AQS:


这时候AQS队列还未构造,仅仅只是设置了一些独占线程相关的属性。上图中表示当前对象已经有线程ThreadA获得锁了,这时又来了个线程B,会去执行CAS操作,因为线程A已经获得锁了,状态已经等于1了,所以CAS失败,这时候线程B就会调用AQS当中的acquire方法(参数1是固定死的,表示加锁次数):


这里也是一个if判断,我们先看第一个条件,tryAcquire(arg)方法,这里我们同样进入非公平锁实现NonfairSync类中,然后会发现最终其还是会调用父类Sync中的方法nonfairTryAcquire(arg)

Sync#nonfairTryAcquire(arg)


这个方法就是为了尝试去获得锁,但是这里有一个问题,因为这个方法之所以会被执行,就是因为前面的CAS操作失败了,也就是获得锁失败了,state就肯定不会为0了,可是这个方法的131行为什么还要再次判断state是否等于0呢?

我们想一想,如果一个线程争抢锁失败,我们应该怎么做,无外乎两个办法: 一个就是多试几次 ,之前介绍synchronized的时候曾经说过,大部分锁被持有之后都会很快被释放,所以再试试总没有错,万一刚好锁被释放了呢。 另一个办法就是阻塞等待 ,这个后面会介绍,所以这里的131行代码判断也是这个逻辑,就是再试一次,如果成功,就可以直接获得锁,而不需要加入AQS队列挂起线程了。

线程A没有释放锁的时候,线程B会抢占锁失败,则返回false,我们回到之前的逻辑,会继续执行acquireQueued方法,这个方法里面有一个参数是addWaiter返回的,所以我们先去看addWaiter这个方法

AQS#addWaiter(Node)

走到这里就是说明当前线程至少2次尝试获取锁都失败了,所以当前线程会被初始化成为一个Node节点,加入到AQS队列中。我们前面提到了,AQS有两种模式,一种独占,一种共享,而重入锁ReentrantLock是独占的,所以这里固定传入了参数Node.EXCLUSIVE表示当前锁是独占的。而由前面Node对象的源码可以知道,Node.EXCLUSIVE其实是一个null值的Node对象。


因为我们到这里的时候是第一次进来,AQS队列还没有被初始化,head和tail都是为空的,所以if判断肯定不成立,也就是说,如果是第一次调用addWaiter方法时,会先执行下面的enq(node)方法。

AQS#enq()


先不要看else逻辑,线程B第一次进来肯定是走的if逻辑,初始化之后,得到这样的一个AQS:


头节点为什么放空线程

注意了,这里的头节点中thread没有赋值(thread=null),其实这里的第一个节点只是起了一个哨兵的作用,这样就可以免去了后续在查找过程中每次比较是否越界的操作,后面会陆续提到这个哨兵的作用。

回到源码逻辑来,因为上面是一个死循环,初始化之后,紧接着会立刻进行第二次for循环,第二次循环的时候tail节点不为空了,所以会走else逻辑,走完else逻辑之后会得到下面这样一个AQS:


这时候假如又来了线程C,那么线程C就会走到AQS#addWaiter(Node)方法中上面的if逻辑了,因为这时候tail节点已经不为空了,这里的if逻辑其实和enq(Node)方法中for循环中的else分支逻辑是一样的,只是把线程C添加到AQS的尾部,最终会得到下面这个AQS:


接下来我们回到前面的方法,继续执行AQS中的acquireQueued(Node,arg)方法。

AQS#acquireQueued(Node,arg)

上面经过addWaiter(Node)之后,阻塞的线程已经被加入到了AQS队列当中,但是注意,这时候仅仅只是把线程加入进去了,而线程并没有被挂起,也就是说,线程还是处于运行状态,那么接下来要做的事就是需要把加入AQS队列中的线程挂起,当然在挂起之前,还是我们前面说的,就是线程还是不死心,所以还需要最后搏一搏,万一抢到锁了,就不需要挂起了,所以这就是acquireQueued(Node,arg)方法中会做的两件事:

1、看看前一个节点是不是头节点,如果是的话,就再试一次

2、再试一次如果还是失败了,那么线程正式挂起


有几个属性这里可以先不管,关注for循环里面逻辑,首先获取到前一个节点,如果前一个节点是head节点,那就再调用tryAcquire(arg)方法去抢一次锁。

我们这里假设争抢锁还是失败了,这时候就会走到882行的if判断,if判断中第一个逻辑看名字shouldParkAfterFailedAcquire能猜到大致意思,就是争抢锁失败后看一下当前线程是不是应该挂起,我们进入shouldParkAfterFailedAcquire方法看看:


上面这段代码值得说的就是811-815行,我们先来演示下这个流程,因为移除cancel状态节点后面逻辑中还会出现。

1、假设ThreadB被取消了,那么这时候AQS中ThreadB节点状态为-:


2、执行813行代码,相当于:prev=prev.prev;node.prev=prev;得到如下AQS:


3、这时候while循环的条件肯定不成立,因为此时的pred已经指向了头节点,状态为-1,

所以循环结束,继续执行815行代码,得到如下AQS:


最终的结果我们可以看到,虽然ThreadB还有指向其他线程,但是我们通过其他任何节点,都没办法找到ThreadB,已经重新构建了一个关联关系,相当于ThreadB被移出了队列。

因为head节点是一个哨兵,不可能会被取消,所以这里的while循环是不需要担心pred会变为null的。

暂时忘掉上面移除cancel节点的流程,我们假设是线程B进来,那么前一个节点就是head节点,肯定会走到最后一个else,这也是一个CAS操作,把头节点状态改为-1,如果是线程C进来,就会把B节点设置为-1,这时候就会得到下面这样一个AQS:


这个AQS队列和上面的唯一区别就是前面两个节点的waitStatus状态从0改成了-1。

这里注意了,只有前一个节点waitStatus=-1才会返回true,所以这里第一次循环进来肯定返回false,也就是还会再一次进行循环,循环的时候还会再次执行上面的争抢锁方法(看起来真的是贼心不死哈)。判断失败后,就会二次进入shouldParkAfterFailedAcquire方法,这时候因为第一次循环已经把前一个节点状态改为-1了,所以就会返回true了。

返回true之后,就会执行if判断的第二个逻辑了,这里面才是真的把线程正式挂起来。要挂起一个线程着实有点不容易哈哈。调用parkAndCheckInterrupt()方法正式挂起:


为什么要使用interrupted()返回中断标记

要解释这个原因我们需要先解释下park()方法:

LockSupport.park()方法是中断一个线程,但是遇上下面三种情况,就会立即返回:

  • 其他线程对当前线程发起了unpark()操作时
  • 其他线程中断了当前线程时
  • 不合逻辑的调用(也就是没有理由)时
    第三点没想明白场景,有知道的欢迎留言,感谢!

这里我们要说的是第2点,其他线程中断了当前线程会有什么影响,我们先来演示一个例子再来得出结论:

当park()遇上了interrupt()

前面讲线程基本知识的时候,我们讲到了sleep()遇到了interrupt()会怎么样,感兴趣的可以 点击这里 详细了解。

这里我们来看个例子:

package com.zwx.concurrent.lock;
import java.util.concurrent.locks.LockSupport;
public class LockParkInterrputDemo {
public static void main(String[] args) throws InterruptedException {
Thread t1 = new Thread(()->{
int i = 0;
while (true){
if(i == 0){
LockSupport.park();
//获取中断标记,但是不复位
System.out.println(Thread.currentThread().isInterrupted());
LockSupport.park();
LockSupport.park();
System.out.println("如果走到这里就说明park不生效了");
}
i++;
if(i == Integer.MAX_VALUE){
break;
}
}
});
t1.start();
Thread.sleep(1000);//确保t1被park()之后再中断
t1.interrupt();
System.out.println("end");
}
}
复制代码

输出结果:


所以其实park()方法至少有以下两个个特点:

  • 当一个线程park()时收到中断信号,会立刻恢复,且中断标记为true,而且不会抛出InterruptedException
  • 当一个线程中断标记为true时候,park()对其无效

有这两个结论,上面就很好理解了,我们想一想,假设上面的线程挂起之后,并不是被线程A释放锁之后调用unpark()唤醒的,而是被其他线程中断了,那么就会立刻恢复继续后面的操作,这时候如果不对线程进行复位,那么他会回到前面的死循环,park()也无效了,就会一直死循环抢占锁,会一直占用CPU资源,如果线程多了可能直接把CPU耗尽。

分析到这里,线程被挂起,告一段落。挂起之后需要等待线程A释放锁之后唤醒再继续执行。所以接下来我们看看unlock()是如何释放锁以及唤醒后续线程的。

lock.unlock()源码解读

ReentrantLock#unlock()

上文的示例中,当我们调用lock.unlock()时,我们进入Lock接口的实现类ReentrantLock中的释放锁入口:


这里和上文的加锁不一样,加锁会区分公平锁和非公平锁,这里直接就是调用了sync父类AQS中的release(arg)方法:


我们可以看到,这里首先会调用tryRelease(arg)方法,最终会回到ReentrantLock类中的tryRelease(arg)方法:

ReentrantLock#tryRelease()


这个方法看起来就比较简单了,释放一次就把state-1,所以我们的lock()和unlock()是需要配对的,否则无法完全释放锁,这里因为我们没有重入,所以c=0,那么这时候的AQS队列就变成了这样:


当前方法返回true,那么就会继续执行上面AQS#release(arg)方法中if里面的逻辑了:


这个方法就没什么好说的,比较简单了,我们直接进入到unparkSuccessor(h)方法中一窥究竟。

AQS#unparkSuccessor(Node)

private void unparkSuccessor(Node node) {
/*
* If status is negative (i.e., possibly needing signal) try
* to clear in anticipation of signalling.  It is OK if this
* fails or if status is changed by waiting thread.
* 如果状态是负的,尝试去清除这个信号,当然,如果清除失败或者说被其他
* 等待获取锁的线程修改了,也没关系。
* 这里为什么要去把状态修改为0呢?其实这个线程是要被唤醒的,修不修改都无所谓。
* 回忆一下上面的acquireQueued方法中调用了shouldParkAfterFailedAcquire
* 去把前一个节点状态改为-1,而在改之前会抢占一次锁,所以说这里的操作
* 其实并没有太大用处,可能可以为争抢锁的线程再多一次抢锁机会,故而成功失败均不影响
*/
int ws = node.waitStatus;
if (ws < 0)
compareAndSetWaitStatus(node, ws, 0);
/*
* Thread to unpark is held in successor, which is normally
* just the next node.  But if cancelled or apparently null,
* traverse backwards from tail to find the actual
* non-cancelled successor.
* 唤醒后继节点,通常是next节点,但是如果next节点被取消了或者为空,那么
* 就需要从尾部开始遍历,将无效节点先剔除
*/
Node s = node.next;
if (s == null || s.waitStatus > 0) {//如果下一个节点为空或者被取消了
s = null;
for (Node t = tail; t != null && t != node; t = t.prev)
if (t.waitStatus <= 0)//一直遍历,直到找到状态小于等于0的有效节点
s = t;
}
if (s != null)
LockSupport.unpark(s.thread);
}
复制代码

这段代码中值得说明的是为什么要从tail节点开始循环遍历。不知道大家对enq()方法中的构造AQS队列的步骤还有没有印象,为了不让大家翻上去找代码,我把代码重新贴下来:


我们看到,不管是if分支还是else分支,cas操作成功之后都只是把tail节点的关系构造出来了,第一个if分支CAS操作后得到下面这样的情况:


执行else分支的CAS操作之后,可能得到下面这样的情况:


我们可以发现,上面两种情况next节点都还没来得及构造,那么假如这时候从前面还是遍历就会出现找不到节点的情况,但是从tail往前就不会有这个问题。

看到这里忍不住感叹下,大佬的思维真是达到了一定的高度,写的代码完全都是精华。

到这里释放锁完成,下一个线程(ThreadB)也被唤醒了,那么下一个线程被唤醒后在哪里呢?还是把上面线程最终挂起的代码贴出来:


也就是说线程被唤醒后,会继续执行return语句,返回中断标记。然后会回到AQS类中的

acquireQueued(Node,arg)方法

回到AQS#acquireQueued(Node,arg)


也就是说会回到上面代码中的882行的if判断,不管interrupted是等于true(想成挂起期间被中断过)还是等于false,都不会跳出当前的for循环,那么就继续循环。

因为被唤醒的线程是ThreadB,所以这时候if判断成立,而且因为此时state=0,处于无锁状态,tryAcquire(arg)获取锁也会成功,这时候AQS又变成了有锁状态,只不过独占线程由A变成了B:


这时候线程B获取锁成功了,所以必然要从AQS队列中移除,我们进入setHead(node)方法:


我们还是来演示一下这三行代码:

1、head=node,于是得到如下AQS队列:


2、node.Thread=null;node.prev=null;得到如下AQS队列:


3、回到前一个方法,执行setHead(Node)下一行代码,p.next = null,得到如下最新的AQS:


经过这三步,我们看到,原先的头节点已经没有任何关联关系了,其实在第二步的时候,原先头节点已经不在队列中了,执行第三步只是为了消除其持有的引用,方便被垃圾回收。

到这里,最终会执行return interrupted;跳出循环,继续回到前一个方法。

回到AQS#acquire(arg)


这时候假如前面的interrupted返回true的话会执行selfInterrupt()方法:


这里自己中断自己的原因就是上面介绍过的,上面捕获到线程中断之后只是记录下了中断状态,然后对线程进行了复位,所以这时候这里需要再次中断自己,对外界做出响应。

到这里,整个lock()和unlock()分析就结束了,但是上面acquireQueued方法我们这里需要再进去看一下,里面的finally中有一个cancelAcquire(Node)方法。

AQS#cancelAcquire(Node)

private void cancelAcquire(Node node) {
if (node == null)//1
return;//2
node.thread = null;//3-将当前节点的线程设置为null
// Skip cancelled predecessors 跳过已经被取消的前置节点
Node pred = node.prev;//4
while (pred.waitStatus > 0)//5
node.prev = pred = pred.prev;//6
//predNext是很明显需要解除关系的,如果不解除下面的cas操作将会失败
Node predNext = pred.next;//7-如果上一个节点没有不合法的,那么这个就是自己,否则就是当前节点前面的某一个节点
node.waitStatus = Node.CANCELLED;//8
//1.如果当前线程是tail节点,直接移除掉,并且把上一个节点设置为tail节点
if (node == tail && compareAndSetTail(node, pred)) {//9
compareAndSetNext(pred, predNext, null);//10-这里要和上面Node predNext = pred.next结合起来理解
} else {//11
/**
* 如果下一个节点需要唤醒信号(即需要状态设置为-1),尝试把上一个节点的next节点设置
* 为当前节点的下一个节点,这样他就可以得到一个唤醒的信号,如果设置信号失败,那就直接唤醒
* 当前节点的下一个节点,并以此往后传递
*/
int ws;//12
//2.如果当前线程前置节点是head节点,且状态为-1(不为-1但是设置为-1成功)
if (pred != head &&//13
((ws = pred.waitStatus) == Node.SIGNAL ||//14
(ws <= 0 && compareAndSetWaitStatus(pred, ws, Node.SIGNAL))) &&//15
pred.thread != null) {//16
Node next = node.next;//17
if (next != null && next.waitStatus <= 0)//18
compareAndSetNext(pred, predNext, next);//19
} else {//20-当前节点的前置节点是head节点,那就直接把下一个节点唤醒
unparkSuccessor(node);//21-//这里面会去除状态为cancel的节点,而此时状态已经为-1了
}
node.next = node; //22-help GC
}
}
复制代码

这个代码逻辑上是有点绕的,所以还是要结合图形来会比较好理解,而且这里有两种情况,一种就是当前队列中没有无效节点被清除,一种是有无效节点被清除,我们假设当前有如下两个队列:


上图中的AQS同步队列中假设没有无效节点需要被清除,这种场景的5和6行是可以忽略的,这时候第7行的predNext其实就是当前节点自己。

假如这时候就是ThreadD进来,而ThreadC是无效节点,那么第5行和第6行就会执行了,这时候predNext就是ThreadC所在的节点了,而不是ThreadD本身了,所以predNext在这种场景的时候就不会是自己了。

然后下面分了三种情况来进行移除节点(
为了便于理解,下图中没有将状态改为-3页也没有将thread设置为null显示出来

):

  • 当前节点为tail节点(即ThreadD)
    这种情况可以直接移除,所以第9行通过一个CAS直接把tail节点替换成当前节点的prev节点,得到如下AQS:

    紧接着第10行,就是把前一个节点的下一个节点设置为空,也就是把ThreadC的next设置为空:


    这样其实就相当于把ThreadD移除了,这里个人认为可以加上node.prev=null帮助GC。

  • 当前节点不是tail节点,且不是head节点的下一个节点
    假如当前节点是ThreadC,这里的if中的13-16行的判断都是为了确定前一个节点状态是-1且thread不为null,如果后一个节点也是有效的,那么就通过CAS将ThreadB的next节点设置为ThreadD:

    这里到这一步其实就可以了,因为每次唤醒的时候都会执行无效节点的清除,而且唤醒是根据next往后移动的,这里根据next找不到ThreadC节点了。
    然后22行就是把当前节点的下一个节点设置为自己:


  • 当前节点不是tail节点,是head节点的下一个节点
    这里其实就相当于是说当前节点为第一个可以被唤醒的节点,因为我们的head节点是哨兵节点。
    这种就直接执行unparkSuccessor(Node)方法就行了,这个方法上面讲过了,里面会有清除无效节点的操作。

总结

分析到这里ReentrantLock和AQS就分析完了,上面的公平锁和非公平锁本来也想放在后面介绍的,因为篇幅有限就不准备再去分析公平锁了,如果有确实想知道的,可以给我评论留言,谢谢大家。

下一篇,将分析Condition队列,感兴趣的

**请关注我,一起学习进步**

微信扫一扫,分享到朋友圈

【并发编程系列5】JUC必知ReentrantLock和AQS同步队列实现原理分析

开局就亲,这剧好狠!一口气刷8集,甜到哭泣

上一篇

[图]科学团队开发新型可穿戴设备 可在用户癫痫发作1小时前发出警告

下一篇

你也可能喜欢

【并发编程系列5】JUC必知ReentrantLock和AQS同步队列实现原理分析

长按储存图像,分享给朋友