我从互联网上收集了以下示例:
use std::sync::mpsc;
use std::thread;
use std::time::Duration;
// Transaction enum
enum Transaction {
Widthdrawl(String, f64),
Deposit(String, f64),
}
fn main() {
//A banking send receive example...
// set the number of customers
let n_customers = 10;
// Create a "customer" and a "banker"
let (customers, banker) = mpsc::channel();
let handles = (0..n_customers + 1)
.into_iter()
.map(|i| {
// Create another "customer"
let customer = customers.clone();
// Create the customer thread
let handle = thread::Builder::new()
.name(format!("{}{}", "thread", i).into())
.spawn(move || {
// Define Transaction
let trans_type = match i % 2 {
0 => Transaction::Deposit(
thread::current().name().unwrap().to_string(),
(i + 5) as f64 * 10.0,
),
_ => Transaction::Widthdrawl(
thread::current().name().unwrap().to_string(),
(i + 10) as f64 * 5.0,
),
};
// Send the Transaction
customer.send(trans_type).unwrap();
});
handle
})
.collect::<Vec<Result<thread::JoinHandle<_>, _>>>();
// Wait for threads to finish
for handle in handles {
handle.unwrap().join().unwrap()
}
// Create a bank thread
let bank = thread::spawn(move || {
// Create a value
let mut balance: f64 = 10000.0;
println!("Initially, Bank value: {}", balance);
// Perform the transactions in order
//banker.recv_timeout(Duration::new(5, 0)); <-- TIMEOUT line...
banker.into_iter().for_each(|i| {
let mut customer_name: String = "None".to_string();
match i {
// Subtract for Widthdrawls
Transaction::Widthdrawl(cust, amount) => {
customer_name = cust;
println!(
"Customer name {} doing withdrawal of amount {}",
customer_name, amount
);
balance = balance - amount;
}
// Add for deposits
Transaction::Deposit(cust, amount) => {
customer_name = cust;
println!(
"Customer name {} doing deposit of amount {}",
customer_name, amount
);
balance = balance + amount;
}
}
println!("Customer is {}, Bank value: {}", customer_name, balance);
});
});
// Let the bank finish
bank.join().unwrap(); //THE THREAD DOES NOT END!!
}
bank
线程从不加入,因此不结束主线程。如果删除
join
并取消注释上面的超时行,则银行线程有时不等待客户线程发送(我认为可以)。//banker.recv_timeout(Duration::new(5, 0)); <-- TIMEOUT line...
银行线程未加入的原因可能是什么,或者使银行了解不会再收到更多客户消息的更好方法是什么? (因为我认为
timeout()
在这里可能不是一种可靠的方法)。
最佳答案
完成所有生产者后,我需要删除tx
channel ,然后在那之后消费者停止:
// Wait for threads to finish
for handle in handles {
handle.unwrap().join().unwrap()
}
drop(customers);
// Create a bank thread
关于multithreading - 为什么包含MPSC channel 的线程永远不会加入?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/60931769/