rust - 如何使用reqwest执行并行异步HTTP GET请求?

标签 rust rust-tokio reqwest

The async example很有用,但是对于Rust和Tokio来说是新手,我正在努力找出如何立即执行N个请求,使用向量中的URL并为每个URL创建响应HTML的迭代器作为字符串的方法。

怎么办呢?

最佳答案

并发请求
从reqwest 0.10开始:

use futures::{stream, StreamExt}; // 0.3.5
use reqwest::Client; // 0.10.6
use tokio; // 0.2.21, features = ["macros"]

const CONCURRENT_REQUESTS: usize = 2;

#[tokio::main]
async fn main() {
    let client = Client::new();

    let urls = vec!["https://api.ipify.org"; 2];

    let bodies = stream::iter(urls)
        .map(|url| {
            let client = &client;
            async move {
                let resp = client.get(url).send().await?;
                resp.bytes().await
            }
        })
        .buffer_unordered(CONCURRENT_REQUESTS);

    bodies
        .for_each(|b| async {
            match b {
                Ok(b) => println!("Got {} bytes", b.len()),
                Err(e) => eprintln!("Got an error: {}", e),
            }
        })
        .await;
}

stream::iter(urls)

stream::iter
收集字符串并将其转换为 Stream
.map(|url| {

StreamExt::map
在流中的每个元素上运行一个异步函数,并将该元素转换为新类型。
let client = &client;
async move {

取得对Client的明确引用,然后将引用(不是原始的Client)移到匿名异步块中。
let resp = client.get(url).send().await?;

使用Client的连接池启动异步GET请求,然后等待该请求。
resp.bytes().await

请求并等待响应的字节。
.buffer_unordered(N);

StreamExt::buffer_unordered
将 future 流转换为那些 future 值(value)流,同时执行 future 。
bodies
    .for_each(|b| {
        async {
            match b {
                Ok(b) => println!("Got {} bytes", b.len()),
                Err(e) => eprintln!("Got an error: {}", e),
            }
        }
    })
    .await;

StreamExt::for_each
将流转换回单个将来,打印出沿途收到的数据量,然后等待将来完成。
也可以看看:
  • Join futures with limited concurrency
  • How to merge iterator of streams?
  • How do I synchronously return a value calculated in an asynchronous Future in stable Rust?
  • What is the difference between `then`, `and_then` and `or_else` in Rust futures?

  • 无限执行
    如果愿意,还可以将迭代器转换为 future 迭代器,并使用 future::join_all :
    use futures::future; // 0.3.4
    use reqwest::Client; // 0.10.1
    use tokio; // 0.2.11
    
    #[tokio::main]
    async fn main() {
        let client = Client::new();
    
        let urls = vec!["https://api.ipify.org"; 2];
    
        let bodies = future::join_all(urls.into_iter().map(|url| {
            let client = &client;
            async move {
                let resp = client.get(url).send().await?;
                resp.bytes().await
            }
        }))
        .await;
    
        for b in bodies {
            match b {
                Ok(b) => println!("Got {} bytes", b.len()),
                Err(e) => eprintln!("Got an error: {}", e),
            }
        }
    }
    
    我鼓励使用第一个示例,因为您通常想限制并发性,bufferbuffer_unordered可以帮助您。
    平行要求
    并发请求通常就足够了,但是有时候需要并行请求。在这种情况下,您需要产生一个任务。
    use futures::{stream, StreamExt}; // 0.3.8
    use reqwest::Client; // 0.10.9
    use tokio; // 0.2.24, features = ["macros"]
    
    const PARALLEL_REQUESTS: usize = 2;
    
    #[tokio::main]
    async fn main() {
        let urls = vec!["https://api.ipify.org"; 2];
    
        let client = Client::new();
    
        let bodies = stream::iter(urls)
            .map(|url| {
                let client = client.clone();
                tokio::spawn(async move {
                    let resp = client.get(url).send().await?;
                    resp.bytes().await
                })
            })
            .buffer_unordered(PARALLEL_REQUESTS);
    
        bodies
            .for_each(|b| async {
                match b {
                    Ok(Ok(b)) => println!("Got {} bytes", b.len()),
                    Ok(Err(e)) => eprintln!("Got a reqwest::Error: {}", e),
                    Err(e) => eprintln!("Got a tokio::JoinError: {}", e),
                }
            })
            .await;
    }
    
    主要区别在于:
  • 我们使用 tokio::spawn 在单独的任务中执行工作。
  • 我们必须为每个任务分配自己的reqwest::Client。作为recommended,我们克隆了一个共享客户端以使用连接池。
  • 当无法加入任务时,还有另外一种错误情况。

  • 也可以看看:
  • What is the difference between concurrent programming and parallel programming?
  • What is the difference between concurrency and parallelism?
  • What is the difference between concurrency, parallelism and asynchronous methods?
  • 关于rust - 如何使用reqwest执行并行异步HTTP GET请求?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/62556930/

    相关文章:

    node.js - 如何在 Rust 的 wasm_bindgen 函数中发出 HTTP 请求?

    rust - 不能在返回 `?` 的函数中使用 `()` 运算符

    rust - 通过 32 位整数索引向量

    rust - 只是从 "if let"得到一个 bool 结果?

    rust - 根据编译目标操作系统,Rust 中将不同类型的值分配给变量的惯用方式是什么?

    rust - Tokio 的简单 TCP 回显服务器示例(在 GitHub 和 API 引用上)有什么好的详细解释?

    multithreading - 从 Rust 的 channel 迭代器中获取第一个接收到的值

    android - 用于Android的Tokio和Reqwest的Rust JNI异步回调

    rust - 有没有更好的方法来为拥有 DIsplay 事物集合的结构实现 Display?

    rust - 构造一个结构体,其中包含来自 'from_iter' 的 i32 通用迭代器?