Rust-高级-Crate之crossbeam

  1. 基础
  2. 管道
  3. 线程间数据传递

基础

使用 crossbeam crate 为并发和并行编程提供了数据结构和函数。
Scope::spawn 生成一个新的作用域线程,该线程确保传入 crossbeam::scope 函数的闭包在返回之前终止

fn main() {
let arr = &[1, 25, -4, 10];
let max = find_max(arr);
assert_eq!(max, Some(25));
}

fn find_max(arr: &[i32]) -> Option<i32> {
const THRESHOLD: usize = 2;

if arr.len() <= THRESHOLD {
return arr.iter().cloned().max();
}

let mid = arr.len() / 2;
let (left, right) = arr.split_at(mid);

crossbeam::scope(|s| {
let thread_l = s.spawn(|_| find_max(left));
let thread_r = s.spawn(|_| find_max(right));

let max_l = thread_l.join().unwrap()?;
let max_r = thread_r.join().unwrap()?;

Some(max_l.max(max_r))
}).unwrap()
}

管道

下面的实例使用 crossbeam 和 crossbeam-channel 两个 crate 创建了一个并行的管道。数据在从源到接收器的过程中由两个工作线程并行处理。

我们使用容量由 crossbeam_channel::bounded 分配的有界信道。

通过迭代器 crossbeam_channel::Receiver::iter 方法从信道读取数据,
这将会造成阻塞,要么等待新消息,要么直到信道关闭。因为信道是在 crossbeam::scope 范围内创建的,我们必须通过 drop 手动关闭它们,以防止整个程序阻塞工作线程的 for 循环。你可以将对 drop 的调用视作不再发送消息的信号。

fn channel() {
let (snd1, rcv1) = bounded(1);
let (snd2, rcv2) = bounded(1);
let n_msgs = 4;
let n_workers = 2;

crossbeam::scope(|s| {
// 生产者线程
s.spawn(|_| {
for i in 0..n_msgs {
snd1.send(i).unwrap();
println!("Source sent {}", i);
}
// 关闭信道 —— 这是退出的必要条件
// for 巡海在工作线程中
drop(snd1);
});

// 由 2 个线城并行处理
for _ in 0..n_workers {
// 从数据源发送数据到接收器,接收器接收数据
let (sendr, recvr) = (snd2.clone(), rcv1.clone());
// 在不同的线程中衍生工人
s.spawn(move |_| {
thread::sleep(Duration::from_millis(500));
// 接收数据,直到信道关闭前
for msg in recvr.iter() {
println!("Worker {:?} received {}.",
thread::current().id(), msg);
sendr.send(msg * 2).unwrap();
}
});
}
// 关闭信道,否则接收器不会关闭
// 退出 for 循坏
drop(snd2);

// 接收器
for msg in rcv2.iter() {
println!("Sink received {}", msg);
}
}).unwrap();
}

线程间数据传递

这个实例示范了在单生产者、单消费者(SPSC)环境中使用 crossbeam-channel。使用 crossbeam::scope 和 Scope::spawn 来管理生产者线程。在两个线程之间,使用 crossbeam_channel::unbounded 信道交换数据,这意味着可存储消息的数量没有限制。生产者线程在消息之间休眠半秒。

use std::{thread, time};
use crossbeam_channel::unbounded;

fn main() {
let (snd, rcv) = unbounded();
let n_msgs = 5;
crossbeam::scope(|s| {
s.spawn(|_| {
for i in 0..n_msgs {
snd.send(i).unwrap();
thread::sleep(time::Duration::from_millis(100));
}
});
}).unwrap();
for _ in 0..n_msgs {
let msg = rcv.recv().unwrap();
println!("Received {}", msg);
}
}