rust - 将异步函数传递给另一段代码(无法满足编译器的要求)

标签 rust async-await

我目前正在跨线程发送闭包/函数。

这对于同步功能非常适用。

我特别路过pub type WSMethod<T> = Box<dyn Fn(WSReq, PgConn, &mut WSConnections<T>, Uuid) -> Result<String, BoxError> + Send + Sync>;
发送示例函数

pub fn update_league(req: WSReq, conn: PgConn, _: &mut WSConnections_, _: Uuid) -> Result<String, BoxError>{
    let deserialized = serde_json::from_value(req.data)?;
    let league = db::update_league(&conn, deserialized)?;
    let resp_msg = WSMsgOut::resp(req.message_id, req.method, league);
    serde_json::to_string(&resp_msg).map_err(|e| e.into())
}

但是现在我想切换到发送异步功能,

IE。
pub async fn upsert_competitions(req: WSReq, conn: PgConn, ws_conns: &mut WSConnections_, user_ws_id: Uuid) -> Result<String, BoxError>{
    let deserialized: Vec<NewCompetition> = serde_json::from_value(req.data)?;
    let competitions_out= db::upsert_competitions(&conn, deserialized.into_iter().map(transform_from).collect_vec())?;
    if let Some(ws_user) = ws_conns.lock().await.get_mut(&user_ws_id){
        sub_to_competitions(ws_user, competitions_out.iter().map(|c| &c.competition_id)).await;
    }
    publish_competitions(ws_conns, &competitions_out).await;
    let resp_msg = WSMsgOut::resp(req.message_id, req.method, competitions_out);
    serde_json::to_string(&resp_msg).map_err(|e| e.into())
}

这是完全相同的函数签名,只是异步的。

在将函数装箱以便可以将它们发送到的地方,出现此错误
Box::new(upsert_competitions))
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ expected enum `std::result::Result`, found opaque type

满的:
288 | pub async fn upsert_competitions(req: WSReq, conn: PgConn, ws_conns: &mut WSConnections_, user_ws_id: Uuid) -> Result<String, BoxError>{
    |                                                                                                                ------------------------ the `Output` of this `async fn`'s found opaque type
    |
    = note:     expected enum `std::result::Result<std::string::String, std::boxed::Box<dyn std::error::Error + std::marker::Send + std::marker::Sync>>`
            found opaque type `impl core::future::future::Future`
    = note: required for the cast to the object type `dyn for<'r> std::ops::Fn(warp_ws_server::WSReq, diesel::r2d2::PooledConnection<diesel::r2d2::ConnectionManager<diesel::PgConnection>>, &'r mut std::sync::Arc<tokio::sync::mutex::Mutex<std::collections::HashMap<uuid::Uuid, warp_ws_server::WSConnection<subscriptions::Subscriptions>>>>, uuid::Uuid) -> std::result::Result<std::string::String, std::boxed::Box<dyn std::error::Error + std::marker::Send + std::marker::Sync>> + std::marker::Send + std::marker::Sync`

我尝试将.await附加到传递的方法的调用站点method(req, conn, ws_conns, user_ws_id).await上。

由于未为Future实现Result,因此这会导致编译器错误。
所以

我将类型更改为:Box<dyn Fn(WSReq, PgConn, &mut WSConnections<T>, Uuid) -> Result<String, BoxError> + Send + Sync>-> Box<dyn (Fn(WSReq, PgConn, &mut WSConnections<T>, Uuid) -> Future<Output=Result<String, BoxError>>) + Send + Sync>
它提示 future 的大小,因此我将Future装箱,然后是另一个错误(请参阅取消固定),因此我将错误固定。

最终导致Box<dyn (Fn(WSReq, PgConn, &mut WSConnections<T>, Uuid) -> Pin<Box<dyn Future<Output=Result<String, BoxError>> + Send + Sync >>) + Send + Sync>
现在的错误是
Box::new(upsert_competitions)^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ expected struct `std::pin::Pin`, found opaque type
expected struct `std::pin::Pin<std::boxed::Box<dyn core::future::future::Future<Output = std::result::Result<std::string::String, std::boxed::Box<dyn std::error::Error + std::marker::Send + std::marker::Sync>>> + std::marker::Send + std::marker::Sync>>`
            found opaque type `impl core::future::future::Future

我不知道该怎么走。
我不认为我应该固定/装箱函数结果,我想固定/装箱调用函数时返回的将来,但是我不认为我可以做到这一点,

因为我肯定希望在我调用func时创建/固定 future ,而不是更早。

我也尝试过类似的东西

基于上述错误的Box::new(Pin::new(Box::new(upsert_competitions))))

它给了我期望的Fn<blah> ....而不是Pin<Box<....
完整的最新代码的来源:

CLosure type-def

closure being successfully passed as a regular function

closure being unsuccessfully passed as an async func

closure being called

编辑:

最新更新(进行了错误处理)
pub fn upsert_competitions(req: WSReq, conn: PgConn, ws_conns: &mut WSConnections_, user_ws_id: Uuid) -> Pin<Box<dyn Future<Output=Result<String, BoxError>> + Send + Sync>>{
    async fn hmmm(req: WSReq, conn: PgConn, ws_conns: &mut WSConnections_, user_ws_id: Uuid) -> Result<String, BoxError>{
        let deserialized: Vec<NewCompetition> = serde_json::from_value(req.data).expect("fuck");
        println!("{:?}", &deserialized);
        let competitions_out= db::upsert_competitions(&conn, deserialized.into_iter().map(transform_from).collect_vec()).expect("fuck");
        // assume anything upserted the user wants to subscribe to
        if let Some(ws_user) = ws_conns.lock().await.get_mut(&user_ws_id){
            sub_to_competitions(ws_user, competitions_out.iter().map(|c| &c.competition_id)).await;
        }
        // TODO ideally would return response before awaiting publishing going out
        publish_competitions(ws_conns, &competitions_out).await;
        println!("{:?}", &competitions_out);
        let resp_msg = WSMsgOut::resp(req.message_id, req.method, competitions_out);
        let out = serde_json::to_string(&resp_msg).map_err(|e| e.into());
        out
    }
    Box::pin(hmmm(req, conn, ws_conns, user_ws_id))
}
305 |     Box::pin(hmmm(req, conn, ws_conns, user_ws_id))
    |     ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ future returned by 
`hmmm` is not `Sync`

所以现在只需要弄清楚如何使这个 future 同步
note: future is not `Sync` as this value is used across an await

给我很好的线索
299 |         publish_competitions(ws_conns, &competitions_out).await;
    |         ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ await

发生在此处,稍后可能会使用conn
得出结论,我必须在内部异步函数之外继续使用conn,而不是在等待期间使用。

在等待期间固定变量后,我现在到达
error[E0621]: explicit lifetime required in the type of `ws_conns`
   --> src/handlers.rs:305:5
    |
289 | pub fn upsert_competitions(req: WSReq, conn: PgConn, ws_conns: &mut WSConnections_, user_ws_id: Uuid) -> Pin<Box<dyn Future<Output=Result<String, BoxError>> + Send + Sync>>{
    |                                                                ------------------- help: add explicit lifetime `'static` to the type of `ws_conns`: `&'static mut std::sync::Arc<tokio::sync::mutex::Mutex<std::collections::HashMap<uuid::Uuid, warp_ws_server::WSConnection<subscriptions::Subscriptions>>>>`
...
305 |     Box::pin(hmmm(req, competitions_out, ws_conns, user_ws_id))
    |     ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ lifetime `'static` required

尝试进行&'static引用,但最终我指出了哪里不对。

我还尝试使用upsert_competitions<U: lock_api::RawMutex + 'static>泛型类型来代替,

但是,对于lock_api::mutex::RawMutex而言,未实现获取特征std::sync::Arc<tokio::sync::mutex::Mutex<std::collections::HashMap<uuid::Uuid, warp_ws_server::WSConnection<subscriptions::Subscriptions>>>>的功能

我需要找到一个实现.lock()的U,但这也是Arc实现的一个特征。

最佳答案

异步函数的返回类型在转换为Fn时包装在Future中,而不是固定的Future中,因为您只需将其固定即可开始轮询。从一开始就创建固定的 future 将使从多个异步功能构建组合的 future 的过程效率降低且更加复杂。因此正确的类型是pub type WSMethod<T> = Box<dyn Fn(WSReq, PgConn, &mut WSConnections<T>, Uuid) -> [[UNNAMED TYPE implementing Future]]<Result<String, BoxError> + Send + Sync>>;,但是您不能命名该类型[[[UNNAMED TYPE实现Future]],因此您需要手动将其装箱。最简单的方法是将来使用FutureExt中的boxed方法。

因此,您需要将更改类型为Box<dyn (Fn(WSReq, PgConn, &mut WSConnections<T>, Uuid) -> Pin<Box<dyn Future<Output=Result<String, BoxError>> + Send + Sync >>) + Send + Sync>的方法与将对方法的引用替换为Box::new(|req, conn, connections, uuid| upsert_competitions(req, conn, connections, uuid).boxed())的方法结合起来

关于rust - 将异步函数传递给另一段代码(无法满足编译器的要求),我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/61580611/

相关文章:

Angular 6 Material - 等待 Mat Dialog 关闭

javascript - TypeScript:异步函数中的参数关键字

c# - 异步读取文件的正确方法

asynchronous - 在Dart中以同步方式运行异步代码

rust - 了解 Rust 中 "super"关键字的具体用法

rust - 是否可以从子结构调用父结构的方法?

rust - Rust 在哪里存储所有这些字节?

rust - 为什么特质不能 self 构建?

rust - 在子类型上获取 Rc<RefCell<dyn T>>>

javascript - 如何将异步迭代器转换为数组?