c# - Azure 主题辅助角色在 60 秒后停止处理消息

标签 c# azure azure-worker-roles azureservicebus

我们有一个云服务,使用辅助角色来处理从 Azure 服务总线上设置的主题接收的消息。

消息本身似乎完好无损地到达,并且通常被正确接收和处理。然而,在某些情况下,消息似乎停止处理(日志记录突然结束,并且在我们的 WadLogsTable 中看不到更多对正在处理的消息的引用)。根据我的研究,发生这种情况可能是由于辅助角色保持连接打开和空闲的时间超过几秒。我该如何防止这些需要长时间处理的消息被放弃?

我们的 worker 角色的代码如下。

public class WorkerRole : RoleEntryPoint
{
    private static StandardKernel _kernel;
    private readonly ManualResetEvent _completedEvent = new ManualResetEvent(false);
    private BaseRepository<CallData> _callDataRepository;
    private BaseRepository<CallLog> _callLogRepository;

    private SubscriptionClient _client;
    private NamespaceManager _nManager;
    private OnMessageOptions _options;
    private BaseRepository<Site> _siteRepository;

    public override void Run()
    {
        try
        {
            List<CallInformation> callInfo;
            Trace.WriteLine("Starting processing of messages");

            // Initiates the message pump and callback is invoked for each message that is received, calling close on the client will stop the pump.

            _client.OnMessage(message =>
            {
                // Process message from subscription.
                Trace.TraceInformation("Call Received. Ready to process message ");
                message.RenewLock();
                callInfo = message.GetBody<List<CallInformation>>();
                writeCallData(callInfo);


                Trace.TraceInformation("Call Processed. Clearing from topic.");
            }, _options);
        }
        catch (Exception e)
        {
            Trace.TraceInformation("Error: " + e.Message + "---" + e.StackTrace);
        }
    }

    private void writeCallData(List<CallInformation> callList)
    {
        try
        {
            Trace.TraceInformation("Calls received: " + callList.Count);
            foreach (var callInfo in callList)
            {
                Trace.TraceInformation("Unwrapping call...");
                var call = callInfo.CallLog.Unwrap();
                Trace.TraceInformation("Begin Processing: Local Call " + call.ID + " with " + callInfo.DataPoints.Length + " datapoints");
                Trace.TraceInformation("Inserting Call...");
                _callLogRepository.ExecuteSqlCommand(/*SNIP: Insert call*/);
                    Trace.TraceInformation("Call entry written. Now building datapoint list...");
                    var datapoints = callInfo.DataPoints.Select(datapoint => datapoint.Unwrap()).ToList();
                    Trace.TraceInformation("datapoint list constructed. Processing datapoints...");
                    foreach (var data in datapoints)
                    {
                        /*SNIP: Long running code. Insert our datapoints one at a time. Sometimes our messages die in the middle of this foreach. */
                    }
                    Trace.TraceInformation("All datapoints written for call with dependable ID " + call.Call_ID);
                Trace.TraceInformation("Call Processed successfully.");
            }
        }
        catch (Exception e)
        {
            Trace.TraceInformation("Call Processing Failed. " + e.Message);
        }
    }

    public override bool OnStart()
    {
        try
        {
            var connectionString = CloudConfigurationManager.GetSetting("Microsoft.ServiceBus.ConnectionString");
            _nManager = NamespaceManager.CreateFromConnectionString(connectionString);
            _nManager.Settings.OperationTimeout = new TimeSpan(0,0,10,0);
            var topic = new TopicDescription("MyTopic")
            {
                DuplicateDetectionHistoryTimeWindow = new TimeSpan(0, 0, 10, 0),
                DefaultMessageTimeToLive = new TimeSpan(0, 0, 10, 0),
                RequiresDuplicateDetection = true,
            };
            if (!_nManager.TopicExists("MyTopic"))
            {
                _nManager.CreateTopic(topic);
            }
            if (!_nManager.SubscriptionExists("MyTopic", "AllMessages"))
            {
                _nManager.CreateSubscription("MyTopic", "AllMessages");
            }
            _client = SubscriptionClient.CreateFromConnectionString(connectionString, "MyTopic", "AllMessages",
                ReceiveMode.ReceiveAndDelete);
            _options = new OnMessageOptions
            {
                    AutoRenewTimeout = TimeSpan.FromMinutes(5),

            };
            _options.ExceptionReceived += LogErrors;
            CreateKernel();

            _callLogRepository.ExecuteSqlCommand(/*SNIP: Background processing*/);
        }
        catch (Exception e)
        {
            Trace.TraceInformation("Error on roleStart:" + e.Message + "---" + e.StackTrace);
        }
        return base.OnStart();
    }

    public override void OnStop()
    {
        // Close the connection to Service Bus Queue
        _client.Close();
        _completedEvent.Set();
    }

    void LogErrors(object sender, ExceptionReceivedEventArgs e)
    {
        if (e.Exception != null)
        {
            Trace.TraceInformation("Error: " + e.Exception.Message + "---" + e.Exception.StackTrace);
            _client.Close();
        }
    }

    public IKernel CreateKernel()
    {
        _kernel = new StandardKernel();
        /*SNIP: Bind NInjectable repositories */
        return _kernel;
    }
}

最佳答案

您的Run方法不会无限期地继续下去。它应该看起来像这样:

public override void Run()
{
   try
   {
      Trace.WriteLine("WorkerRole entrypoint called", "Information");
      while (true)
      {
         // Add code here that runs in the role instance
      }

   }
   catch (Exception e)
   {
      Trace.WriteLine("Exception during Run: " + e.ToString());
      // Take other action as needed.
   }
}

取自 docs :

The Run is considered the Main method for your application. Overriding the Run method is not required; the default implementation never returns. If you do override the Run method, your code should block indefinitely. If the Run method returns, the role is automatically recycled by raising the Stopping event and calling the OnStop method so that your shutdown sequences may be executed before the role is taken offline.

关于c# - Azure 主题辅助角色在 60 秒后停止处理消息,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/38532968/

相关文章:

Azure 辅助角色还是 Web 应用程序?

c# - 获取 IEnumerator 的元素到列表

node.js - 无法使用 Sequelize 从 PG DB(在 Azure 中)检索数据

azure - 使用 Azure 函数查询 SQL 表后在应用程序见解中生成警报

c# - 从 VS2017 发布 Azure 函数不会在 azure 门户上显示文件?

azure - 我需要 Azure WorkerRole 错误处理指南

c# - 使用 JWT token 的 ASP.NET Core 网站到 WebApi 身份验证

c# - 如何转换为 C# - 将 C 中的结构传递给函数?

c# - 我应该如何处理引用实体的持久性?

multithreading - Azure WorkerRole 与启动新线程的优点