The async example很有用,但是对于Rust和Tokio来说是新手,我正在努力找出如何立即执行N个请求,使用向量中的URL并为每个URL创建响应HTML的迭代器作为字符串的方法。
怎么办呢?
最佳答案
并发请求
从reqwest 0.10开始:
use futures::{stream, StreamExt}; // 0.3.5
use reqwest::Client; // 0.10.6
use tokio; // 0.2.21, features = ["macros"]
const CONCURRENT_REQUESTS: usize = 2;
#[tokio::main]
async fn main() {
let client = Client::new();
let urls = vec!["https://api.ipify.org"; 2];
let bodies = stream::iter(urls)
.map(|url| {
let client = &client;
async move {
let resp = client.get(url).send().await?;
resp.bytes().await
}
})
.buffer_unordered(CONCURRENT_REQUESTS);
bodies
.for_each(|b| async {
match b {
Ok(b) => println!("Got {} bytes", b.len()),
Err(e) => eprintln!("Got an error: {}", e),
}
})
.await;
}
stream::iter(urls)
stream::iter
收集字符串并将其转换为
Stream
。.map(|url| {
StreamExt::map
在流中的每个元素上运行一个异步函数,并将该元素转换为新类型。
let client = &client; async move {
取得对
Client
的明确引用,然后将引用(不是原始的Client
)移到匿名异步块中。let resp = client.get(url).send().await?;
使用
Client
的连接池启动异步GET请求,然后等待该请求。resp.bytes().await
请求并等待响应的字节。
.buffer_unordered(N);
StreamExt::buffer_unordered
将 future 流转换为那些 future 值(value)流,同时执行 future 。
bodies .for_each(|b| { async { match b { Ok(b) => println!("Got {} bytes", b.len()), Err(e) => eprintln!("Got an error: {}", e), } } }) .await;
StreamExt::for_each
将流转换回单个将来,打印出沿途收到的数据量,然后等待将来完成。
也可以看看:
无限执行
如果愿意,还可以将迭代器转换为 future 迭代器,并使用
future::join_all
:use futures::future; // 0.3.4
use reqwest::Client; // 0.10.1
use tokio; // 0.2.11
#[tokio::main]
async fn main() {
let client = Client::new();
let urls = vec!["https://api.ipify.org"; 2];
let bodies = future::join_all(urls.into_iter().map(|url| {
let client = &client;
async move {
let resp = client.get(url).send().await?;
resp.bytes().await
}
}))
.await;
for b in bodies {
match b {
Ok(b) => println!("Got {} bytes", b.len()),
Err(e) => eprintln!("Got an error: {}", e),
}
}
}
我鼓励使用第一个示例,因为您通常想限制并发性,buffer
和buffer_unordered
可以帮助您。平行要求
并发请求通常就足够了,但是有时候需要并行请求。在这种情况下,您需要产生一个任务。
use futures::{stream, StreamExt}; // 0.3.8
use reqwest::Client; // 0.10.9
use tokio; // 0.2.24, features = ["macros"]
const PARALLEL_REQUESTS: usize = 2;
#[tokio::main]
async fn main() {
let urls = vec!["https://api.ipify.org"; 2];
let client = Client::new();
let bodies = stream::iter(urls)
.map(|url| {
let client = client.clone();
tokio::spawn(async move {
let resp = client.get(url).send().await?;
resp.bytes().await
})
})
.buffer_unordered(PARALLEL_REQUESTS);
bodies
.for_each(|b| async {
match b {
Ok(Ok(b)) => println!("Got {} bytes", b.len()),
Ok(Err(e)) => eprintln!("Got a reqwest::Error: {}", e),
Err(e) => eprintln!("Got a tokio::JoinError: {}", e),
}
})
.await;
}
主要区别在于:tokio::spawn
在单独的任务中执行工作。 reqwest::Client
。作为recommended,我们克隆了一个共享客户端以使用连接池。 也可以看看:
关于rust - 如何使用reqwest执行并行异步HTTP GET请求?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/62556930/