c# - 监听 IEnumerable<T> 时 BlockingCollection 的 Queue.Peek() 类比

原文 标签 c# multithreading wcf .net-4.0 task-parallel-library

我正在使用 Pipelines pattern实现将消息消费者与生产者分离,以避免消费缓慢的问题。
如果消息处理阶段出现任何异常 [1]它将丢失并且不会分派(dispatch)到其他服务/层 [2] .我该如何处理 [3] 中的此类问题所以消息不会丢失,什么是重要的!消息的顺序不会混淆,因此上层服务/层将按照它们进入的顺序获取消息。我有一个想法,它涉及到另一个中间 Queue但这似乎很复杂?不幸的是 BlockingCollection<T>不公开 Queue.Peek() 的任何类似物方法,这样我就可以读取下一条可用消息,如果处理成功,请执行 Dequeue()

private BlockingCollection<IMessage> messagesQueue;    

// TPL Task does following:
// Listen to new messages and as soon as any comes in - process it
foreach (var cachedMessage in 
             messagesQueue.GetConsumingEnumerable(cancellation))
{    
    const int maxRetries = 3;
    int retriesCounter = 0;
    bool isSent = false;

    // On this point a message already is removed from messagesQueue
    while (!isSent && retriesCounter++ <= maxRetries)
    {
        try
        {
           // [1] Preprocess a message
           // [2] Dispatch to an other service/layer    
           clientProxyCallback.SendMessage(cachedMessage);
           isSent = true;
        }                                
        catch(Exception exception)
        {
           // [3]   
           // logging
           if (!isSent && retriesCounter < maxRetries)
           {
              Thread.Sleep(NSeconds);
           }
        }            
    
        if (!isSent && retriesCounter == maxRetries)
        {
           // just log, message is lost on this stage!
        }
    }
}
编辑 :忘了说这是 IIS 托管的 WCF 服务,它通过客户端回调契约(Contract)将消息发送回 Silverlight 客户端 WCF 代理。
编辑2:下面是我将如何使用 Peek() ,我错过了什么吗?
bool successfullySent = true;
try
{
   var item = queue.Peek();
   PreProcessItem(item);
   SendItem(item);       
}
catch(Exception exception)
{
   successfullySent = false;
}
finally
{
   if (successfullySent)
   {
       // just remove already sent item from the queue
       queue.Dequeue();
   }
}
EDIT3:当然,我可以使用 while 循环、 bool 标志、Queue 的旧式方法和 AutoResetEvent ,但我只是想知道是否可以使用 BlockingCollectionGetConsumingEnumerable()我认为像 Peek 这样的设施将是
与消费枚举一起使用时非常有用,因为否则所有管道模式实现示例都是新的东西,如 BlockingCollectionGetConsumingEnumerable()看起来不耐用,我不得不回到旧方法。

最佳答案

您应该考虑中间队列。
BlockingCollection<T>由于其性质,不能“偷看”元素 - 可以有多个消费者。其中一个可以偷看一个项目,另一个可以拿走它 - 因此,第一个将尝试拿走已经被拿走的项目。

关于c# - 监听 IEnumerable<T> 时 BlockingCollection 的 Queue.Peek() 类比,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/13581945/

相关文章:

c++11如何插入简单的内存屏障?

java - 如何使用 hibernate 4 实现自定义线程安全序列?

c# - 对父窗体或目标控件使用 Invoke 有什么区别吗?

c# - 如何在不同的线程池中处理异步回调?

c# - 将字符串转换为日期 - C#

c# - 使用Parallel.ForEach获得不同的求和结果

WCF 这可能是由于服务端点绑定(bind)未使用 HTTP 协议(protocol)

c# - 等于与另一个可为空类型的可为空类型的比较

WCF:许多方法上的相同故障契约(Contract)

c# - 将 OperationContext.Current 存储到静态变量时无法访问