C#等待生产者/消费者中的多个事件

标签 c# asynchronous producer-consumer

我正在使用 producer/consumer pattern 实现数据链路层.数据链路层有自己的线程和状态机,用于通过线路(以太网、RS-232...)传送数据链路协议(protocol)。物理层的接口(interface)表示为 System.IO.Stream。另一个线程将消息写入数据链接对象并从中读取消息。

数据链路对象处于空闲状态,必须等待以下四种情况之一:

  1. 收到一个字节
  2. 一条消息来自网络线程
  3. 保活计时器已过期
  4. 所有通信都被网络层取消了

我很难找出最好的方法来做到这一点,而不会将通信拆分为读/写线程(从而显着增加复杂性)。以下是我如何从 4 中得到 3:

// Read a byte from 'stream'. Timeout after 10 sec. Monitor the cancellation token.
stream.ReadTimeout = 10000;
await stream.ReadAsync(buf, 0, 1, cts.Token);

BlockingCollection<byte[]> SendQueue = new ...;
...
// Check for a message from network layer. Timeout after 10 seconds.
// Monitor cancellation token.
SendQueue.TryTake(out msg, 10000, cts.Token);

我应该怎么做才能阻塞线程,等待所有四个条件?欢迎所有建议。我不喜欢任何架构或数据结构。

编辑:******** 感谢大家的帮助。这是我的解决方案 ********

首先,我认为生产者/消费者队列没有异步实现。所以我实现了类似于 this stackoverflow post 的东西.

我需要一个外部和内部取消源来分别停止消费者线程和取消中间任务,similar to this article .

byte[] buf = new byte[1];
using (CancellationTokenSource internalTokenSource = new CancellationTokenSource())
{
    CancellationToken internalToken = internalTokenSource.Token;
    CancellationToken stopToken = stopTokenSource.Token;
    using (CancellationTokenSource linkedCts =
        CancellationTokenSource.CreateLinkedTokenSource(stopToken, internalToken))
    {
        CancellationToken ct = linkedCts.Token;
        Task<int> readTask = m_stream.ReadAsync(buf, 0, 1, ct);
        Task<byte[]> msgTask = m_sendQueue.DequeueAsync(ct);
        Task keepAliveTask = Task.Delay(m_keepAliveTime, ct);

        // Wait for at least one task to complete
        await Task.WhenAny(readTask, msgTask, keepAliveTask);

        // Next cancel the other tasks
        internalTokenSource.Cancel();
        try {
            await Task.WhenAll(readTask, msgTask, keepAliveTask);
        } catch (OperationCanceledException e) {
            if (e.CancellationToken == stopToken)
                throw;
        }

        if (msgTask.IsCompleted)
            // Send the network layer message
        else if (readTask.IsCompleted)
            // Process the byte from the physical layer
        else
            Contract.Assert(keepAliveTask.IsCompleted);
            // Send a keep alive message
    }
}

最佳答案

我会选择你的选项二,等待 4 个条件中的任何一个发生。假设您已经将 4 个任务作为可等待的方法:

var task1 = WaitForByteReceivedAsync();
var task2 = WaitForMessageAvailableAsync();
var task3 = WaitForKeepAliveTimerAsync();
var task4 = WaitForCommunicationCancelledAsync();

// now gather them
IEnumerable<Task<bool>> theTasks = new List<IEnumerable<Task<bool>>>{
task1, task2, task3, task4
};

// Wait for any of the things to complete
var result = await Task.WhenAny(theTasks);

上面的代码将在第一个任务完成后立即恢复,并忽略其他 3 个。

注意:

the documentation for WhenAny ,它说:

返回的任务将始终以 RanToCompletion 状态结束,其结果设置为要完成的第一个任务。即使要完成的第一个任务以“已取消”或“故障”状态结束也是如此。

因此,在相信所发生的事情之前,请务必进行最终检查:

if(result.Result.Result == true) 
... // First Result is the task, the second is the bool that the task returns

关于C#等待生产者/消费者中的多个事件,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/35749735/

相关文章:

c# - 如何使用 LINQ 计算列表中的重复项

objective-c - 如何使用 xCode 6 中的期望设置异步单元测试?

java - 具有批量和刷新功能的生产者/消费者

c - 如何处理生产者-消费者模式的 FIFO 队列

c# - 在 .NET 中检测 Windows 版本

c# - ServiceStack.OrmLite.Sqlite.Core 无法加载正确的 Sqlite DLL

c# - Entity Framework 7 和登录失败

Python,非阻塞线程

python - 使用 Python 的 Salesforce Bulk API 批量下载

go - 在汇合的kafka go中阅读来自kafka主题的消息时,如何使用确认?