我正在使用 futures-rs powered version of the Rusoto AWS Kinesis library .我需要生成 AWS Kinesis 请求的深层管道以实现高吞吐量,因为 Kinesis 对每个 HTTP 请求有 500 条记录的限制。结合发送请求的 50 毫秒延迟,我需要开始生成许多并发请求。我希望在某个地方创建大约 100 个飞行请求。
鲁索托 put_records
函数签名如下所示:
fn put_records(
&self,
input: &PutRecordsInput,
) -> RusotoFuture<PutRecordsOutput, PutRecordsError>
RusotoFuture
是这样定义的包装器:
/// Future that is returned from all rusoto service APIs.
pub struct RusotoFuture<T, E> {
inner: Box<Future<Item = T, Error = E> + 'static>,
}
内部Future
被包裹但是 RusutoFuture
仍然实现 Future::poll()
, 所以我相信它与 futures-rs
兼容生态系统。 RusotoFuture
提供同步调用:
impl<T, E> RusotoFuture<T, E> {
/// Blocks the current thread until the future has resolved.
///
/// This is meant to provide a simple way for non-async consumers
/// to work with rusoto.
pub fn sync(self) -> Result<T, E> {
self.wait()
}
}
我可以发出请求并 sync()
它,从 AWS 获取结果。我想创建许多请求,将它们放入某种队列/列表中,然后收集完成的请求。如果请求出错,我需要重新发出请求(这在 Kinesis 中有些正常,尤其是在达到分片吞吐量限制时)。如果请求成功完成,我应该发出一个包含新数据的请求。我可以为每个请求生成一个线程并同步它,但是当我运行异步 IO 线程时,这似乎效率很低。
我试过使用 futures::sync::mpsc::channel
从我的应用程序线程(不是从 Tokio react 器内部运行)但是每当我克隆 tx
它生成自己的缓冲区,消除了 send
上的任何类型的背压:
fn kinesis_pipeline(client: DefaultKinesisClient, stream_name: String, num_puts: usize, puts_size: usize) {
use futures::sync::mpsc::{ channel, spawn };
use futures::{ Sink, Future, Stream };
use futures::stream::Sender;
use rusoto_core::reactor::DEFAULT_REACTOR;
let client = Arc::new(KinesisClient::simple(Region::UsWest2));
let data = FauxData::new(); // a data generator for testing
let (mut tx, mut rx) = channel(1);
for rec in data {
tx.clone().send(rec);
}
}
没有克隆,我有错误:
error[E0382]: use of moved value: `tx`
--> src/main.rs:150:9
|
150 | tx.send(rec);
| ^^ value moved here in previous iteration of loop
|
= note: move occurs because `tx` has type `futures::sync::mpsc::Sender<rusoto_kinesis::PutRecordsRequestEntry>`, which does not implement the `Copy` trait
我也看过 futures::mpsc::sync::spawn
根据建议,但它需要 rx
的所有权(作为 Stream
)并没有解决我的 Copy
问题的 tx
导致无限行为。
我希望能得到 channel
/spawn
使用工作,我将有一个需要 RusotoFuture
的系统s,等待它们完成,然后为我提供一种从我的应用程序线程中获取完成结果的简单方法。
最佳答案
据我所知,您的问题是 channel
不是 Sender
的单个克隆吗?容量加一,就是克隆Sender
对于您要发送的每件元素。
您在没有 clone
的情况下看到的错误来自您对 Sink::send
的错误使用界面。与 clone
你实际上应该看到警告:
warning: unused `futures::sink::Send` which must be used: futures do nothing unless polled
也就是说:您当前的代码实际上并没有发送任何东西!
为了应用背压,你需要链接那些 send
电话;每一个都应该等到前一个完成(你也需要等待最后一个!);成功后你会得到 Sender
后退。最好的方法是生成一个 Stream
从你的迭代器使用 iter_ok
并将其传递给 send_all
.
现在你有了一个 future SendAll
你需要“开车”。如果您忽略结果并对错误 (.then(|r| { r.unwrap(); Ok::<(), ()>(()) })
) panic ,您可以将其作为单独的任务生成,但也许您想将其集成到您的主应用程序中(即在 Box
中返回它)。
// this returns a `Box<Future<Item = (), Error = ()>>`. you may
// want to use a different error type
Box::new(tx.send_all(iter_ok(data)).map(|_| ()).map_err(|_| ()))
RusotoFuture::sync
和 Future::wait
不要使用 Future::wait
: 它已经在一个分支中被弃用了,它通常不会做你真正想要的。我怀疑RusotoFuture
意识到这些问题,所以我建议避免 RusotoFuture::sync
.
克隆Sender
增加 channel 容量
正如您所说的正确,克隆 Sender
将容量增加一。
这似乎是为了提高性能:A Sender
以未阻塞(“unparked”)状态开始;如果 Sender
没有被阻止它可以发送一个项目而不被阻止。但是,如果队列中的项目数在 Sender
时达到配置的限制。发送一个项目,Sender
被阻塞(“停放”)。 (从队列中删除项目将在特定时间解锁 Sender
。)
这意味着在内部队列达到限制后每个Sender
仍然可以发送一件元素,这会导致容量增加的记录效果,但前提是实际上所有 Sender
我们正在发送元素 - 未使用 Sender
s 不会增加观察到的容量。
性能提升来自于这样一个事实,即只要您没有达到限制,它就不需要停放和通知任务(这非常繁重)。
mpsc
顶部的私有(private)文档模块描述了更多细节。
关于asynchronous - 控制衍生 future 的数量以产生背压,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/48267479/