Rust-Atomic

一、编译器、硬件重排

rust参照了C++20内存模型的原子操作部分。
[重排部分]https://nomicon.purewhite.io/atomics.html#atomics

二、数据访问

什么是原子操作

假设要实现一个计数功能,每次对变量执行+1的操作。CPU执行的时候就需要顺序执行三个操作

  1. 从内存中读取变量的值
  2. 加1
  3. 写回到内存中

如果有两个线程同时操作了这个值,这三个操作可能是交叉的,导致结果不正确。 原子操作就避免了这个问题,保证读取-更改-写回的操作是一个整体。

原子排序方式

原子访问可以告诉硬件和编译器,我们的程序是多线程的。每一个原子访问都关联一种“排序方式”,以确定它和其他访问之间的关系。归根结底,就是告诉编译器和硬件什么是它们不能做的。对于编译器,主要指的是命令的重排。而对于硬件,指的是写操作的结果如何同步到其他的线程。Rust暴露的排序方式包括:

  • 顺序一致性(SeqCst)
  • 释放(Release)
  • 获取(Acquire)
  • Relaxed

Rust提供了Ordering来指定不同的排序方式:

pub enum Ordering {
Relaxed,
Release,
Acquire,
AcqRel,
SeqCst,
}

使用不同的 Ordering 会让编译器或者 CPU 对某些指令进行重新排序执行。

三、synchronizes-with 和 happens-before

对于个自治的操作序列,或者说两个局部时间域。如何将这两个时间的某些操作强行规定一个先后关系,而其他操作不管,就是synchronize-with所意味的。

那么如何做同步呢?就是引入一个点,可以成为同步点,这个点是两个线程的参照物,一个线程的一些操作必须在这个点之前完成,另一个线程的操作必须在这个点之后完成。在代码中就是某个操作成为这个同步点,如何让它成为同步点?

打标记。而标记的设计也是成体系的。对应上面说的,memory_order_release的标记就代表前面的操作必须在这个点之前完成,对于其他的操作不管:比如后面的是不是跑到前面完成了。而memory_order_acquire就是这个点后面的操作必须在点后面完成,就刚好禁止了后面的是不是跑到前面完成了这种情况。

Synchronizes-with - 简单来说,两个线程 A 和 B,以及一个支持原子操作的变量 x,如果 A 线程对 x 写入了一个新的值(store),而 B 线程在 x 上面读取到了这个新的值(load),我们就可以认为,A 的 store 就是 synchronizes-with B 的 load 的。

Happens-before - 这应该算是一个更基础的概念。如果一个操作 B 能看到之前操作 A 产生的结果,那么 A 就是 happens-before B 的。譬如在单线程里面,如果一个操作 A 的语句在操作 B 的前面执行,通常叫做 sequenced-before,那么 A 就是 happens-before B 的

而对于跨线程(inter-thread) 的情况,要判断 happens-before,就需要借助于前面提到的 synchronizes-with 了。如果操作 A 是 synchronizes-with 另一个线程的操作 B 的,那么 A 就是 happens-before B 的。Happens-before 也具有传递性,如果 B 是 happens-before C 的,那么 A 也是 happens-before C。

如果 A 是 sequenced-before B,而 B 是 inter-thread happens-before C 的,那么 A 也是 inter-thread happens-before C。同理,如果 A inter-thread happens-before B,而 B sequenced-before C,那么 A 也是 inter-thread happens-before C。

四、方法

4.1 load

读取原子变量的值。并需要提供Ordering(不包括Release)的排序状态。

4.2 store

修改原子变量的值。并提供Ordering(不包括Acquire)的排序状态。

4.3 swap

将值存储到原子变量中,并返回原子变量中存储的前一个值。并提供Ordering的排序状态

4.4 compare_and_swap

自 1.50.0 起已弃用:使用compare_exchangeorcompare_exchange_weak代替

4.5 compare_exchange

pub fn compare_exchange(
&self,
current: bool,
new: bool,
success: Ordering,
failure: Ordering
) -> Result<bool, bool>

如果当前值与 current 值相同,则将值存储到原子变量中。
返回值是指示是否写入了新值并包含先前值的结果。 成功后,此值保证等于 current

需要两个 Ordering 参数来描述这个操作的内存顺序。 success 描述了在与 current 的比较成功时发生的读取 - 修改 - 写入操作所需的顺序。 failure 描述了比较失败时发生的加载操作所需的排序。 使用 Acquire 作为成功排序,使存储成为操作 Relaxed 的一部分,而使用 Release,则使装载成功 Relaxed。

失败排序只能是 SeqCst,Acquire 或 Relaxed,并且必须等于或弱于成功排序。

use std::sync::atomic::{AtomicBool, Ordering};

let some_bool = AtomicBool::new(true);

assert_eq!(some_bool.compare_exchange(true,
false,
Ordering::Acquire,
Ordering::Relaxed),
Ok(true));
assert_eq!(some_bool.load(Ordering::Relaxed), false);

assert_eq!(some_bool.compare_exchange(true, true,
Ordering::SeqCst,
Ordering::Acquire),
Err(false));
assert_eq!(some_bool.load(Ordering::Relaxed), false);

4.6 compare_exchange_weak

pub fn compare_exchange_weak(
&self,
current: bool,
new: bool,
success: Ordering,
failure: Ordering
) -> Result<bool, bool>

如果当前值与 current 值相同,则将值存储到 bool 中。

AtomicBool::compare_exchange 不同,即使比较成功,也允许该函数错误地失败,这可能导致某些平台上的代码效率更高。

返回值是指示是否写入了新值并包含先前值的结果。

需要两个 Ordering 参数来描述这个操作的内存顺序。 success 描述了在与 current 的比较成功时发生的读取 - 修改 - 写入操作所需的顺序。 failure 描述了比较失败时发生的加载操作所需的排序。 使用 Acquire 作为成功排序,使存储成为操作 Relaxed 的一部分,而使用 Release,则使装载成功 Relaxed。 失败排序只能是 SeqCst,Acquire 或 Relaxed,并且必须等于或弱于成功排序。

use std::sync::atomic::{AtomicBool, Ordering};

let val = AtomicBool::new(false);

let new = true;
let mut old = val.load(Ordering::Relaxed);
loop {
match val.compare_exchange_weak(old, new, Ordering::SeqCst, Ordering::Relaxed) {
Ok(_) => break,
Err(x) => old = x,
}
}

4.7 fetch_xxx

提供了如fetch_addfetch_sub等原子计算操作。

五、实例

5.1 Relaxed

Relaxed ordering 只能保证原子操作,在同一个线程里面,对同一个变量的操作会满足 happens-before 关系,但对于 inter-thread 来说,它不能提供 synchronizes-with 支持,并不保证任何顺序,它对编译器和 CPU 不做任何限制,可以乱序执行。

static X: AtomicBool = AtomicBool::new(false);
static Y: AtomicBool = AtomicBool::new(false);
static Z: AtomicI32 = AtomicI32::new(0);

fn write_x_then_y() {
X.store(true, Ordering::Relaxed);
Y.store(true, Ordering::Relaxed);
}

fn read_y_then_x() {
while !Y.load(Ordering::Relaxed) {}
if X.load(Ordering::Relaxed) {
Z.fetch_add(1, Ordering::SeqCst);
}
}

fn main() {
let t1 = thread::spawn(move || {
write_x_then_y();
});

let t2 = thread::spawn(move || {
read_y_then_x();
});

t1.join().unwrap();
t2.join().unwrap();

assert_ne!(Z.load(Ordering::SeqCst), 0);
}

上面 assert 可能会失败,也就是 Z 的值在最后可能为 1。在函数 write_x_then_y 里面即使 store X happens-before store Y,即使在 read_y_then_x 里面 load Y 返回了 true,X 的值仍然可能是 false。因为对 X 和 Y 的两个操作都是 relaxed 的,虽然对于不同的线程,两个 load 或者两个 store 都可能满足 happens-before,但在 store 和 load 之间,并没有相关的约束,也就是意味着 load 可能看到乱序的 store。

5.2 Release/Acquire/AcqRel

AcquireRelease 通常都是需要成对使用的,当对 store 使用 Release ordering 之后,后续任何的 Acquire ordering 的 load 操作,都会看到之前 store 的值。也就是说,通过 Acquire-Release,我们能支持 synchronizes-with。

static X: AtomicBool = AtomicBool::new(false);
static Y: AtomicBool = AtomicBool::new(false);
static Z: AtomicI32 = AtomicI32::new(0);

fn write_x_then_y() {
X.store(true, Ordering::Relaxed);
Y.store(true, Ordering::Release);
}

fn read_y_then_x() {
while !Y.load(Ordering::Acquire) {}
if X.load(Ordering::Relaxed) {
Z.fetch_add(1, Ordering::SeqCst);
}
}

fn main() {
let t1 = thread::spawn(move || {
write_x_then_y();
});

let t2 = thread::spawn(move || {
read_y_then_x();
});

t1.join().unwrap();
t2.join().unwrap();

assert_ne!(Z.load(Ordering::SeqCst), 0);
}

不同于之前的 relaxed,这里我们对 Y 使用了 Acquire 和 Release,那么最后 Z 就一定不可能为 0 了。主要是因为 store Y 是 synchronizes-with load Y 的,也就是 store Y happens before load Y,因为 store X 是 sequenced-before store Y,那么 store X 就是 happens-before load X 的。

通常,我们还可以使用 AcqRel ordering,它其实就是组合了 AcquireRelease,对于 load 使用 Acquire,而对于 store 则是使用 Release

5.3 SeqCst

不光提供了 Acquire,Releaseordering 支持,同时也确保所有线程会看到完全一致的原子操作顺序。

static X: AtomicBool = AtomicBool::new(false);
static Y: AtomicBool = AtomicBool::new(false);
static Z: AtomicI32 = AtomicI32::new(0);

fn write_x() {
X.store(true, Ordering::SeqCst); // 1
}

fn write_y() {
Y.store(true, Ordering::SeqCst); // 2
}

fn read_x_then_y() {
while !X.load(Ordering::SeqCst) {}
if Y.load(Ordering::SeqCst) { // 3
Z.fetch_add(1, Ordering::SeqCst);
}
}

fn read_y_then_x() {
while !Y.load(Ordering::SeqCst) {}
if X.load(Ordering::SeqCst) { // 4
Z.fetch_add(1, Ordering::SeqCst);
}
}

fn main() {
let t1 = thread::spawn(move || {
write_x();
});

let t2 = thread::spawn(move || {
write_y();
});

let t3 = thread::spawn(move || {
read_x_then_y();
});

let t4 = thread::spawn(move || {
read_y_then_x();
});

t1.join().unwrap();
t2.join().unwrap();
t3.join().unwrap();
t4.join().unwrap();

assert_ne!(Z.load(Ordering::SeqCst), 0);
}

上面的例子,只有使用 SeqCst ordering,才能保证 Z 最后的值不为 0,任何其他的 ordering,都不能保证,我们来具体分析一下。因为两个 read 函数都是有 while 循环,退出之前一定能确保 write 函数被调用了。因为使用 SeqCst 能保证所有线程看到一致的操作顺序,假设 3 返回了 false,表明 X 为 true,而 Y 为 false,这时候一定能保证 store Y 还没调用,一定能保证 store X 在 store Y 之前发生,4 就一定会返回 true。

如果这里我们对 load 使用 Acquire,而对 store 使用 Releaseread_x_then_yread_y_then_x 可能看到完全相反的对 X 和 Y 的操作顺序。

SeqCst 在有些时候,可能会有性能瓶颈,因为它需要确保操作在所有线程之前全局同步,但是它其实又是最直观的一种使用方式, 所以通常,当我们不知道用什么 ordering 的时候,用 SeqCst 就对了。

5.4 fence

出了使用不同的 ordering,我们还可以使用 fence 来支持 synchronizes-with,如下:

static X: AtomicBool = AtomicBool::new(false);
static Y: AtomicBool = AtomicBool::new(false);
static Z: AtomicI32 = AtomicI32::new(0);

fn write_x_then_y() {
X.store(true, Ordering::Relaxed); // 1
fence(Ordering::Release); // 2
Y.store(true, Ordering::Relaxed); // 3
}

fn read_y_then_x() {
while !Y.load(Ordering::Relaxed) {} // 4
fence(Ordering::Acquire); // 5
if X.load(Ordering::Relaxed) { // 6
Z.fetch_add(1, Ordering::SeqCst);
}
}

fn main() {
let t1 = thread::spawn(move || {
write_x_then_y();
});

let t2 = thread::spawn(move || {
read_y_then_x();
});

t1.join().unwrap();
t2.join().unwrap();

assert_ne!(Z.load(Ordering::SeqCst), 0);
}

在上面的例子中,2 Release fence 是 synchronizes-with 5 Acquire fence 的,而4 load Y 的时候一定会读取到 3 store Y 的值,加上 1 store X 是 sequenced-before 3 的,那么自然能确定 1 是 happens-before 6 的。也就是 Z 一定不会等于 0。