rust - 为什么即使在未来解决之后,在循环中使用带有 Tokio future 的克隆 super 客户端也会阻塞?

标签 rust rust-tokio hyper

我有一个以固定时间间隔更新缓存数据的服务。每 N 秒它会使用循环( tokio::run(future_update(http_client.clone())) )触发 future ,但它不会返回到 future 解决的父函数。循环阻塞,我只得到一次迭代。

当我创建一个新的超 HTTP 客户端而不是传递一个克隆的客户端时,一切正常。它不工作 Arc<Client>要么。

pub fn trigger_cache_reload(http_client: Arc<Client<HttpConnector, Body>>) {
    let load_interval_sec = get_load_interval_sec(conf.load_interval_seconds.clone());

    std::thread::spawn(move || loop {
        let http_client = http_client.clone();

        info!("Woke up");
        tokio::run(pipeline(http_client));
        info!(
            "Pipeline run complete. Huuhh Now I need sleep of {} secs. Sleeping",
            load_interval_sec
        );
        std::thread::sleep(std::time::Duration::from_secs(load_interval_sec));
    });
}

fn pipeline(
    client: Arc<Client<HttpConnector, Body>>,
) -> Box<dyn Future<Item = (), Error = ()> + Send> {
    let res = fetch_message_payload() //return type of this call is Box<dyn Future<Item = (), Error = Error> + Send>
        .map_err(Error::from)
        .and_then(|_| {
            //let client = hyper::Client::builder().max_idle_per_host(1).build_http();
            //if i create new client here every time and use it then all working is fine.
            refresh_cache(client) //return type of this call is Box<dyn Future<Item = (), Error = Error> + Send>
                .map_err(Error::from)
                .and_then(|arg| {
                    debug!("refresh_cache completed");
                    Ok(arg)
                })
        });

    let res = res.or_else(|e| {
        error!("error {:?}", e);
        Ok(())
    });
    Box::new(res)
}

调用trigger_cache_reload后有一次,我得到 "woke up"日志消息。我也得到 "refresh_cache completed"在未来成功完成一段时间后记录消息。我没有得到 "sleeping"带或不带 Arc 的日志消息.

如果我每次都在未来创建一个新客户端,我就能得到 "sleeping"记录消息。

最佳答案

tokio::run 每次调用它时都会创建一个全新的事件循环和线程池(reactor + executor)。这真的不是你想要做的。

super 客户端会将其状态绑定(bind)到先前的事件循环,并且如果轮询新的事件循环则无法取得进展,因为旧的事件循环将在 run 完成后被销毁。这就是为什么新客户端可以工作,但您不能重复使用旧客户端。

这里有两种解决方案:

  • 如果您的应用程序的其余部分不使用 tokio,我将只使用同步 reqwest::Client .如果您不需要大量并发,同步解决方案在这里会容易得多。

  • 如果您使用的是 tokio,请使用 tokio::spawn在另一个 Future 中与 tokio_timer::Timeout 一起运行检查,然后在事件循环中等待指定的时间量。

异步/等待示例

新的 async/await 支持使得这样的代码更容易编写。

这个例子目前只适用于 nightly 编译器 tokio-0.3.0-alpha.2 和当前的 hyper master 分支:

[dependencies]
tokio = "0.3.0-alpha.2"
tokio-timer = "0.3.0-alpha.2"
hyper = { git = "https://github.com/hyperium/hyper.git" }
use tokio::timer::Interval;
use hyper::{Client, Uri};

use std::time::Duration;

#[tokio::main]
async fn main() {
    let client = Client::new();
    let second_interval = 120;
    let mut interval = Interval::new_interval(Duration::from_secs(second_interval));
    let uri = Uri::from_static("http://httpbin.org/ip");

    loop {
        let res = Client.get(uri.clone()).await.unwrap();
        // Do what you need to with the response...
        interval.next().await;
    }
}

关于rust - 为什么即使在未来解决之后,在循环中使用带有 Tokio future 的克隆 super 客户端也会阻塞?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/57589869/

相关文章:

rust - 处理 std::borrow::Cow 的集合

generics - 如何初始化常量泛型数组?

rust - 使用 tokio-tungstenite 时如何获取 header ?

asynchronous - Tokio react 器是否轮询每个组合器之间所有可能的 poll() 函数?

rust - 如何从 FuturesUnordered 返回错误?

rust - 创建具有自定义错误类型的 super 服务

json - 使用 serde_json 解析对象内部的对象

rust - 如何在立即丢弃向量时将值移出向量?

rust - 如何用ndarray初始化一个常量矩阵?

rust - 如何在 Hyper 处理程序之间共享 HashMap?