concurrency - 为什么 `futures::channel::mpsc`只能通知一个发送者?

标签 concurrency rust rust-tokio

我正在阅读 futures-preview 0.3 源代码以了解如何正确地“通知任何”。在 mpsc::channel(有界)中,多个发送者可能会等待接收(在缓冲区已满的情况下)。

研究 next_message 的实现和 unpark_one , 收件人似乎每一张收据只通知一个发件人。

我怀疑这是否适用于 select! ,因为 select! 可能会导致错误通知。但是,我无法产生问题案例。

这是我试图混淆 mpsc 的尝试:

[package]
name = "futures-mpsc-test"
version = "0.1.0"
edition = "2018"

[dependencies]
futures-preview = { version = "0.3.0-alpha.9", features = ["tokio-compat"] }
tokio = "0.1.11"

还有这个:

#![feature(async_await, await_macro, futures_api, pin)]

use std::collections::HashSet;

use futures::prelude::*;

use futures::channel::mpsc::{channel, Sender};
use futures::channel::oneshot;
use futures::select;

async fn main2() {
    let channel_len = 1;
    let num_false_wait = 1000;
    let num_expected_messages = 100;

    let (mut send, mut recv) = channel(channel_len);
    // One extra capacity per sender. Fill the extras.
    await!(send.send(-2)).unwrap();

    // Fill buffers
    for _ in 0..channel_len {
        await!(send.send(-1)).unwrap();
    }

    // False waits. Should resolve and produce false waiters.
    for _ in 0..num_false_wait {
        await!(false_wait(&send));
    }

    // True messages.
    {
        let mut send = send.clone();
        await!(send.send(-2)).unwrap();
        tokio::spawn(async move {
            for i in 0..num_expected_messages {
                await!(send.send(i)).unwrap();
            }
            Ok(())
        }.boxed().compat());
    }

    // Drain receiver until all true messages are received.
    let mut expects = (0..num_expected_messages).collect::<HashSet<_>>();
    while !expects.is_empty() {
        let i = await!(recv.next()).unwrap();
        expects.remove(&i);
        eprintln!("Received: {}", i);
    }
}

// If `send` is full, it will produce false waits.
async fn false_wait(send: &Sender<i32>) {
    let (wait_send, wait_recv) = oneshot::channel();
    let mut send = send.clone();
    await!(send.send(-2)).unwrap();
    tokio::spawn(async move {
        let mut sending = send.send(-3);
        let mut fallback = future::ready(());
        select! {
            sending => {
                sending.unwrap();
            },
            fallback => {
                eprintln!("future::ready is selected");
            },
        };
        wait_send.send(()).unwrap();
        Ok(())
    }.boxed().compat());
    await!(wait_recv).unwrap();
}

fn main() {
    tokio::run(async {
        await!(main2());
        Ok(())
    }.boxed().compat());
}

我希望这会发生:

  1. 缓冲区由-1填充。因此,以后的发件人将被阻止。
  2. 既有“真服务员”也有“假服务员”。 假服务员已经退出,因为 select! 的另一臂 立即完成。
  3. 在每次调用 await!(recv.next()) 时,最多 一个 等待发件人 通知。如果一个假服务员被通知,没有人可以推到缓冲区, 即使缓冲区有空房间。
  4. 如果所有元素都在没有真正通知的情况下被耗尽, 整个系统都卡住了。

尽管如我所料,main2 异步函数已成功完成。为什么?

最佳答案

进一步调查 futures 源代码解决了我的问题。终于不能这样混淆mpsc了。

重点是,mpsc 的大小是灵活的,可以增长到比最初指定的更多。此行为是 mentioned in the docs :

The channel's capacity is equal to buffer + num-senders. In other words, each sender gets a guaranteed slot in the channel capacity, and on top of that there are buffer "first come, first serve" slots available to all senders.

是的,我在做实验之前先读过这个,但当时我无法弄清楚这个的重要性。

固定缓冲区问题

考虑一个典型的有界队列实现,其中队列的大小不能增长超过最初指定的大小。规范是这样的:

  • 当队列为空时,接收者阻塞。
  • 当队列已满(即大小达到界限)时,发送者将阻塞。

在这种情况下,如果队列已满,多个发送者正在等待一个资源(队列的大小)。

在多线程编程中,这是通过 notify_one 等原语完成的。然而,在 futures 中,这是容易出错的:与多线程编程不同,通知的任务不一定使用资源,因为任务可能已经放弃获取资源(由于像 select! 这样的构造或 Deadline ) 然后规范就被破坏了(队列未满,但所有事件的发件人都被阻止)。

mpsc 灵活

如上所述,futures::channel::mpsc::channel 的缓冲区大小并不严格。规范总结如下:

  • message_queue.len() == 0 时,接收者阻塞。
  • message_queue.len() >= buffer 时,发件人可能阻止。
  • message_queue.len() >= buffer + num_senders 时,发件人阻止。

在这里,num_senders 基本上Sender 的克隆数,但在某些情况下会更多。更准确地说,num_sendersSenderTask 的个数。

那么,我们如何避免资源共享呢?我们有额外的状态:

  • 每个发送者(SenderTask 的实例)都有 is_parked bool 状态。
  • 该 channel 有另一个名为 parked_queue 的队列,一个 Arc 队列引用 SenderTask

channel 维护以下不变量:

  • message_queue.len() <= buffer + num_parked_senders 。请注意,我们不知道 num_parked_senders 的值。
  • parked_queue.len() == min(0, message_queue.len() - buffer)
  • 每个驻留的发件人在 parked_queue 中至少有一条消息。

这是通过以下算法完成的:

  • 对于接收,
    • 它从 SenderTask 弹出一个 parked_queue,如果发件人已停放,则取消停放。
  • 对于发送,
    • 它总是等待 is_parked 变为 false。如果 message_queue.len() < bufferparked_queue.len() == 0 ,则所有发件人都未停放。因此,我们可以保证在这种情况下取​​得进展。
    • 如果 is_parkedfalse ,无论如何将消息推送到队列。
    • 在那之后,如果是 message_queue.len() <= buffer ,它不需要做任何进一步的事情。
    • 如果是 message_queue.len() > buffer ,发送者将被解除停放并推送到 parked_queue

您可以轻松地检查上述算法中是否保持了不变量。

令人惊讶的是,发送者不再等待共享资源。相反,发送方等待其 is_parked 状态。即使发送任务在完成之前被丢弃,它也只是在 parked_queue 中停留一段时间,不会阻塞任何东西。多聪明啊!

关于concurrency - 为什么 `futures::channel::mpsc`只能通知一个发送者?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/53245906/

相关文章:

Java 互斥锁与 smp

java - 使用回调函数在线程中注册事件

rust - 在 Rust 中,如何在 BigInt 上使用已实现的特征 FromStr?

rust - 为什么可变引用没有移到这里?

timer - 如何使用 Tokio 产生许多可取消的计时器?

java - 在 Java 中更新(和迭代) map

java - 如何以 "OOP"方式处理同时独立运行的多个类实例?

Rust Clap 自定义标题

stream - 延迟 Tokio Stream

rust - 丢弃最后一个发件人但收信人仍处于事件状态时,是否可以在Tokio MPSC中保留项目?