我尝试实现自己的QueueProcessorFactory,它工作得很好,除了一件事我无法理解。在我尝试一条消息 5 次(默认)后,它会运行CopyMessageToPoisonQueueAsync,然后DeleteMessageAsync。
到目前为止一切顺利,但 10 分钟后,消息再次出现在队列中,出队计数为 5,并且它也在有害队列中,然后执行相同的过程,CopyMessageToPoisonQueueAsync,< em>DeleteMessageAsync,位置队列中的一个额外项目与已复制的项目完全相同,10 分钟后执行相同的过程,但出队计数为 6。我应该在删除时更改 ExpirationTime 并将其设置为现在,还是执行我还想念其他东西吗?
这是我的代码:
class Program
{
static void Main()
{
var config = new JobHostConfiguration();
config.Queues.MaxPollingInterval = TimeSpan.FromSeconds(3);
config.Queues.QueueProcessorFactory = new CustomQueueProcessorFactory();
var host = new JobHost(config);
host.RunAndBlock();
}
}
public class CustomQueueProcessorFactory : IQueueProcessorFactory
{
public List<CustomQueueProcessor> CustomQueueProcessors = new List<CustomQueueProcessor>();
public QueueProcessor Create(QueueProcessorFactoryContext context)
{
CustomQueueProcessor processor = new CustomQueueProcessor(context);
CustomQueueProcessors.Add(processor);
return processor;
}
public class CustomQueueProcessor : QueueProcessor
{
public CustomQueueProcessor(QueueProcessorFactoryContext context)
: base(context)
{
}
public override Task<bool> BeginProcessingMessageAsync(CloudQueueMessage message, CancellationToken cancellationToken)
{
return base.BeginProcessingMessageAsync(message, cancellationToken);
}
public override Task CompleteProcessingMessageAsync(CloudQueueMessage message, FunctionResult result, CancellationToken cancellationToken)
{
return base.CompleteProcessingMessageAsync(message, result, cancellationToken);
}
protected override Task CopyMessageToPoisonQueueAsync(CloudQueueMessage message, CloudQueue poisonQueue, CancellationToken cancellationToken)
{
return base.CopyMessageToPoisonQueueAsync(message, poisonQueue, cancellationToken);
}
protected override Task DeleteMessageAsync(CloudQueueMessage message, CancellationToken cancellationToken)
{
return base.DeleteMessageAsync(message, cancellationToken);
}
protected override async Task ReleaseMessageAsync(CloudQueueMessage message, FunctionResult result, TimeSpan visibilityTimeout, CancellationToken cancellationToken)
{
visibilityTimeout = TimeSpan.FromSeconds(2);
await base.ReleaseMessageAsync(message, result, visibilityTimeout, cancellationToken);
}
}
}
如果我添加一些 ConsoleWritelines,我会得到以下输出:
由于重复而省略开头 。 。 .
BeginProcessingMessageAsync 消息:d3c88182-ff39-4f81-8c29-b4ce0b2062ad 出队计数:5 日期:2017-06-26 13:33:42 执行“Functions.ProcessQueueMessage”(原因=“在“01testqueue”上检测到新队列消息。”,Id=17405a55-6d28-48b2-a874-718c0b741f61) 测试QueueProcessorFactory 执行函数时出现异常:Functions.ProcessQueueMessage Microsoft.Azure.WebJobs.Host.FunctionInitationException:执行函数时出现异常:Functions.ProcessQueueMessage ---> System.Exception:Derp! 在WebJobTest1.Functions.ProcessQueueMessage(字符串消息,TextWriter日志)
...消息被省略...
CompleteProcessingMessageAsync 消息:d3c88182-ff39-4f81-8c29-b4ce0b2062ad 出列计数:5 CopyMessageToPoisonQueueAsync 消息:d3c88182-ff39-4f81-8c29-b4ce0b2062ad 出列计数:5 消息已达到 MaxDequeueCount 5。正在将消息移至队列“01testqueue-poison”。 CopyMessageToPoisonQueueAsync 消息:da643007-954a-4296-9e9a-54ebb0aec6c5 出列计数:5 十分钟等待时间: BeginProcessingMessageAsync 消息:d3c88182-ff39-4f81-8c29-b4ce0b2062ad 出队计数:6 日期:2017-06-26 13:43:46 执行“Functions.ProcessQueueMessage”(原因=“在“01testqueue”上检测到新队列消息。”,Id=c22fe457-cb70-4cc8-a8a4-550cd44a8345) 测试QueueProcessorFactory 执行函数时出现异常:Functions.ProcessQueueMessage CompleteProcessingMessageAsync 消息:d3c88182-ff39-4f81-8c29-b4ce0b2062ad 出队计数:6 CopyMessageToPoisonQueueAsync 消息:d3c88182-ff39-4f81-8c29-b4ce0b2062ad 出队计数:6 消息已达到 MaxDequeueCount 5。正在将消息移至队列“01testqueue-poison”。 CopyMessageToPoisonQueueAsync 消息:c43fd9fb-c2d7-4745-91ae-33cdc407ede6 出队计数:6
最佳答案
根据您的描述,我认为这是由于将 Storage SDK 8.x 与 WebJobs SDK 一起使用时的已知问题造成的。以下是类似的问题:
Poison messages stay undeleted-but-invisible with the latest WindowsAzure.Storage 8.1.1
WebJobs V1.1.2 fails to remove poison messages from queue with V8.0.1 of Storage
根据我的测试,这个问题暂时还没有得到解决。您可以降级存储 SDK 或更改 CopyMessageToPoisonQueueAsync
,如下所示:
protected override Task CopyMessageToPoisonQueueAsync(CloudQueueMessage message, CloudQueue poisonQueue, CancellationToken cancellationToken)
{
var newMessage = new CloudQueueMessage(message.Id, message.PopReceipt);
newMessage.SetMessageContent(message.AsBytes);
return base.CopyMessageToPoisonQueueAsync(newMessage, poisonQueue, cancellationToken);
}
关于azure - 在 Azure Webjob 中,使用 CustomQueueProcessorFactory 时,即使调用 DeleteMessageAsync 后消息也会返回到队列,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/44759133/