我正在使用 Pipelines pattern实现将消息消费者与生产者分离,以避免消费缓慢的问题。
如果消息处理阶段出现任何异常 [1]
它将丢失并且不会分派(dispatch)到其他服务/层 [2]
.我该如何处理 [3]
中的此类问题所以消息不会丢失,什么是重要的!消息的顺序不会混淆,因此上层服务/层将按照它们进入的顺序获取消息。我有一个想法,它涉及到另一个中间 Queue
但这似乎很复杂?不幸的是 BlockingCollection<T>
不公开 Queue.Peek()
的任何类似物方法,这样我就可以读取下一条可用消息,如果处理成功,请执行 Dequeue()
private BlockingCollection<IMessage> messagesQueue;
// TPL Task does following:
// Listen to new messages and as soon as any comes in - process it
foreach (var cachedMessage in
messagesQueue.GetConsumingEnumerable(cancellation))
{
const int maxRetries = 3;
int retriesCounter = 0;
bool isSent = false;
// On this point a message already is removed from messagesQueue
while (!isSent && retriesCounter++ <= maxRetries)
{
try
{
// [1] Preprocess a message
// [2] Dispatch to an other service/layer
clientProxyCallback.SendMessage(cachedMessage);
isSent = true;
}
catch(Exception exception)
{
// [3]
// logging
if (!isSent && retriesCounter < maxRetries)
{
Thread.Sleep(NSeconds);
}
}
if (!isSent && retriesCounter == maxRetries)
{
// just log, message is lost on this stage!
}
}
}
编辑 :忘了说这是 IIS 托管的 WCF 服务,它通过客户端回调契约(Contract)将消息发送回 Silverlight 客户端 WCF 代理。编辑2:下面是我将如何使用
Peek()
,我错过了什么吗?bool successfullySent = true;
try
{
var item = queue.Peek();
PreProcessItem(item);
SendItem(item);
}
catch(Exception exception)
{
successfullySent = false;
}
finally
{
if (successfullySent)
{
// just remove already sent item from the queue
queue.Dequeue();
}
}
EDIT3:当然,我可以使用 while 循环、 bool 标志、Queue
的旧式方法和 AutoResetEvent
,但我只是想知道是否可以使用 BlockingCollection
和 GetConsumingEnumerable()
我认为像 Peek
这样的设施将是与消费枚举一起使用时非常有用,因为否则所有管道模式实现示例都是新的东西,如
BlockingCollection
和 GetConsumingEnumerable()
看起来不耐用,我不得不回到旧方法。
最佳答案
您应该考虑中间队列。BlockingCollection<T>
由于其性质,不能“偷看”元素 - 可以有多个消费者。其中一个可以偷看一个项目,另一个可以拿走它 - 因此,第一个将尝试拿走已经被拿走的项目。
关于c# - 监听 IEnumerable<T> 时 BlockingCollection 的 Queue.Peek() 类比,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/13581945/