java中的CAS操作

一、什么是CAS

Compare And Swap的缩写,中文翻译成比较并交换。
CAS 操作包含三个操作数 —— 内存位置(V)、预期原值(A)和新值(B)。 如果内存位置的值与预期原值相匹配,那么处理器会自动将该位置值更新为新值 。否则,处理器不做任何操作。

二、引出CAS

看看下面代码:

static int count = 0;
static AtomicInteger atomicInteger = new AtomicInteger(0);
final static int COUNT = 10;

public static void main(String[] args) throws InterruptedException {
final CountDownLatch latch = new CountDownLatch(COUNT);

final Runnable runnable = new Runnable() {
@Override
public void run() {
for (int i = 0; i < 1000; i++) {
count++;
atomicInteger.getAndIncrement();
}
latch.countDown();
}
};

for (int i = 0; i < COUNT; i++) {
new Thread(runnable).start();
}
latch.await();
System.out.println("理论结果:" + 1000*COUNT);
System.out.println("count:" + count);
System.out.println("atomic:" + atomicInteger.intValue());
}

输出结果:
理论结果:10000
count:9997
atomic:10000

多次允许后发现,count的结果永远不正确,而atomic的数据永远是正确的。

三、Atomic

根据操作的目标数据类型

  • 基本原子类
  • 数组原子类
  • 原子引用类型
  • 字段更新原子类

3.1 基本原子类

基本原子类的功能,是通过原子方式更新 Java 基础类型变量的值。基本原子类主要包括了以下三个:

  • AtomicInteger:整型原子类。
  • AtomicLong:长整型原子类。
  • AtomicBoolean :布尔型原子类。

3.2 数组原子类

数组原子类的功能,是通过原子方式更数组里的某个元素的值。数组原子类主要包括了以下三个:

  • AtomicIntegerArray:整型数组原子类。
  • AtomicLongArray:长整型数组原子类。
  • AtomicReferenceArray :引用类型数组原子类。

3.3 原子引用类型

引用原子类主要包括了以下三个:

  • AtomicReference:引用类型原子类。
  • AtomicMarkableReference :带有更新标记位的原子引用类型。
  • AtomicStampedReference :带有更新版本号的原子引用类型。
    AtomicStampedReference通过引入“版本”的概念,来解决ABA的问题。

3.4 字段更新原子类

字段更新原子类主要包括了以下三个:

  • AtomicIntegerFieldUpdater:原子更新整型字段的更新器。
  • AtomicLongFieldUpdater:原子更新长整型字段的更新器。
  • AtomicReferenceFieldUpdater:原子更新引用类型里的字段。

3.5 基本函数

方法 介绍
public final int get() 获取当前的值
public final int getAndSet(int newValue) 获取当前的值,然后设置新的值
public final int getAndIncrement() 获取当前的值,然后自增
public final int getAndDecrement() 获取当前的值,然后自减
public final int getAndAdd(int delta) 获取当前的值,并加上预期的值
boolean compareAndSet(int expect, int update) 通过 CAS 方式设置整数值

3.6 Unsafe类

Unsafe 提供了CAS 方法,直接通过native 方式(封装 C++代码)调用了底层的 CPU 指令 cmpxchg。从名字中我们可以看出来这个类对普通程序员来说是“危险”的,一般应用开发者不会用到这个类。

3.6.1 Unsafe 提供的 CAS 方法

主要如下: 定义在 Unsafe 类中的三个 “比较并交换”原子方法

/*
@param o 包含要修改的字段的对象
@param offset 字段在对象内的偏移量
@param expected 期望值(旧的值)
@param update 更新值(新的值)
@return true 更新成功 | false 更新失败
*/
public final native boolean compareAndSwapObject(Object o, long offset, Object expected, Object update);
public final native boolean compareAndSwapInt( Object o, long offset, int expected,int update);
public final native boolean compareAndSwapLong( Object o, long offset, long expected, long update);

Unsafe 提供的 CAS 方法包含四个入参: 包含要修改的字段对象、字段内存位置、预期原值及新值。
在执行 Unsafe 的 CAS 方法的时候,这些方法首先将内存位置的值与预期值(旧的值)比较,如果相匹配,那么处理器会自动将该内存位置的值更新为新值,并返回 true ;如果不相匹配,处理器不做任何操作,并返回 false 。
github
CAS操作由处理器提供支持,是一种原语。原语是操作系统或计算机网络用语范畴。是由若干条指令组成的,用于完成一定功能的一个过程,具有不可分割性,即原语的执行必须是连续的,在执行过程中不允许被中断。如 Intel 处理器,比较并交换通过指令的 cmpxchg 系列实现。

3.6.2 获取属性偏移量

Unsafe 提供的获取字段(属性)偏移量的相关操作,主要如下:

/**
* @param o 需要操作属性的反射
* @return 属性的偏移量
*/
public native long staticFieldOffset(Field field);
public native long objectFieldOffset(Field field);

  • staticFieldOffset 方法用于获取静态属性 Field 在 Class 对象中的偏移量,在 CAS 操作静态属性时,会用到这个偏移量。
  • objectFieldOffset 方法用于获取非静态 Field (非静态属性)在 Object 实例中的偏移量,在 CAS 操作对象的非静态属性时,会用到这个偏移量。

3.6.3 根据属性的偏移量获取属性的最新值:

/**
* @param o 字段所属于的对象实例
* @param fieldOffset 字段的偏移量
* @return 字段的最新值
*/
public native int getIntVolatile(Object o, long fieldOffset);

四、CAS的缺点

4.1 ABA问题

一个线程先读取共享内存数据值A,随后因某种原因,线程暂时挂起,同时另一个线程临时将共享内存数据值先改为B,随后又改回为A。随后挂起线程恢复,并通过CAS比较,最终比较结果将会无变化。这样会通过检查,这就是ABA问题。
JDK 提供了两个类 AtomicStampedReference、AtomicMarkableReference 来解决 ABA 问题。

4.2 只能保证一个共享变量的原子操作

一个比较简单的规避方法为:把多个共享变量合并成一个共享变量来操作。 JDK 提供了 AtomicReference 类来保证引用对象之间的原子性,可以把多个变量放在一个 AtomicReference 实例后再进行 CAS 操作。比如有两个共享变量 i=1、j=2,可以将二者合并成一个对象,然后用 CAS 来操作该合并对象的 AtomicReference 引用。

4.3 循环时间长开销大

高并发下N多线程同时去操作一个变量,会造成大量线程CAS失败,然后处于自旋状态,导致严重浪费CPU资源,降低了并发性。

解决 CAS 恶性空自旋的较为常见的方案为:

  • 分散操作热点,使用 LongAdder 替代基础原子类 AtomicLong。
  • 使用队列削峰,将发生 CAS 争用的线程加入一个队列中排队,降低 CAS 争用的激烈程度。JUC 中非常重要的基础类 AQS(抽象队列同步器)就是这么做的。

五、以空间换时间:LongAdder

5.1 LongAdder(DoubleAdder) 的原理

LongAdder 的基本思路就是分散热点, 如果有竞争的话,内部维护了多个Cell变量,每个Cell里面有一个初始值为0的long型变量, 不同线程会命中到数组的不同Cell (槽 )中,各个线程只对自己Cell(槽) 中的那个值进行 CAS 操作。这样热点就被分散了,冲突的概率就小很多。
在没有竞争的情况下,要累加的数通过 CAS 累加到 base 上。
如果要获得完整的 LongAdder 存储的值,只要将各个槽中的变量值累加,后的值即可。
github

5.2 longAddr内部结构

|-----------|
| Striped64 | |------------|
|___________| |Serializable|
^ |____________|
| ^
| |
|-----------| |
| LongAdder |--------------|
|___________|

5.2.1 Striped64类的重要成员属性

/**
* cell表,当非空时,大小是2的幂。
*/
transient volatile Cell[] cells;

/**
* 基础值,主要在没有争用时使用
* 在没有争用时使用CAS更新这个值
*/
transient volatile long base;

/**
* 自旋锁(通过CAS锁定) 在调整大小和或创建cell时使用,
* 为 0 表示 cells 数组没有处于创建、扩容阶段,反之为1
*/
transient volatile int cellsBusy;

内部包含一个 base 和一个 Cell[] 类型的 cells 数组 。 在没有竞争的情况下,要累加的数通过 CAS 累加到 base 上;如果有竞争的话,会将要累加的数累加到 Cells 数组中的某个 cell 元素里面。所以 Striped64 的整体值 value 为 base+ ∑ [0~n]cells 。

5.2.2 LongAdder的整体值 value 的获取源码

public long longValue() {
return sum();
}
public long sum() {
Striped64.Cell[] as = cells;
Striped64.Cell a;
long sum = base;
if (as != null) {
for (int i = 0; i < as.length; ++i) {
if ((a = as[i]) != null)
sum += a.value;
}
}
return sum;
}

triped64 的设计核心思路就是通过内部的分散计算来避免竞争,以空间换时间。 LongAdder
的 base 类似于 AtomicInteger 里面的 value ,在没有竞争的情况,cells 数组为 null ,这时只使用 base 做累加;而一旦发生竞争,cells 数组就上场了。
cells 数组第一次初始化长度为 2 ,以后每次扩容都是变为原来的两倍,一直到 cells 数组的长
度大于等于当前服务器 CPU 的核数。为什么呢?同一时刻,能持有 CPU 时间片而去并发操作同
一个内存地址的最大线程数,最多也就是 CPU 的核数。
在存在线程争用的时候,每个线程被映射到 cells[threadLocalRandomProbe & cells.length] 位置的 Cell 元素,该线程对 value 所做的累加操作,就执行在对应的 Cell 元素的值上,最终相当于将线程绑定到了 cells 中的某个 cell 对象上。

5.3 LongAdder 类的 add 方法

5.3.1 自增

public void increment() {
add(1L);
}

5.3.2 自减

public void decrement() {
add(-1L);
}

5.3.3 add方法

public void add(long x) {
//as: 表示cells引用
//b: base值
//v: 表示当前线程命中的cell的期望值
//m: 表示cells数组长度
//a: 表示当前线程命中的cell
Striped64.Cell[] as;
long b, v;
int m;
Striped64.Cell a;
/*
stop 1:true -> 说明存在竞争,并且cells数组已经初始化了,当前线程需要将数据写入到对应的cell中
false -> 表示cells未初始化,当前所有线程应该将数据写到base中
stop 2:true -> 表示发生竞争了,可能需要重试或者扩容
false -> 表示当前线程cas替换数据成功
*/
if (
(as = cells) != null //stop 1
||
!casBase(b = base, b + x) //stop 2
) {
/*
进入的条件:
1.cells数组已经初始化了,当前线程需要将数据写入到对应的cell中
2.表示发生竞争了,可能需要重试或者扩容
*/

/*
是否有竞争:true -> 没有竞争
false -> 有竞争*/
boolean uncontended = true;
/*
stop 3:as == null || (m = as.length - 1)<0 代表 cells 没有初始化
stop 4:表示当前线程命中的cell为空,意思是还没有其他线程在同一个位置做过累加操作。
stop 5:表示当前线程命中的cell不为空, 然后在该Cell对象上进行CAS设置其值为v+x(x为该 Cell 需要累加的值),如果CAS操作失败,表示存在争用。
*/
if (as == null || (m = as.length - 1) < 0 || //stop 3
(a = as[getProbe() & m]) == null || //stop 4
!(uncontended = a.cas(v = a.value, v + x))) //stop 5
/*
进入的条件:
1.cells 未初始化
2.当前线程对应下标的cell为空
3.当前线程对应的cell有竞争并且cas失败
*/
longAccumulate(x, null, uncontended);
}
}

github

5.3.4 longAccumulate方法

final void longAccumulate(long x, LongBinaryOperator fn,
boolean wasUncontended) {
int h;
//条件成立: 说明当前线程还未分配hash值
if ((h = getProbe()) == 0) {
//1.给当前线程分配hash值
ThreadLocalRandom.current(); // force initialization
//2.提取当前线程的hash值
h = getProbe();
//3.因为上一步提取了重新分配的新的hash值,所以会重新分配cells数组的位置给当前线程写入,先假设它能找到一个元素不冲突的数组下标。
wasUncontended = true;
}
//扩容意向,collide=true 可以扩容,collide=false 不可扩容
boolean collide = false; // True if last slot nonempty
//自旋,一直到操作成功
for (;;) {
//as: 表示cells引用
//a: 当前线程命中的cell
//n: cells数组长度
//a: 表示当前线程命中的cell的期望值
Striped64.Cell[] as; Striped64.Cell a; int n; long v;
//CASE1: cells数组已经初始化了,当前线程将数据写入到对应的cell中
if ((as = cells) != null && (n = as.length) > 0) {
//CASE1.1: true 表示下标位置的 cell 为 null,需要创建 new Cell
if ((a = as[(n - 1) & h]) == null) {
// cells 数组没有处于创建、扩容阶段
if (cellsBusy == 0) { // Try to attach new Cell
Striped64.Cell r = new Striped64.Cell(x); // Optimistically create
// cas去加锁
if (cellsBusy == 0 && casCellsBusy()) {
boolean created = false;
try { // Recheck under lock
Striped64.Cell[] rs; int m, j;
// 再次判断cells不为空 & 槽位为空
if ((rs = cells) != null &&
(m = rs.length) > 0 &&
rs[j = (m - 1) & h] == null) {
rs[j] = r;
created = true;
}
} finally {
cellsBusy = 0;
}
if (created)//创建、扩容成功,退出自旋
break;
continue; // Slot is now non-empty
}
}
collide = false;
}
// CASE1.2:当前线程竞争修改cell失败
else if (!wasUncontended) // CAS already known to fail
wasUncontended = true; // Continue after rehash
// CASE 1.3:当前线程 rehash 过 hash 值,CAS 更新 Cell
else if (a.cas(v = a.value, ((fn == null) ? v + x :
fn.applyAsLong(v, x))))
break;
// CASE1.4:判断是否可以扩容
// CASE1.4.1:n >= NCPU
// true -> cells数组长度已经 >= cpu核数,不可进行扩容,把扩容意向改为false
// false -> 可扩容
// CASE1.4.2:cells != as
// true -> 其它线程已经扩容过了,当前线程rehash之后重试即可
// false -> 未有线程对cells进行修改
else if (n >= NCPU || cells != as)
collide = false; // 把扩容意向改为false
// CASE 1.5:设置扩容意向为 true,但是不一定真的发生扩容
else if (!collide)
collide = true;
//CASE1.6:真正扩容的逻辑
// CASE1.6.1:cellsBusy == 0
// true -> 表示cells没有被其它线程占用,当前线程可以去竞争锁
// false -> 表示有其它线程正在操作cells
// CASE1.6.2:casCellsBusy()
// true -> 表示当前线程获取锁成功,可以进行扩容操作
// false -> 表示当前线程获取锁失败,当前时刻有其它线程在做扩容相关的操作
else if (cellsBusy == 0 && casCellsBusy()) {
try {
//重复判断一下当前线程的临时cells数组是否与原cells数组一致(防止有其它线程提前修改了cells数组,因为cells是volatile的全局变量)
if (cells == as) { // Expand table unless stale
//n << 1 表示数组长度翻一倍
Striped64.Cell[] rs = new Striped64.Cell[n << 1];
for (int i = 0; i < n; ++i)
rs[i] = as[i];
cells = rs;
}
} finally {
cellsBusy = 0;
}
//扩容后,将扩容意向置为false
collide = false;
continue; // Retry with expanded table
}
//重置当前线程hash值
h = advanceProbe(h);
}
//CASE2:cells 还未初始化(as 为 null),并且 cellsBusy 加锁成功
// CASE2.1:判断锁是否被占用
// true -> 表示当前未加锁
// false -> 表示当前已加锁
// CASE2.2:因为其它线程可能会在当前线程给as赋值之后修改了cells
// true -> cells没有被其它线程修改
// false -> cells已经被其它线程修改
// CASE2.3:获取锁
// true -> 获取锁成功 会把cellsBusy = 1
// false -> 表示其它线程正在持有这把锁
else if (cellsBusy == 0 && cells == as && casCellsBusy()) {
boolean init = false;
try {
//双重检查,防止其它线程已经初始化,当前线程再次初始化,会导致数据丢失
// Initialize table
if (cells == as) {
Striped64.Cell[] rs = new Striped64.Cell[2];
rs[h & 1] = new Striped64.Cell(x);
cells = rs;
init = true;
}
} finally {
cellsBusy = 0;
}
if (init)
break;
}
//CASE3:当前线程 cellsBusy 加锁失败,表示其他线程正在初始化 cells
//所以当前线程将值累加到 base,注意 add(…)方法调用此方法时 fn 为 null
else if (casBase(v = base, ((fn == null) ? v + x :
fn.applyAsLong(v, x))))
break; // Fall back on using base
}
}

github

5.3.5 LongAdder 类的 casCellsBusy 方法

casCellsBusy 方法的代码很简单,就是将 cellsBusy 成员的值改为 1 ,表示目前的 cells 数组在初始化或扩容中:

final boolean casCellsBusy() {
return UNSAFE.compareAndSwapInt(this, CELLSBUSY, 0, 1);
}

5.3.6 求和

// sum()方法很简单,就是将base值及所有cell值进行累加
public long sum() {
Cell[] as = cells; Cell a;
long sum = base;
if (as != null) {
for (int i = 0; i < as.length; ++i) {
if ((a = as[i]) != null)
sum += a.value;
}
}
return sum;
}

六、使用 AtomicStampedReference 解决 ABA 问题

JDK 的提供了一个类似 AtomicStampedReference 类来解决 ABA 问题。
AtomicStampReference 在 CAS 的基础上增加了一个 Stamp 整型 印戳(或标记),使用这个印戳可以来觉察数据是否发生变化,给数据带上了一种实效性的检验。
AtomicStampReference 的 compareAndSet 方法首先检查当前的对象引用值是否等于预期引用,
并且当前印戳( Stamp )标志是否等于预期标志,如果全部相等,则以原子方式将引用值和印戳
( Stamp )标志的值更新为给定的更新值。

6.1 AtomicStampReference 的构造器

/**  
* @param initialRef初始引用
* @param initialStamp初始戳记
*/
AtomicStampedReference(V initialRef, int initialStamp)

6.2 AtomicStampReference 的常用的几个方法

方法 介绍
public V getRerference() 引用的当前值
public int getStamp() 返回当前的”戳记”
public boolean weakCompareAndSet(V expectedReference,V newReference,int expectedStamp,int newStamp) expectedReference 引用的旧值;newReference 引用的新值;expectedStamp 旧的戳记;newStamp 新的戳记

6.3 例子

public static void main(String[] args) {
boolean success = false;
AtomicStampedReference<Integer> atomicStampedReference = new AtomicStampedReference<Integer>(1, 0);
int stamp = atomicStampedReference.getStamp();
success = atomicStampedReference.compareAndSet(1, 0, stamp, stamp + 1);
System.out.println("success:" + success + ";reference:" + "" + atomicStampedReference.getReference() + ";stamp:" + atomicStampedReference.getStamp());
//修改印戳,更新失败
stamp = 0;
success = atomicStampedReference.compareAndSet(0, 1, stamp, stamp + 1);
System.out.println("success:" + success + ";reference:" + "" + atomicStampedReference.getReference() + ";stamp:" + atomicStampedReference.getStamp());
}