以下代码有效,可以在Playground中进行测试
use std::{thread, time::Duration};
use rand::Rng;
fn main() {
let mut hiv = Vec::new();
let (sender, receiver) = crossbeam_channel::unbounded();
// make workers
for t in 0..5 {
println!("Make worker {}", t);
let receiver = receiver.clone(); // clone for this thread
let handler = thread::spawn(move || {
let mut rng = rand::thread_rng(); // each thread have one
loop {
let r = receiver.recv();
match r {
Ok(x) => {
let s = rng.gen_range(100..1000);
thread::sleep(Duration::from_millis(s));
println!("w={} r={} working={}", t, x, s);
},
_ => { println!("No more work for {} --- {:?}.", t, r); break},
}
}
});
hiv.push(handler);
}
// Generate jobs
for x in 0..10 {
sender.send(x).expect("all threads hung up :(");
}
drop(sender);
// wait for jobs to finish.
println!("Wait for all threads to finish.\n");
for h in hiv {
h.join().unwrap();
}
println!("join() done. Work Finish.");
}
我的问题如下:
我可以使用 threadpool 删除样板代码吗? , rayon或者其他 Rust 箱子?
我知道我可以自己实现,但想知道是否有一些具有相同功能的 crate ?
根据我的研究,当你“发送”代码并执行它时,线程池/人造丝很有用,但我还没有找到方法让 N 个线程拥有一些需要记住的代码/逻辑?
基本思想在 let mut rng = rand::thread_rng();
中,这是每个线程需要拥有自己的实例。
代码还有其他问题,请指出。
最佳答案
是的,您可以使用 Rayon 消除大量代码并使剩余代码更具可读性,如下要点所示:
https://gist.github.com/BillBarnhill/db07af903cb3c3edb6e715d9cedae028
由于所有权规则,工作池模型在 Rust 中并不是很好。因此,并行迭代器通常是更好的选择。
我最初忘记针对每个线程上下文解决您主要关心的问题。您可以了解如何使用 ThreadLocal 存储每个线程上下文!在这个答案中:
https://stackoverflow.com/a/42656422/204343
我会尝试回来编辑代码以反射(reflect) ThreadLocal!一旦我有更多时间就使用。
由于 thread_id_value,要点需要每晚,但这几乎是稳定的,如果需要可以删除。
真正的问题是要点有时机,并将 main_new 与 main_original 进行比较,结果令人惊讶。也许并不奇怪,Rayon 具有良好的调试支持。
在调试构建时,计时输出为:
main_new duration: 1.525667954s
main_original duration: 1.031234059s
您可以看到 main_new 的运行时间几乎延长了 50%。
在发布时,main_new 速度要快一些:
main_new duration: 1.584190936s
main_original duration: 1.5851124s
下面是要点的精简版本,仅包含新代码。
#![feature(thread_id_value)]
use std::{thread, time::Duration, time::Instant};
use rand::Rng;
#[allow(unused_imports)]
use rayon::prelude::*;
fn do_work(x : u32) -> String {
let mut rng = rand::thread_rng(); // each thread have one
let s = rng.gen_range(100..1000);
let thread_id = thread::current().id();
let t = thread_id.as_u64();
thread::sleep(Duration::from_millis(s));
format!("w={} r={} working={}", t, x, s)
}
fn process_work_product(output : String) {
println!("{}", output);
}
fn main() {
// bit hacky, but lets set number of threads to 5
rayon::ThreadPoolBuilder::new()
.num_threads(4)
.build_global()
.unwrap();
let x = 0..10;
x.into_par_iter()
.map(do_work)
.for_each(process_work_product);
}
关于Rust 线程池每个线程中都有初始化代码?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/74833740/