redis - 如何在异步 tokio 运行时中将 future::join_all 与多路复用的 redis 一起使用

标签 redis rust async-await rust-tokio

我正在尝试使用 Rust redis client在异步复用模式下,用tokio作为异步运行时,以及要加入的动态数量的 future 。

我在一定数量的 future 上使用 future::join3 取得了成功,但我想多路复用更多命令(编译时不必知 Prop 体大小,但即使那将是一个改进)。

这是使用 future::join3 时的工作示例;该示例正确打印 好的(一些("PONG")) 好的(一些("PONG")) 好的(一些("PONG"))

Cargo.toml

[package]
name = "redis_sample"
version = "0.1.0"
authors = ["---"]
edition = "2018"


[dependencies]
redis = { version = "0.17.0", features = ["aio", "tokio-comp", "tokio-rt-core"] }
tokio = { version = "0.2.23", features = ["full"] }
futures = "0.3.8"

src/main.rs

use futures::future;
use redis::RedisResult;

#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
    let redis_client = redis::Client::open("redis://127.0.0.1:6379")?;
    let mut redis_connection = redis_client.get_multiplexed_tokio_connection().await?;

    let results: (RedisResult<Option<String>>, RedisResult<Option<String>>, RedisResult<Option<String>>) = future::join3(
        redis::cmd("PING").query_async(&mut redis_connection.clone()),
        redis::cmd("PING").query_async(&mut redis_connection.clone()),
        redis::cmd("PING").query_async(&mut redis_connection),
    ).await;

    println!("{:?} {:?} {:?}", results.0, results.1, results.2);

    Ok(())
}

现在我想做同样的事情,但是使用 n 命令(比如说 10,但理想情况下我想将其调整为生产性能)。这是我所能得到的,但我无法克服借用规则;我尝试在 Vec 中存储一些中介(redis Cmd 或 future 本身)以延长它们的生命,但这还有其他问题(有多个 mut 引用)。

Cargo.toml 是一样的;这是 main.rs

use futures::{future, Future};
use std::pin::Pin;
use redis::RedisResult;

const BATCH_SIZE: usize = 10;

#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
    let redis_client = redis::Client::open("redis://127.0.0.1:6379")?;
    let redis_connection = redis_client.get_multiplexed_tokio_connection().await?;

    let mut commands: Vec<Pin<Box<dyn Future<Output = RedisResult<Option<String>>>>>> = vec![];
    for _ in 0..BATCH_SIZE {
        commands.push(Box::pin(redis::cmd("PING").query_async(& mut redis_connection.clone())));
    }
    let results = future::join_all(commands).await;

    println!("{:?}", results);

    Ok(())
}

我收到了两个编译器警告(创建了一个在使用中被释放的临时文件),我不知道如何继续使用这段代码。我不是 100% 迷上了使用 Pin,但没有它我什至无法存储 future 。

完整的编译器输出:

   Compiling redis_sample v0.1.0 (/Users/gyfis/Documents/programming/rust/redis_sample)
error[E0716]: temporary value dropped while borrowed
  --> redis_sample/src/main.rs:14:32
   |
14 |         commands.push(Box::pin(redis::cmd("PING").query_async(& mut redis_connection.clone())));
   |                                ^^^^^^^^^^^^^^^^^^                                              - temporary value is freed at the end of this statement
   |                                |
   |                                creates a temporary which is freed while still in use
...
21 | }
   | - borrow might be used here, when `commands` is dropped and runs the `Drop` code for type `std::vec::Vec`
   |
   = note: consider using a `let` binding to create a longer lived value

error[E0716]: temporary value dropped while borrowed
  --> redis_sample/src/main.rs:14:69
   |
14 |         commands.push(Box::pin(redis::cmd("PING").query_async(& mut redis_connection.clone())));
   |                                                                     ^^^^^^^^^^^^^^^^^^^^^^^^   - temporary value is freed at the end of this statement
   |                                                                     |
   |                                                                     creates a temporary which is freed while still in use
...
21 | }
   | - borrow might be used here, when `commands` is dropped and runs the `Drop` code for type `std::vec::Vec`
   |
   = note: consider using a `let` binding to create a longer lived value

error: aborting due to 2 previous errors

For more information about this error, try `rustc --explain E0716`.
error: could not compile `redis_sample`.

感谢任何帮助!

最佳答案

这应该可行,我只是延长了 redis_connection 的生命周期。

use futures::{future, Future};
use std::pin::Pin;
use redis::RedisResult;

const BATCH_SIZE: usize = 10;

#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
    let redis_client = redis::Client::open("redis://127.0.0.1:6379")?;
    let redis_connection = redis_client.get_multiplexed_tokio_connection().await?;

    let mut commands: Vec<Pin<Box<dyn Future<Output = RedisResult<Option<String>>>>>> = vec![];
    for _ in 0..BATCH_SIZE {
        let mut redis_connection = redis_connection.clone();
        commands.push(Box::pin(async move {
            redis::cmd("PING").query_async(&mut redis_connection).await
        }));
    }
    let results = future::join_all(commands).await;

    println!("{:?}", results);

    Ok(())
}

因为你在一个函数体内,你甚至不需要装箱 futures,类型推断可以完成所有工作:

use futures::future;
use redis::RedisResult;

const BATCH_SIZE: usize = 10;

#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
    let redis_client = redis::Client::open("redis://127.0.0.1:6379")?;
    let redis_connection = redis_client.get_multiplexed_tokio_connection().await?;

    let mut commands = vec![];
    for _ in 0..BATCH_SIZE {
        let mut redis_connection = redis_connection.clone();
        commands.push(async move {
            redis::cmd("PING").query_async::<_, Option<String>>(&mut redis_connection).await
        });
    }
    let results = future::join_all(commands).await;

    println!("{:?}", results);

    Ok(())
}

关于redis - 如何在异步 tokio 运行时中将 future::join_all 与多路复用的 redis 一起使用,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/65380313/

相关文章:

rust - 如何同步返回在异步 Future 中计算的值?

c# - 异步/等待与线程场景

c# - PostSubmitter 的异步 CTP

rust - 是否可以返回或什么都不退?

rust - Rust 生命周期会影响编译程序的语义吗?

python - 获取带分数的zrange

python - Redis - 错误 : value is not a valid float

redis-cli 删除键模式不删除记录

ruby-on-rails - Sidekiq 作业已排队但未在 Heroku 上处理

rust - 为什么 `use std::{ self, ... };` 不编译?