Rust 线​​程池每个线程中都有初始化代码?

标签 rust threadpool rayon

以下代码有效,可以在Playground中进行测试

use std::{thread, time::Duration};
use rand::Rng;

fn main() {
    let mut hiv = Vec::new();
    let (sender, receiver) = crossbeam_channel::unbounded();
    
    // make workers
    for t in 0..5 {
        println!("Make worker {}", t);
        
        let receiver = receiver.clone();  // clone for this thread
        
        let handler = thread::spawn(move || {
            let mut rng = rand::thread_rng(); // each thread have one
            
            loop {
                let r = receiver.recv();
                match r {
                    Ok(x) => {
                        let s = rng.gen_range(100..1000);
                
                        thread::sleep(Duration::from_millis(s));
                        println!("w={} r={} working={}", t, x, s);
                    },
                    _ => { println!("No more work for {} --- {:?}.", t, r); break},
                }
            }
            
        });
        
        hiv.push(handler);
    }
    
    // Generate jobs
    for x in 0..10 {
        sender.send(x).expect("all threads hung up :(");
    }
    drop(sender);
    
    // wait for jobs to finish.
    println!("Wait for all threads to finish.\n");
    for h in hiv {
        h.join().unwrap();
    }
    println!("join() done. Work Finish.");
}

我的问题如下:
我可以使用 threadpool 删除样板代码吗? , rayon或者其他 Rust 箱子?

我知道我可以自己实现,但想知道是否有一些具有相同功能的 crate ?

根据我的研究,当你“发送”代码并执行它时,线程池/人造丝很有用,但我还没有找到方法让 N 个线程拥有一些需要记住的代码/逻辑?

基本思想在 let mut rng = rand::thread_rng(); 中,这是每个线程需要拥有自己的实例。

代码还有其他问题,请指出。

最佳答案

是的,您可以使用 Rayon 消除大量代码并使剩余代码更具可读性,如下要点所示:

https://gist.github.com/BillBarnhill/db07af903cb3c3edb6e715d9cedae028

由于所有权规则,工作池模型在 Rust 中并不是很好。因此,并行迭代器通常是更好的选择。

我最初忘记针对每个线程上下文解决您主要关心的问题。您可以了解如何使用 ThreadLocal 存储每个线程上下文!在这个答案中:

https://stackoverflow.com/a/42656422/204343

我会尝试回来编辑代码以反射(reflect) ThreadLocal!一旦我有更多时间就使用。

由于 thread_id_value,要点需要每晚,但这几乎是稳定的,如果需要可以删除。

真正的问题是要点有时机,并将 main_new 与 main_original 进行比较,结果令人惊讶。也许并不奇怪,Rayon 具有良好的调试支持。

在调试构建时,计时输出为:

main_new duration: 1.525667954s
main_original duration: 1.031234059s

您可以看到 main_new 的运行时间几乎延长了 50%。

在发布时,main_new 速度要快一些:

main_new duration: 1.584190936s
main_original duration: 1.5851124s

下面是要点的精简版本,仅包含新代码。

#![feature(thread_id_value)]

use std::{thread, time::Duration, time::Instant};
use rand::Rng;

#[allow(unused_imports)]
use rayon::prelude::*;

fn do_work(x : u32) -> String {
    let mut rng = rand::thread_rng(); // each thread have one
    let s = rng.gen_range(100..1000);
    let thread_id = thread::current().id();
    
    let t = thread_id.as_u64();
    
    thread::sleep(Duration::from_millis(s));
    format!("w={} r={} working={}", t, x, s)
}

fn process_work_product(output : String) {
    println!("{}", output);
}

fn main() {
    // bit hacky, but lets set number of threads to 5
    rayon::ThreadPoolBuilder::new()
        .num_threads(4)
        .build_global()
        .unwrap();
    
    let x = 0..10;
    x.into_par_iter()
        .map(do_work)
        .for_each(process_work_product);
}

关于Rust 线​​程池每个线程中都有初始化代码?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/74833740/

相关文章:

rust - 删除存储在 FFI 中的 Rust void 指针

java - 线程池中线程之间的通信

string - 正则表达式捕获不会像我认为的那样长

java - 如何使这个非阻塞服务器成为多线程的?

java - 当线程池中没有空闲线程并且我们向池中提交任务时会发生什么?

memory - Rust-使用Rayon进行排列-矢量内存分配错误

parallel-processing - 如何使用 Rayon 检测整数总和的溢出?

rust - 如何满足 Iterator trait bound 以便在这里使用 Rayon?

rust - 如何将一个 HashSet 的所有值插入另一个 HashSet?

rust - 如何在 Rust 中正确表示基于堆栈的语言?