multithreading - Rayon 中的每线程初始化

标签 multithreading rust rayon

我正在尝试使用 Rayon 的 par_iter() 优化我的功能。

单线程版本是这样的:

fn verify_and_store(store: &mut Store, txs: Vec<Tx>) {

    let result = txs.iter().map(|tx| {

         tx.verify_and_store(store)

    }).collect();

    ...
}

每个 Store 实例只能由一个线程使用,但是 Store 的多个实例可以并发使用,所以我可以通过 clone< 使它成为多线程-ing store:

fn verify_and_store(store: &mut Store, txs: Vec<Tx>) {

    let result = txs.par_iter().map(|tx| {

         let mut local_store = store.clone();

         tx.verify_and_store(&mut local_store)

    }).collect();

    ...
}

但是,这会在每次 迭代时克隆store,这太慢了。我想每个线程使用一个商店实例。

Rayon 有可能吗?还是应该求助于手动线程和工作队列?

最佳答案

可以使用线程局部变量来确保 local_store不会在给定线程中多次创建。

例如,编译 ( full source ):

fn verify_and_store(store: &mut Store, txs: Vec<Tx>) {
    use std::cell::RefCell;
    thread_local!(static STORE: RefCell<Option<Store>> = RefCell::new(None));

    let mut result = Vec::new();

    txs.par_iter().map(|tx| {
        STORE.with(|cell| {
            let mut local_store = cell.borrow_mut();
            if local_store.is_none() {
                *local_store = Some(store.clone());
            }
            tx.verify_and_store(local_store.as_mut().unwrap())
        })
    }).collect_into(&mut result);
}

但是,这段代码有两个问题。一,如果store的克隆par_iter() 时需要做点什么完成后,例如刷新缓冲区,这根本不会发生 - 他们的 Drop只会在 Rayon 的工作线程退出时调用,甚至是 is not guaranteed .

第二个也是更严重的问题是 store 的克隆每个工作线程只创建一次。如果 Rayon 缓存了它的线程池(我相信它确实如此),这意味着稍后对 verify_and_store 的不相关调用将继续使用最后已知的 store 克隆,这可能与当前商店无关。

这可以通过稍微复杂化代码来纠正:

  • 将克隆的变量存储在 Mutex<Option<...>> 中而不是 Option , 以便调用 par_iter() 的线程可以访问它们.这将在每次访问时产生互斥锁,但该锁是无竞争的,因此成本低。

  • 使用Arc围绕互斥锁,以便收集对向量中创建的存储克隆的引用。此向量用于通过将商店重置为 None 来清理商店迭代完成后。

  • 将整个调用包装在一个不相关的互斥锁中,以便对 verify_and_store 的两个并行调用不要最终看到彼此的商店克隆。 (如果在迭代之前创建并安装了一个新的线程池,这可能是可以避免的。)希望这个序列化不会影响 verify_and_store 的性能。 ,因为每次调用都会使用整个线程池。

结果并不漂亮,但它可以编译,仅使用安全代码,并且似乎可以工作:

fn verify_and_store(store: &mut Store, txs: Vec<Tx>) {
    use std::sync::{Arc, Mutex};
    type SharedStore = Arc<Mutex<Option<Store>>>;

    lazy_static! {
        static ref STORE_CLONES: Mutex<Vec<SharedStore>> = Mutex::new(Vec::new());
        static ref NO_REENTRY: Mutex<()> = Mutex::new(());
    }
    thread_local!(static STORE: SharedStore = Arc::new(Mutex::new(None)));

    let mut result = Vec::new();
    let _no_reentry = NO_REENTRY.lock();

    txs.par_iter().map({
        |tx| {
            STORE.with(|arc_mtx| {
                let mut local_store = arc_mtx.lock().unwrap();
                if local_store.is_none() {
                    *local_store = Some(store.clone());
                    STORE_CLONES.lock().unwrap().push(arc_mtx.clone());
                }
                tx.verify_and_store(local_store.as_mut().unwrap())
            })
        }
    }).collect_into(&mut result);

    let mut store_clones = STORE_CLONES.lock().unwrap();
    for store in store_clones.drain(..) {
        store.lock().unwrap().take();
    }
}

关于multithreading - Rayon 中的每线程初始化,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/42647900/

相关文章:

multithreading - 如何更改 Rayon 使用的线程数?

c# - C#中的线程问题

java - 动态加载 GridPane 时显示 ProgressBar

rust - PI常数不明确

rust - 如何固定RefCell的内容?

parallel-processing - 如何使用 Rayon 检测整数总和的溢出?

parallel-processing - 为什么基于人造丝的并行处理比串行处理需要更多时间?

java - 在构造函数中设置线程的名称

multithreading - 使用 OpenMP 加速和调度

rust - 为什么在以 `Self: Sized` 为界时不能调用特征对象上的函数?