asynchronous - 控制衍生 future 的数量以产生背压

标签 asynchronous rust future amazon-kinesis

我正在使用 futures-rs powered version of the Rusoto AWS Kinesis library .我需要生成 AWS Kinesis 请求的深层管道以实现高吞吐量,因为 Kinesis 对每个 HTTP 请求有 500 条记录的限制。结合发送请求的 50 毫秒延迟,我需要开始生成许多并发请求。我希望在某个地方创建大约 100 个飞行请求。

鲁索托 put_records函数签名如下所示:

fn put_records(
    &self,
    input: &PutRecordsInput,
) -> RusotoFuture<PutRecordsOutput, PutRecordsError>

RusotoFuture是这样定义的包装器:

/// Future that is returned from all rusoto service APIs.
pub struct RusotoFuture<T, E> {
    inner: Box<Future<Item = T, Error = E> + 'static>,
}

内部Future被包裹但是 RusutoFuture仍然实现 Future::poll() , 所以我相信它与 futures-rs 兼容生态系统。 RusotoFuture提供同步调用:

impl<T, E> RusotoFuture<T, E> {
    /// Blocks the current thread until the future has resolved.
    ///
    /// This is meant to provide a simple way for non-async consumers
    /// to work with rusoto.
    pub fn sync(self) -> Result<T, E> {
        self.wait()
    }
}

我可以发出请求并 sync()它,从 AWS 获取结果。我想创建许多请求,将它们放入某种队列/列表中,然后收集完成的请求。如果请求出错,我需要重新发出请求(这在 Kinesis 中有些正常,尤其是在达到分片吞吐量限制时)。如果请求成功完成,我应该发出一个包含新数据的请求。我可以为每个请求生成一个线程并同步它,但是当我运行异步 IO 线程时,这似乎效率很低。

我试过使用 futures::sync::mpsc::channel 从我的应用程序线程(不是从 Tokio react 器内部运行)但是每当我克隆 tx它生成自己的缓冲区,消除了 send 上的任何类型的背压:

fn kinesis_pipeline(client: DefaultKinesisClient, stream_name: String, num_puts: usize, puts_size: usize) {
    use futures::sync::mpsc::{ channel, spawn };
    use futures::{ Sink, Future, Stream };
    use futures::stream::Sender;
    use rusoto_core::reactor::DEFAULT_REACTOR;

    let client = Arc::new(KinesisClient::simple(Region::UsWest2));
    let data = FauxData::new(); // a data generator for testing

    let (mut tx, mut rx) = channel(1);

    for rec in data {
        tx.clone().send(rec);
    }
}

没有克隆,我有错误:

error[E0382]: use of moved value: `tx`
   --> src/main.rs:150:9
    |
150 |         tx.send(rec);
    |         ^^ value moved here in previous iteration of loop
    |
    = note: move occurs because `tx` has type `futures::sync::mpsc::Sender<rusoto_kinesis::PutRecordsRequestEntry>`, which does not implement the `Copy` trait

我也看过 futures::mpsc::sync::spawn 根据建议,但它需要 rx 的所有权(作为 Stream )并没有解决我的 Copy 问题的 tx导致无限行为。

我希望能得到 channel/spawn使用工作,我将有一个需要 RusotoFuture 的系统s,等待它们完成,然后为我提供一种从我的应用程序线程中获取完成结果的简单方法。

最佳答案

据我所知,您的问题是 channel 不是 Sender 的单个克隆吗?容量加一,就是克隆Sender对于您要发送的每件元素。

您在没有 clone 的情况下看到的错误来自您对 Sink::send 的错误使用界面。与 clone你实际上应该看到警告:

warning: unused `futures::sink::Send` which must be used: futures do nothing unless polled

也就是说:您当前的代码实际上并没有发送任何东西!

为了应用背压,你需要链接那些 send电话;每一个都应该等到前一个完成(你也需要等待最后一个!);成功后你会得到 Sender后退。最好的方法是生成一个 Stream从你的迭代器使用 iter_ok 并将其传递给 send_all .

现在你有了一个 future SendAll你需要“开车”。如果您忽略结果并对错误 (.then(|r| { r.unwrap(); Ok::<(), ()>(()) })) panic ,您可以将其作为单独的任务生成,但也许您想将其集成到您的主应用程序中(即在 Box 中返回它)。

// this returns a `Box<Future<Item = (), Error = ()>>`. you may
// want to use a different error type
Box::new(tx.send_all(iter_ok(data)).map(|_| ()).map_err(|_| ()))

RusotoFuture::syncFuture::wait

不要使用 Future::wait : 它已经在一个分支中被弃用了,它通常不会做你真正想要的。我怀疑RusotoFuture意识到这些问题,所以我建议避免 RusotoFuture::sync .

克隆Sender增加 channel 容量

正如您所说的正确,克隆 Sender将容量增加一。

这似乎是为了提高性能:A Sender以未阻塞(“unparked”)状态开始;如果 Sender没有被阻止它可以发送一个项目而不被阻止。但是,如果队列中的项目数在 Sender 时达到配置的限制。发送一个项目,Sender被阻塞(“停放”)。 (从队列中删除项目将在特定时间解锁 Sender。)

这意味着在内部队列达到限制后每个Sender仍然可以发送一件元素,这会导致容量增加的记录效果,但前提是实际上所有 Sender我们正在发送元素 - 未使用 Sender s 不会增加观察到的容量。

性能提升来自于这样一个事实,即只要您没有达到限制,它就不需要停放和通知任务(这非常繁重)。

mpsc 顶部的私有(private)文档模块描述了更多细节。

关于asynchronous - 控制衍生 future 的数量以产生背压,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/48267479/

相关文章:

c# - 如何处理在一个平台上同步但在另一个平台上异步的代码

Rust:从 Slice 的基准内置排序与编译排序代码相比有 x16 差异?

scala - 如何在Scala中对Future进行投票?

rust - 为什么我的 Future 实现在被轮询一次并且 NotReady 后被阻止?

c++ - C++:在不超时的情况下使用带有超时的future.get

javascript - 了解 JavaScript 中另一个函数的 ajax 调用结果

error-handling - 如何以惯用的 Rust 方式处理来自 libc 函数的错误?

java - ThreadPoolExecutor, future : correlating requests and responses

bash - 异步 bash 脚本

module - 如何从父文件夹模块或兄弟文件夹模块访问模块?