c# - 听取使用 IEnumerable<T> 时用于 BlockingCollection 的 Queue.Peek() 的模拟

标签 c# multithreading wcf .net-4.0 task-parallel-library

我正在使用 Pipelines pattern将消息消费者与生产者分离的实现,以避免消费者缓慢的问题。

消息处理阶段出现异常[1]它将丢失并且不会被分派(dispatch)到其他服务/层 [2] .我该如何处理 [3] 中的此类问题所以消息不会丢失,重要的是!消息的顺序不会混淆,因此上层服务/层将按照消息进来的顺序获取消息。我有一个想法涉及另一个中间层 Queue但它似乎很复杂?遗憾BlockingCollection<T>不暴露 Queue.Peek() 的任何类似物方法,这样我就可以阅读下一条可用消息,如果处理成功,请执行 Dequeue()

private BlockingCollection<IMessage> messagesQueue;    

// TPL Task does following:
// Listen to new messages and as soon as any comes in - process it
foreach (var cachedMessage in 
             messagesQueue.GetConsumingEnumerable(cancellation))
{    
    const int maxRetries = 3;
    int retriesCounter = 0;
    bool isSent = false;

    // On this point a message already is removed from messagesQueue
    while (!isSent && retriesCounter++ <= maxRetries)
    {
        try
        {
           // [1] Preprocess a message
           // [2] Dispatch to an other service/layer    
           clientProxyCallback.SendMessage(cachedMessage);
           isSent = true;
        }                                
        catch(Exception exception)
        {
           // [3]   
           // logging
           if (!isSent && retriesCounter < maxRetries)
           {
              Thread.Sleep(NSeconds);
           }
        }            
    
        if (!isSent && retriesCounter == maxRetries)
        {
           // just log, message is lost on this stage!
        }
    }
}

编辑:忘了说这是 IIS 托管的 WCF 服务,它通过客户端回调协定将消息分派(dispatch)回 Silverlight 客户端 WCF 代理。

EDIT2: 下面是我将如何使用 Peek() 执行此操作, 我错过了什么吗?

bool successfullySent = true;
try
{
   var item = queue.Peek();
   PreProcessItem(item);
   SendItem(item);       
}
catch(Exception exception)
{
   successfullySent = false;
}
finally
{
   if (successfullySent)
   {
       // just remove already sent item from the queue
       queue.Dequeue();
   }
}

EDIT3: 当然,我可以使用 while 循环、bool 标志、Queue 等旧式方法和 AutoResetEvent , 但我只是想知道是否可以使用 BlockingCollectionGetConsumingEnumerable()我认为像 Peek 这样的设施将是 与使用可枚举一起使用时非常有用,因为否则所有管道模式实现都会示例新的东西,如 BlockingCollectionGetConsumingEnumerable()看起来不耐用,我必须回到旧方法。

最佳答案

你应该考虑中间队列。

BlockingCollection<T>由于其性质,不能“窥视”项目 - 可以有多个消费者。其中一个可以偷看一个项目,另一个可以它 - 因此,第一个将尝试拿取已经被拿走的项目。

关于c# - 听取使用 IEnumerable<T> 时用于 BlockingCollection 的 Queue.Peek() 的模拟,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/13581945/

相关文章:

c# - 使用 Linq to sql 将 List<String> 插入到 ntext 字段

java - 是否可以在 C# 中实现 JAVA 回调模式

c# - .Net MAUI 是否与 .NET Standard 2.0 兼容?

java - 为什么在 Java 中我的 CPU 绑定(bind)线程会导致内核空间中的操作?

javascript - 保护客户端 Web 服务的策略

javascript - Ajax 调用具有多个返回值的服务

c# - 我如何从组合框中获取值并将其显示到文本框

.Net 和 Mono 中的 C# Task.WaitAll()

c++ - 更多线程,更好的性能?

.net - WCF wsHttpBinding 的 Windows 身份验证有多安全?