c# - 明显的 BufferBlock.Post/Receive/ReceiveAsync race/bug

标签 c# task-parallel-library async-await dataflow tpl-dataflow

交叉发布到 http://social.msdn.microsoft.com/Forums/en-US/tpldataflow/thread/89b3f71d-3777-4fad-9c11-50d8dc81a4a9

我知道...我并没有真正发挥 TplDataflow 的最大潜力。 ATM 我只是使用 BufferBlock 作为消息传递的安全队列,其中生产者和消费者以不同的速率运行。我看到一些奇怪的行为,让我不知所措 继续。

private BufferBlock<object> messageQueue = new BufferBlock<object>();

public void Send(object message)
{
    var accepted=messageQueue.Post(message);
    logger.Info("Send message was called qlen = {0} accepted={1}",
    messageQueue.Count,accepted);
}

public async Task<object> GetMessageAsync()
{
    try
    {
        var m = await messageQueue.ReceiveAsync(TimeSpan.FromSeconds(30));
        //despite messageQueue.Count>0 next line 
        //occasionally does not execute
        logger.Info("message received");
        //.......
    }
    catch(TimeoutException)
    {
        //do something
    }
}

在上面的代码中(它是 2000 行分布式解决方案的一部分),Send 每 100 毫秒左右被周期性调用一次。这意味着一个项目被 Posted 到 messageQueue 大约每秒 10 次。这是经过验证的。但是,偶尔会出现 ReceiveAsync 未在超时内完成(即 Post 未导致 ReceiveAsync 完成)和 TimeoutException 在 30 秒后被提升。此时,messageQueue.Count 为数百。这是出乎意料的。在较慢的发布速度(1 个帖子/秒)下也观察到此问题,并且通常在 1000 个项目通过 BufferBlock 之前发生。

因此,为了解决这个问题,我使用了以下代码,该代码有效,但在接收时偶尔会导致 1 秒的延迟(由于出现上述错误)

    public async Task<object> GetMessageAsync()
    {
        try
        {
            object m;
            var attempts = 0;
            for (; ; )
            {
                try
                {
                    m = await messageQueue.ReceiveAsync(TimeSpan.FromSeconds(1));
                }
                catch (TimeoutException)
                {
                    attempts++;
                    if (attempts >= 30) throw;
                    continue;
                }
                break;

            }

            logger.Info("message received");
            //.......
        }
        catch(TimeoutException)
        {
            //do something
        }
   }

在我看来,这看起来像是 TDF 中的竞争条件,但我无法弄清楚为什么在我以类似方式使用 BufferBlock 的其他地方不会发生这种情况。实验性地从 ReceiveAsync 更改为 Receive 没有帮助。我没有检查过,但我想孤立地看,上面的代码可以完美运行。这是我在“TPL 数据流简介”中看到的一种模式 tpldataflow.docx .

我该怎么做才能弄清楚这个问题?是否有任何指标可以帮助推断正在发生的事情?如果我不能创建可靠的测试用例,我还能提供什么信息?

帮助!

最佳答案

Stephen 似乎认为以下是解决方案

var m = await messageQueue.ReceiveAsync();

instead of:

var m = await messageQueue.ReceiveAsync(TimeSpan.FromSeconds(30));

你能证实还是否认这一点?

关于c# - 明显的 BufferBlock.Post/Receive/ReceiveAsync race/bug,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/10068451/

相关文章:

c# - 如何正确管理 TPL 数据流中的完成

javascript - 在 NodeJS 中安排一个异步函数

c# - 使用 System.Data.SQLite 打开 conn 时出错,尝试加载格式不正确的程序。互操作.084

c# - 将 Azure Functions 与事件中心集成

c# - 自定义 TaskScheduler、SynchronizationContext?

javascript - React async wait 函数按执行顺序阻止 useDispatch

c# - MVC Controller 的操作中的异步/等待

c# - .net core/standard string.ToLower() 没有文化参数

c# - 如何在自定义目标之后运行构建后事件?

wpf - 使用 Task 与 Dispatcher 进行 ui 线程操作