Rust-高级-异步之futures

一、异步

面对异步编程,线程是其核心,但是线程的问题也很明显:线程切换开销大,且资源浪费(主要集中在内存上)

Rust 线程

这里使用一个需求来了解Rust 线程:

use std::thread::{sleep};
use std::time::Duration;

fn sleepus() {
for i in 1..=10 {
println!("Sleepus {}", i);
sleep(Duration::from_millis(500));
}
}

fn interruptus() {
for i in 1..=5 {
println!("Interruptus {}", i);
sleep(Duration::from_millis(1000));
}
}

fn main() {
sleepus();
interruptus();
}

上面的代码只能从上到下的过程运行,先打印Sleep,后打印Interruptus

需要实现交织显示,则需要加入线程:

use std::thread::{sleep, spawn};

fn main() {
let sleepus = spawn(sleepus);
let interruptus = spawn(interruptus);

sleepus.join().unwrap();
interruptus.join().unwrap();
}

使用spawn(sleepus)而不是spawn(sleepus())来创建线程。后者将 立即执行sleepus()然后将其执行结果传给spawn,这不是我们期望的- 我在主函数种使用join()来等待子线程结束,并使用unwrap()来处理可能发生的故障。

不过这两种方法都不是异步解决方案!我们使用两个由操作系统管理的线程来并发执行两个同步任务!

基本要素

编写异步的应用,至少需要俩个crate:

  • futures:这个是Rust官方团队提供维护的crate.
  • 异步代码运行时crate: 可以自己选择,比如:Tokio, async_std, smol等等.

下面依赖是面向异步编程的。

futures = { version = "0.3.*" }

二、Future

Future 字面的意思就是未来发生的事情,在程序中则代表了一系列暂时没有结果的运算子,Future需要程序主动去poll(轮询)才能获取到最终的结果,每一次轮询的结果可能是Ready或者Pending。

当Ready的时候,证明当前Future已完成,代码逻辑可以向下执行;当Pending的时候,代表当前Future并未执行完成,代码不能向下执行,看到这里就要问了,那什么时候才能向下执行呢,这里的关键在于Runtime中的Executor需要不停的去执行Future的poll操作,直至Future返回Ready可以向下执行为止。这里和Epoll非常相像。

通常实现future有两种模式,一种基于推模式,也被称为基于完成的模式,一种基于拉模式,也被称为基于就绪的模式。Rust的future库实现了基于拉模式的future。

简单的例子

use futures::executor::block_on;
use std::thread::{sleep, spawn};
use std::time::Duration;

async fn sleepus() {
for i in 1..=10 {
println!("Sleepus {}", i);
sleep(Duration::from_millis(500));
}
}

async fn interruptus() {
for i in 1..=5 {
println!("Interruptus {}", i);
sleep(Duration::from_millis(1000));
}
}

fn main() {
block_on(sleepus());
block_on(interruptus());
}

三、Future结构

先看看Future的结构:

pub enum Poll<T> {
Ready(T),
Pending,
}

pub trait Future {
type Output;

fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output>;
}

Output代表了Future返回的值的类型,poll方法可以返回一个Poll类型,Poll类型是一个Enum,包装了Ready和Pending两种状态。

Context

Context提供了对Future进行调度的功能。目前Context作为一个结构体,有一个核心成员Waker,用来唤醒绑定的Future. 未来不排除在Context添加新的字段。

pub struct Context<'a> {
waker: &'a Waker,
_marker: PhantomData<fn(&'a ()) -> &'a ()>,
}

pub struct Waker {
waker: RawWaker,
}

impl Waker {
pub fn wake(self) {}
pub fn wake_by_ref(&self) {}
pub fn will_wake(&self, other: &Waker) -> bool {}

pub unsafe fn from_raw(waker: RawWaker) -> Waker {}
}

pub struct RawWaker {
data: *const (),
vtable: &'static RawWakerVTable,
}

pub struct RawWakerVTable {
clone: unsafe fn(*const ()) -> RawWaker,
wake: unsafe fn(*const ()),
wake_by_ref: unsafe fn(*const ()),
drop: unsafe fn(*const ()),
}

Pin

修饰指针,保证指针指向的值将不被移动。解决了生命周期遇到await的情况。
如:Pin>
Pin是对指针P的限制,无法通过Safe的方式获取到& mut T。但是如果在std::marker中,如果Self满足Unpin,就可以通过Safe的方式获取到& mut T。

Runtime

Runtime 由两部分组成,Executor和Reactor。

  • Executor为执行器,没有任何阻塞的等待,循环执行一系列就绪的Future,当Future返回pending的时候,会将Future转移到Reactor上等待进一步的唤醒。
  • Reactor为反应器(唤醒器),轮询并唤醒挂载的事件,并执行对应的wake方法,通常来说,wake会将Future的状态变更为就绪,同时将Future放到Executor的队列中等待执行。

执行流程

github
这里的流程相对简单。但是大致描述了执行器Executor与反应器Reactor的沟通。

四、Futures crate

futures包主要提供四种抽象概念用于异步编程:

  • Futures: 是由异步计算产生的单个最终值(一些语言称之为promise),目前其一部分已合入标准库
  • Streams: 标识异步生成的一系列值
  • Sinks: 为异步写入数据提供支持
  • Executors: 程序负责运行异步任务

Streams

Future是异步开发中最基础的概念了,如果说Future代表了一次性的异步值,那么Stream则代表了一系列的异步值。

pub trait Stream {
type Item;
fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>>;
}

Stream对应了同步原语中的Iterator的概念
pub trait Iterator {
type Item;
fn next(&mut self) -> Option<Self::Item>;
}

stream用来处理数据链,也可以自行关闭(None)。

可以看到Stream中的poll_next返回的参数并不是Self::Output了,而是Option修饰过的Item。这样就有了链表的抽象。

Sinks

有了代表一次性的异步值Future, 也有了代表可重复的异步值的Stream, 因此,需要有一个代表一次或多次的异步值的通道,也就是接下来的Sink。通常来说, Sink可以来抽象网络连接的写入端,消息队列中的 Producer。

基本的sinks在写入端包含:

  • Channels
  • Sockets
  • Pipes
pub trait Sink<Item> {
type Error;

fn poll_ready(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>>;
fn start_send(self: Pin<&mut Self>, item: Item) -> Result<(), Self::Error>;
fn poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>>;
fn poll_close(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>>;
}

在Sink的上层,我们可以封装 send 以及 send_all 等方法,用来抽象对应的 Future 与 Stream。

Executors

Future代表一次性的异步值,Stream类似迭代器可以迭代多个异步值,Sink用于发送这些异步值,future是任务,而真正执行任务需要executor来承载(比较常用的block_on方法,当然也可以使用await)。

Executor的实现可以是单线程与线程池两个版本,两种实现间各有优劣,单线程少了数据的竞争,但是吞吐量却容易达到瓶颈,线程池的实现可以提高吞吐量,但是却要处理数据的竞争冲突。

task

他是一个执行单元,利用状态机完成各个状态的转换(pending、ready)。

组合子

上面定义了实现异步的最基本概念,Future, Stream以及Sink。
但是他们只能帮助我们处理单个Future,那如何面对复杂的项目逻辑中的多Future逻辑处理,就需要通过组合子。

五、构建一个Future

我们模拟一个futures工作模式,里面需要一个runner、future trait以及poll类型。

Runner

下面的实现相对简单:

use std::cell::RefCell;

thread_local!(static NOTIFY: RefCell<bool> = RefCell::new(true));

struct Context<'a> {
waker: &'a Waker,
}

impl<'a> Context<'a> {
// 初始化
fn from_waker(waker: &'a Waker) -> Self {
Context {waker}
}

// 获取waker参数
fn waker(&self) -> &'a Waker {
&self.waker
}
}

struct Waker;

impl Waker {
// 线程中修改字段
fn wake(&self) {
NOTIFY.with(|f| *f.borrow_mut() = true)
}
}

fn run<F>(mut f: F) -> F::Output
where F: Future
{
NOTIFY.with(|n| loop {
// 循环查看当前字段是否为true,true表示已准备好被轮询
if *n.borrow() {
// 修改当前字段为false
*n.borrow_mut() = false;

// 创建Context
let ctx = Context::from_waker(&Waker);
if let Poll::Ready(var) = f.poll(&ctx) {
return var;
}
}
})
}

run为泛型函数,F就是Future类型,返回一个内部的类型Output。

函数体为一直循环直到future返回为Ready类型。当future准备好后,就会返回Ready中的带来的数据var。

看看Poll内的定义:

enum Poll<T> {
Ready(T),
Pending,
}

Future Trait

接下来就是核心部分Future的定义了:

trait Future {
type Output;
fn poll(&mut self, ctx: &Context) -> Poll<Self::Output>;
}

返回的类型为Output,含有一个方法poll。方法中有可修改的self字段,以及Context结构体,其内部有一个对waker的引用。waker被用于提醒运行时段,future准备好再次被轮询。

实现Future

#[derive(Default)]
struct MyFuture {
count: u32,
}

impl Future for MyFuture {
type Output = u32;

fn poll(&mut self, ctx: &Context) -> Poll<Self::Output> {
match self.count {
5 => Poll::Ready(5),
_ => {
self.count += 1;
ctx.waker().wake();
Poll::Pending
}
}
}
}
  • [derive(Default)] 为这个类型自动创建一个::default()函数。数值类型(即这里的count)默认为 0。

  • struct MyFuture { count: u32 }定义了一个带有一个计数器(count)的简单结构体。这让我们能够模拟异步行为。
  • impl Future for MyFuture 是我们对这个 trait 的实现。
  • 我们把 Output 设置为i32类型,因此我们可以返回内部的计数。
  • 在我们的poll实现中,我们基于内部的 count 字段决定要做什么、
  • 如果它匹配了 33=>,我们返回一个带有值为 3 的Poll::Ready响应。
  • 在其他情况下,我们增加计数器的值并且返回Poll::Pending

运行

fn main() {
let my_future = MyFuture::default();
println!("Output: {}", run(my_future));
}

这里就完成了一个简单的支持单线程的Future

升级!Future链

struct AddOneFuture<T> (T);

impl<T> Future for AddOneFuture<T>
where T: Future, T::Output: std::ops::Add<u32, Output = u32>,
{
type Output = u32;

fn poll(&mut self, ctx: &Context) -> Poll<Self::Output> {
match self.0.poll(ctx) {
// 针对Future,进行二次加工
Poll::Ready(count) => Poll::Ready(count + 1),
// 继续等待
Poll::Pending => Poll::Pending,
}
}
}

完成上面后就可以大概知道运作内容,嵌套一层future,并对其进行二次操作。

fn main() {
let my_future = MyFuture::default();
println!("Output: {}", run(AddOneFuture(my_future)));
}

只需要几个简单步骤,就可以建立为 futures 赋予强大能力的链式函数(combinators)。

简单封装

use std::cell::RefCell;
use crate::task::*;
use crate::future::*;

thread_local!(static NOTIFY: RefCell<bool> = RefCell::new(true));

mod task {
use crate::NOTIFY;
use crate::Poll;

pub struct Context<'a> {
waker: &'a Waker,
}

impl<'a> Context<'a> {
// 初始化
pub fn from_waker(waker: &'a Waker) -> Self {
Context { waker }
}

// 获取waker参数
pub fn waker(&self) -> &'a Waker {
&self.waker
}
}

pub struct Waker;

impl Waker {
// 线程中修改字段
pub fn wake(&self) {
NOTIFY.with(|f| *f.borrow_mut() = true)
}
}
}

mod future {
use crate::task::*;
use crate::Poll;

pub trait Future {
type Output;
fn poll(&mut self, ctx: &Context) -> Poll<Self::Output>;
}

#[derive(Default)]
pub struct MyFuture {
count: u32,
}

impl Future for MyFuture {
type Output = u32;

pub fn poll(&mut self, ctx: &Context) -> Poll<Self::Output> {
match self.count {
5 => Poll::Ready(5),
_ => {
self.count += 1;
ctx.waker().wake();
Poll::Pending
}
}
}
}

pub struct AddOneFuture<T> (T);

impl<T> Future for AddOneFuture<T>
where T: Future, T::Output: std::ops::Add<u32, Output=u32>,
{
type Output = u32;

fn poll(&mut self, ctx: &Context) -> Poll<Self::Output> {
match self.0.poll(ctx) {
// 针对Future,进行二次加工
Poll::Ready(count) => Poll::Ready(count + 1),
// 继续等待
Poll::Pending => Poll::Pending,
}
}
}
}

enum Poll<T> {
Ready(T),
Pending,
}

fn run<F>(mut f: F) -> F::Output
where F: Future
{
NOTIFY.with(|n| loop {
// 循环查看当前字段是否为true,true表示已准备好被轮询
if *n.borrow() {
// 修改当前字段为false
*n.borrow_mut() = false;

// 创建Context
let ctx = Context::from_waker(&Waker);
if let Poll::Ready(var) = f.poll(&ctx) {
return var;
}
}
})
}

Future::Ready

future::ready创建了一个 future,该 future 带有传入值并且是立即就绪(ready)的。

mod future {
....

pub struct Ready<T>(Option<T>);

impl <T> Future for Ready<T> {
type Output = T;

fn poll(&mut self, ctx: &Context) -> Poll<Self::Output> {
Poll::Ready(self.0.take().unwrap())
}
}

pub fn ready<T>(var: T) -> Ready<T> {
Ready(Some(var))
}

我们创建了一个类型为Ready的泛型结构体,该结构体包装了一个Option。这里我们使用Option枚举以保证 poll 函数只被调用一次。这里通过ready函数将数据封装成Future类型。
fn main() {
let my_future = future::ready(1);
println!("Output: {}", run(my_future));
}

BLOCK_ON

把我们的 run 函数重命名为block_on

fn block_on<F>(mut f: F) -> F::Output
where
F: Future,
{
NOTIFY.with(|n| loop {
if *n.borrow() {
*n.borrow_mut() = false;
let ctx = Context::from_waker(&Waker);
if let Poll::Ready(val) = f.poll(&ctx) {
return val;
}
}
})
}

fn main() {
let my_future = future::ready(1);
println!("Output: {}", block_on(my_future));
}

组合子

一个嵌套的 future 可以由一个组合子函数函数创建,它可以有一个复杂的类型Future< Output = Future < Output = i32>>。这可以被称为一个 future,该 future 的输出(Output)是另一个 future,新的 future 的输出是 i32 类型。这样的组合子中,最简单的一个就是map。

Map

mod future {
pub trait Future {
...
fn map<U, F>(self, f: F) -> Map<Self, F>
where F: FnOnce(Self::Output) -> U,
Self: Sized,
{
Map {
future: self,
f: Some(f)
}
}
}

pub struct Map<Fut, F> {
future: Fut,
f: Option<F>,
}

impl <Fut, F, T> Future for Map<Fut, F>
where Fut: Future,
F: FnOnce(Fut::Output) -> T,
{
type Output = T;

fn poll(&mut self, ctx: &Context) -> Poll<Self::Output> {
match self.future.poll(ctx) {
Poll::Ready(var) => {
let f = self.f.take().unwrap();
Poll::Ready(f(var))
}
Poll::Pending => Poll::Pending,
}
}
}
}

上述代码较长。

  • 定义了Map结构体,用于保存当前的future和Option类型,Option类型中保存了当前后续需要运行的闭包。
  • 给Map添加Future的特性,让其具有Future性质及功能,具有三个泛型:Fut(Future类型)、F(运行的闭包)、T(闭包返回的数据类型)
  • Map特性中的poll方法,内部完成了对当前future调用poll方法并获取返回值,如果是Ready,则获取其值并传递给闭包函数处理,且进行二次封装为Ready类型。
  • Future trait中定义的map方法,定义了其接收一个闭包,并和自己的future数据统一封装成Map结构体。
  • FnOnce是函数 trait 中的一个,其他还包括FnMut和Fn。FnOnce是用起来最简单的,因为编译器可以保证这个函数只被调用一次。它使用环境中用到的值并获取其所有权。Fn和FnMut分别以不可变和可变的方式借用环境中值的引用。所有的闭包都实现了FnOncetrait,并且其中一些没有移动值的闭包还实现了FnMut和Fntrait。
  • Self: Sized是一个约束,允许map只能被Sized的 trait 实现者调用。你不必考虑这个问题,但是确实有些类型不是Sized。

大多数组合子都遵循这个模式。

fn main() {
let my_future = future::ready(1).map(|val| val + 1);
println!("Output: {}", block_on(my_future));
}

Then

future,它是通过一个称为then的函数来实现的,该函数看起来很像映射(map),但是这个闭包应该它自己的 future。这里值得注意的是,传递给then的闭包返回的值必须是实现了Future,否则你将会得到一个编译器错误。

mod future {
trait Future {
// ...
fn then<Fut, F>(self, f: F) -> Then<Self, F>
where
F: FnOnce(Self::Output) -> Fut,
Fut: Future,
Self: Sized,
{
Then {
future: self,
f: Some(f),
}
}
}

// ...

pub struct Then<Fut, F> {
future: Fut,
f: Option<F>,
}

impl<Fut, NextFut, F> Future for Then<Fut, F>
where
Fut: Future,
NextFut: Future,
F: FnOnce(Fut::Output) -> NextFut,
{
type Output = NextFut::Output;

fn poll(&mut self, cx: &Context) -> Poll<Self::Output> {
match self.future.poll(cx) {
Poll::Ready(val) => {
let f = self.f.take().unwrap();
f(val).poll(cx)
}
Poll::Pending => Poll::Pending,
}
}
}
}

同上,这里很多地方都和map相似

  • Then类型实现的Future trait中,返回值从不限制的T变为了NextFut,并且这个类型必须为Future类型,因为我们需要保证返回的值必须具有Future特性,能够处理poll。
  • 因此这里的泛型F,返回的数据也就变为了NextFut,因为我们需要调用他并返回新的Future。
  • then方法中的函数泛型F的返回值童Fut一样为Future。
  • 在Then的实现方法poll中,我们首先运行当前传递进来的Future结构体中的poll方法,并直到Ready状态的时候,获取其值,并进行再次调用,调用的函数就是传进来的闭包,因为他的返回值必然为Future,因此,需要再次调用poll方法。因此在调用结束后,获取的值类型同上次传入的Future值类型发生了映射,因此这里的数据类型为下一个Future中定义的Output。即NextFut::Output。
fn main() {
let my_future = future::ready(1)
.map(|val| val + 1)
.then(|val| future::ready(val + 1));
println!("Output: {}", block_on(my_future));
}
Result 组合子

Poll类型里定义了成功状态、失败状态和未就绪状态。这意味着像map这种函数只有当 Poll 是就绪并且不是错误的情况下才能执行。尽管这会产生一些困扰,但是它在链接组合子并且根据成功或失败状态做决定的时候,会产生一些非常好的人体工程学(ergonomics )。

现在 future 要么是就绪或者是未就绪,对于成功或失败语义是不可知的。它们可以包含任何值,包括一个Result。我们需要实现一个新的 trait。

mod future {
//...
pub trait TryFuture {
type Ok;
type Error;

fn try_poll(self, cx: &mut Context) -> Poll<Result<Self::Ok, Self::Error>>;
}

impl<F, T, E> TryFuture for F
where
F: Future<Output = Result<T, E>>,
{
type Ok = T;
type Error = E;

fn try_poll(&mut self, cx: &Context) -> Poll<F::Output> {
self.poll(cx)
}
}
}

TryFuture是一个 trait,我们可以为任意的类型<F, T, E>实现这个 trait,其中F实现了Future trait,它的Output类型是Result<T,E>。它只有一个实现者。那个实现者定义了一个try_poll函数,该函数与Future trait 上的poll有相同的签名,它只是简单地调用了poll方法。

AndThen

下面是我们可能是根据现有工具处理的方式。

fn main() {
let files_future = get_user(1).then(|result| {
match result {
Ok(user) => get_files_for_user(user),
Err(err) => future::ready(Err(err)),
}
});

match block_on(files_future) {
Ok(files) => println!("User Files: {}", files),
Err(err) => println!("There was an error: {}", err),:w
};
}

但是假定你想要链接更多的 future,那么代码就会非常复杂。
and_then组合子将会把类型Future<Output = Result<T, E>>映射到Future<Output = Result<U, E>>,其中我们把T变为了U。
mod future {
pub trait TryFuture {
// ...

fn and_then<Fut, F>(self, f: F) -> AndThen<Self, F>
where
F: FnOnce(Self::Ok) -> Fut,
Fut: Future,
Self: Sized,
{
AndThen {
future: self,
f: Some(f),
}
}
}

// ...
pub struct AndThen<Fut, F> {
future: Fut,
f: Option<F>,
}

impl<Fut, NextFut, F> Future for AndThen<Fut, F>
where
Fut: TryFuture,
NextFut: TryFuture<Error = Fut::Error>,
F: FnOnce(Fut::Ok) -> NextFut,
{
type Output = Result<NextFut::Ok, Fut::Error>;

fn poll(&mut self, cx: &Context) -> Poll<Self::Output> {
match self.future.try_poll(cx) {
Poll::Ready(Ok(val)) => {
let f = self.f.take().unwrap();
f(val).try_poll(cx)
}
Poll::Ready(Err(err)) => Poll::Ready(Err(err)),
Poll::Pending => Poll::Pending,
}
}
}
}

事实上,这和then组合子的实现基本一致。只有一些关键的区别需要注意:

  • 函数定义在 TryFuture trait 中
  • type Output = Result<NextFut::Ok, Fut::Error>;表明 AndThen future 的输出拥有新的 future 的值类型,以及在它之前的 future 的错误类型。换句话说,如果先前的 future 的输出包含一个错误类型,那么这个闭包将不会被执行。
  • 我们调用的是try_poll而不是poll。

像这样来链接组合子的时候,它们的类型前面可能会变得很长且在编译错误信息中难以阅读。and_then函数要求 future 调用时的错误类型和由闭包返回的类型必须是相同的。

MapErr

看看下面的代码:

let my_future = future::ready(1)
.map(Ok);

这里无法编译成功,提示无法推断在枚举 Result 上声明的类型参数 E 的类型。
在编译错误的提示中可以看到下面的描述:
282 |     let my_future = future::ready(1)
| --------- consider giving `my_future` the explicit type `future::Map<future::Ready<i32>, fn(i32) -> Result<i32, E> {Result::<i32, E>::Ok}>`, where the type parameter `E` is specified
283 | // .then(|val| future::ready(val + 1))
284 | .map(Ok);
| ^^ cannot infer type for type parameter `E` declared on the enum `Result`

看到返回的Map类型的函数泛型F的返回值中没有对错误情况E泛型的描述,所以无法正常编译。

需要添加对Result类型的转换处理:

mod future {
pub trait TryFuture {
// ...

fn map_err<E, F>(self, f: F) -> MapErr<Self, F>
where
F: FnOnce(Self::Error) -> E,
Self: Sized,
{
MapErr {
future: self,
f: Some(f),
}
}
}

// ...
pub struct MapErr<Fut, F> {
future: Fut,
f: Option<F>,
}

impl<Fut, F, E> Future for MapErr<Fut, F>
where
Fut: TryFuture,
F: FnOnce(Fut::Error) -> E,
{
type Output = Result<Fut::Ok, E>;

fn poll(&mut self, cx: &Context) -> Poll<Self::Output> {
match self.future.try_poll(cx) {
Poll::Ready(result) => {
let f = self.f.take().unwrap();
Poll::Ready(result.map_err(f))
}
Poll::Pending => Poll::Pending,
}
}
}
}

fn main() {
let my_future = future::ready(1)
.map(Ok)
.map_err(|_: ()| 5);

println!("Output: {:?}", block_on(my_future));
}

这里添加该组合子对Result类型中的泛型E部分进行补充完整。

六、简单的例子

使用tokio的包:

[dependencies]
tokio = { version = "1", features = ["full"] }

future写法

实现Future trait

#[derive(Default)]
struct Test {
data: u32,
}

impl Future for Test {
type Output = u32;

fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
let mut this = self.get_mut();
match this.data {
3 => Poll::Ready(3),
_ => {
println!("{}", this.data);
this.data += 1;
cx.waker().wake_by_ref();
Poll::Pending
}
}
}
}

fn main() {

let test = Test::default();
let f = futures::executor::block_on(test);
println!("{}", f);
}