rust - 为什么从 Actix Web 处理程序中的 Rusoto S3 流读取会导致死锁?

标签 rust actix-web rusoto

我正在使用 actix_webrusoto_s3 编写应用程序。

当我直接从 main 运行 actix 请求之外的命令时,它运行良好,并且 get_object 按预期工作。当它被封装在 actix_web 请求中时,流将永远被阻塞。

我有一个为所有请求共享的客户端,它被封装到 Arc 中(这发生在 actix 数据内部)。

完整代码:

fn index(
    _req: HttpRequest,
    path: web::Path<String>,
    s3: web::Data<S3Client>,
) -> impl Future<Item = HttpResponse, Error = actix_web::Error> {
    s3.get_object(GetObjectRequest {
        bucket: "my_bucket".to_owned(),
        key: path.to_owned(),
        ..Default::default()
    })
    .and_then(move |res| {
        info!("Response {:?}", res);
        let mut stream = res.body.unwrap().into_blocking_read();
        let mut body = Vec::new();
        stream.read_to_end(&mut body).unwrap();
        match process_file(body.as_slice()) {
            Ok(result) => Ok(result),
            Err(error) => Err(RusotoError::from(error)),
        }
    })
    .map_err(|e| match e {
        RusotoError::Service(GetObjectError::NoSuchKey(key)) => {
            actix_web::error::ErrorNotFound(format!("{} not found", key))
        }
        error => {
            error!("Error: {:?}", error);
            actix_web::error::ErrorInternalServerError("error")
        }
    })
    .from_err()
    .and_then(move |img| HttpResponse::Ok().body(Body::from(img)))
}

fn health() -> HttpResponse {
    HttpResponse::Ok().finish()
}

fn main() -> std::io::Result<()> {
    let name = "rust_s3_test";
    env::set_var("RUST_LOG", "debug");
    pretty_env_logger::init();
    let sys = actix_rt::System::builder().stop_on_panic(true).build();
    let prometheus = PrometheusMetrics::new(name, "/metrics");
    let s3 = S3Client::new(Region::Custom {
        name: "eu-west-1".to_owned(),
        endpoint: "http://localhost:9000".to_owned(),
    });
    let s3_client_data = web::Data::new(s3);

    Server::build()
        .bind(name, "0.0.0.0:8080", move || {
            HttpService::build().keep_alive(KeepAlive::Os).h1(App::new()
                .register_data(s3_client_data.clone())
                .wrap(prometheus.clone())
                .wrap(actix_web::middleware::Logger::default())
                .service(web::resource("/health").route(web::get().to(health)))
                .service(web::resource("/{file_name}").route(web::get().to_async(index))))
        })?
        .start();
    sys.run()
}

stream.read_to_end 中,线程被阻塞并且永远不会被解析。

我尝试过为每个请求克隆客户端并为每个请求创建一个新客户端,但我在所有情况下都得到了相同的结果。

我做错了什么吗?

如果我不异步使用它,它会工作...

s3.get_object(GetObjectRequest {
    bucket: "my_bucket".to_owned(),
    key: path.to_owned(),
    ..Default::default()
})
.sync()
.unwrap()
.body
.unwrap()
.into_blocking_read();
let mut body = Vec::new();
io::copy(&mut stream, &mut body);

这是 Tokio 的问题吗?

最佳答案

let mut stream = res.body.unwrap().into_blocking_read();

检查 implementation of into_blocking_read() : 它调用 .wait()。您不应在 Future 中调用阻塞代码。

由于 Rusoto 的 body 是一个 Stream,所以有一种方法可以异步读取它:

.and_then(move |res| {
    info!("Response {:?}", res);
    let stream = res.body.unwrap();

    stream.concat2().map(move |file| {
        process_file(&file[..]).unwrap()
    })
    .map_err(|e| RusotoError::from(e)))
})

process_file 不应阻塞封闭的 Future。如果需要阻塞,可以考虑在新线程上运行或者用tokio_threadpool's blocking封装.

注意:您可以在您的实现中使用 tokio_threadpool 的blocking,但我建议您先了解它是如何工作的。


如果你不打算将整个文件加载到内存中,你可以使用for_each:

stream.for_each(|part| {
    //process each part in here 
    //Warning! Do not add blocking code here either.
})

另见:

关于rust - 为什么从 Actix Web 处理程序中的 Rusoto S3 流读取会导致死锁?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/56846492/

相关文章:

rust - 如何访问数组类型结构的元素

types - 什么时候需要使用类型注解?

rust - 在 actix-web 服务器和异步闭包之间共享状态

rust - 如何在 actix-web 路由中缓存或内存数据?

rust - 如何将 `futures::Stream` 包装到任何实现 `Write` 的东西中?

rust - 在删除 Rust Future 时 panic 运行异步代码

closures - 如何在 FnMut 上下文中使用盒装闭包?

error-handling - 对于没有返回值的语句,忽略错误的正确方法是什么?

rust - 尽管已实现,但尚未实现特征Serialize?

amazon-s3 - 使用 rusoto 将字符串上传到 S3