我正在阅读 futures-preview
0.3 源代码以了解如何正确地“通知任何”。在 mpsc::channel
(有界)中,多个发送者可能会等待接收(在缓冲区已满的情况下)。
研究 next_message
的实现和 unpark_one
, 收件人似乎每一张收据只通知一个发件人。
我怀疑这是否适用于 select!
,因为 select!
可能会导致错误通知。但是,我无法产生问题案例。
这是我试图混淆 mpsc
的尝试:
[package]
name = "futures-mpsc-test"
version = "0.1.0"
edition = "2018"
[dependencies]
futures-preview = { version = "0.3.0-alpha.9", features = ["tokio-compat"] }
tokio = "0.1.11"
还有这个:
#![feature(async_await, await_macro, futures_api, pin)]
use std::collections::HashSet;
use futures::prelude::*;
use futures::channel::mpsc::{channel, Sender};
use futures::channel::oneshot;
use futures::select;
async fn main2() {
let channel_len = 1;
let num_false_wait = 1000;
let num_expected_messages = 100;
let (mut send, mut recv) = channel(channel_len);
// One extra capacity per sender. Fill the extras.
await!(send.send(-2)).unwrap();
// Fill buffers
for _ in 0..channel_len {
await!(send.send(-1)).unwrap();
}
// False waits. Should resolve and produce false waiters.
for _ in 0..num_false_wait {
await!(false_wait(&send));
}
// True messages.
{
let mut send = send.clone();
await!(send.send(-2)).unwrap();
tokio::spawn(async move {
for i in 0..num_expected_messages {
await!(send.send(i)).unwrap();
}
Ok(())
}.boxed().compat());
}
// Drain receiver until all true messages are received.
let mut expects = (0..num_expected_messages).collect::<HashSet<_>>();
while !expects.is_empty() {
let i = await!(recv.next()).unwrap();
expects.remove(&i);
eprintln!("Received: {}", i);
}
}
// If `send` is full, it will produce false waits.
async fn false_wait(send: &Sender<i32>) {
let (wait_send, wait_recv) = oneshot::channel();
let mut send = send.clone();
await!(send.send(-2)).unwrap();
tokio::spawn(async move {
let mut sending = send.send(-3);
let mut fallback = future::ready(());
select! {
sending => {
sending.unwrap();
},
fallback => {
eprintln!("future::ready is selected");
},
};
wait_send.send(()).unwrap();
Ok(())
}.boxed().compat());
await!(wait_recv).unwrap();
}
fn main() {
tokio::run(async {
await!(main2());
Ok(())
}.boxed().compat());
}
我希望这会发生:
- 缓冲区由
-1
填充。因此,以后的发件人将被阻止。 - 既有“真服务员”也有“假服务员”。
假服务员已经退出,因为
select!
的另一臂 立即完成。 - 在每次调用
await!(recv.next())
时,最多 一个 等待发件人 通知。如果一个假服务员被通知,没有人可以推到缓冲区, 即使缓冲区有空房间。 - 如果所有元素都在没有真正通知的情况下被耗尽, 整个系统都卡住了。
尽管如我所料,main2
异步函数已成功完成。为什么?
最佳答案
进一步调查 futures
源代码解决了我的问题。终于不能这样混淆mpsc了。
重点是,mpsc
的大小是灵活的,可以增长到比最初指定的更多。此行为是 mentioned in the docs :
The channel's capacity is equal to
buffer + num-senders
. In other words, each sender gets a guaranteed slot in the channel capacity, and on top of that there are buffer "first come, first serve" slots available to all senders.
是的,我在做实验之前先读过这个,但当时我无法弄清楚这个的重要性。
固定缓冲区问题
考虑一个典型的有界队列实现,其中队列的大小不能增长超过最初指定的大小。规范是这样的:
- 当队列为空时,接收者阻塞。
- 当队列已满(即大小达到界限)时,发送者将阻塞。
在这种情况下,如果队列已满,多个发送者正在等待一个资源(队列的大小)。
在多线程编程中,这是通过 notify_one
等原语完成的。然而,在 futures
中,这是容易出错的:与多线程编程不同,通知的任务不一定使用资源,因为任务可能已经放弃获取资源(由于像 select!
这样的构造或 Deadline
) 然后规范就被破坏了(队列未满,但所有事件的发件人都被阻止)。
mpsc
灵活
如上所述,futures::channel::mpsc::channel
的缓冲区大小并不严格。规范总结如下:
- 当
message_queue.len() == 0
时,接收者阻塞。 - 当
message_queue.len() >= buffer
时,发件人可能阻止。 - 当
message_queue.len() >= buffer + num_senders
时,发件人阻止。
在这里,num_senders
基本上是 Sender
的克隆数,但在某些情况下会更多。更准确地说,num_senders
是 SenderTask
的个数。
那么,我们如何避免资源共享呢?我们有额外的状态:
- 每个发送者(
SenderTask
的实例)都有is_parked
bool 状态。 - 该 channel 有另一个名为
parked_queue
的队列,一个Arc
队列引用SenderTask
。
channel 维护以下不变量:
-
message_queue.len() <= buffer + num_parked_senders
。请注意,我们不知道num_parked_senders
的值。 -
parked_queue.len() == min(0, message_queue.len() - buffer)
- 每个驻留的发件人在
parked_queue
中至少有一条消息。
这是通过以下算法完成的:
- 对于接收,
- 它从
SenderTask
弹出一个parked_queue
,如果发件人已停放,则取消停放。
- 它从
- 对于发送,
- 它总是等待
is_parked
变为false
。如果message_queue.len() < buffer
为parked_queue.len() == 0
,则所有发件人都未停放。因此,我们可以保证在这种情况下取得进展。 - 如果
is_parked
是false
,无论如何将消息推送到队列。 - 在那之后,如果是
message_queue.len() <= buffer
,它不需要做任何进一步的事情。 - 如果是
message_queue.len() > buffer
,发送者将被解除停放并推送到parked_queue
。
- 它总是等待
您可以轻松地检查上述算法中是否保持了不变量。
令人惊讶的是,发送者不再等待共享资源。相反,发送方等待其 is_parked
状态。即使发送任务在完成之前被丢弃,它也只是在 parked_queue
中停留一段时间,不会阻塞任何东西。多聪明啊!
关于concurrency - 为什么 `futures::channel::mpsc`只能通知一个发送者?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/53245906/