multithreading - 如何等待 tokio 任务完成?

标签 multithreading rust rust-tokio

我正在尝试写信给 HashMap使用Arc<Mutex<T>>模式作为网站抓取练习的一部分,灵感来自 the Rust cookbook .

第一部分使用tokio运行。我无法完成正在完成的任务并返回 HashMap因为它只是挂着。

type Db = Arc<Mutex<HashMap<String, bool>>>;

pub async fn handle_async_tasks(db: Db) -> BoxResult<HashMap<String, bool>> {
    let links = NodeUrl::new("https://www.inverness-courier.co.uk/")
        .await
        .unwrap();

    let arc = db.clone();

    let mut handles = Vec::new();

    for link in links.links_with_paths {
        let x = arc.clone();
        handles.push(tokio::spawn(async move {
            process(x, link).await;
        }));
    }

    //  for handle in handles {
    //     handle.await.expect("Task panicked!");
    //  } < I tried this as well>

    futures::future::join_all(handles).await;

    let readables = arc.lock().await;

    for (key, value) in readables.clone().into_iter() {
        println!("Checking db: k, v ==>{} / {}", key, value);
    }

    let clone_db = readables.clone();

    return Ok(clone_db);
}

async fn process(db: Db, url: Url) {
    let mut db = db.lock().await;
    println!("checking {}", url);

    if check_link(&url).await.is_ok() {
        db.insert(url.to_string(), true);
    } else {
        db.insert(url.to_string(), false);
    }
}

async fn check_link(url: &Url) -> BoxResult<bool> {
    let res = reqwest::get(url.as_ref()).await?;
    Ok(res.status() != StatusCode::NOT_FOUND)
}

pub struct NodeUrl {
    domain: String,
    pub links_with_paths: Vec<Url>,
}

#[tokio::main]
async fn main() {
    let db: Db = Arc::new(Mutex::new(HashMap::new()));

    let db = futures::executor::block_on(task::handle_async_tasks(db));
}

我想退回HashMapmain()主线程被阻塞的地方。如何等待所有异步线程进程完成并返回 HashMap

最佳答案

let links = NodeUrl::new("https://www.some-site.com/.co.uk/").await.unwrap();

这对我来说似乎不是一个有效的 URL。

async fn process(db: Db, url: Url) {
    let mut db = db.lock().await;
    println!("checking {}", url);

    if check_link(&url).await.is_ok() {
         db.insert(url.to_string(), true);
    } else {
         db.insert(url.to_string(), false);
    }
}

这是一个很大的问题。在整个请求期间,您持有数据库的排他锁。这使得您的应用程序有效地串行。 reqwest 中的默认超时为 30 秒。因此,如果服务器没有响应并且您有很多链接需要浏览该程序,那么该程序可能看起来“挂起”。

您应该只获得尽可能短的数据库锁定 - 只是为了执行插入:

async fn process(db: Db, url: Url) {
    println!("checking {}", url);

    if check_link(&url).await.is_ok() {
         let mut db = db.lock().await;
         db.insert(url.to_string(), true);
    } else {
         let mut db = db.lock().await;
         db.insert(url.to_string(), false);
    }
}

或者更好的是,消除无用的 if:

async fn process(db: Db, url: Url) {
    println!("checking {}", url);
    let valid = check_link(&url).await.is_ok();
    db.lock().await.insert(url.to_string(), valid);
}

最后,您没有显示您的 main 函数,您调用 handle_async_tasks 或运行其他内容的方式可能有问题。

关于multithreading - 如何等待 tokio 任务完成?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/70154267/

相关文章:

java - 在固定的时间段内等待多个 ExecutorServices 的终止

python - apache arrow 如何促进 "No overhead for cross-system communication"?

rust - 如何通过 futures:stream::Stream 发送 bytes::bytes::Bytes?

linux - 为什么使用Tokio进行异步网络IO的Rust项目为什么要对文件描述符5进行数千次写入?

java - 安卓 : Get data in real-time instead of using Async-Task

c# - 在 Parallel.ForEach 中使用 Convert.ChangeType 发生死锁

rust - 从 Rust 中的元组内的自有盒子中借用

rust - 为什么做 dbg!和打印! ("{:?}") 显示不同的输出?

rust - 即使我正在发送换行符,TCP 回显服务器也从不回复

c# - 我是否需要在异步BeginReceive回调中同步TCP/UDP客户端