八、并发
8.1 竞争
安全Rust保证了不存在数据竞争。数据竞争指的是:
- 两个或两个以上的线程并发地访问同一块内存
- 其中一个线程做写操作
- 其中一个线程是非同步(unsynchronized)的
数据竞争导致未定义行为,所以不可能在安全Rust中存在。大多数情况下,Rust的所有权系统就可以避免数据竞争:不可能有可变引用的别名,因此也就不可能有数据竞争。但是内部可变性把这件事弄得复杂了,这也是为什么我们要有Send和Sync(见下)。
但是Rust并不会避免一般竞争条件。
因为要做到这一点其实是不可能的,而且好像也是不必要的。你的硬件是竞争的,操作系统是竞争的,计算机上其他的程序是竞争的,整个世界都是竞争的。任何一个声称可以避免所有竞争条件的系统,即使没有错误,也一定及其难用。
所以,安全Rust出现死锁,或者因为不正确的同步而做出一些奇怪的行为,这些都是可以接受的。显然这样的程序并不是最理想的程序,但Rust也只能帮你到这了。而且,竞争条件自己不能违反Rust的内存安全性。只有配合上其他的非安全代码,竞争条件才有可能破坏内存安全。比如:use std::thread;
use std::sync::atomic::{AtomicUsize, Ordering};
use std::sync::Arc;
let data = vec![1, 2, 3, 4];
// 使用Arc,这样即使程序已经执行完毕了,存储AtomicUsize的内存依然存在,
// 其他的线程可以增加它的值。否则Rust不能编译这段代码,因为thread:spawn
// 对生命周期有限制。
let idx = Arc::new(AtomicUsize::new(0));
let other_idx = idx.clone();
// move获得other_idx的所有权,将它移入线程
thread::spawn(move || {
// 可以改变idx,因为它的值是一个原子,不会引起数据竞争
other_idx.fetch_add(10, Ordering::SeqCst);
});
// 用原子中的值做索引。这么做是安全的,因为我们只读取了一次原子的内存,
// 然后将读出的值的拷贝传递给Vec做索引。索引过程可以做正确的边界检查,
// 在执行索引期间这个值也不会发生改变。
// 但是,如果上面的线程在执行这句代码之前增加了这个值,这段代码会panic。
// 这符合竞争条件,因为程序执行得正确与否(panic几乎不可能是正确的)
// 依赖于线程的执行顺序
println!("{}", data[idx.load(Ordering::SeqCst)]);use std::thread;
use std::sync::atomic::{AtomicUsize, Ordering};
use std::sync::Arc;
let data = vec![1, 2, 3, 4];
let idx = Arc::new(AtomicUsize::new(0));
let other_idx = idx.clone();
// move获得other_idx的所有权,将它移入线程
thread::spawn(move || {
// 可以改变idx,因为它的值是一个原子,不会引起数据竞争
other_idx.fetch_add(10, Ordering::SeqCst);
});
if idx.load(Ordering::SeqCst) < data.len() {
unsafe {
// 在边界检查之后读取idx的值是不正确的,因为它有可能已经改变了。
// 这是一个竞争条件,而且十分危险,因为我们要使用的get_unchecked是非安全的。
println!("{}", data.get_unchecked(idx.load(Ordering::SeqCst)));
}
}
8.2 Send 和 Sync
不是所有人都遵守可变性的原则。有一些类型允许你拥有一块内存的多个别名,同时还改变内存的值。除非这些类型使用同步来控制访问,否则它们就不是线程安全的。Rust根据Send
和Sync
这两个trait获取相关信息。
- 如果一个类型可以安全地传递给另一个线程,这个类型是
Send
- 如果一个类型可以安全地被多个线程共享(也就是
&T
是Send
),这个类型是Sync
Send
和Sync
是Rust并发机制的基础。因此,Rust赋予它们许多的特性,以保证它们能正确工作。首当其冲的,它们都是非安全trait。这表明它们的实现也是非安全的,而其他的非安全代码则可以假设这些实现是正确的。由于它们是标志trait(它们没有任何关联的方法),“正确地实现”仅仅意味着实现满足它所需要的内部特征。不正确地实现Send
和Sync
会导致未定义行为。
Send
和Sync
还是自动推导的trait。和其他的trait不同,如果一个类型完全由Send
或Sync
组成,那么这个类型本身也是Send
或Sync
。几乎所有的基本类型都是Send
和Sync
,因此你能见到的很多类型也就都是Send
和Sync
。
主要的例外情况有:
- 裸指针不是
Send
也不是Sync
(因为它们没有安全性保证) UnsafeCell
不是Sync
(所以Cell
和RefCell
也不是)Rc
不是Send
或Sync
(因为引用计数是共享且非同步的)
Rc
和UnsafeCell
是典型的非线程安全的:它们允许非同步地共享可变状态。可是,裸指针严格来说并不一定非得是非线程安全不可。通过裸指针做任何有意义的事情都需要先对它解引用,这一步就已经是非安全的了。从这个角度来说,有人可能会认为把它标为线程安全的也未尝不可。
可是,它们被标为非线程安全的主要目的是避免包含它们的类型自动成为线程安全的。这些类型都有着重要的不可追踪的所有权,保证它们线程安全需要花费大量的精力,而他们的作者不太可能做到这一点。Rc
就是一个很好的例子,一个包含*mut
的类型绝对不能是线程安全的。
不是自动推导的类型也可以很容易地实现Send
和Sync
:struct MyBox(*mut u8);
unsafe impl Send for MyBox {}
unsafe impl Sync for MyBox {}
还有一个很少见的场景,一个类型被自动推导为Send
或Sync
,但是它其实不满足二者的要求。这时我们可以去掉Send
和Sync
的实现:#![feature(option_builtin_traits)]
// 对于同步的基础类型有着神奇的语义
struct SpecialThreadToken(u8);
impl !Send for SpecialThreadToken {}
impl !Sync for SpecialThreadToken {}
注意,一个类型自己不可能被不正确地推导为Send
和Sync
。只有当类型和其他的非安全代码一起实现了一些特殊行为时,它才可能成为一个不正确的Send
或Sync
。
大部分使用裸指针的类型都应该把裸指针用一种抽象隐藏起来,以保证类型可以被推导为Send
和Sync
。比如,所有Rust的标准集合类型都是Send
和Sync
(在他们包含Send
和Sync
类型的情况下),虽然它们都大量使用了裸指针处理内存分配和复杂的所有权。类似的,大部分这些集合的迭代器也是Send
和Sync
,因为它们的行为很像这些集合的&
或者&mut
。
8.3 原子操作
C11的内存模型试图同时满足开发者对语义的要求、编译器对优化的要求、还有硬件对千奇百怪混乱状态的要求。而我们只希望能写一段程序做我们想让它做的事情,并且要做得快。是不是很不错?
8.3.1 编译器重排
编译器努力地通过各种复杂的变换,尽可能减少数据依赖和消除死代码。特别是,它可能会彻底改变事件的顺序,或者干脆让某些事件永远不会发生!如果我们写了这样的代码x = 1;
y = 3;
x = 2;
编译器会发现这段程序最好能变成x = 2;
y = 3;
事件的顺序变了,还有一个事件完全消失了。在单线程的情况下,我们不会察觉有什么区别:毕竟代码执行后可以得到和我们期望的完全相同的状态。但如果程序是多线程的,我们可能确实需要在y
被赋值前将x
赋值为1。我们希望编译器能做出这一类优化,因为这可以提升程序的性能。可另一方面,我们还希望我们写的程序能完全按照我们的指令行事。
8.3.2 硬件重排
即使编译器完全明白了我们的意图并且按照我们的期望去工作,硬件还是有可能来找麻烦的。麻烦来自于在内存分层模式下的CPU。你的硬件系统里确实有一些全局共享的内存空间,但是在各个CPU核心看来,这些内存都离得太远,速度也太慢。CPU希望能在它的本地cache里操作数据,只有在cache里没有需要的内存时才委屈地和共享内存打交道。
毕竟,这不就是cache存在的全部意义吗?如果每一次读取cache都要再去检查共享内存看看数据有没有变化,那么cache还有什么价值呢?最终的结果就是,硬件不能保证相同的事件在两个不同的线程里一定有相同的执行顺序。如果要保证这点,我们必须有一些特殊的方法告诉CPU稍微变笨一点。
比如,我们已经成功地让编译器保证下面的逻辑:初始状态: x = 0, y = 1
线程1 线程2
y = 3; if x == 1 {
x = 1; y *= 2;
}
这段程序实际上有两种可能的结果:
y = 3
:线程2在线程1完成之前检查了x的值y = 6
:线程2在线程1完成之后检查了x的值
但是硬件还会创造出第三种状态:
y = 2
:线程2看到了x = 1
,但是没看到y = 3
,接下来用计算结果覆盖了y = 3
不同的CPU提供了不同的保证机制,但是详细区分它们没什么意义。一般来说只需要把硬件分为两类:强顺序的和弱顺序的。最明显的,x86/64平台提供了强顺序保证,而ARM提供弱顺序保证。对于并发编程来说,它们也会导致不同的结果:
- 在强顺序硬件上要求强顺序保证的开销很小,甚至可能为零,因为硬件本身已经无条件提供了强保证。而弱保证可能只能在弱顺序硬件上获得性能优势。
- 在强顺序硬件上要求过于弱的顺序保证有可能也会碰巧成功,即使你的程序是错误的。如果可能的话,在弱保证硬件上测试并发算法。
8.3.3 数据访问
C11的内存模型允许我们接触到程序的因果关系,希望以此满足多个方面的要求。一般来说,就是要确定程序的各个部分以及运行它们的多个线程之前的时间先后关系。在严格的先后关系没有确定的时候,硬件和编译器有足够的空间做一些激进的优化。而关系确定之后,它们的优化就必须很小心了。我们通过“数据访问”和“原子访问”来控制这种关系。
数据访问是程序设计世界的基础。它们都是非同步的,而且编译器可以做出一些激进的优化。尤其是,编译器认定数据访问都是单线程的,所以可以对它随意地重排。硬件也可以把数据访问的重排的结果移植到其他的线程上去,无论结果多么的滞后和不一致都可以。数据访问最严重的问题是,它会导致数据竞争。数据访问对硬件和编译器很友好,但是我们已经看到了编写和它相关的同步程序是十分可怕的。事实上,它的同步语义太弱了。
只依靠数据访问是不可能写出正确的同步代码的。
原子访问可以告诉硬件和编译器,我们的程序是多线程的。每一个原子访问都关联一种“排序方式”,以确定它和其他访问之间的关系。归根结底,就是告诉编译器和硬件什么是它们不能做的。对于编译器,主要指的是命令的重排。而对于硬件,指的是写操作的结果如何同步到其他的线程。Rust暴露的排序方式包括:
- 顺序一致性(SeqCst)
- 释放(Release)
- 获取(Acquire)
- Relaxed
8.3.4 顺序一致性
顺序一致性是所有排序方式中最强大的,包含了其他所有排序方式的约束条件。直观上看,顺序一致性操作不能被重排:在同一个线程中,SeqCst之前的访问永远在它之前,之后的访问永远在它之后。只使用顺序一致性原子操作和数据访问就可以构建一个无数据竞争的程序,这种程序的好处是它的命令在所有线程上都有着唯一的执行流程。而且这个执行流程又很容易推导:它就是每个线程各自执行流程的交叉。如果你使用更弱的原子排序方式的话,这一点并不一定继续有效。
顺序一致性给开发者的便利并不是免费的。即使是在强顺序平台上,顺序一致性也会产生内存屏障(memory fence)。
事实上,顺序一致性很少是程序正确性的必要条件。但是,如果你对其他内存排序方式模棱两可的话,顺序一致性绝对是你正确的选择。程序执行得稍微慢一点总比执行出错要好!将它变为具有更弱一致性的原子操作也很容易,只要把SeqCst变成Relaxed就完工了!当然,证明这种变化的正确性就是另外一个问题了。
8.3.5 获取-释放
获取和释放经常成对出现。它们的名字就提示了它们的应用场景:它们适用于获取和释放锁,确保临界区不会重叠。
直观看起来,acquire保证在它之后的访问永远在它之后。可在它之前的操作却有可能被重排到它后面、类似的,release保证它之前的操作永远在它之前。但是它后面的操作可能被重排到它前面。
当线程A释放了一块内存空间,紧接着线程B获取了同一块内存,这时因果关系就确定了。在A释放之前的所有写操作的结果,B在获取之后都能看到。但是,它们和其他线程之间没有确定因果关系。同理,如果A和B访问的是不同的内存,它们也没有因果关系。
所以,释放-获取的基本用法很简单:你获取一块内存并进入临界区,然后释放内存并离开临界区。比如,一个简单的自旋锁可能是这样的:use std::sync::Arc;
use std::sync::atomic::{AtomicBool, Ordering};
use std::thread;
fn main() {
let lock = Arc::new(AtomicBool::new(false)); // 我上锁了吗?
// ...用某种方式把锁分发到各个线程...
// 设置值为true,以尝试获取锁
while lock.compare_and_swap(false, true, Ordering::Acquire) {}
// 跳出循环,表明我们获取到了锁!
// ...恐怖的数据访问...
// 工作完成了,释放锁
lock.store(false, Ordering::Release);
}
在强顺序平台上,大多数的访问都有释放和获取的语义,释放和获取通常是无开销的。不过在弱顺序平台上不是这样。
8.3.6 Relaxed
Relaxed访问是最弱的。它们可以被随意重排,也没有先后关系。但是Relaxed操作依然是原子的。也就是说,它并不算是数据访问,所有对它的读-修改-写操作都是原子的。Relaxed操作适用于那些你希望发生但又并不特别在意的事情。比如,多线程可以使用Relaxed的fetch_add来增加计数器,如果你不使用计数器的值去同步其他的访问,这个操作就是安全的。
在强顺序平台上使用Relaxed没什么好处,因为它们通常都有释放-获取语义。不过,在弱顺序平台上,Relaxed可以获取更小的开销。
九、实现Vec
我们要把所有的内容汇总起来,从头开始写一个std::Vec
。
9.1 布局
我们先来看看结构体的布局。Vec由三部分组成:一个指向分配空间的指针、空间的大小、以及已经初始化的元素的数量。
简单来说,我们的设计只要这样:pub struct Vec<T> {
ptr: *mut T,
cap: usize,
len: usize,
}
这段代码可以通过编译。可不幸的是,它是不正确的。首先,编译器产生的变性过于严格。所以&Vev<&'static str>
不能当做&Vev<&'a str>
使用。更主要的是,它会给drop检查器传递错误的所有权信息,因为编译器会保守地假设我们不拥有任何的值。
我们在所有权一章见到的,当裸指针指向一块我们拥有所有权的位置,我们应该使用Unique<T>
代替*mut T
。尽管Unique
是不稳定的,我们尽可能不去使用它。
Unique
封装了一个裸指针,并且声明它自己:
- 对
T
协变 - 拥有类型
T
的值(用于drop检查) - 如果
T
是Send/Sync
,那就也是Send/Sync
- 指针永远不为
null
(所以Option
可以做空指针优化)
除了最后一点,其余的我们都可以用稳定的Rust实现:use std::marker::PhantomData;
use std::ops::Deref;
use std::mem;
struct Unique<T> {
ptr: *const T, // 使用*const保证变性
_marker: PhantomData<T>, // 用于drop检查
}
// 设置Send和Sync是安全地,因为我们是Unique中的数据的所有者
// Unique<t>好像就是T一样
unsafe impl<T: Send> Send for Unique<T> {}
unsafe impl<T: Sync> Sync for Unique<T> {}
impl<T> Unique<T> {
pub fn new(ptr: *mut T) -> Self {
Unique { ptr: ptr, _marker: PhantomData }
}
pub fn as_ptr(&self) -> *mut T {
self.ptr as *mut T
}
}
可是,声明数据不为0的方法是不稳定的,而且短期内都不太可能会稳定下来。所以我们还是接受现实,使用比标准库的Unique:
为此,我们将使用Unique<T>
的另一个包装器NonNull<T>
,而不是直接使用他,它为我们提供了上述两个属性,即它是协变的,T
并且被声明为永远不会为空。通过添加PhantomData<T>
(用于丢弃检查)并实现 Send/Sync
,我们得到与使用相同的结果 Unique<T>
:use std::ptr::NonNull;
use std::marker::PhantomData;
pub struct Vec<T> {
ptr: NonNull<T>,
cap: usize,
len: usize,
_marker: PhantomData<T>,
}
unsafe impl<T: Send> Send for Vec<T> {}
unsafe impl<T: Sync> Sync for Vec<T> {}
fn main() {}
9.2 内存分配
使用NonNull
给Vec
(以及所有的标准库集合)造成了一个问题:空的Vec
不会分配内存。这与分配零大小的内存块不同,全局分配器不允许这样做(它会导致未定义的行为!)。如果既不能分配内存,又不能给ptr
传递一个空指针,那我们在Vec::new
中能做什么呢?好吧,我们就胡乱往Vec里塞点东西。
这么做没什么问题,因为我们用cap == 0
来表示没有分配空间。我们也不用做什么特殊的处理,因为我们通常都会去检查cap > len
或者len > 0
。Rust推荐的放进去的值是mem::align_of::<T>()
。NonNull
则提供了一个更方便的方式NonNull::dangling()
。我们会在很多的地方用到dangling
,因为有时候我们没有实际分配的内存,而null
会降低编译器的效率。
所以:use std::mem;
impl<T> Vec<T> {
fn new() -> Self {
assert!(mem::size_of::<T>() != 0, "We're not ready to handle ZSTs");
Vec {
ptr: NonNull::dangling(),
len: 0,
cap: 0,
_marker: PhantomData,
}
}
}
fn main() {}
我们插入了一个assert语句,因为零尺寸类型需要做很多特殊的处理,我们希望以后再讨论这个问题。如果没有assert的话,我们之前的代码会出现很多严重的问题。
接下来我们要讨论在需要内存空间的时候,我们要做些什么。这里我们需要使用全局分配函数alloc
, realloc
和dealloc
。
我们还需要能够处理内存不足(OOM)的方法。标准库会调用alloc::headle_alloc_error
,它将以特定于平台的方式中止程序。
当然,这么做显得有一点傻乎乎,因为大多数平台正常情况下都不会真的没有内存。如果你的程序正常地耗尽了内存,操作系统可能会用其他的方式kill掉它。真的遇到OOM,最有可能的原因是我们一次性的请求严重过量的内存(比如,理论地址空间的一半)。这种情况下其实可以panic而不用担心有什么问题。不过,我们希望尽量模仿标准库的行为,所以我们还是中止整个程序。
好了,现在我们可以编写扩容的代码了。简单粗暴一点,我们需要这样的逻辑:if cap == 0:
allocate()
cap = 1
else:
reallocate()
cap *= 2
但是Rust支持的分配器API过于底层了,我们不得不做一些其他的工作。我们还需要应对过大的或者空的内存分配等特殊的场景。
特别是ptr::offset
会给我们造成很多麻烦。因为它的语义是LLVM
的GEP
入栈指令。如果你很幸运,以前没有处理过这个语义,这里就简单介绍一下GEP
的作用:别名分析,别名分析,别名分析。推导数据依赖和别名对于一个成熟的编译器来说至关重要。
一个简单的例子,看一下下面这段代码:*x *= 7;
*y *= 3;
如果编译器可以证明x
和y
指向内存的不同区域,那么这两个操作理论上可以并行执行(比如,把它们加载到不同的寄存器并各自独立地处理)。但一般编译器不能这么做,因为如果x
和y
指向相同的区域,两个操作是在同一个值上做的,最后的结果不能合并到一起。
如果你使用了GEP
入栈,你其实是在告诉LLVM
你的offset
操作是在一个分配实体里面做的。LLVM
可以认为,当已知两个指针指向不同的对象时,他们所有的offset
也都不是重名的(因为它们只能指向某个确定范围内的位置)。LLVM
针对GEP
偏移做了很多的优化,而入栈偏移是效果最好的,所以我们也要尽可能地利用它。
这就是GEP做的事情,那么它怎么会给我们制造麻烦呢?
第一个问题,我们索引数组时使用的是无符号整数,但GEP
(其实也就是ptr::offset
)接受的是有符号整数。这表明有一半合法的索引值是超出了GEP
的范围的,会指向错误的方向。所以我们必须限制所有的分配空间最多有isize::Max
个元素。这实际意味着我们只需要关心一个字节大小的对象,因为数量> isize::MAX
个u16
会耗尽系统的内存。不过,为了避免一些奇怪的边界场景,比如有人将少于isize::MAX
个对象的数组重解析为字节数组,标准库还限制了分配空间最大为isize::MAX
个字节。
Rust目前支持的各种64位目标平台,都被人为限制了内存地址空间明显小于64位(现代x86平台只暴露了48位的寻址空间),所以我们可以依赖于OOM实现上面的要求。但是对于32位目标平台,特别是那些借助扩展可以使用多于寻址空间的内存的平台(PAE x86或x32),理论上可能成功分配到多于isize::MAX
字节的内存。
不过因为本书只是一个教程,我们也不必做得绝对完美。这里就使用无条件检查,而不用更智能的平台相关的cfg
。
另一个需要关注的边界场景是空分配。而空分配又分为两种:cap = 0
,以及cap > 0
但是类型大小为0。
这些场景的特殊性在于,它们都做了特殊的处理以适配LLVM
的“已分配”的概念。LLVM
的分配的概念比我们通常的理解要更加抽象。因为LLVM
要适配多种语言的语义以及分配器,它其实并不知道什么叫做分配。它所谓的分配的实际含义是“不要和其他的东西重叠”。也就是说,堆分配、栈分配已经全局变量都不能有重合的区域。是的,这就是别名分析。如果Rust和这一概念保持一致的话,理论上可以做到更快更灵活。
回到空分配的场景,代码中许多的地方都可能需要偏移 0。现在的问题是:这么做会导致冲突吗?对于零尺寸类型,我们知道它可以做到任意数量的GEP
入栈偏移而不会引起任何问题。这实际上是一个运行期的no-op,因为所有的元素都不占用空间,可以假设有无数个零尺寸类型位于0x01
。当然,没有哪个分配器真的会分配那个地址,因为它们不会分配0x00
,而最小的对齐(alignment)通常要大于一个字节。同时,内存的第一页通常处于受保护状态,不会在上面分配空间(对于大多数平台,一页是4k的
空间)。
如果是尺寸大于0的类型呢?这种情况就更复杂一些。原则上,你可以认为偏移0不会给LLVM提供任何的信息:地址的前面或后面可能存在一些元素,可不需要知道它们确切是什么。但是,我们还是谨慎一些,假设这么做有可能导致不好的情况。所以我们会显式地避免这种场景。
终于要结束了。
不要再说这些废话了,我们实际写一段内存分配的代码:use std::alloc::{self, Layout};
impl<T> Vec<T> {
fn grow(&mut self) {
let (new_cap, new_layout) = if self.cap == 0 {
(1, Layout::array::<T>(1).unwrap())
} else {
// This can't overflow since self.cap <= isize::MAX.
let new_cap = 2 * self.cap;
// `Layout::array` checks that the number of bytes is <= usize::MAX,
// but this is redundant since old_layout.size() <= isize::MAX,
// so the `unwrap` should never fail.
let new_layout = Layout::array::<T>(new_cap).unwrap();
(new_cap, new_layout)
};
// Ensure that the new allocation doesn't exceed `isize::MAX` bytes.
assert!(new_layout.size() <= isize::MAX as usize, "Allocation too large");
let new_ptr = if self.cap == 0 {
unsafe { alloc::alloc(new_layout) }
} else {
let old_layout = Layout::array::<T>(self.cap).unwrap();
let old_ptr = self.ptr.as_ptr() as *mut u8;
unsafe { alloc::realloc(old_ptr, old_layout, new_layout.size()) }
};
// If allocation fails, `new_ptr` will be null, in which case we abort.
self.ptr = match NonNull::new(new_ptr as *mut T) {
Some(p) => p,
None => alloc::handle_alloc_error(new_layout),
};
self.cap = new_cap;
}
}
fn main() {}
9.3 push和pop
我们可以初始化,我们也可以分配内存。现在我们开始实现一些真正的功能!我们就从push
开始吧。它要做的事情就是检查空间是否已满,满了就扩容,然后写数据到下一个索引位置,最后增加长度。
写数据时,我们一定要小心,不要计算我们要写入的内存位置的值。最坏的情况,那块内存是一块未初始化的内存。最好的情况是那里存着我们已经pop
出去的值。不管哪种情况,我们都不能直接索引这块内存然后解引用它,因为这样其实是把内存中的值当做了一个合法的T的实例。更糟糕的是,foo[idx] = x
会调用foo[idx]
处旧有值的drop
方法!
正确的方法是使用ptr::write
,它直接用值的二进制覆盖目标地址,不会计算任何的值。
对于push
,如果原有的长度(调用push
之前的长度)为0,那么我们就要写到第0个索引位置。所以我们应该用原有的长度做偏移。pub fn push(&mut self, elem: T) {
if self.len == self.cap { self.grow(); }
unsafe {
ptr::write(self.ptr.as_ptr().add(self.len), elem);
}
// Can't fail, we'll OOM first.
self.len += 1;
}
那么pop
是什么样的呢?尽管现在我们要访问的索引位置已经初始化了,Rust不允许我们用解引用的方式将值移出,因为那样的话整个内存都会回到未初始化状态!这时我们需要用ptr:read
,它从目标位置拷贝出二进制值,然后解析成类型T
的值。这时原有位置处的内存逻辑上是未初始化的,可实际上那里还是存在这一个正常的T
的实例。
对于pop
,如果原有长度是1,我们要读的是第0个索引位置。所以我们应该是按新的长度做偏移。pub fn pop(&mut self) -> Option<T> {
if self.len == 0 {
None
} else {
self.len -= 1;
unsafe {
Some(ptr::read(self.ptr.as_ptr().add(self.len)))
}
}
}
9.4 回收资源
接下来我们应该实现Drop
,否则就要造成大量的资源泄露了。最简单的方法是循环调用pop
直到产生None
为止,然后再回收我们的缓存。注意,当T: !Drop
的时候,调用pop
不是必须的。理论上我们可以问一问RustT
是不是need_drop
然后再省略一些pop
调用。可实际上LLVM
很擅长移除像这样的无副作用的代码,所以我们不需要再做多余的事,除非你发现LLVM
不能成功移除(在这里它能)。
在self.cap == 0
的时候,我们一定不要调用alloc::dealloc
,因为这时我们还没有实际分配过任何内存。impl<T> Drop for Vec<T> {
fn drop(&mut self) {
if self.cap != 0 {
while let Some(_) = self.pop() { }
let layout = Layout::array::<T>(self.cap).unwrap();
unsafe {
alloc::dealloc(self.ptr.as_ptr() as *mut u8, layout);
}
}
}
}
```
### 9.5 DeRef
不错!我们实现了一个成熟的小的栈。我们可以`push`、可以`pop`、也可以自动清理。但是还是有一堆的功能是我们需要的。特别是,我们已经有了一个很好的数组,但是还没有`slice`相关的功能。这非常容易解决:我们可以实现`Deref<Target=[T]>`。这样我们的`Vec`就神奇地变成了`slice`。
我们只需要使用`slice::from_raw_parts`。它能够为我们正确处理空`slice`。等到后面我们完成了零尺寸类型的支持,它们依然可以完美配合。
use std::ops::Deref;
impl
type Target = [T];
fn deref(&self) -> &[T] {
unsafe {
std::slice::from_raw_parts(self.ptr.as_ptr(), self.len)
}
}
}把`DefMut`也实现了吧:
use std::ops::DerefMut;
impl
fn deref_mut(&mut self) -> &mut [T] {
unsafe {
std::slice::from_raw_parts_mut(self.ptr.as_ptr(), self.len)
}
}
}现在我们有了`len`、`first`、`last`、`索引`、`分片`、`排序`、`iter`、`iter_mut`,以及其他所有的`slice`提供的功能。完美!
### 9.6 插入和删除
slice并没有提供插入和删除功能,接下来我们就实现它们。
插入需要把目标位置后的所有元素都向右移动1。这里我们需要用到`ptr::copy`,它就是C中的`memmove`的Rust版。它把一块内存从一个地方拷贝到另一个地方,而且可以正确处理源和目标内存区域有重叠的情况(也正是我们这里遇到的情况)。
如果我们在`i`的位置插入,我们需要把`[i .. len]`移动到`[i+1 .. len+1]`,`len`指的是插入前的值。
pub fn insert(&mut self, index: usize, elem: T) {
// 注意:<=是因为我们可以把值插到所有元素的后面
// 这种情况等同于push
assert!(index <= self.len, “index out of bounds”);
if self.cap == self.len { self.grow(); }
unsafe {
if index < self.len {
// ptr::copy(src, dest, len): “从src拷贝len个元素到dest”
ptr::copy(self.ptr.offset(index as isize),
self.ptr.offset(index as isize + 1),
self.len - index);
}
ptr::write(self.ptr.offset(index as isize), elem);
self.len += 1;
}
}
### 9.7 IntoIter
还有两个有意思的迭代器是`Vec`提供的而`slice`没有的:`into_iter`和`drain`。
`IntoIter`以值而不是引用的形式访问`Vec`,同时也是以值的形式返回元素。为了实现这一点,`IntoIter`需要获取`Vec`的分配空间的所有权。
`IntoIter`也需要`DoubleEnd`,即从两个方向读数据。从尾部读数据可以通过调用`pop`实现,但是从头读数据就困难了。我们可以调用`remove(0)`,但是它的开销太大了。我们选择直接使用`ptr::read`从`Vec`的两端拷贝数据,而完全不去改变缓存。
我们要用一个典型的C访问数组的方式来实现这一点。我们先创建两个指针,一个指向数组的开头,另一个指向结尾后面的那个元素。如果我们需要一端的元素,我们就从那一端指针指向的位置处读出值,然后把指针移动一位。当两个指针相等时,就说明迭代完成了。
注意,`next`和`next_back`中的读和偏移的顺序是相反的。对于`next_back`,指针总是指向它下一次要读的元素的后面,而`next`的指针总是指向它下一次要读的元素。为什么要这样呢?考虑一下只剩一个元素还未被读取的情况。
这时的数组像这样:
S E
[X, X, X, O, X, X, X]如果E直接指向它下一次要读的元素,我们就无法把上面的情况和所有元素都读过了的情况区分开了。
我们还需要保存`Vec`的分配空间的信息,虽然在迭代过程中我们并不关心它,但我们在`IntoIter`被`drop`的时候需要这些信息来释放空间。
所以我们要用下面这个结构体:
pub struct IntoIter
buf: NonNull
cap: usize,
start: const T,
end: const T,
_marker: PhantomData
}这是初始化的代码:
impl
pub fn into_iter(self) -> IntoIter
// Can’t destructure Vec since it’s Drop
let ptr = self.ptr;
let cap = self.cap;
let len = self.len;
// Make sure not to drop Vec since that will free the buffer
mem::forget(self);
unsafe {
IntoIter {
buf: ptr,
cap: cap,
start: ptr.as_ptr(),
end: if cap == 0 {
// can't offset off this pointer, it's not allocated!
ptr.as_ptr()
} else {
ptr.as_ptr().add(len)
},
_marker: PhantomData,
}
}
}
}这是向前迭代:
impl
type Item = T;
fn next(&mut self) -> Option
if self.start == self.end {
None
} else {
unsafe {
let result = ptr::read(self.start);
self.start = self.start.offset(1);
Some(result)
}
}
}
fn size_hint(&self) -> (usize, Option<usize>) {
let len = (self.end as usize - self.start as usize)
/ mem::size_of::<T>();
(len, Some(len))
}
}这里是向后迭代。
impl
fn nextback(&mut self) -> Option
if self.start == self.end {
None
} else {
unsafe {
self.end = self.end.offset(-1);
Some(ptr::read(self.end))
}
}
}
}因为 IntoIter 拥有其分配的所有权,所以它需要实现 Drop 来释放它。但是,它还希望实现 Drop 以删除它包含的任何未生成的元素。
impl
fn drop(&mut self) {
if self.cap != 0 {
// drop any remaining elements
for
let layout = Layout::array::
unsafe {
alloc::dealloc(self.buf.as_ptr() as
}
}
}
}
### 9.8 RawVec
我们遇到了一个很有意思的情况:我们把初始化缓存和释放内存的逻辑在Vec和IntoIter里面一模一样地写了两次。现在我们已经实现了功能,而且发现了逻辑的重复,是时候对代码做一些压缩了。
我们要抽象出(ptr, cap),并赋予它们分配、扩容和释放的逻辑:
struct RawVec
ptr: NonNull
cap: usize,
_marker: PhantomData
}
unsafe impl
unsafe impl
impl
fn new() -> Self {
assert!(mem::size_of::
RawVec {
ptr: NonNull::dangling(),
cap: 0,
_marker: PhantomData,
}
}
fn grow(&mut self) {
let (new_cap, new_layout) = if self.cap == 0 {
(1, Layout::array::<T>(1).unwrap())
} else {
// This can't overflow because we ensure self.cap <= isize::MAX.
let new_cap = 2 * self.cap;
// Layout::array checks that the number of bytes is <= usize::MAX,
// but this is redundant since old_layout.size() <= isize::MAX,
// so the `unwrap` should never fail.
let new_layout = Layout::array::<T>(new_cap).unwrap();
(new_cap, new_layout)
};
// Ensure that the new allocation doesn't exceed `isize::MAX` bytes.
assert!(new_layout.size() <= isize::MAX as usize, "Allocation too large");
let new_ptr = if self.cap == 0 {
unsafe { alloc::alloc(new_layout) }
} else {
let old_layout = Layout::array::<T>(self.cap).unwrap();
let old_ptr = self.ptr.as_ptr() as *mut u8;
unsafe { alloc::realloc(old_ptr, old_layout, new_layout.size()) }
};
// If allocation fails, `new_ptr` will be null, in which case we abort.
self.ptr = match NonNull::new(new_ptr as *mut T) {
Some(p) => p,
None => alloc::handle_alloc_error(new_layout),
};
self.cap = new_cap;
}
}
impl
fn drop(&mut self) {
if self.cap != 0 {
let layout = Layout::array::
unsafe {
alloc::dealloc(self.ptr.as_ptr() as *mut u8, layout);
}
}
}
}并将 Vec 更改如下:
pub struct Vec
buf: RawVec
len: usize,
}
impl
fn ptr(&self) -> *mut T {
self.buf.ptr.as_ptr()
}
fn cap(&self) -> usize {
self.buf.cap
}
pub fn new() -> Self {
Vec {
buf: RawVec::new(),
len: 0,
}
}
// push/pop/insert/remove largely unchanged:
// * `self.ptr.as_ptr() -> self.ptr()`
// * `self.cap -> self.cap()`
// * `self.grow() -> self.buf.grow()`
}
impl
fn drop(&mut self) {
while let Some(_) = self.pop() {}
// deallocation is handled by RawVec
}
}最后我们可以简化IntoIter:
pub struct IntoIter
_buf: RawVec
start: const T,
end: const T,
}
// next and next_back literally unchanged since they never referred to the buf
impl
fn drop(&mut self) {
// only need to ensure all our elements are read;
// buffer will clean itself up afterwards.
for _ in &mut *self {}
}
}
impl
pub fn into_iter(self) -> IntoIter
unsafe {
// need to use ptr::read to unsafely move the buf out since it’s
// not Copy, and Vec implements Drop (so we can’t destructure it).
let buf = ptr::read(&self.buf);
let len = self.len;
mem::forget(self);
IntoIter {
start: buf.ptr.as_ptr(),
end: if buf.cap == 0 {
// can't offset off of a pointer unless it's part of an allocation
buf.ptr.as_ptr()
} else {
buf.ptr.as_ptr().add(len)
},
_buf: buf,
}
}
}
}现在看起来好多了。
* `mem::forget`: 在不允许对象的`drop`析构函数的前提下,获取其所有权。
### 9.9 Drain
我们接着看看`Drain`。`Drain`和`IntoIter`基本相同,只不过它并不获取`Vec`的值,而是借用`Vec`并且不改变它的分配空间。现在我们只是先最“基本”的全范围(full-range)的版本。
use std::marker::PhantomData;
struct Drain<’a, T: ‘a> {
// 这里需要限制生命周期。我们使用&’a mut Vec
// 我们只调用pop()和remove(0)
vec: PhantomData<&’a mut Vec
start: const T,
end: const T,
}
impl<’a, T> Iterator for Drain<’a, T> {
type Item = T;
fn next(&mut self) -> Option
if self.start == self.end {
None——等一下,这个看着有点眼熟。我们需要做进一步的压缩。`IntoIter`和`Drain`有着完全一样的结构,我们把它提取出来。
struct RawValIter
start: const T,
end: const T,
}
impl
// unsafe to construct because it has no associated lifetimes.
// This is necessary to store a RawValIter in the same struct as
// its actual allocation. OK since it’s a private implementation
// detail.
unsafe fn new(slice: &[T]) -> Self {
RawValIter {
start: slice.as_ptr(),
end: if slice.len() == 0 {
// if len = 0
, then this is not actually allocated memory.
// Need to avoid offsetting because that will give wrong
// information to LLVM via GEP.
slice.as_ptr()
} else {
slice.as_ptr().add(slice.len())
}
}
}
}
// Iterator and DoubleEndedIterator impls identical to IntoIter.`IntoIter`变成了这样:
pub struct IntoIter
_buf: RawVec
iter: RawValIter
}
impl
type Item = T;
fn next(&mut self) -> Option
fn size_hint(&self) -> (usize, Option
}
impl
fn next_back(&mut self) -> Option
}
impl
fn drop(&mut self) {
for _ in &mut *self {}
}
}
impl
pub fn into_iter(self) -> IntoIter
unsafe {
let iter = RawValIter::new(&self);
let buf = ptr::read(&self.buf);
mem::forget(self);
IntoIter {
iter: iter,
_buf: buf,
}
}
}
}注意,我在设计中留下了一些小后门,以便更简单地将`Drain`升级为可访问任意子范围的版本。特别是,我们可以在`drop`中让`RawValIter`遍历它自己。但是这种设计不适用于更复杂的`Drain`。我们还使用一个`slice`简化`Drain`的初始化。
好了,现在Drain变得很简单:
use std::marker::PhantomData;
pub struct Drain<’a, T: ‘a> {
vec: PhantomData<&’a mut Vec
iter: RawValIter
}
impl<’a, T> Iterator for Drain<’a, T> {
type Item = T;
fn next(&mut self) -> Option
fn size_hint(&self) -> (usize, Option
}
impl<’a, T> DoubleEndedIterator for Drain<’a, T> {
fn next_back(&mut self) -> Option
}
impl<’a, T> Drop for Drain<’a, T> {
fn drop(&mut self) {
for _ in &mut *self {}
}
}
impl
pub fn drain(&mut self) -> Drain
unsafe {
let iter = RawValIter::new(&self);
// this is a mem::forget safety thing. If Drain is forgotten, we just
// leak the whole Vec's contents. Also we need to do this *eventually*
// anyway, so why not do it now?
self.len = 0;
Drain {
iter: iter,
vec: PhantomData,
}
}
}
}
### 9.10 处理零尺寸类型
是时候和零尺寸类型开战了。安全Rust并不需要关心这个,但是`Vec`大量的依赖裸指针和内存分配,这些都需要零尺寸类型。我们要小心两件事情:
* 当给分配器API传递分配尺寸为0时,会导致未定义行为
* 对零尺寸类型的裸指针做`offset`是一个no-op,这会破坏我们的C-style指针迭代器。
幸好我们把指针迭代器和内存分配逻辑抽象出来放在RawValIter和RawVec中了。真是太方便了。
###### 9.10.1 为零尺寸类型分配空间
如果分配器API不支持分配大小为0的空间,那么我们究竟储存了些什么呢?当然是`NonNull::dangling()`了!基本上所有关于`ZST`的操作都是no-op,因为`ZST`只有一个值,不需要储存或加载任何的状态。这也同样适用于`ptr::read`和`ptr::write`:它们根本不会看那个指针一眼。所以我们并不需要修改指针。
注意,我们之前的分配代码依赖于`OOM`会先于数值溢出出现的假设,对于零尺寸类型不再有效了。我们必须显式地保证cap的值在`ZST`的情况下不会溢出。
基于现在的架构,我们需要写3处保护代码,`RawVec`的三个方法每个都有一处。
impl
fn new() -> Self {
// !0就是usize::MAX。这段分支代码在编译期就可以计算出结果。
let cap = if mem::size_of::
// `NonNull::dangling()` doubles as "unallocated" and "zero-sized allocation"
RawVec {
ptr: NonNull::dangling(),
cap: cap,
_marker: PhantomData,
}
}
fn grow(&mut self) {
// since we set the capacity to usize::MAX when T has size 0,
// getting to here necessarily means the Vec is overfull.
assert!(mem::size_of::<T>() != 0, "capacity overflow");
let (new_cap, new_layout) = if self.cap == 0 {
(1, Layout::array::<T>(1).unwrap())
} else {
// This can't overflow because we ensure self.cap <= isize::MAX.
let new_cap = 2 * self.cap;
// `Layout::array` checks that the number of bytes is <= usize::MAX,
// but this is redundant since old_layout.size() <= isize::MAX,
// so the `unwrap` should never fail.
let new_layout = Layout::array::<T>(new_cap).unwrap();
(new_cap, new_layout)
};
// Ensure that the new allocation doesn't exceed `isize::MAX` bytes.
assert!(new_layout.size() <= isize::MAX as usize, "Allocation too large");
let new_ptr = if self.cap == 0 {
unsafe { alloc::alloc(new_layout) }
} else {
let old_layout = Layout::array::<T>(self.cap).unwrap();
let old_ptr = self.ptr.as_ptr() as *mut u8;
unsafe { alloc::realloc(old_ptr, old_layout, new_layout.size()) }
};
// If allocation fails, `new_ptr` will be null, in which case we abort.
self.ptr = match NonNull::new(new_ptr as *mut T) {
Some(p) => p,
None => alloc::handle_alloc_error(new_layout),
};
self.cap = new_cap;
}
}
impl
fn drop(&mut self) {
let elem_size = mem::size_of::
if self.cap != 0 && elem_size != 0 {
unsafe {
alloc::dealloc(
self.ptr.as_ptr() as *mut u8,
Layout::array::<T>(self.cap).unwrap(),
);
}
}
}
}就是这样。我们现在已经支持`push`和`pop`零尺寸类型了。但是迭代器(slice未提供的)还不能工作。
###### 9.10.2 迭代零尺寸类型
offset 0是一个no-op。这意味着我们的start和end总是会被初始化为相同的值,我们的迭代器也无法产生任何的东西。当前的解决方案是把指针转换为整数,增加他们的值,然后再转换回来:
impl
unsafe fn new(slice: &[T]) -> Self {
RawValIter {
start: slice.asptr(),
end: if mem::size_of::
((slice.as_ptr() as usize) + slice.len()) as *const
} else if slice.len() == 0 {
slice.asptr()
} else {
slice.as_ptr().add(slice.len())
},
}
}
}现在我们有了一个新的bug。我们成功地让迭代器从完全不运行,变成了永远不停地运行。我们需要在迭代器的实现中玩同样的把戏。同时,size_hint在ZST的情况下会出现除数为0的问题。因为我们假设这两个指针都指向某个字节,我们在除数为0的情况下直接将除数变为1。
impl
type Item = T;
fn next(&mut self) -> Option
if self.start == self.end {
None
} else {
unsafe {
let result = ptr::read(self.start);
self.start = if mem::size_of::
(self.start as usize + 1) as *const
} else {
self.start.offset(1)
};
Some(result)
}
}
}
fn size_hint(&self) -> (usize, Option<usize>) {
let elem_size = mem::size_of::<T>();
let len = (self.end as usize - self.start as usize)
/ if elem_size == 0 { 1 } else { elem_size };
(len, Some(len))
}
}
impl
fn nextback(&mut self) -> Option
if self.start == self.end {
None
} else {
unsafe {
self.end = if mem::size_of::
(self.end as usize - 1) as *const
} else {
self.end.offset(-1)
};
Some(ptr::read(self.end))
}
}
}
}很好,迭代器也可以工作了。
### 9.11 最终代码
use std::alloc::{self, Layout};
use std::marker::PhantomData;
use std::mem;
use std::ops::{Deref, DerefMut};
use std::ptr::{self, NonNull};
struct RawVec
ptr: NonNull
cap: usize,
_marker: PhantomData
}
unsafe impl
unsafe impl
impl
fn new() -> Self {
// !0 is usize::MAX. This branch should be stripped at compile time.
let cap = if mem::size_of::
// `NonNull::dangling()` doubles as "unallocated" and "zero-sized allocation"
RawVec {
ptr: NonNull::dangling(),
cap: cap,
_marker: PhantomData,
}
}
fn grow(&mut self) {
// since we set the capacity to usize::MAX when T has size 0,
// getting to here necessarily means the Vec is overfull.
assert!(mem::size_of::<T>() != 0, "capacity overflow");
let (new_cap, new_layout) = if self.cap == 0 {
(1, Layout::array::<T>(1).unwrap())
} else {
// This can't overflow because we ensure self.cap <= isize::MAX.
let new_cap = 2 * self.cap;
// `Layout::array` checks that the number of bytes is <= usize::MAX,
// but this is redundant since old_layout.size() <= isize::MAX,
// so the `unwrap` should never fail.
let new_layout = Layout::array::<T>(new_cap).unwrap();
(new_cap, new_layout)
};
// Ensure that the new allocation doesn't exceed `isize::MAX` bytes.
assert!(
new_layout.size() <= isize::MAX as usize,
"Allocation too large"
);
let new_ptr = if self.cap == 0 {
unsafe { alloc::alloc(new_layout) }
} else {
let old_layout = Layout::array::<T>(self.cap).unwrap();
let old_ptr = self.ptr.as_ptr() as *mut u8;
unsafe { alloc::realloc(old_ptr, old_layout, new_layout.size()) }
};
// If allocation fails, `new_ptr` will be null, in which case we abort.
self.ptr = match NonNull::new(new_ptr as *mut T) {
Some(p) => p,
None => alloc::handle_alloc_error(new_layout),
};
self.cap = new_cap;
}
}
impl
fn drop(&mut self) {
let elem_size = mem::size_of::
if self.cap != 0 && elem_size != 0 {
unsafe {
alloc::dealloc(
self.ptr.as_ptr() as *mut u8,
Layout::array::<T>(self.cap).unwrap(),
);
}
}
}
}
pub struct Vec
buf: RawVec
len: usize,
}
impl
fn ptr(&self) -> *mut T {
self.buf.ptr.as_ptr()
}
fn cap(&self) -> usize {
self.buf.cap
}
pub fn new() -> Self {
Vec {
buf: RawVec::new(),
len: 0,
}
}
pub fn push(&mut self, elem: T) {
if self.len == self.cap() {
self.buf.grow();
}
unsafe {
ptr::write(self.ptr().add(self.len), elem);
}
// Can't overflow, we'll OOM first.
self.len += 1;
}
pub fn pop(&mut self) -> Option<T> {
if self.len == 0 {
None
} else {
self.len -= 1;
unsafe { Some(ptr::read(self.ptr().add(self.len))) }
}
}
pub fn insert(&mut self, index: usize, elem: T) {
assert!(index <= self.len, "index out of bounds");
if self.cap() == self.len {
self.buf.grow();
}
unsafe {
ptr::copy(
self.ptr().add(index),
self.ptr().add(index + 1),
self.len - index,
);
ptr::write(self.ptr().add(index), elem);
self.len += 1;
}
}
pub fn remove(&mut self, index: usize) -> T {
assert!(index < self.len, "index out of bounds");
unsafe {
self.len -= 1;
let result = ptr::read(self.ptr().add(index));
ptr::copy(
self.ptr().add(index + 1),
self.ptr().add(index),
self.len - index,
);
result
}
}
pub fn into_iter(self) -> IntoIter<T> {
unsafe {
let iter = RawValIter::new(&self);
let buf = ptr::read(&self.buf);
mem::forget(self);
IntoIter {
iter: iter,
_buf: buf,
}
}
}
pub fn drain(&mut self) -> Drain<T> {
unsafe {
let iter = RawValIter::new(&self);
// this is a mem::forget safety thing. If Drain is forgotten, we just
// leak the whole Vec's contents. Also we need to do this *eventually*
// anyway, so why not do it now?
self.len = 0;
Drain {
iter: iter,
vec: PhantomData,
}
}
}
}
impl
fn drop(&mut self) {
while let Some(_) = self.pop() {}
// deallocation is handled by RawVec
}
}
impl
type Target = [T];
fn deref(&self) -> &[T] {
unsafe { std::slice::from_raw_parts(self.ptr(), self.len) }
}
}
impl
fn deref_mut(&mut self) -> &mut [T] {
unsafe { std::slice::from_raw_parts_mut(self.ptr(), self.len) }
}
}
struct RawValIter
start: const T,
end: const T,
}
impl
unsafe fn new(slice: &[T]) -> Self {
RawValIter {
start: slice.asptr(),
end: if mem::size_of::
((slice.as_ptr() as usize) + slice.len()) as *const
} else if slice.len() == 0 {
slice.as_ptr()
} else {
slice.as_ptr().add(slice.len())
},
}
}
}
impl
type Item = T;
fn next(&mut self) -> Option
if self.start == self.end {
None
} else {
unsafe {
let result = ptr::read(self.start);
self.start = if mem::sizeof::
(self.start as usize + 1) as *const
} else {
self.start.offset(1)
};
Some(result)
}
}
}
fn size_hint(&self) -> (usize, Option<usize>) {
let elem_size = mem::size_of::<T>();
let len = (self.end as usize - self.start as usize) /
if elem_size == 0 { 1 } else { elem_size };
(len, Some(len))
}
}
impl
fn nextback(&mut self) -> Option
if self.start == self.end {
None
} else {
unsafe {
self.end = if mem::size_of::
(self.end as usize - 1) as *const
} else {
self.end.offset(-1)
};
Some(ptr::read(self.end))
}
}
}
}
pub struct IntoIter
_buf: RawVec
iter: RawValIter
}
impl
type Item = T;
fn next(&mut self) -> Option
self.iter.next()
}
fn size_hint(&self) -> (usize, Option
self.iter.size_hint()
}
}
impl
fn next_back(&mut self) -> Option
self.iter.next_back()
}
}
impl
fn drop(&mut self) {
for _ in &mut *self {}
}
}
pub struct Drain<’a, T: ‘a> {
vec: PhantomData<&’a mut Vec
iter: RawValIter
}
impl<’a, T> Iterator for Drain<’a, T> {
type Item = T;
fn next(&mut self) -> Option
self.iter.next()
}
fn size_hint(&self) -> (usize, Option
self.iter.size_hint()
}
}
impl<’a, T> DoubleEndedIterator for Drain<’a, T> {
fn next_back(&mut self) -> Option
self.iter.next_back()
}
}
impl<’a, T> Drop for Drain<’a, T> {
fn drop(&mut self) {
// pre-drain the iter
for _ in &mut *self {}
}
}
fn main() {
tests::create_push_pop();
tests::iter_test();
tests::test_drain();
tests::test_zst();
println!(“All tests finished OK”);
}
mod tests {
use super::*;
pub fn create_push_pop() {
let mut v = Vec::new();
v.push(1);
assert_eq!(1, v.len());
assert_eq!(1, v[0]);
for i in v.iter_mut() {
*i += 1;
}
v.insert(0, 5);
let x = v.pop();
assert_eq!(Some(2), x);
assert_eq!(1, v.len());
v.push(10);
let x = v.remove(0);
assert_eq!(5, x);
assert_eq!(1, v.len());
}
pub fn iter_test() {
let mut v = Vec::new();
for i in 0..10 {
v.push(Box::new(i))
}
let mut iter = v.into_iter();
let first = iter.next().unwrap();
let last = iter.next_back().unwrap();
drop(iter);
assert_eq!(0, *first);
assert_eq!(9, *last);
}
pub fn test_drain() {
let mut v = Vec::new();
for i in 0..10 {
v.push(Box::new(i))
}
{
let mut drain = v.drain();
let first = drain.next().unwrap();
let last = drain.next_back().unwrap();
assert_eq!(0, *first);
assert_eq!(9, *last);
}
assert_eq!(0, v.len());
v.push(Box::new(1));
assert_eq!(1, *v.pop().unwrap());
}
pub fn test_zst() {
let mut v = Vec::new();
for _i in 0..10 {
v.push(())
}
let mut count = 0;
for _ in v.into_iter() {
count += 1
}
assert_eq!(10, count);
}
}
十、实现Arc和Mutex
---
### 10.1 Arc
我们将实现一个更简单的`std::sync::Arc`
###### 10.1.1 布局
先进行布局Arc。
一个`Arc<T>`为`T`类型的值提供了线程安全的共享所有权,并在堆中分配。在 Rust 中,**共享意味着不变性**,所以我们不需要设计任何东西来管理对该值的访问,对吧?虽然像 `Mutex` 这样的内部可变性类型允许 `Arc` 的用户创建共享可变性,但 `Arc` 本身并不需要关注这些问题。
然而,有一个地方 `Arc` 需要关注可变:**销毁**。当 `Arc` 的所有所有者都销毁时,我们需要能够`drop`其内容并释放其分配。所以我们需要一种方法让所有者知道它是否是最后一个所有者,而最简单的方法就是对所有者进行计数——**引用计数**。
不幸的是,**这种引用计数本质上是共享的可变状态**,所以 `Arc` 需要考虑同步问题。我们可以为此使用 `Mutex`,但那太过于杀鸡用牛刀了。相反,我们将使用 `atomics`。既然每个人都需要一个指向 `T` 的分配的指针,我们也可以把引用计数放在同一个分配中。
直观地说,它看起来就像这样:
fn main() {
use std::sync::atomic;
pub struct Arc
ptr: *mut ArcInner
}
pub struct ArcInner
rc: atomic::AtomicUsize,
data: T,
}这可以编译通过,然而它是不正确的。首先,编译器会给我们太严格的**可变性**。例如,在期望使用`Arc<&'a str>`的地方不能使用`Arc<&'static str>`。更重要的是,它将给 `drop checker` 提供不正确的所有权信息,因为它将假定我们不拥有任何`T`类型的值。由于这是一个提供值的共享所有权的结构,在某些时候会有一个完全拥有其数据的结构实例。了解关于变异和 `drop checker` 的所有细节。
为了解决第一个问题,我们可以使用`NonNull<T>`。请注意,`NonNull<T>`是一个围绕原始指针的包装,并声明以下内容:
* 我们是`T`的协变
* 我们的指针从不为空
为了解决第二个问题,我们可以包含一个包含`ArcInner<T>`的`PhantomData`标记。这将告诉 `drop checker`,我们对`ArcInner<T>`(它本身包含`T`)的值有一些所有权的概念。
通过这些改变,我们得到了最终的结构:
use std::marker::PhantomData;
use std::ptr::NonNull;
use std::sync::atomic::AtomicUsize;
pub struct Arc
ptr: NonNull
phantom: PhantomData
}
pub struct ArcInner
rc: AtomicUsize,
data: T,
}
###### 10.1.2 基本代码
现在我们已经确定了实现`Arc`的布局,让我们开始写一些基本代码。
**构建 Arc**
我们首先需要一种方法来构造一个`Arc<T>`。
这很简单,因为我们只需要把`ArcInner<T>`扔到一个 `Box` 里并得到一个`NonNull<T>`的指针。
impl
pub fn new(data: T) -> Arc
// 当前的指针就是第一个引用,因此初始时设置 count 为 1
let boxed = Box::new(ArcInner {
rc: AtomicUsize::new(1),
data,
});
Arc {
// 我们从 Box::into_raw 得到该指针,因此使用 .unwrap()
是完全可行的
ptr: NonNull::new(Box::into_raw(boxed)).unwrap(),
phantom: PhantomData,
}
}
}
**Send 和 Sync**
由于我们正在构建并发原语,因此我们需要能够跨线程发送它。因此,我们可以实现`Send`和`Sync`标记特性。
这是没问题的,因为:
* 当且仅当你拥有唯一的 `Arc` 引用时,你才能获得其引用数据的可变引用(这仅发生在`Drop`中)
* 我们使用原子操作进行共享可变引用计数
unsafe impl
unsafe impl我们需要约束`T: Sync + Send`,因为如果我们不提供这些约束,就有可能通过`Arc`跨越线程边界共享不安全的值,这有可能导致数据竞争或不可靠。
例如,如果没有这些约束,`Arc<Rc<u32>>`将是`Sync + Send`,这意味着你可以从`Arc`中克隆出`Rc`来跨线程发送(不需要创建一个全新的`Rc`),这将产生数据竞争,因为`Rc`不是线程安全的.
**获取ArcInner**
为了将`NonNull<T>`指针解引用为`T`,我们可以调用`NonNull::as_ref`。这是不安全的,与普通的`as_ref`函数不同,所以我们必须这样调用它。
unsafe { self.ptr.as_ref() }在这段代码中,我们将多次使用这个片段(通常与相关的`let`绑定)。
这种不安全是没问题的,因为当这个`Arc`存活的时候,我们可以保证内部指针是有效的。
**Deref**
好了。现在我们可以制作`Arc`了(很快就能正确地克隆和销毁它们),但是我们怎样才能获得里面的数据呢?
我们现在需要的是一个`Deref`的实现。
我们需要导入该 Trait:
use std::ops::Deref;这里是实现:
impl
type Target = T;
fn deref(&self) -> &T {
let inner = unsafe { self.ptr.as_ref() };
&inner.data
}
}看着很简单,对不?这只是解除了对ArcInner<T>的NonNull指针的引用,然后得到了对里面数据的引用。
**代码**
下面是本节的所有代码。
use std::ops::Deref;
impl
pub fn new(data: T) -> Arc
// 当前的指针就是第一个引用,因此初始时设置 count 为 1
let boxed = Box::new(ArcInner {
rc: AtomicUsize::new(1),
data,
});
Arc {
// 我们从 Box::into_raw 得到该指针,因此使用 .unwrap()
是完全可行的
ptr: NonNull::new(Box::into_raw(boxed)).unwrap(),
phantom: PhantomData,
}
}
}
unsafe impl
unsafe impl
impl
type Target = T;
fn deref(&self) -> &T {
let inner = unsafe { self.ptr.as_ref() };
&inner.data
}
}
###### 10.1.3 克隆
现在我们已经有了一些基本的代码,我们需要一种方法来克隆`Arc`。
我们大致需要:
1. 递增原子引用计数
2. 从内部指针构建一个新的Arc实例
首先,我们需要获得对ArcInner的访问。
let inner = unsafe { self.ptr.as_ref() };我们可以通过以下方式更新原子引用计数:
let old_rc = inner.rc.fetch_add(1, Ordering::???);但是我们在这里应该使用什么顺序?我们实际上没有任何代码在克隆时需要原子同步,因为我们在克隆时不修改内部值。因此,我们可以在这里使用 `Relaxed` 顺序,这意味着没有 `happen-before` 的关系,但却是原子性的。然而,当`Drop Arc` 时,我们需要在递减引用计数时进行原子同步。这在关于`Arc`的`Drop`实现部分中有更多描述。
因此,代码变成了这样:
let old_rc = inner.rc.fetch_add(1, Ordering::Relaxed);我们需要增加一个导入来使用Ordering。
use std::sync::atomic::Ordering;然而,我们现在的这个实现有一个问题:如果有人决定`mem::forget`一堆 `Arc` 怎么办?到目前为止,我们所写的代码(以及将要写的代码)假设引用计数准确地描绘了内存中的 `Arc` 的数量,但在`mem::forget`的情况下,这是错误的。因此,当越来越多的 `Arc` 从这个 `Arc` 中克隆出来,而它们又没有被Drop和参考计数被递减时,我们就会溢出!这将导致释放后使用`(use-after-free)`。这是非常糟糕的事情!
为了处理这个问题,我们需要检查引用计数是否超过某个任意值(低于`usize::MAX`,因为我们把引用计数存储为`AtomicUsize`),并做一些防御。
标准库的实现决定,如果任何线程上的引用计数达到`isize::MAX`(大约是`usize::MAX`的一半),就直接中止程序(因为在正常代码中这是非常不可能的情况,如果它发生,程序可能是非常有问题的)。基于的假设是,不应该有大约 20 亿个线程(或者在一些 64 位机器上大约9万亿个)在同时增加引用计数。这就是我们要做的。
实现这种行为是非常简单的。
if old_rc >= isize::MAX as usize {
std::process::abort();
}然后,我们需要返回一个新的`Arc`的实例。
Self {
ptr: self.ptr,
phantom: PhantomData
}现在,让我们把这一切包在`Clone`的实现中。
use std::sync::atomic::Ordering;
impl
fn clone(&self) -> Arc
let inner = unsafe { self.ptr.as_ref() };
// 我们没有修改 Arc 中的数据,因此在这里不需要任何原子的同步操作,
// 使用 relax 这种排序方式也就完全可行了
let old_rc = inner.rc.fetch_add(1, Ordering::Relaxed);
if old_rc >= isize::MAX as usize {
std::process::abort();
}
Self {
ptr: self.ptr,
phantom: PhantomData,
}
}
}
###### 10.1.4 销毁
我们现在需要一种方法来减少引用计数,并在计数足够低时丢弃数据,否则数据将永远存在于堆中。
为了做到这一点,我们可以实现`Drop`。
我们大致需要:
1. 递减引用计数
2. 如果数据只剩下一个引用,那么:
3. 原子化地对数据进行屏障,以防止对数据的使用和删除进行重新排序
4. 丢弃内部数据
首先,我们需要获得对`ArcInner`的访问:
let inner = unsafe { self.ptr.as_ref() };现在,我们需要递减引用计数。为了简化我们的代码,如果从`fetch_sub`返回的值(递减引用计数之前的值)不等于1,我们可以直接返回(我们不是数据的最后一个引用)。
if inner.rc.fetch_sub(1, Ordering::Release) != 1 {
return;
}然后我们需要创建一个原子屏障来防止重新排序使用数据和删除数据。正如标准库对Arc的实现中所述。
> 需要这个内存屏障来防止数据使用的重新排序和数据的删除。因为它被标记为“Release”,引用计数的减少与“Acquire”屏障同步。这意味着数据的使用发生在减少引用计数之前,而减少引用计数发生在这个屏障之前,而屏障发生在数据的删除之前。(译者注:use < decrease < 屏障 < delete)
> 强制要求一个线程中对该对象的任何可能的访问(通过现有的引用)发生在不同线程中删除该对象之前是很重要的。这可以通过在丢弃一个引用后的“Release”操作来实现(任何通过该引用对对象的访问显然必须在之前发生),以及在删除对象前的“Acquire”操作。
> 特别是,虽然 Arc 的内容通常是不可改变的,但有可能对类似`Mutex<T>`的东西进行内部可变。由于`Mutex`在被删除时不会被获取,我们不能依靠它的同步逻辑来使线程 A 的写操作对线程 B 的析构器可见。
> 还要注意的是,这里的 `Acquire fence` 可能可以用 `Acquire load` 来代替,这可以在高度竞争的情况下提高性能。
为了做到这一点,我们可以这么做:
use std::sync::atomic;
atomic::fence(Ordering::Acquire);最后,我们可以 `drop` 数据本身。我们使用`Box::from_raw`来丢弃 `Box` 中的`ArcInner<T>`和它的数据。这需要一个`*mut T`而不是`NonNull<T>`,所以我们必须使用`NonNull::as_ptr`进行转换。
unsafe { Box::from_raw(self.ptr.as_ptr()); }这是安全的,因为我们知道我们拥有的是最后一个指向`ArcInner`的指针,而且其指针是有效的。
现在,让我们在`Drop`的实现中把这一切整合起来。
impl
fn drop(&mut self) {
let inner = unsafe { self.ptr.as_ref() };
if inner.rc.fetch_sub(1, Ordering::Release) != 1 {
return;
}
// 我们需要防止针对 inner 的使用和删除的重排序,
// 因此使用 fence 来进行保护是非常有必要的
atomic::fence(Ordering::Acquire);
// 安全保证:我们知道这是最后一个对 ArcInner 的引用,并且这个指针是有效的
unsafe { Box::from_raw(self.ptr.as_ptr()); }
}
}
###### 10.1.5 最终代码
这就是我们的最终代码,我在这里加了一些额外的注释并排序了一下 `imports`:
use std::marker::PhantomData;
use std::ops::Deref;
use std::ptr::NonNull;
use std::sync::atomic::{self, AtomicUsize, Ordering};
pub struct Arc
ptr: NonNull
phantom: PhantomData
}
pub struct ArcInner
rc: AtomicUsize,
data: T,
}
impl
pub fn new(data: T) -> Arc
// 当前的指针就是第一个引用,因此初始时设置 count 为 1
let boxed = Box::new(ArcInner {
rc: AtomicUsize::new(1),
data,
});
Arc {
// 我们从 Box::into_raw 得到该指针,因此使用 .unwrap()
是完全可行的
ptr: NonNull::new(Box::into_raw(boxed)).unwrap(),
phantom: PhantomData,
}
}
}
unsafe impl
unsafe impl
impl
type Target = T;
fn deref(&self) -> &T {
let inner = unsafe { self.ptr.as_ref() };
&inner.data
}
}
impl
fn clone(&self) -> Arc
let inner = unsafe { self.ptr.as_ref() };
// 我们没有修改 Arc 中的数据,因此在这里不需要任何原子的同步操作,
// 使用 relax 这种排序方式也就完全可行
let old_rc = inner.rc.fetch_add(1, Ordering::Relaxed);
if old_rc >= isize::MAX as usize {
std::process::abort();
}
Self {
ptr: self.ptr,
phantom: PhantomData,
}
}
}
impl
fn drop(&mut self) {
let inner = unsafe { self.ptr.as_ref() };
if inner.rc.fetch_sub(1, Ordering::Release) != 1 {
return;
}
// 我们需要防止针对 inner 的使用和删除的重排序
// 因此使用 fence 来进行保护是非常有必要
atomic::fence(Ordering::Acquire);
// 安全保证:我们知道这是最后一个对 ArcInner 的引用,并且这个指针是有效的
unsafe { Box::from_raw(self.ptr.as_ptr()); }
}
}
```