rust - 使用 Hyper 同时获取多个 URL

标签 rust hyper rust-tokio

我正在尝试调整 Hyper basic client example同时获取多个 URL。

这是我目前拥有的代码:

extern crate futures;
extern crate hyper;
extern crate tokio_core;

use std::io::{self, Write};
use std::iter;
use futures::{Future, Stream};
use hyper::Client;
use tokio_core::reactor::Core;

fn get_url() {
    let mut core = Core::new().unwrap();
    let client = Client::new(&core.handle());
    let uris: Vec<_> = iter::repeat("http://httpbin.org/ip".parse().unwrap()).take(50).collect();
    for uri in uris {
        let work = client.get(uri).and_then(|res| {
            println!("Response: {}", res.status());

            res.body().for_each(|chunk| {
                io::stdout()
                    .write_all(&chunk)
                    .map_err(From::from)
            })
        });
        core.run(work).unwrap();
    }
}

fn main() {
    get_url();
}

好像不是并发的(需要很长时间才能完成),我是不是把工作交给了核心?

最佳答案

am I giving the work to the core in the wrong way?

是的,您正在向 Tokio 发出一个请求,并要求它在开始下一个请求之前完成。您采用了异步代码并强制其顺序执行。

您需要为 react 器提供一个单一的 future ,以执行不同类型的并发工作。

super 0.14

use futures::prelude::*;
use hyper::{body, client::Client};
use std::{
    io::{self, Write},
    iter,
};
use tokio;

const N_CONCURRENT: usize = 1;

#[tokio::main]
async fn main() {
    let client = Client::new();

    let uri = "http://httpbin.org/ip".parse().unwrap();
    let uris = iter::repeat(uri).take(50);

    stream::iter(uris)
        .map(move |uri| client.get(uri))
        .buffer_unordered(N_CONCURRENT)
        .then(|res| async {
            let res = res.expect("Error making request: {}");
            println!("Response: {}", res.status());

            body::to_bytes(res).await.expect("Error reading body")
        })
        .for_each(|body| async move {
            io::stdout().write_all(&body).expect("Error writing body");
        })
        .await;
}

N_CONCURRENT 设置为 1:

real    1.119   1119085us
user    0.012   12021us
sys     0.011   11459us

并设置为 10:

real    0.216   216285us
user    0.014   13596us
sys     0.021   20640us

cargo .toml

[dependencies]
futures = "0.3.17"
hyper = { version = "0.14.13", features = ["client", "http1", "tcp"] }
tokio = { version = "1.12.0", features = ["full"] }

super 0.12

use futures::{stream, Future, Stream}; // 0.1.25
use hyper::Client; // 0.12.23
use std::{
    io::{self, Write},
    iter,
};
use tokio; // 0.1.15

const N_CONCURRENT: usize = 1;

fn main() {
    let client = Client::new();

    let uri = "http://httpbin.org/ip".parse().unwrap();
    let uris = iter::repeat(uri).take(50);

    let work = stream::iter_ok(uris)
        .map(move |uri| client.get(uri))
        .buffer_unordered(N_CONCURRENT)
        .and_then(|res| {
            println!("Response: {}", res.status());
            res.into_body()
                .concat2()
                .map_err(|e| panic!("Error collecting body: {}", e))
        })
        .for_each(|body| {
            io::stdout()
                .write_all(&body)
                .map_err(|e| panic!("Error writing: {}", e))
        })
        .map_err(|e| panic!("Error making request: {}", e));

    tokio::run(work);
}

N_CONCURRENT 设置为 1:

real    0m2.279s
user    0m0.193s
sys     0m0.065s

并设置为 10:

real    0m0.529s
user    0m0.186s
sys     0m0.075s

另见:

关于rust - 使用 Hyper 同时获取多个 URL,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/49087958/

相关文章:

asynchronous - 串的Rust future

enums - 无法更改枚举的一个值,因为它是对不可变变量的重新分配

rust - `flat_map` 如何影响我的代码?

ssl - 如何在 Rust 中使用客户端证书发出请求

rust - 在 super 响应中发送特定数量的字节的最佳方法是什么?

Rust:预期类型 [X],但发现类型 [X]

stream - 如何在 Rust 中的 future 和流之间进行选择?

c - 如何将C结构传递给Rust?

rust - 为什么 Rust 在 LLVM IR 闭包环境中将闭包捕获的 i64 存储为 i64*s?

post - 如何使用带有 hyper 的 multipart/form-data 发布图像?