一、编译器、硬件重排
rust参照了C++20内存模型的原子操作部分。
[重排部分]https://nomicon.purewhite.io/atomics.html#atomics
二、数据访问
什么是原子操作
假设要实现一个计数功能,每次对变量执行+1的操作。CPU执行的时候就需要顺序执行三个操作
- 从内存中读取变量的值
- 加1
- 写回到内存中
如果有两个线程同时操作了这个值,这三个操作可能是交叉的,导致结果不正确。 原子操作就避免了这个问题,保证读取-更改-写回的操作是一个整体。
原子排序方式
原子访问可以告诉硬件和编译器,我们的程序是多线程的。每一个原子访问都关联一种“排序方式”,以确定它和其他访问之间的关系。归根结底,就是告诉编译器和硬件什么是它们不能做的。对于编译器,主要指的是命令的重排。而对于硬件,指的是写操作的结果如何同步到其他的线程。Rust暴露的排序方式包括:
- 顺序一致性(SeqCst)
- 释放(Release)
- 获取(Acquire)
- Relaxed
Rust提供了Ordering
来指定不同的排序方式:pub enum Ordering {
Relaxed,
Release,
Acquire,
AcqRel,
SeqCst,
}
使用不同的 Ordering
会让编译器或者 CPU 对某些指令进行重新排序执行。
三、synchronizes-with 和 happens-before
对于个自治的操作序列,或者说两个局部时间域。如何将这两个时间的某些操作强行规定一个先后关系,而其他操作不管,就是synchronize-with所意味的。
那么如何做同步呢?就是引入一个点,可以成为同步点,这个点是两个线程的参照物,一个线程的一些操作必须在这个点之前完成,另一个线程的操作必须在这个点之后完成。在代码中就是某个操作成为这个同步点,如何让它成为同步点?
打标记。而标记的设计也是成体系的。对应上面说的,memory_order_release的标记就代表前面的操作必须在这个点之前完成,对于其他的操作不管:比如后面的是不是跑到前面完成了。而memory_order_acquire就是这个点后面的操作必须在点后面完成,就刚好禁止了后面的是不是跑到前面完成了这种情况。
Synchronizes-with - 简单来说,两个线程 A 和 B,以及一个支持原子操作的变量 x,如果 A 线程对 x 写入了一个新的值(store),而 B 线程在 x 上面读取到了这个新的值(load),我们就可以认为,A 的 store 就是 synchronizes-with B 的 load 的。
Happens-before - 这应该算是一个更基础的概念。如果一个操作 B 能看到之前操作 A 产生的结果,那么 A 就是 happens-before B 的。譬如在单线程里面,如果一个操作 A 的语句在操作 B 的前面执行,通常叫做 sequenced-before,那么 A 就是 happens-before B 的
而对于跨线程(inter-thread) 的情况,要判断 happens-before,就需要借助于前面提到的 synchronizes-with 了。如果操作 A 是 synchronizes-with 另一个线程的操作 B 的,那么 A 就是 happens-before B 的。Happens-before 也具有传递性,如果 B 是 happens-before C 的,那么 A 也是 happens-before C。
如果 A 是 sequenced-before B,而 B 是 inter-thread happens-before C 的,那么 A 也是 inter-thread happens-before C。同理,如果 A inter-thread happens-before B,而 B sequenced-before C,那么 A 也是 inter-thread happens-before C。
四、方法
4.1 load
读取原子变量的值。并需要提供Ordering(不包括Release)
的排序状态。
4.2 store
修改原子变量的值。并提供Ordering(不包括Acquire)
的排序状态。
4.3 swap
将值存储到原子变量中,并返回原子变量中存储的前一个值。并提供Ordering
的排序状态
4.4 compare_and_swap
自 1.50.0 起已弃用:使用compare_exchangeorcompare_exchange_weak代替
4.5 compare_exchange
|
如果当前值与 current
值相同,则将值存储到原子变量中。
返回值是指示是否写入了新值并包含先前值的结果。 成功后,此值保证等于 current
。
需要两个 Ordering 参数来描述这个操作的内存顺序。 success 描述了在与 current 的比较成功时发生的读取 - 修改 - 写入操作所需的顺序。 failure 描述了比较失败时发生的加载操作所需的排序。 使用 Acquire 作为成功排序,使存储成为操作 Relaxed 的一部分,而使用 Release,则使装载成功 Relaxed。
失败排序只能是 SeqCst,Acquire 或 Relaxed,并且必须等于或弱于成功排序。use std::sync::atomic::{AtomicBool, Ordering};
let some_bool = AtomicBool::new(true);
assert_eq!(some_bool.compare_exchange(true,
false,
Ordering::Acquire,
Ordering::Relaxed),
Ok(true));
assert_eq!(some_bool.load(Ordering::Relaxed), false);
assert_eq!(some_bool.compare_exchange(true, true,
Ordering::SeqCst,
Ordering::Acquire),
Err(false));
assert_eq!(some_bool.load(Ordering::Relaxed), false);
4.6 compare_exchange_weak
|
如果当前值与 current 值相同,则将值存储到 bool 中。
与 AtomicBool::compare_exchange
不同,即使比较成功,也允许该函数错误地失败,这可能导致某些平台上的代码效率更高。
返回值是指示是否写入了新值并包含先前值的结果。
需要两个 Ordering 参数来描述这个操作的内存顺序。 success 描述了在与 current 的比较成功时发生的读取 - 修改 - 写入操作所需的顺序。 failure 描述了比较失败时发生的加载操作所需的排序。 使用 Acquire 作为成功排序,使存储成为操作 Relaxed 的一部分,而使用 Release,则使装载成功 Relaxed。 失败排序只能是 SeqCst,Acquire 或 Relaxed,并且必须等于或弱于成功排序。use std::sync::atomic::{AtomicBool, Ordering};
let val = AtomicBool::new(false);
let new = true;
let mut old = val.load(Ordering::Relaxed);
loop {
match val.compare_exchange_weak(old, new, Ordering::SeqCst, Ordering::Relaxed) {
Ok(_) => break,
Err(x) => old = x,
}
}
4.7 fetch_xxx
提供了如fetch_add
、fetch_sub
等原子计算操作。
五、实例
5.1 Relaxed
Relaxed ordering
只能保证原子操作,在同一个线程里面,对同一个变量的操作会满足 happens-before 关系,但对于 inter-thread 来说,它不能提供 synchronizes-with 支持,并不保证任何顺序,它对编译器和 CPU 不做任何限制,可以乱序执行。static X: AtomicBool = AtomicBool::new(false);
static Y: AtomicBool = AtomicBool::new(false);
static Z: AtomicI32 = AtomicI32::new(0);
fn write_x_then_y() {
X.store(true, Ordering::Relaxed);
Y.store(true, Ordering::Relaxed);
}
fn read_y_then_x() {
while !Y.load(Ordering::Relaxed) {}
if X.load(Ordering::Relaxed) {
Z.fetch_add(1, Ordering::SeqCst);
}
}
fn main() {
let t1 = thread::spawn(move || {
write_x_then_y();
});
let t2 = thread::spawn(move || {
read_y_then_x();
});
t1.join().unwrap();
t2.join().unwrap();
assert_ne!(Z.load(Ordering::SeqCst), 0);
}
上面 assert 可能会失败,也就是 Z 的值在最后可能为 1。在函数 write_x_then_y
里面即使 store X happens-before store Y,即使在 read_y_then_x
里面 load Y 返回了 true,X 的值仍然可能是 false。因为对 X 和 Y 的两个操作都是 relaxed
的,虽然对于不同的线程,两个 load 或者两个 store 都可能满足 happens-before,但在 store 和 load 之间,并没有相关的约束,也就是意味着 load 可能看到乱序的 store。
5.2 Release/Acquire/AcqRel
Acquire
和 Release
通常都是需要成对使用的,当对 store 使用 Release ordering
之后,后续任何的 Acquire ordering
的 load 操作,都会看到之前 store 的值。也就是说,通过 Acquire-Release
,我们能支持 synchronizes-with。static X: AtomicBool = AtomicBool::new(false);
static Y: AtomicBool = AtomicBool::new(false);
static Z: AtomicI32 = AtomicI32::new(0);
fn write_x_then_y() {
X.store(true, Ordering::Relaxed);
Y.store(true, Ordering::Release);
}
fn read_y_then_x() {
while !Y.load(Ordering::Acquire) {}
if X.load(Ordering::Relaxed) {
Z.fetch_add(1, Ordering::SeqCst);
}
}
fn main() {
let t1 = thread::spawn(move || {
write_x_then_y();
});
let t2 = thread::spawn(move || {
read_y_then_x();
});
t1.join().unwrap();
t2.join().unwrap();
assert_ne!(Z.load(Ordering::SeqCst), 0);
}
不同于之前的 relaxed
,这里我们对 Y 使用了 Acquire 和 Release
,那么最后 Z 就一定不可能为 0 了。主要是因为 store Y 是 synchronizes-with load Y 的,也就是 store Y happens before load Y,因为 store X 是 sequenced-before store Y,那么 store X 就是 happens-before load X 的。
通常,我们还可以使用 AcqRel ordering
,它其实就是组合了 Acquire
和 Release
,对于 load 使用 Acquire
,而对于 store 则是使用 Release
。
5.3 SeqCst
不光提供了 Acquire,Release
的 ordering
支持,同时也确保所有线程会看到完全一致的原子操作顺序。static X: AtomicBool = AtomicBool::new(false);
static Y: AtomicBool = AtomicBool::new(false);
static Z: AtomicI32 = AtomicI32::new(0);
fn write_x() {
X.store(true, Ordering::SeqCst); // 1
}
fn write_y() {
Y.store(true, Ordering::SeqCst); // 2
}
fn read_x_then_y() {
while !X.load(Ordering::SeqCst) {}
if Y.load(Ordering::SeqCst) { // 3
Z.fetch_add(1, Ordering::SeqCst);
}
}
fn read_y_then_x() {
while !Y.load(Ordering::SeqCst) {}
if X.load(Ordering::SeqCst) { // 4
Z.fetch_add(1, Ordering::SeqCst);
}
}
fn main() {
let t1 = thread::spawn(move || {
write_x();
});
let t2 = thread::spawn(move || {
write_y();
});
let t3 = thread::spawn(move || {
read_x_then_y();
});
let t4 = thread::spawn(move || {
read_y_then_x();
});
t1.join().unwrap();
t2.join().unwrap();
t3.join().unwrap();
t4.join().unwrap();
assert_ne!(Z.load(Ordering::SeqCst), 0);
}
上面的例子,只有使用 SeqCst ordering
,才能保证 Z 最后的值不为 0,任何其他的 ordering,都不能保证,我们来具体分析一下。因为两个 read 函数都是有 while 循环,退出之前一定能确保 write 函数被调用了。因为使用 SeqCst
能保证所有线程看到一致的操作顺序,假设 3 返回了 false,表明 X 为 true,而 Y 为 false,这时候一定能保证 store Y 还没调用,一定能保证 store X 在 store Y 之前发生,4 就一定会返回 true。
如果这里我们对 load 使用 Acquire
,而对 store 使用 Release
,read_x_then_y
和 read_y_then_x
可能看到完全相反的对 X 和 Y 的操作顺序。
SeqCst
在有些时候,可能会有性能瓶颈,因为它需要确保操作在所有线程之前全局同步,但是它其实又是最直观的一种使用方式, 所以通常,当我们不知道用什么 ordering 的时候,用 SeqCst
就对了。
5.4 fence
出了使用不同的 ordering
,我们还可以使用 fence
来支持 synchronizes-with,如下:static X: AtomicBool = AtomicBool::new(false);
static Y: AtomicBool = AtomicBool::new(false);
static Z: AtomicI32 = AtomicI32::new(0);
fn write_x_then_y() {
X.store(true, Ordering::Relaxed); // 1
fence(Ordering::Release); // 2
Y.store(true, Ordering::Relaxed); // 3
}
fn read_y_then_x() {
while !Y.load(Ordering::Relaxed) {} // 4
fence(Ordering::Acquire); // 5
if X.load(Ordering::Relaxed) { // 6
Z.fetch_add(1, Ordering::SeqCst);
}
}
fn main() {
let t1 = thread::spawn(move || {
write_x_then_y();
});
let t2 = thread::spawn(move || {
read_y_then_x();
});
t1.join().unwrap();
t2.join().unwrap();
assert_ne!(Z.load(Ordering::SeqCst), 0);
}
在上面的例子中,2 Release fence 是 synchronizes-with 5 Acquire fence 的,而4 load Y 的时候一定会读取到 3 store Y 的值,加上 1 store X 是 sequenced-before 3 的,那么自然能确定 1 是 happens-before 6 的。也就是 Z 一定不会等于 0。