multithreading - 一个 Rust 闭包可以被多个线程使用吗?

标签 multithreading concurrency closures rust

我希望能够让多个线程计算同一个闭包。我想到的应用程序是并行化数值积分,因此函数域可以很容易地分成 N 个 block 并交给线程的情况。

这是一个简单的函数,它多次计算提供的闭包并对结果取平均值:

use std::sync::mpsc;
use std::thread;

const THREAD_COUNT: u64 = 4;

fn average<F: Fn(f64) -> f64>(f: F) -> f64 {
    let (tx, rx) = mpsc::channel();
    for id in 0..THREAD_COUNT {
        let thread_tx = tx.clone();
        thread::spawn(move || {
            thread_tx.send(f(id as f64));
        });
    }

    let mut total = 0.0;
    for id in 0..THREAD_COUNT {
        total += rx.recv().unwrap();
    }
    total / THREAD_COUNT as f64
}

fn main() {
    average(|x: f64| -> f64 { x });
}

当我编译时出现这个错误:

error[E0277]: `F` cannot be sent between threads safely
  --> src/main.rs:10:9
   |
10 |         thread::spawn(move || {
   |         ^^^^^^^^^^^^^ `F` cannot be sent between threads safely
   |
   = help: within `[closure@src/main.rs:10:23: 12:10 thread_tx:std::sync::mpsc::Sender<f64>, f:F, id:u64]`, the trait `std::marker::Send` is not implemented for `F`
   = help: consider adding a `where F: std::marker::Send` bound
   = note: required because it appears within the type `[closure@src/main.rs:10:23: 12:10 thread_tx:std::sync::mpsc::Sender<f64>, f:F, id:u64]`
   = note: required by `std::thread::spawn`

所以我将 + Send 添加到 F 的边界并得到一个新错误:

error[E0310]: the parameter type `F` may not live long enough
  --> src/main.rs:10:9
   |
6  | fn average<F: Fn(f64) -> f64 + Send>(f: F) -> f64 {
   |            -- help: consider adding an explicit lifetime bound `F: 'static`...
...
10 |         thread::spawn(move || {
   |         ^^^^^^^^^^^^^
   |
note: ...so that the type `[closure@src/main.rs:10:23: 12:10 thread_tx:std::sync::mpsc::Sender<f64>, f:F, id:u64]` will meet its required lifetime bounds
  --> src/main.rs:10:9
   |
10 |         thread::spawn(move || {
   |         ^^^^^^^^^^^^^

所以我将 + 'static 添加到 F 并得到这个:

error[E0382]: capture of moved value: `f`
  --> src/main.rs:11:28
   |
10 |         thread::spawn(move || {
   |                       ------- value moved (into closure) here
11 |             thread_tx.send(f(id as f64));
   |                            ^ value captured here after move
   |
   = note: move occurs because `f` has type `F`, which does not implement the `Copy` trait

所以我将 + Copy 添加到 F 并得到:

error: the trait `core::marker::Copy` is not implemented for the type `[closure@src/test.rs:115:11: 115:26]

似乎每个线程都想要它自己的闭包副本(因为 move)但是闭包没有实现 Copy 所以运气不好。这对我来说似乎很奇怪,因为如果闭包从不改变状态,那么多线程访问它们的安全问题是什么?

我可以通过提供常规函数而不是闭包来使代码工作,但这使我的代码变得非通用,即它仅适用于特定功能而不适用于 Fn(f64) -> f64。对于我正在执行的集成类型,集成的函数通常有某些固定变量与集成变量混合,因此用闭包捕获固定变量似乎很自然。

是否有某种方法可以使这种多线程函数评估以通用方式工作?我只是在想错事吗?

最佳答案

最终的问题围绕着谁拥有闭包。编写的代码声明闭包的所有权已转移给 average。然后此函数尝试将闭包提供给多个线程,如您所见,它失败了,因为您不能将一个项目提供给多个 child 。

but closures don't implement Copy so no luck

截至Rust 1.26.0 ,闭包确实实现了CloneCopy(如果所有捕获的变量都实现了的话)。这意味着您的最终示例代码现在可以按原样运行:

fn average<F: Fn(f64) -> f64 + Send + 'static + Copy>(f: F) -> f64 { /* ... */ }

但是,您的闭包可能不会实现CopyClone

您不能给出对 average 拥有的闭包的引用,因为使用 thread::spawn 创建的线程可能比对 average 的调用生命周期更长>。当 average 退出时,任何堆栈分配的变量都将被销毁。对它们的任何使用都会导致内存不安全,而 Rust 旨在防止这种情况。

一种解决方案是使用 Arc .这将允许在多线程上下文中单个资源的多个共享所有者。当包装的闭包被克隆时,只会创建一个新的引用。当所有引用消失时,对象被释放。

use std::{
    sync::{mpsc, Arc},
    thread,
};

const THREAD_COUNT: u64 = 4;

fn average<F>(f: F) -> f64
where
    F: Fn(f64) -> f64 + Send + Sync + 'static,
{
    let (tx, rx) = mpsc::channel();
    let f = Arc::new(f);

    for id in 0..THREAD_COUNT {
        let thread_tx = tx.clone();
        let f = f.clone();
        thread::spawn(move || {
            thread_tx.send(f(id as f64)).unwrap();
        });
    }

    let mut total = 0.0;
    for _ in 0..THREAD_COUNT {
        total += rx.recv().unwrap();
    }

    total / THREAD_COUNT as f64
}

fn main() {
    average(|x| x);
}

更标准的解决方案是使用作用域线程。这些线程保证在特定时间退出,这允许您将比线程生命周期更长的引用传递给线程。

另见:

关于multithreading - 一个 Rust 闭包可以被多个线程使用吗?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/36211389/

相关文章:

java - 在多线程编程中,同步是否剥夺了并发执行的好处

grails - Grails Controller 中 Action 和方法的区别

c - Delphi 是否有任何等效于 C 的 volatile 变量?

java - 如何防止客户端发送文件后服务器接收到空输出(多线程java套接字)

c# - 如何检查我的多线程代码是否确实在多个线程上运行?

java - 如何制作不区分大小写的 ConcurrentMap?

java - 使用 ConcurrentHashMap,什么时候需要同步?

clojure - ClojureScript 如何编译闭包?

Python列表理解覆盖值

c++ - 为什么我的多线程作业队列崩溃?