Java中常见锁分为两种:一种是Synchronized
修饰的锁;另一种就是java.util.concurrent
包下的各种同步锁,他们基于AbstractQueuedSynchronizer(AQS)框架来构建的
,而AQS
的核心数据结构是CLH锁
的变体。
一、无锁、偏向锁、轻量级锁和重量级锁
Synchronized 锁的底层类别
所状态 | 存储内容 | 存储内容 | |
---|---|---|---|
无锁 | 对象的hashCode、对象分代年龄、是否是偏向锁(0) | 01 | |
偏向锁 | 偏向线程ID、偏向时间戳、对象分代年龄、是否是偏向锁(1) | 01 | |
轻量级锁 | 指向栈中锁记录的指针 | 00 | |
重量级锁 | 指向互斥量(重量级锁)的指针 | 10 |
所以目前锁一共有4种状态,级别从低到高依次是:无锁、偏向锁、轻量级锁和重量级锁。它会随着竞争情况逐渐升级。synchronized 同步锁可以升级但是不可以降级,目的是为了提高获取锁和释放锁的效率。
在jdk1.6之前,阻塞或唤醒一个Java线程需要操作系统切换CPU状态来完成,这种状态转换需要耗费处理器时间。如果同步代码块中的内容过于简单,状态转换消耗的时间有可能比用户代码执行的时间还要长,这种方式就是synchronized最初实现同步的方式(依赖于底层的操作系统的Mutex Lock(互斥锁)来实现的线程同步),synchronized 是重量级锁,效率低下。
在jvm中,对象在内存中除了本身的数据外还会有个对象头,其对象头中有两类信息:mark word
和类型指针
;另外对于数组而言还会有一份记录数组长度的数据。类型指针是指向该对象所属类对象的指针,mark word用于存储对象的HashCode、GC分代年龄、锁状态等信息。在32位系统上mark word长度为32bit,64位系统上长度为64bit。
表格中可以看到存储数据格式会随着锁的不同而不同。
当对象状态为偏向锁时,mark word存储的是偏向的线程ID;当状态为轻量级锁时,mark word存储的是指向线程栈中Lock Record的指针;当状态为重量级锁时,为指向堆中的monitor对象的指针。
1.1 全局安全点(safepoint)
代表了一个状态,在该状态下所有线程都是暂停的。
1.2 偏向锁
偏向锁在获取资源的时候会在资源对象上记录该对象是偏向该线程的,偏向锁并不会主动释放,这样每次偏向锁进入的时候都会判断该资源是否是偏向自己的,如果是偏向自己的则不需要进行额外的操作,直接可以进入同步操作。
1.2.1 偏向锁获取过程
- 访问Mark Word中偏向锁标志位是否设置成1,锁标志位是否为01——确认为可偏向状态。
- 如果为可偏向状态,则测试线程ID是否指向当前线程,如果是,进入步骤(5),否则进入步骤(3)。
- 如果线程ID并未指向当前线程,则通过CAS操作竞争锁。如果竞争成功,则将Mark Word中线程ID设置为当前线程ID,然后执行(5);如果竞争失败,执行(4)。
- 如果CAS获取偏向锁失败,则表示有竞争。当到达全局安全点(safepoint)时获得偏向锁的线程被挂起,偏向锁升级为轻量级锁,然后被阻塞在安全点的线程继续往下执行同步代码。
- 执行同步代码。
1.2.2.偏向锁的释放
偏向锁的撤销在上述第四步骤中有提到。偏向锁只有遇到其他线程尝试竞争偏向锁时,持有偏向锁的线程才会释放锁,线程不会主动去释放偏向锁。偏向锁的撤销,需要等待全局安全点safepoint,它会首先暂停拥有偏向锁的线程A,然后判断这个线程A,此时有两种情况:
- A线程已经退出了同步代码块,或者是已经不存活了,如果是上面两种情况之一的,此时就会直接撤销偏向锁,变成无锁状态
- A线程还在同步代码块中,此时将A线程的偏向锁升级为轻量级锁。下面说明了怎么升级。
1.2.3 批量重偏向
为什么有批量重偏向
当只有一个线程反复进入同步块时,偏向锁带来的性能开销基本可以忽略,但是当有其他线程尝试获得锁时,就需要等到safe point时将偏向锁撤销为无锁状态或升级为轻量级/重量级锁。这个过程是要消耗一定的成本的,所以如果说运行时的场景本身存在多线程竞争的,那偏向锁的存在不仅不能提高性能,而且会导致性能下降。因此,JVM中增加了一种批量重偏向/撤销的机制。
批量重偏向的原理
- 首先引入一个概念epoch,其本质是一个时间戳,代表了偏向锁的有效性,epoch存储在可偏向对象的MarkWord中。除了对象中的epoch,对象所属的类class信息中,也会保存一个epoch值。
- 每当遇到一个全局安全点时(这里的意思是说批量重偏向没有完全替代了全局安全点,全局安全点是一直存在的),比如要对class C 进行批量再偏向,则首先对 class C中保存的epoch进行增加操作,得到一个新的epoch_new
- 然后扫描所有持有 class C 实例的线程栈,根据线程栈的信息判断出该线程是否锁定了该对象,仅将epoch_new的值赋给被锁定的对象中,也就是现在偏向锁还在被使用的对象才会被赋值epoch_new。
- 退出安全点后,当有线程需要尝试获取偏向锁时,直接检查 class C 中存储的 epoch 值是否与目标对象中存储的 epoch 值相等, 如果不相等,则说明该对象的偏向锁已经无效了(因为(3)步骤里面已经说了只有偏向锁还在被使用的对象才会有epoch_new,这里不相等的原因是class C里面的epoch值是epoch_new,而当前对象的epoch里面的值还是epoch),此时竞争线程可以尝试对此对象重新进行偏向操作。
1.3 轻量级锁
1.3.1 轻量级锁的获取过程
- 在代码进入同步块的时候,如果同步对象锁状态为偏向状态(就是锁标志位为“01”状态,是否为偏向锁标志位为“1”),虚拟机首先将在当前线程的栈帧中建立一个名为锁记录(Lock Record)的空间,用于存储锁对象目前的Mark Word的拷贝。官方称之为 Displaced Mark Word(所以这里我们认为Lock Record和 Displaced Mark Word其实是同一个概念)。这时候线程堆栈与对象头的状态如图所示:
拷贝对象头中的Mark Word复制到锁记录中。
拷贝成功后,虚拟机将使用CAS操作尝试将对象头的Mark Word更新为指向Lock Record的指针,并将Lock record里的owner指针指向对象头的mark word。如果更新成功,则执行步骤(4),否则执行步骤(5)。
如果上面这个更新动作成功了,那么这个线程就拥有了该对象的锁,并且对象Mark Word的锁标志位设置为“00”,即表示此对象处于轻量级锁定状态,这时候线程堆栈与对象头的状态如下所示:
如果上面这个更新操作失败了,虚拟机首先会检查对象的Mark Word是否指向当前线程的栈帧。
- 如果是就说明当前线程已经拥有了这个对象的锁,现在是重入状态,那么设置Lock Record第一部分(Displaced Mark Word)为null,起到了一个重入计数器的作用。重入之后然后结束。接着就可以直接进入同步块继续执行。
- 如果不是说明这个锁对象已经被其他线程抢占了,说明此时有多个线程竞争锁,那么它就会自旋等待锁,一定次数后仍未获得锁对象,说明发生了竞争,需要膨胀为重量级锁。
下图为重入三次时的lock record示意图,左边为锁对象,右边为当前线程的栈帧,重入之后然后结束。接着就可以直接进入同步块继续执行。
自旋原理非常简单,如果持有锁的线程能在很短时间内释放锁资源,那么那些等待竞争锁的线程就不需要做内核态和用户态之间的切换进入阻塞挂起状态,它们只需要等一等(自旋),等持有锁的线程释放锁后即可立即获取锁,这样就避免用户线程和内核的切换的消耗。
但是线程自旋是需要消耗 CPU 的,如果一直获取不到锁,那线程也不能一直占用 CPU 自旋做无用功,所以需要设定一个自旋等待的最大时间。JVM 对于自旋周期的选择,JDK1.6 之后引入了适应性自旋锁,适应性自旋锁意味着自旋的时间不是固定的,而是由前一次在同一个锁上的自旋时间以及锁的拥有者的状态来决定。线程如果自旋成功了,则下次自旋的次数会更多,如果自旋失败了,则自旋的次数就会减少。
如果持有锁的线程执行的时间超过自旋等待的最大时间扔没有释放锁,就会导致其他争用锁的线程在最大等待时间内还是获取不到锁,自旋不会一直持续下去,这时争用线程会停止自旋进入阻塞状态,该锁膨胀为重量级锁。
1.3.2 轻量级锁的解锁过程
- 通过CAS操作尝试把线程中复制的Displaced Mark Word对象替换当前的Mark Word。
- 如果替换成功,整个同步过程就完成了。
- 如果替换失败,说明有其他线程尝试过获取该锁(此时锁已膨胀),那就要在释放锁的同时,唤醒被挂起的线程。
1.4 重量级锁
1.4.1 重量级锁加锁和释放锁机制
调用omAlloc分配一个ObjectMonitor对象,把锁对象头的mark word锁标志位变成 “10 ”,然后在mark word存储指向ObjectMonitor对象的指针
ObjectMonitor中有两个队列,_WaitSet和_EntryList,用来保存ObjectWaiter对象列表(每个等待锁的线程都会被封装成ObjectWaiter对象),_owner指向持有ObjectMonitor对象的线程,当多个线程同时访问一段同步代码时,首先会进入 _EntryList 集合,当线程获取到对象的monitor 后进入 _Owner 区域并把monitor中的owner变量设置为当前线程同时monitor中的计数器count加1,若线程调用wait()方法,将释放当前持有的monitor,owner变量恢复为null,count自减1,同时该线程进入WaitSet集合中等待被唤醒。若当前线程执行完毕也将释放monitor(锁)并复位变量的值,以便其他线程进入获取monitor(锁)。如下图所示
- 当多个线程同时访问一段同步代码时,首先会进入 _EntryList 队列中。
- 当某个线程获取到对象的Monitor后进入临界区域,并把Monitor中的 _owner 变量设置为当前线程,同时Monitor中的计数器 _count 加1。即获得对象锁。
- 若持有Monitor的线程调用 wait() 方法,将释放当前持有的Monitor,*owner变量恢复为null,*count自减1,同时该线程进入 _WaitSet 集合中等待被唤醒。
- 在WaitSet 集合中的线程会被再次放到EntryList 队列中,重新竞争获取锁。
- 若当前线程执行完毕也将释放Monitor并复位变量的值,以便其他线程进入获取锁。
1.4.2 Monitor 对象
本质上是一个管程:管程提供了一种机制,线程可以临时放弃互斥访问,等待某些条件得到满足后,重新获得执行权恢复它的互斥访问。
在HotSpot虚拟机中,Monitor是基于C++的ObjectMonitor类实现的,结构体如下:ObjectMonitor() {
_header = NULL;
_count = 0; // 约为 _WaitSet 和 _EntryList 的节点数之和
_waiters = 0,
_recursions = 0; // 记录重入次数
_object = NULL;
_owner = NULL; // _owner指向持有ObjectMonitor对象的线程
_WaitSet = NULL; // 双向循环链表:存放处于wait状态的线程队列,即调用wait()方法的线程
_WaitSetLock = 0 ;
_Responsible = NULL ;
_succ = NULL ;
_cxq = NULL ; // 单向链表:多个线程争抢锁,会先存入这个
FreeNext = NULL ;
_EntryList = NULL ; // 双向循环链表:存放处于等待锁block状态的线程队列
_SpinFreq = 0 ;
_SpinClock = 0 ;
OwnerIsThread = 0 ;
}
1.4.3 锁竞争机制
1.4.3.1 ObjectMonitor的并发管理逻辑
ObjectMonitor::enter()
和 ObjectMonitor::exit()
分别是ObjectMonitor获取锁和释放锁的方法。线程解锁后还会唤醒之前等待的线程,根据策略选择直接唤醒_cxq队列中的头部线程去竞争,或者将_cxq队列中的线程加入_EntryList,然后再唤醒_EntryList队列中的线程去竞争。
1.4.3.2 ObjectMonitor::enter()
看一下ObjectMonitor::enter()方法竞争锁的流程:
- 首先尝试通过 CAS 把 ObjectMonitor 中的 _owner 设置为当前线程,设置成功就表示获取锁成功。通过 _recursions 的自增来表示重入。
- 如果没有CAS成功,那么就开始启动自适应自旋,自旋还不行的话,就包装成 ObjectWaiter 对象加入到 _cxq 单向链表之中。
- 加入_cxq链表后,再次尝试是否可以CAS拿到锁,再次失败就要阻塞(block),底层调用了pthread_mutex_lock。
1.4.3.3 ObjectMonitor::exit()方法
- 线程执行 Object.wait()方法时,会将当前线程加入到 _waitSet 这个双向链表中,然后再运行ObjectMonitor::exit() 方法来释放锁。
- 可重入锁就是根据 _recursions 来判断的,重入一次就执行 _recursions++,解锁一次就执行 _recursions—,如果 _recursions 减到 0 ,就说明需要释放锁了。
- 线程解锁后还会唤醒之前等待的线程。当线程执行 Object.notify()方法时,从 _waitSet 头部拿线程节点,然后根据策略(QMode指定)决定将线程节点放在哪里,包括_cxq 或 _EntryList 的头部或者尾部,然后唤醒队列中的线程。
1.5 synchronized关键字
代码块
同步代码块的加锁、解锁是通过 Javac 编译器实现的,底层是借助monitorenter和monitorerexit,为了能够保证无论代码块正常执行结束 or 抛出异常结束,都能正确释放锁,Javac 编译器在编译的时候,会对monitorerexit进行特殊处理public class Test {
public static void main(String[] args) {
}
public void test() {
synchronized (this) {
// 同步数据
System.out.println("hehehe");
}
}
}
然后通过javap -c Test.class >> test.txt
查看其编译后的字节码:Compiled from "Test.java"
public class com.Test {
public com.Test();
Code:
0: aload_0
1: invokespecial #1 // Method java/lang/Object."<init>":()V
4: return
public static void main(java.lang.String[]);
Code:
0: return
public void test();
Code:
0: aload_0
1: dup
2: astore_1
3: monitorenter
4: getstatic #2 // Field java/lang/System.out:Ljava/io/PrintStream;
7: ldc #3 // String hehehe
9: invokevirtual #4 // Method java/io/PrintStream.println:(Ljava/lang/String;)V
12: aload_1
13: monitorexit
14: goto 22
17: astore_2
18: aload_1
19: monitorexit
20: aload_2
21: athrow
22: return
Exception table:
from to target type
4 14 17 any
17 20 17 any
}
monitorenter指令指向同步代码块的开始位置,monitorexit指令则指明同步代码块的结束位置,当执行monitorenter指令时,当前线程将试图获取mark word里面存储的monitor,当 monitor的进入计数器为 0,那线程可以成功取得monitor,并将计数器值设置为1,取锁成功。
值得注意的是编译器将会确保无论方法通过何种方式完成,方法中调用过的每条 monitorenter 指令都有执行其对应 monitorexit 指令,而无论这个方法是正常结束还是异常结束。为了保证在方法异常完成时 monitorenter 和 monitorexit 指令依然可以正确配对执行,编译器会自动产生一个异常处理器,这个异常处理器声明可处理所有的异常,它的目的就是用来执行 monitorexit 指令。从上面的字节码中也可以看出有两个monitorexit指令,它就是异常结束时被执行的释放monitor 的指令。
同步实例方法
同步方法的加锁、解锁是通过 Javac 编译器实现的,底层是借助ACC_SYNCHRONIZED
访问标识符来实现的,代码如下所示:public class Test {
public static void main(String[] args) {
}
public synchronized void test() {
// 同步数据
System.out.println("hehehe");
}
}
方法级的同步是隐式,即无需通过字节码指令来控制的,它实现在方法调用和返回操作之中。JVM可以从方法常量池中的方法表结构(method_info Structure) 中的 ACC_SYNCHRONIZED 访问标志区分一个方法是否同步方法。当方法调用时,调用指令将会 检查方法的 ACC_SYNCHRONIZED访问标志是否被设置,如果设置了,执行线程将先持有monitor,然后再执行方法,最后在方法完成(无论是正常完成还是非正常完成)时释放monitor。在方法执行期间,执行线程持有了monitor,其他任何线程都无法再获得同一个monitor。如果一个同步方法执行期间抛出了异常,并且在方法内部无法处理此异常,那这个同步方法所持有的monitor将在异常抛到同步方法之外时自动释放。Compiled from "Test.java"
public class com.Test {
public com.Test();
Code:
0: aload_0
1: invokespecial #1 // Method java/lang/Object."<init>":()V
4: return
public static void main(java.lang.String[]);
Code:
0: return
public synchronized void test();
Code:
0: getstatic #2 // Field java/lang/System.out:Ljava/io/PrintStream;
3: ldc #3 // String hehehe
5: invokevirtual #4 // Method java/io/PrintStream.println:(Ljava/lang/String;)V
8: return
}
同步静态方法
给当前类加锁,会作⽤于类的所有对象实例,因为静态成员不属于任何⼀ 个实例对象,是类成员( static 表明这是该类的⼀个静态资源,不管new了多少个对象,只有 ⼀份)。所以如果⼀个线程A调⽤⼀个实例对象的⾮静态 synchronized ⽅法,⽽线程B需要调⽤ 这个实例对象所属类的静态 synchronized ⽅法,是允许的,不会发⽣互斥现象,因为访问静态 synchronized ⽅法占⽤的锁是当前类的锁,⽽访问⾮静态 synchronized ⽅法占⽤的锁是当前 实例对象锁。
二、处理器架构
在了解下面锁之前,我们先需要一些处理器架构基础知识。
2.1 SMP架构
即对称多处理器结构,在这种架构中,一台计算机由多个CPU组成,并共享内存和其他资源,所有的CPU都可以平等的访问内存、I/O等。虽然可以同时使用多个CPU,但从外部表现来看,它们就如同一台单CPU机器一样,操作系统将任务队列对称地分布于多个CPU之上,从而极大地提高了整个系统的数据处理能力。
但是随着CPU数量的增加,每个CPU都要访问共享资源,而资源在某些场景下只能单线程访问,在某些场景下的操作又必须通知到其他CPU,那么这就带来了性能损耗、资源浪费,成为了系统瓶颈。
2.2 NUMA架构
即非一致存储访问,这种模型的是为了解决smp扩容性很差而提出的技术方案。它按组将CPU分为多模块,每个CPU模块由多个CPU组成,并且具有独立的本地内存、I/O等,模块之间的访问通过互联模块完成(类似远程通信),访问本地资源的速度会远高于访问外部资源
NUMA架构相当于打包多个SMP架构的CPU,它能较好解决SMP架构存在的扩展问题;但是,在NUMA的单个CPU模块中,虽然控制了CPU数量减少了共享资源的操作时的性能损耗,由于存在互联模块的工作,在CPU模块增加时,并不能线性的增加系统性能。
2.3 MPP架构
MPP 提供了另外一种进行系统扩展的方式,它由多个 SMP 服务器通过一定的节点互联网络进行连接,协同工作,完成相同的任务,从用户的角度来看是一个服务器系统。 其基本特征是由多个 SMP 服务器(每个 SMP 服务器称节点)通过节点互联网络连接而成,每个节点只访问自己的本地资源(内存、存储等),是一种完全无共享(Share Nothing)结构,因而扩展能力最好,理论上其扩展无限制,目前的技术可实现512个节点互联,数千个 CPU。 实验证明, SMP 服务器 CPU 利用率最好的情况是 2 至 4 个 CPU。
可以将MMP理解为刀片服务器,每个刀扇里的都是一台独立SMP架构服务器,并且每个刀扇之间均有高性能的网络设备进行交互,保证smp服务器之间的数据传输性能。MMP架构比较依赖管理系统的处理能力来保障通信。
三、CLH 锁
CLH队列锁是一种基于链表的可扩展、高性能、公平的自旋锁, 申请线程仅仅在本地变量上自旋,不断轮询前驱的状态,假设发现前驱释放了锁就结束自旋。能确保无饥饿性,提供先来先服务的公平性。
CLH 锁的核心思想:将众多线程长时间对某资源(比如锁状态)的竞争,通过有序化这些线程将其转化为只需对前驱节点的本地变量检测。而唯一存在竞争的地方就是在入队列之前对尾节点tail的竞争,但此时竞争的线程数量已经少了很多了。比起所有线程直接对某资源竞争的轮询次数也减少了很多,这也大大节省了CPU缓存同步的消耗,从而大大提升系统性能。
当一个线程需要获取锁时:
创建一个QNode, 将其中的 locked 设置为 true,表示需要获取锁,myPred 表示对其前驱节点的引用。
线程A对tail域调用getAndSet方法,使自己成为队列的尾部,同时获取一个指向其前驱结点的引用myPred
线程B需要获得锁,同样的流程再来一遍
线程就在前驱节点的 locked 字段上旋转,直到前驱结点释放锁(即前驱节点的锁值 locked == false)
- 当一个线程需要释放锁时,将当前结点的locked域设置为false,同时回收前驱结点。
由于自旋过程中,监控的是前置节点的变量,因此在SMP架构的共享内存模式,能更好的提供性能。
在NUMA架构下,如果当前节点与前驱节点不在同一CPU模块下,跨CPU模块会带来额外的系统开销,而MCS锁更适用于NUMA架构。
四、MCS 锁
MCS锁中线程只对当前节点的本地变量自旋,而前驱节点则负责通知其结束自旋操作。
正因为如此,它解决了CLH在NUMA系统架构中获取locked域状态内存过远的问题 。
每个节点在解锁时更新后继节点的锁值(locked),在这一刻,该节点的后置节点会结束自旋,并进行加锁。
NUMA架构下MCS锁的性能略优于CLH锁。
4.1 CLH锁在NUMA架构下低于MCS锁的原因
CLH锁和MCS锁为线程节点分配的内存通常都会分配到与对应线程执行cpu核心绑定的NUMA节点的存储器中,而不同线程对应的cpu则可能位于不同的NUMA节点中。CLH锁会无限轮训前驱节点的isLocked状态,这一操作在前驱节点线程与当前线程不位于同一NUMA节点时,会不断的进行远程节点访问,性能较低;而MCS锁中,当前线程则是无限轮训自己线程节点的isLocked,这种情况下都是本地NUMA节点内存的访问。
五、AQS
AQS(AbstractQueuedSynchronizer,队列同步器)
核心思想:如果被请求的共享资源空闲,那么就将当前请求资源的线程设置为有效的工作线程,将共享资源设置为锁定状态;如果共享资源被占用,就需要一定的阻塞等待唤醒机制来保证锁分配。这个机制主要用的是CLH队列的变体实现的,将暂时获取不到锁的线程加入到队列中。
AQS 使用一个volatile int state(代表共享资源)成员变量来表示同步状态,通过内置的FIFO队列来完成资源获取的排队工作,通过CAS完成对State值的修改。
内部队列改造:
- 在结构上引入了头节点和尾节点,分别指向队列的头和尾,尝试获取锁、入队列、释放锁等实现都与头尾节点相关
- 为了可以处理timeout和cancel操作,每个node维护一个指向前驱的指针。如果一个node的前驱被cancel,这个node可以前向移动使用前驱的状态字段
- 在每个node里面使用一个状态字段来控制阻塞/唤醒,而不是自旋
- head节点使用的是虚拟节点
AQS的主要使用方式是继承,子类通过继承 AQS 并实现它的抽象方法来管理同步状态,在 AQS 里由一个int型的state来代表这个状态。将资源封装为Node,通过cas改变state值。
AQS同时提供了互斥模式(exclusive)和共享模式(shared)两种不同的同步逻辑。一般情况下,子类只需要根据需求实现其中一种模式,当然也有同时实现两种模式的同步类,如ReadWriteLock。
volatile
从这里看是基本都是JCU包中的功能,大致使用了volatile来管理变量,完成并发。
5.1 源码部分
5.1.1 节点
节点有AQS的静态内部类Node定义:static final class Node {
// 共享模式
static final Node SHARED = new Node();
// 独占模式
static final Node EXCLUSIVE = null;
static final int CANCELLED = 1;
static final int SIGNAL = -1;
static final int CONDITION = -2;
static final int PROPAGATE = -3;
/**
* CANCELLED,值为1,表示当前的线程被取消
* SIGNAL,值为-1,表示当前节点的后继节点包含的线程需要运行,也就是unpark;
* CONDITION,值为-2,表示当前节点在等待condition,也就是在condition队列中;
* PROPAGATE,值为-3,表示当前场景下后续的acquireShared能够得以执行;
* 值为0,表示当前节点在sync队列中,等待着获取锁。
*/
volatile int waitStatus;
// 前驱结点
volatile Node prev;
// 后继结点
volatile Node next;
// 与该结点绑定的线程
volatile Thread thread;
// 存储condition队列中的后继节点
Node nextWaiter;
// 是否为共享模式
final boolean isShared() {
return nextWaiter == SHARED;
}
// 获取前驱结点
final Node predecessor() throws NullPointerException {
Node p = prev;
if (p == null)
throw new NullPointerException();
else
return p;
}
Node() { // Used to establish initial head or SHARED marker
}
Node(Thread thread, Node mode) { // Used by addWaiter
this.nextWaiter = mode;
this.thread = thread;
}
Node(Thread thread, int waitStatus) { // Used by Condition
this.waitStatus = waitStatus;
this.thread = thread;
}
}
类中有两个常量SHARE和EXCLUSIVE,顾名思义这两个常量用于表示这个节点支持共享模式还是独占模式
- 共享模式指的是允许多个线程获取同一个锁而且可能获取成功
- 独占模式指的是一个锁如果被一个线程持有,其他线程必须等待
多个线程读取一个文件可以采用共享模式,而当有一个线程在写文件时不会允许另一个线程写这个文件,这就是独占模式的应用场景。
5.1.2 CAS操作
AQS有三个重要的变量:// 队头结点
private transient volatile Node head;
// 队尾结点
private transient volatile Node tail;
// 代表共享资源
private volatile int state;
protected final int getState() {
return state;
}
protected final void setState(int newState) {
state = newState;
}
protected final boolean compareAndSetState(int expect, int update) {
return unsafe.compareAndSwapInt(this, stateOffset, expect, update);
}compareAndSetState
方法是以乐观锁的方式更新共享资源。
独占锁是一种悲观锁,就是一种独占锁,会导致其它所有需要锁的线程挂起,等待持有锁的线程释放锁。而另一个更加有效的锁就是乐观锁。所谓乐观锁就是,每次不加锁而是假设没有冲突而去完成某项操作,如果因为冲突失败就重试,直到成功为止。乐观锁用到的机制就是CAS,即Compare And Swap。
head、tail、state三个变量都是volatile的。,它在多处理器开发中保证了共享变量的“可见性”。可见性的意思是当一个线程修改一个共享变量时,另外一个线程能读到这个修改的值。如果一个字段被声明成,Java线程内存模型确保所有线程看到这个变量的值是一致的。volatile保证共享变量的可见性,CAS保证更新操作的原子性。
- 声明共享变量为volatile
- 使用CAS的原子条件更新来实现线程之间的同步
- 配合以volatile的读/写和CAS所具有的volatile读和写的内存语义来实现线程之间的通信
5.1.3 方法
AQS定义两种资源共享方式:
- Exclusive:独占,只有一个线程能执行,如ReentrantLock
- Share:共享,多个线程可同时执行,如CountDownLatch
5.1.3.1 模板方法
1. acquire(int)
|
此方法是独占模式下线程获取共享资源的顶层入口。如果获取到资源,线程直接返回,否则进入等待队列,直到获取到资源为止,且整个过程忽略中断的影响。获取到资源后,线程就可以去执行其临界区代码了。
流程如下:
- 尝试直接去获取资源,如果成功则直接返回;
- addWaiter()将该线程加入等待队列的尾部,并标记为独占模式;
- acquireQueued()使线程在等待队列中获取资源,一直获取到资源后才返回。如果在整个等待过程中被中断过,则返回true,否则返回false。
- 如果线程在等待过程中被中断过,它是不响应的。只是获取资源后才再进行自我中断selfInterrupt(),将中断补上。
下面是每个方法的实现代码
1.1 tryAcquire(int)
|
此方法尝试去获取独占资源。如果获取成功,则直接返回true,否则直接返回false。
AQS只是一个框架,在这里定义了一个接口,具体资源的获取交由自定义同步器去实现了(通过state的get/set/CAS),至于能不能重入,能不能加塞,那就看具体的自定义同步器怎么去设计了。当然,自定义同步器在进行资源访问时要考虑线程安全的影响。 这里之所以没有定义成abstract,是因为独占模式下只用实现tryAcquire,而共享模式下只用实现tryAcquireShared。如果都定义成,那么每个模式也要去实现另一模式下的接口。说到底,还是站在咱们开发者的角度,尽量减少不必要的工作量。
1.2 addWaiter(Node)
|
其中,compareAndSetTail方法也是调用Unsafe类实现CAS操作,更新队尾。
1.3 enq(Node)
|
这段代码CAS自旋volatile变量,也是AtomicInteger、AtomicBoolean等原子量的灵魂。
1.4 acquireQueued(Node, int)
通过tryAcquire()和addWaiter(),如果线程获取资源失败,已经被放入等待队列尾部了。但是,后面还有一项重要的事没干,就是让线程进入阻塞状态,直到其他线程释放资源后唤醒自己。过程跟在银行办理业务时排队拿号有点相似,acquireQueued()就是干这件事:在等待队列中排队拿号(中间没其它事干可以休息),直到拿到号后再返回。final boolean acquireQueued(final Node node, int arg) {
boolean failed = true; // 是否获取到了资源
try {
boolean interrupted = false; // 等待过程中有没有被中断
for (;;) { // 自旋,直到
final Node p = node.predecessor();
// 前驱是head,则有资格去尝试获取资源
if (p == head && tryAcquire(arg)) {
// 获取资源成功,将自己置为队头,并回收其前驱(旧的队头)
setHead(node);
p.next = null; // help GC
failed = false;
return interrupted;
}
// 获取资源失败,
if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt())
interrupted = true;
}
} finally {
if (failed)
cancelAcquire(node);
}
}
如果获取资源失败后,会调用两个函数,shouldParkAfterFailedAcquire和parkAndCheckInterrupt,下面来看看它俩是干什么的。
1.5 shouldParkAfterFailedAcquire(Node, Node)
该函数的作用是“在获取资源失败后是否需要阻塞”:private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
int ws = pred.waitStatus; // 前驱状态
if (ws == Node.SIGNAL)
// Node.SIGNAL,代表前驱释放资源后会通知后继结点
return true;
if (ws > 0) { // 代表前驱已取消任务,相当于退出了等待队列
do { // 一个个往前找,找到最近一个正常等待的前驱,排在它的后面
node.prev = pred = pred.prev;
} while (pred.waitStatus > 0);
pred.next = node;
} else {
// 前驱状态正常,则将其状态置为SIGNAL,意为,释放资源后通知后继结点
compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
}
return false;
}
整个流程中,如果前驱节点的状态不是SIGNAL,那么自己就不能安心去休息,需要去找个安心的休息点,同时可以再尝试下看有没有机会轮到自己拿号。
1.6 parkAndCheckInterrupt()
如果线程找好安全休息点后,那就可以安心去休息了。此方法就是让线程去休息,真正进入等待状态。private final boolean parkAndCheckInterrupt() {
LockSupport.park(this); // 使线程进入waiting状态
return Thread.interrupted();
}
park()会让当前线程进入waiting状态。在此状态下,有两种途径可以唤醒该线程:被unpark()或被interrupt()。
1.7 总结
- 调用自定义同步器的tryAcquire()尝试直接去获取资源,如果成功则直接返回;
- 没成功,则addWaiter()将该线程加入等待队列的尾部,并标记为独占模式;
- acquireQueued()使线程在等待队列中休息,有机会时(轮到自己,会被unpark())会去尝试获取资源。获取到资源后才返回。如果在整个等待过程中被中断过,则返回true,否则返回false。
- 如果线程在等待过程中被中断过,它是不响应的。只是获取资源后才再进行自我中断selfInterrupt(),将中断补上。
2. release(int)
release()是acquire()的逆操作,是独占模式下线程释放共享资源的顶层入口。它会释放指定量的资源,如果彻底释放了(即state=0),它会唤醒等待队列里的其他线程来获取资源。public final boolean release(int arg) {
if (tryRelease(arg)) {
Node h = head;
if (h != null && h.waitStatus != 0) // 状态不为0,证明需要唤醒后继结点
unparkSuccessor(h);
return true;
}
return false;
}
2.1 tryRelease(int)
|
跟tryAcquire()一样,这个方法是需要自定义同步器去实现的。正常来说,tryRelease()都会成功的,因为这是独占模式,该线程来释放资源,那么它肯定已经拿到独占资源了,直接减掉相应量的资源即可,也不需要考虑线程安全的问题。
2.2 unparkSuccessor(Node)
|
一句话概括:用unpark()唤醒等待队列中最前边的那个未放弃线程。
3. acquireShared(int)
此方法是共享模式下线程获取共享资源的顶层入口。它会获取指定量的资源,获取成功则直接返回,获取失败则进入等待队列,直到获取到资源为止,整个过程忽略中断。public final void acquireShared(int arg) {
if (tryAcquireShared(arg) < 0)
doAcquireShared(arg);
}
protected int tryAcquireShared(int arg) { // 留给子类实现
throw new UnsupportedOperationException();
}
这里tryAcquireShared()依然需要自定义同步器去实现。但是AQS已经把其返回值的语义定义好了:负值代表获取失败;0代表获取成功,但没有剩余资源;正数表示获取成功,还有剩余资源,其他线程还可以去获取。
3.1 doAcquireShared(int)
|
该函数的功能类似于独占模式下的acquireQueued()。
跟独占模式比,有一点需要注意的是,这里只有线程是head.next时(“老二”),才会去尝试获取资源,有剩余的话还会唤醒之后的队友。那么问题就来了,假如老大用完后释放了5个资源,而老二需要6个,老三需要1个,老四需要2个。因为老大先唤醒老二,老二一看资源不够自己用继续park(),也更不会去唤醒老三和老四了。独占模式,同一时刻只有一个线程去执行,这样做未尝不可;但共享模式下,多个线程是可以同时执行的,现在因为老二的资源需求量大,而把后面量小的老三和老四也都卡住了。
3.2 setHeadAndPropagate(Node, int)
|
此方法在setHead()的基础上多了一步,就是自己苏醒的同时,如果条件符合(比如还有剩余资源),还会去唤醒后继节点,毕竟是共享模式。
4. releaseShared(int)
此方法是共享模式下线程释放共享资源的顶层入口。它会释放指定量的资源,如果彻底释放了(即state=0),它会唤醒等待队列里的其他线程来获取资源。public final boolean releaseShared(int arg) {
if (tryReleaseShared(arg)) { // 尝试释放资源
doReleaseShared(); // 释放成功,继续唤醒后继结点
return true;
}
return false;
}
protected boolean tryReleaseShared(int arg) { // 留给子类实现
throw new UnsupportedOperationException();
}
跟独占模式下的release()相似,但有一点稍微需要注意:独占模式下的tryRelease()在完全释放掉资源(state=0)后,才会返回true去唤醒其他线程,这主要是基于可重入的考量;而共享模式下的releaseShared()则没有这种要求,多线程可并发执行,不适用于可重入。
4.1 doReleaseShared()
|
至此,AQS的独占模式与共享模式下的实现原理剖析的差不多了。
5.1.3.2 可重写的方法
不同的自定义同步器争用共享资源的方式也不同。自定义同步器在实现时只需要实现共享资源的获取与释放方式即可,至于具体线程等待队列的维护(如获取资源失败入队/唤醒出队等),AQS已经在顶层实现好了。
自定义同步器实现时主要实现以下几种方法:
- isHeldExclusively():该线程是否正在独占资源。只有用到condition才需要去实现它。
- tryAcquire(int):独占方式。尝试获取资源,成功则返回true,失败则返回false。
- tryRelease(int):独占方式。尝试释放资源,成功则返回true,失败则返回false。
- tryAcquireShared(int):共享方式。尝试获取资源。负数表示失败;0表示成功,但没有剩余可用资源;正数表示成功,且有剩余资源。
- tryReleaseShared(int):共享方式。尝试释放资源,成功则返回true,失败则返回false。
初始化为0,表示未锁定状态。A线程lock()时,会调用tryAcquire()独占该锁并将state+1。此后,其他线程再时就会失败,直到A线程unlock()到state=0(即释放锁)为止,其它线程才有机会获取该锁。当然,释放锁之前,A线程自己是可以重复获取此锁的(state会累加),这就是可重入的概念。但要注意,获取多少次就要释放多么次,这样才能保证state是能回到零态的。
任务分为N个子线程去执行,也初始化为N(注意N要与线程个数一致)。这N个子线程是并行执行的,每个子线程执行完后countDown()一次,会CAS减1。等到所有子线程都执行完后(即state=0),会unpark()主调用线程,然后主调用线程就会从await()函数返回,继续后余动作。
一般来说,自定义同步器要么是独占方法,要么是共享方式,他们也只需实现tryAcquire-tryRelease、tryAcquireShared-tryReleaseShared中的一种即可。但AQS也支持自定义同步器同时实现独占和共享两种方式,如ReentrantReadWriteLock。
5.2 为什么AQS框架底层使用CLH队列结构作为基础
由于AQS中需要让大量并发争用锁的线程频繁的被阻塞和唤醒,出于性能的考虑,为避免过多的线程上下文切换,AQS本身没有再利用操作系统底层提供的线程阻塞/唤醒机制通过互斥锁来保证同步队列的并发安全,而是使用基于CAS的乐观重试机制来构造一个无锁,并发安全的同步队列。
AQS作为一个通用的同步器框架,是需要支持超时、中断等取消加锁的,而前面提到基础版的CLH锁和MCS锁都存在一个缺陷,即无法很好的支持超时、中断等取消加锁的场景。
引入了显式前驱节点引用的CLH锁比起MCS锁可以更加简单的实现超时、中断等加锁过程中临时退出加锁的场景。而由于AQS中的线程在征用锁失败时不会占用CPU一直自旋等待,而是被设置为阻塞态让出CPU(LockSupport.park),因此MCS锁在NUMA架构下性能略高的优点也就不是那么重要了。
5.3 一些锁
5.3.1 ReentrantLock
5.3.1.1 公平和非公平锁
Reentrantlock 可以是公平锁,也可以是非公平锁。
ReentrantLock 的构造函数中,默认的无参构造函数将会把Sync对象创建为 NonfairSync 对象,这是一个”非公平锁”; 而另一个构造函数ReentrantLock(boolean fair) 传入参数为 true 时将会把 Sync 对象创建为”公平锁”FairSync。
公平锁:多个线程按照申请锁的顺序去获得锁,线程会直接进入队列去排队,永远都是队列的第一位才能得到锁。
- 优点:所有的线程都能得到资源,不会饿死在队列中。
- 缺点:吞吐量会下降很多,队列里面除了第一个线程,其他的线程都会阻塞,cpu唤醒阻塞线程的开销会很大。
非公平锁:多个线程去获取锁的时候,会直接去尝试获取,获取不到,再去进入等待队列,如果能获取到,就直接获取到锁。
- 优点:可以减少CPU唤醒线程的开销,整体的吞吐效率会高点,CPU也不必取唤醒所有线程,会减少唤起线程的数量。
- 缺点:这样可能导致队列中间的线程一直获取不到锁或者长时间获取不到锁,导致饿死。
|
非公平锁中tryAcquire
方法直接调用nonfairTryAcquire(int acquires)
方法,对于非公平锁只要 CAS 设置同步状态成功,则表示当前线程获取了锁。final boolean nonfairTryAcquire(int acquires) {
final Thread current = Thread.currentThread();
int c = getState();
if (c == 0) {
// 通过自旋获取锁
if (compareAndSetState(0, acquires)) {
setExclusiveOwnerThread(current);
return true;
}
}
// 重入锁判断
else if (current == getExclusiveOwnerThread()) {
int nextc = c + acquires;
if (nextc < 0) // overflow
throw new Error("Maximum lock count exceeded");
setState(nextc);
return true;
}
return false;
}
而公平锁则不同,tryAcquire
方法进行了重写。唯一不同的位置为判断条件多了 hasQueuedPredecessors()
,即加入了同步队列中当前节点是否有前驱节点的判断,如果该方法返回true,则表示有线程比当前线程更早地请求获取锁,因此需要等待前驱线程获取并释放锁之后才能继续获取锁。protected final boolean tryAcquire(int acquires) {
final Thread current = Thread.currentThread();
int c = getState();
if (c == 0) {
// 获取锁逻辑
if (!hasQueuedPredecessors() &&
compareAndSetState(0, acquires)) {
setExclusiveOwnerThread(current);
return true;
}
}
// 重入锁判断
else if (current == getExclusiveOwnerThread()) {
int nextc = c + acquires;
if (nextc < 0)
throw new Error("Maximum lock count exceeded");
setState(nextc);
return true;
}
return false;
}
而这里的核心hasQueuedPredecessors()
就是公平锁的核心,根据他可以了解为链表方式判断,即AQS或者CLH逻辑、即谁先排队而不是自旋。
5.3.1.2 可重入
重进入是指任意线程在获取到锁之后,能够再次获取该锁而不会被锁所阻塞,该特性的实现需要解决两个问题:
- 线程再次获取锁 :锁需要去识别获取锁的线程是否为当前占据锁的线程,如果是,则再次成功获取。
- 锁的最终释放 : 线程重复n次获取了锁,随后在第n次释放该锁后,其他线程能够获取到该锁。锁的最终释放要求锁对于获取进行计数自增,计数表示当前锁被重复获取的次数,而锁被释放时,计数自减,当计数等于0时表示锁已经成功释放。
|
上面代码可以看到这里判断是否为当前线程位置。通过判断当前线程是否为获取锁的线程来决定获取操作是否成功,如果是获取锁的线程再次请求,则将同步状态值进行增加并返回true,表示获取同步状态成功。同步状态表示锁被一个线程重复获取的次数。
如果该锁被获取了n次,那么前(n-1)次tryRelease(int releases)方法必须返回 false,而只有同步状态完全释放了,才能返回true。该方法将同步状态是否为 0 作为最终释放的条件,当同步状态为0时,将占有线程设置为null,并返回true,表示释放成功。protected final boolean tryRelease(int releases) {
int c = getState() - releases;
if (Thread.currentThread() != getExclusiveOwnerThread())
throw new IllegalMonitorStateException();
boolean free = false;
if (c == 0) {
free = true;
setExclusiveOwnerThread(null);
}
setState(c);
return free;
}
5.3.2 CountDownLatch
可以设置一个计数器,然后通过countDown方法来进行减1的操作,使用await方法等待计数器不大于0,然后继续执行await方法之后的语句。
- CountDownLatch主要有两个方法,当一个或多个线程调用await方法时,这些线程会阻塞
- 其它线程调用countDown方法会将计数器减1(调用countDown方法的线程不会阻塞)
- 当计数器的值变为0时,因await方法阻塞的线程会被唤醒,继续执行
|
5.3.3 CyclicBarrier
允许一组线程全部等待彼此达到共同屏障点的同步辅助。循环阻塞在涉及固定大小的线程方的程序中很有用,这些线程必须偶尔等待彼此。屏障被称为循环 ,因为它可以在等待的线程被释放之后重新使用。public static void main(String[] args) {
int num = 3;
CyclicBarrier cyclicBarrier=new CyclicBarrier(num,()->{
System.out.println(num+"已加载完成");
});
for (int i = 1; i <=7; i++) {
new Thread(()->{
try {
System.out.println(Thread.currentThread().getName()+"完成");
//等待
cyclicBarrier.await();
} catch (Exception e) {
e.printStackTrace();
}
},String.valueOf(i)).start();
}
}
5.3.4 Semaphore
类似一个生产者消费者模型,更像一个占位模型,提供了n个空间给线程抢占,当达到了上限n时,就阻塞,直到空间被释放。public static void main(String[] args) throws InterruptedException {
Semaphore semaphore=new Semaphore(3);
for (int i = 1; i <=6; i++) {
new Thread(()->{
try {
//抢占
semaphore.acquire();
System.out.println(Thread.currentThread().getName()+"抢到了");
TimeUnit.SECONDS.sleep(new Random().nextInt(5));
} catch (InterruptedException e) {
e.printStackTrace();
}finally {
// 释放
System.out.println(Thread.currentThread().getName()+"离开了");
semaphore.release();
}
}, String.valueOf(i)).start();
}
}
六、乐观锁和悲观锁
Java中,synchronized关键字
和Lock的实现类
都是悲观锁。悲观锁认为自己在使用数据的时候一定有别的线程来修改数据,因此在获取数据的时候会先加锁,确保数据不会被别的线程修改。
乐观锁认为自己在使用数据时不会有别的线程修改数据,所以不会添加锁。// 悲观锁
public static void main(String[] args) {
ReentrantLock lock = new ReentrantLock();
lock.lock();
// 同步数据
lock.unlock();
}
public synchronized void test() {
// 同步数据
}
// 乐观锁
final AtomicInteger atomicInteger = new AtomicInteger(1);
atomicInteger.incrementAndGet();