multithreading - 使用 channel 和线程时我要等待或加入什么?

标签 multithreading rust

这是一个示例,但我应该等待什么来决定何时完成。我们有没有更好的方法来等待 channel 为空并且所有线程都已完成?完整示例位于 http://github.com/posix4e/rust_webcrawl

loop {
    let n_active_threads = running_threads.compare_and_swap(0, 0, Ordering::SeqCst);
    match rx.try_recv() {
        Ok(new_site) => {
            let new_site_copy = new_site.clone();
            let tx_copy = tx.clone();
            counter += 1;

            print!("{} ", counter);
            if !found_urls.contains(&new_site) {
                found_urls.insert(new_site);
                running_threads.fetch_add(1, Ordering::SeqCst);
                let my_running_threads = running_threads.clone();
                pool.execute(move || {
                    for new_url in get_websites_helper(new_site_copy) {
                        if new_url.starts_with("http") {
                            tx_copy.send(new_url).unwrap();
                        }
                    }
                    my_running_threads.fetch_sub(1, Ordering::SeqCst);
                });
            }
        }
        Err(TryRecvError::Empty) if n_active_threads == 0 => break,
        Err(TryRecvError::Empty) => {
             writeln!(&mut std::io::stderr(),
                "Channel is empty, but there are {} threads running",
                n_active_threads);
              thread::sleep_ms(10);
        },
        Err(TryRecvError::Disconnected) => unreachable!(),
    }
}

最佳答案

这实际上是一个非常复杂的问题,一个极有可能出现竞争条件的问题!据我了解,您:

  1. 拥有无限队列
  2. 有一组对队列项进行操作的 worker
  3. 工作人员可以将未知数量的项目放回队列
  4. 想知道什么时候一切都“完成”了

一个明显的问题是它可能永远不会完成。如果每个工作人员都将一个项目放回队列,就会出现无限循环。

话虽如此,我觉得解决方案是跟踪

  1. 有多少项目在排队
  2. 有多少项目正在进行中

当这两个值都为零时,您就完成了。说起来容易做起来难...

use std::sync::Arc;
use std::sync::atomic::{AtomicUsize,Ordering};
use std::sync::mpsc::{channel,TryRecvError};
use std::thread;

fn main() {
    let running_threads = Arc::new(AtomicUsize::new(0));
    let (tx, rx) = channel();

    // We prime the channel with the first bit of work
    tx.send(10).unwrap();

    loop {
        // In an attempt to avoid a race condition, we fetch the
        // active thread count before checking the channel. Otherwise,
        // we might read nothing from the channel, and *then* a thread
        // finishes and added something to the queue.
        let n_active_threads = running_threads.compare_and_swap(0, 0, Ordering::SeqCst);

        match rx.try_recv() {
            Ok(id) => {
                // I lie a bit and increment the counter to start
                // with. If we let the thread increment this, we might
                // read from the channel before the thread ever has a
                // chance to run!
                running_threads.fetch_add(1, Ordering::SeqCst);

                let my_tx = tx.clone();
                let my_running_threads = running_threads.clone();

                // You could use a threadpool, but I'm spawning
                // threads to only rely on stdlib.
                thread::spawn(move || {
                    println!("Working on {}", id);

                    // Simulate work
                    thread::sleep_ms(100);

                    if id != 0 {
                        my_tx.send(id - 1).unwrap();
                        // Send multiple sometimes
                        if id % 3 == 0 && id > 2 {
                            my_tx.send(id - 2).unwrap();
                        }
                    }

                    my_running_threads.fetch_sub(1, Ordering::SeqCst);
                });
            },
            Err(TryRecvError::Empty) if n_active_threads == 0 => break,
            Err(TryRecvError::Empty) => {
                println!("Channel is empty, but there are {} threads running", n_active_threads);
                // We sleep a bit here, to avoid quickly spinning
                // through an empty channel while the worker threads
                // work.
                thread::sleep_ms(1);
            },
            Err(TryRecvError::Disconnected) => unreachable!(),
        }
    }
}

我不保证这个实现是完美的(我可能应该保证它是坏的,因为线程很难)。一个重要的警告是,我并不十分了解 Ordering 的所有变体的含义。 ,所以我选择了看起来能提供最强保证的那个。

关于multithreading - 使用 channel 和线程时我要等待或加入什么?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/29621088/

相关文章:

.NET 远程处理线程模型

rust - 包含模块 Rust

rust - 如何编写返回 Vec<Path> 的函数?

rust - 如何有条件地将类型分配给引用

rust - 有没有办法在 Rust 代码中使用 unistd.h 中的函数?

postgresql - 使用 Diesel 执行插入或更新

java - 如何处理多个扫描器(多个资源)的多个线程?

c# - Control.Invoke 输入参数

android - 获取游标的问题!

java - 显式锁和条件变量 Java 生产者消费者