我们正在利用 Azure ServiceBus 队列来处理大量客户端请求。然而,OnMessage
调用似乎是一个阻塞调用,但是如果它确实是一个阻塞调用,那么这个调用的阻塞是非常不一致的。
我试图完成的是从 Web 服务应用程序永久地观察队列(以允许从正在运行的应用程序中挖掘指标)
我正在创建下面的订阅:
protected virtual void Subscribe(string queueName, Func<QueueRequest, bool> callback)
{
var client = GetClient(queueName, PollingTimeout);
var transformCallback = new Action<BrokeredMessage>((message) =>
{
try
{
var request = message.ToQueueRequest();
if (callback(request))
{
message.Complete();
}
else
{
message.Abandon();
}
}
catch (Exception ex)
{
//TODO: Log the error
message.Abandon();
}
});
var options = new OnMessageOptions
{
MaxConcurrentCalls = _config.GetInt("MaxThreadsPerQueue"),
AutoComplete = false
};
options.ExceptionReceived += OnMessageError;
client.OnMessage(transformCallback, options);
}
如果我只调用一次订阅,应用程序将停止监视队列,从而停止处理消息。但是,如果我在我的订阅调用周围放置一个 while 循环。因此,我非常犹豫地写了下面的代码片段,以便在 OnMessage
完成后重新订阅。
protected void MonitorQueue()
{
IsRunning = true;
while (IsRunning)
{
try
{
Log.Info("MonitoringThread: OnMessage beginning logging for {0}", QueueName);
QueueClient.Subscribe(QueueName, Processor);
Log.Info("MonitoringThread: OnMessage ended logging for {0}", QueueName);
}
catch (Exception ex)
{
IsRunning = false;
Log.Error("MonitoringThread: Error in subscription for {0}: ", ex, QueueName);
}
if (SleepBeforeReinit > 0 && IsRunning)
{
Thread.Sleep(SleepBeforeReinit);
}
}
}
这解决了消息由于未被提取而过期为死信的问题,但这会导致其他问题。
由于 OnMessage 是一项计费操作,当我看到日志文件告诉我队列开始和结束的间隔不到一秒并且我的日志文件的大小增加非常时,我很担心。
我将我的 MessagingFactory
设置为有 1 天的 OperationTimeout
,但这似乎并没有像我预期的那样影响订阅打开/关闭状态的频率.
我已经看到很多示例作为 worker 角色执行此操作,但是这不会完成我们正在尝试做的事情。我目前正在从我们的 Web 应用程序的 Global.asax.cs 连接它。非常感谢任何建议!
最佳答案
OnMessage 和 OnMessageAsync 不阻塞调用。这些需要实例化一次,并将继续订阅队列,直到应用程序终止。
有关详细信息,请参阅相关帖子:Azure Service Bus, determine if OnMessage stops processing
关于c# - Azure 服务总线消息是否阻止调用?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/31305524/