我们有一个云服务,使用辅助角色来处理从 Azure 服务总线上设置的主题接收的消息。
消息本身似乎完好无损地到达,并且通常被正确接收和处理。然而,在某些情况下,消息似乎停止处理(日志记录突然结束,并且在我们的 WadLogsTable 中看不到更多对正在处理的消息的引用)。根据我的研究,发生这种情况可能是由于辅助角色保持连接打开和空闲的时间超过几秒。我该如何防止这些需要长时间处理的消息被放弃?
我们的 worker 角色的代码如下。
public class WorkerRole : RoleEntryPoint
{
private static StandardKernel _kernel;
private readonly ManualResetEvent _completedEvent = new ManualResetEvent(false);
private BaseRepository<CallData> _callDataRepository;
private BaseRepository<CallLog> _callLogRepository;
private SubscriptionClient _client;
private NamespaceManager _nManager;
private OnMessageOptions _options;
private BaseRepository<Site> _siteRepository;
public override void Run()
{
try
{
List<CallInformation> callInfo;
Trace.WriteLine("Starting processing of messages");
// Initiates the message pump and callback is invoked for each message that is received, calling close on the client will stop the pump.
_client.OnMessage(message =>
{
// Process message from subscription.
Trace.TraceInformation("Call Received. Ready to process message ");
message.RenewLock();
callInfo = message.GetBody<List<CallInformation>>();
writeCallData(callInfo);
Trace.TraceInformation("Call Processed. Clearing from topic.");
}, _options);
}
catch (Exception e)
{
Trace.TraceInformation("Error: " + e.Message + "---" + e.StackTrace);
}
}
private void writeCallData(List<CallInformation> callList)
{
try
{
Trace.TraceInformation("Calls received: " + callList.Count);
foreach (var callInfo in callList)
{
Trace.TraceInformation("Unwrapping call...");
var call = callInfo.CallLog.Unwrap();
Trace.TraceInformation("Begin Processing: Local Call " + call.ID + " with " + callInfo.DataPoints.Length + " datapoints");
Trace.TraceInformation("Inserting Call...");
_callLogRepository.ExecuteSqlCommand(/*SNIP: Insert call*/);
Trace.TraceInformation("Call entry written. Now building datapoint list...");
var datapoints = callInfo.DataPoints.Select(datapoint => datapoint.Unwrap()).ToList();
Trace.TraceInformation("datapoint list constructed. Processing datapoints...");
foreach (var data in datapoints)
{
/*SNIP: Long running code. Insert our datapoints one at a time. Sometimes our messages die in the middle of this foreach. */
}
Trace.TraceInformation("All datapoints written for call with dependable ID " + call.Call_ID);
Trace.TraceInformation("Call Processed successfully.");
}
}
catch (Exception e)
{
Trace.TraceInformation("Call Processing Failed. " + e.Message);
}
}
public override bool OnStart()
{
try
{
var connectionString = CloudConfigurationManager.GetSetting("Microsoft.ServiceBus.ConnectionString");
_nManager = NamespaceManager.CreateFromConnectionString(connectionString);
_nManager.Settings.OperationTimeout = new TimeSpan(0,0,10,0);
var topic = new TopicDescription("MyTopic")
{
DuplicateDetectionHistoryTimeWindow = new TimeSpan(0, 0, 10, 0),
DefaultMessageTimeToLive = new TimeSpan(0, 0, 10, 0),
RequiresDuplicateDetection = true,
};
if (!_nManager.TopicExists("MyTopic"))
{
_nManager.CreateTopic(topic);
}
if (!_nManager.SubscriptionExists("MyTopic", "AllMessages"))
{
_nManager.CreateSubscription("MyTopic", "AllMessages");
}
_client = SubscriptionClient.CreateFromConnectionString(connectionString, "MyTopic", "AllMessages",
ReceiveMode.ReceiveAndDelete);
_options = new OnMessageOptions
{
AutoRenewTimeout = TimeSpan.FromMinutes(5),
};
_options.ExceptionReceived += LogErrors;
CreateKernel();
_callLogRepository.ExecuteSqlCommand(/*SNIP: Background processing*/);
}
catch (Exception e)
{
Trace.TraceInformation("Error on roleStart:" + e.Message + "---" + e.StackTrace);
}
return base.OnStart();
}
public override void OnStop()
{
// Close the connection to Service Bus Queue
_client.Close();
_completedEvent.Set();
}
void LogErrors(object sender, ExceptionReceivedEventArgs e)
{
if (e.Exception != null)
{
Trace.TraceInformation("Error: " + e.Exception.Message + "---" + e.Exception.StackTrace);
_client.Close();
}
}
public IKernel CreateKernel()
{
_kernel = new StandardKernel();
/*SNIP: Bind NInjectable repositories */
return _kernel;
}
}
最佳答案
您的Run
方法不会无限期地继续下去。它应该看起来像这样:
public override void Run()
{
try
{
Trace.WriteLine("WorkerRole entrypoint called", "Information");
while (true)
{
// Add code here that runs in the role instance
}
}
catch (Exception e)
{
Trace.WriteLine("Exception during Run: " + e.ToString());
// Take other action as needed.
}
}
取自 docs :
The Run is considered the Main method for your application. Overriding the Run method is not required; the default implementation never returns. If you do override the Run method, your code should block indefinitely. If the Run method returns, the role is automatically recycled by raising the Stopping event and calling the OnStop method so that your shutdown sequences may be executed before the role is taken offline.
关于c# - Azure 主题辅助角色在 60 秒后停止处理消息,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/38532968/