multithreading - 为什么包含MPSC channel 的线程永远不会加入?

标签 multithreading rust

我从互联网上收集了以下示例:

use std::sync::mpsc;
use std::thread;
use std::time::Duration;

// Transaction enum
enum Transaction {
    Widthdrawl(String, f64),
    Deposit(String, f64),
}

fn main() {
    //A banking send receive example...
    // set the number of customers
    let n_customers = 10;

    // Create a "customer" and a "banker"
    let (customers, banker) = mpsc::channel();

    let handles = (0..n_customers + 1)
        .into_iter()
        .map(|i| {
            // Create another "customer"
            let customer = customers.clone();

            // Create the customer thread
            let handle = thread::Builder::new()
                .name(format!("{}{}", "thread", i).into())
                .spawn(move || {
                    // Define Transaction
                    let trans_type = match i % 2 {
                        0 => Transaction::Deposit(
                            thread::current().name().unwrap().to_string(),
                            (i + 5) as f64 * 10.0,
                        ),
                        _ => Transaction::Widthdrawl(
                            thread::current().name().unwrap().to_string(),
                            (i + 10) as f64 * 5.0,
                        ),
                    };

                    // Send the Transaction
                    customer.send(trans_type).unwrap();
                });

            handle
        })
        .collect::<Vec<Result<thread::JoinHandle<_>, _>>>();

    // Wait for threads to finish
    for handle in handles {
        handle.unwrap().join().unwrap()
    }

    // Create a bank thread
    let bank = thread::spawn(move || {
        // Create a value
        let mut balance: f64 = 10000.0;
        println!("Initially, Bank value: {}", balance);

        // Perform the transactions in order
        //banker.recv_timeout(Duration::new(5, 0)); <-- TIMEOUT line...

        banker.into_iter().for_each(|i| {
            let mut customer_name: String = "None".to_string();
            match i {
                // Subtract for Widthdrawls
                Transaction::Widthdrawl(cust, amount) => {
                    customer_name = cust;
                    println!(
                        "Customer name {} doing withdrawal of amount {}",
                        customer_name, amount
                    );
                    balance = balance - amount;
                }
                // Add for deposits
                Transaction::Deposit(cust, amount) => {
                    customer_name = cust;
                    println!(
                        "Customer name  {} doing deposit of amount {}",
                        customer_name, amount
                    );
                    balance = balance + amount;
                }
            }

            println!("Customer is {}, Bank value: {}", customer_name, balance);
        });
    });

    // Let the bank finish
    bank.join().unwrap(); //THE THREAD DOES NOT END!!
}
bank线程从不加入,因此不结束主线程。

如果删除join并取消注释上面的超时行,则银行线程有时不等待客户线程发送(我认为可以)。
//banker.recv_timeout(Duration::new(5, 0)); <-- TIMEOUT line...

银行线程未加入的原因可能是什么,或者使银行了解不会再收到更多客户消息的更好方法是什么? (因为我认为timeout()在这里可能不是一种可靠的方法)。

最佳答案

完成所有生产者后,我需要删除tx channel ,然后在那之后消费者停止:

// Wait for threads to finish
for handle in handles {
    handle.unwrap().join().unwrap()
}

drop(customers);

// Create a bank thread

关于multithreading - 为什么包含MPSC channel 的线程永远不会加入?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/60931769/

相关文章:

.net - ConcurrentBag(T) 是否针对单线程场景进行了优化?如果是这样,为什么要并发?

Python 多处理和套接字未关闭

rust - 如何用重复的 u16 值填充 [u8] 数组?

rust - 从子 iter().map 中的父 iter().map 访问变量会抛出 'error[E0597]: ` x` does not live long enough' on compile'

multithreading - 在Perl中使用多线程时,应如何更新哈希哈希?

c - 使用 POSIX 消息队列的单进程线程安全

rust - 什么时候应该使用智能指针?

reference - 为什么我不能从闭包中返回对外部变量的可变引用?

rust - 如何返回值而不出现 'cannot move out of borrowed content'错误?

multithreading - C++11 编译器是否允许引入额外的原子变量负载?