我正在使用 producer/consumer pattern 实现数据链路层.数据链路层有自己的线程和状态机,用于通过线路(以太网、RS-232...)传送数据链路协议(protocol)。物理层的接口(interface)表示为 System.IO.Stream。另一个线程将消息写入数据链接对象并从中读取消息。
数据链路对象处于空闲状态,必须等待以下四种情况之一:
- 收到一个字节
- 一条消息来自网络线程
- 保活计时器已过期
- 所有通信都被网络层取消了
我很难找出最好的方法来做到这一点,而不会将通信拆分为读/写线程(从而显着增加复杂性)。以下是我如何从 4 中得到 3:
// Read a byte from 'stream'. Timeout after 10 sec. Monitor the cancellation token.
stream.ReadTimeout = 10000;
await stream.ReadAsync(buf, 0, 1, cts.Token);
或
BlockingCollection<byte[]> SendQueue = new ...;
...
// Check for a message from network layer. Timeout after 10 seconds.
// Monitor cancellation token.
SendQueue.TryTake(out msg, 10000, cts.Token);
我应该怎么做才能阻塞线程,等待所有四个条件?欢迎所有建议。我不喜欢任何架构或数据结构。
编辑:******** 感谢大家的帮助。这是我的解决方案 ********
首先,我认为生产者/消费者队列没有异步实现。所以我实现了类似于 this stackoverflow post 的东西.
我需要一个外部和内部取消源来分别停止消费者线程和取消中间任务,similar to this article .
byte[] buf = new byte[1];
using (CancellationTokenSource internalTokenSource = new CancellationTokenSource())
{
CancellationToken internalToken = internalTokenSource.Token;
CancellationToken stopToken = stopTokenSource.Token;
using (CancellationTokenSource linkedCts =
CancellationTokenSource.CreateLinkedTokenSource(stopToken, internalToken))
{
CancellationToken ct = linkedCts.Token;
Task<int> readTask = m_stream.ReadAsync(buf, 0, 1, ct);
Task<byte[]> msgTask = m_sendQueue.DequeueAsync(ct);
Task keepAliveTask = Task.Delay(m_keepAliveTime, ct);
// Wait for at least one task to complete
await Task.WhenAny(readTask, msgTask, keepAliveTask);
// Next cancel the other tasks
internalTokenSource.Cancel();
try {
await Task.WhenAll(readTask, msgTask, keepAliveTask);
} catch (OperationCanceledException e) {
if (e.CancellationToken == stopToken)
throw;
}
if (msgTask.IsCompleted)
// Send the network layer message
else if (readTask.IsCompleted)
// Process the byte from the physical layer
else
Contract.Assert(keepAliveTask.IsCompleted);
// Send a keep alive message
}
}
最佳答案
我会选择你的选项二,等待 4 个条件中的任何一个发生。假设您已经将 4 个任务作为可等待的方法:
var task1 = WaitForByteReceivedAsync();
var task2 = WaitForMessageAvailableAsync();
var task3 = WaitForKeepAliveTimerAsync();
var task4 = WaitForCommunicationCancelledAsync();
// now gather them
IEnumerable<Task<bool>> theTasks = new List<IEnumerable<Task<bool>>>{
task1, task2, task3, task4
};
// Wait for any of the things to complete
var result = await Task.WhenAny(theTasks);
上面的代码将在第一个任务完成后立即恢复,并忽略其他 3 个。
注意:
在the documentation for WhenAny ,它说:
返回的任务将始终以 RanToCompletion 状态结束,其结果设置为要完成的第一个任务。即使要完成的第一个任务以“已取消”或“故障”状态结束也是如此。
因此,在相信所发生的事情之前,请务必进行最终检查:
if(result.Result.Result == true)
... // First Result is the task, the second is the bool that the task returns
关于C#等待生产者/消费者中的多个事件,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/35749735/