c# - Azure 服务总线 - 使用 OnMessage() 方法接收消息

标签 c# azureservicebus

MS documentation 之后从订阅中接收消息并不困难。但是,如果我希望我的应用程序在每次发布新消息时都收到一条消息 - 持续轮询。因此 SubscriptionClient 类的 OnMessage() 方法。

MS documentation说:...当调用 OnMessage 时,客户端启动一个内部消息泵,不断轮询队列或订阅。这个消息泵由一个发出 Receive() 调用的无限循环组成。如果调用超时,它会发出下一个 Receive() 调用。..."

但是当应用程序运行时,调用OnMessage() 方法的那一刻只会收到最新的消息。发布新消息时,持续轮询似乎不起作用。在尝试了很多不同的方法之后我唯一能让它工作并让应用程序在收到新消息时使用react的方法是将代码放入一个具有无限循环的单独任务中。这在很多层面上似乎都是完全错误的!(参见下面的代码)。

任何人都可以帮助我更正我的代码或发布一个工作示例以在没有循环的情况下完成相同的功能吗?谢谢!

 public void ReceiveMessageFromSubscription(string topicName, string subscriptionFilter)
        {
            var newMessage = new MessageQueue();
            int i = 0;

            Task listener = Task.Factory.StartNew(() =>
            {
                while (true)
                {
                    SubscriptionClient Client = SubscriptionClient.CreateFromConnectionString(connectionString, topicName, subscriptionFilter);

                    Dictionary<string, string> retrievedMessage = new Dictionary<string, string>();

                    OnMessageOptions options = new OnMessageOptions();
                                     options.AutoComplete = false;
                                     options.AutoRenewTimeout = TimeSpan.FromMinutes(1);

                    Client.OnMessage((message) =>
                    {
                        try
                        {
                            retrievedMessage.Add("messageGuid", message.Properties["MessageGuid"].ToString());
                            retrievedMessage.Add("instanceId", message.Properties["InstanceId"].ToString());
                            retrievedMessage.Add("pId", message.Properties["ProcessId"].ToString());
                            retrievedMessage.Add("processKey", message.Properties["ProcessKey"].ToString());
                            retrievedMessage.Add("message", message.Properties["Message"].ToString());

                            newMessage.AnnounceNewMessage(retrievedMessage); // event ->

                            message.Complete(); // Remove message from subscription.
                        }
                        catch (Exception ex)
                        {
                            string exmes = ex.Message;
                            message.Abandon();
                        }

                    }, options);

                    retrievedMessage.Clear();

                    i++;

                    Thread.Sleep(3000);
                }

            });
        }

最佳答案

您的代码有一些问题需要解决 -

  • 它失败了,我假设您的应用程序然后退出 - 或者在 至少正在监听消息的线程终止。
  • 您的 while 循环不断重复代码以连接消息处理程序, 你只需要这样做一次。
  • 您需要一种方法来使调用堆栈保持事件状态并防止您的应用程序对您的对象进行垃圾回收。

下面的内容将助您走向成功。祝你好运。

 ManualResetEvent CompletedResetEvent = new ManualResetEvent(false);
    SubscriptionClient Client;

    public void ReceiveMessagesFromSubscription(string topicName, string subscriptionFilter, string connectionString)
    {
        Task listener = Task.Factory.StartNew(() =>
        {
            // You only need to set up the below once. 
            Client = SubscriptionClient.CreateFromConnectionString(connectionString, topicName, subscriptionFilter);

            OnMessageOptions options = new OnMessageOptions();
            options.AutoComplete = false;
            options.AutoRenewTimeout = TimeSpan.FromMinutes(1);
            options.ExceptionReceived += LogErrors;

            Client.OnMessage((message) =>
            {
                try
                {
                    Trace.WriteLine("Got the message with ID {0}", message.MessageId);
                    message.Complete(); // Remove message from subscription.
                }
                catch (Exception ex)
                {
                    Trace.WriteLine("Exception occurred receiving a message: {0}" + ex.ToString());
                    message.Abandon(); // Failed. Leave the message for retry or max deliveries is exceeded and it dead letters.
                }

            }, options);

            CompletedResetEvent.WaitOne();
        });
    }

    /// <summary>
    /// Added in rudimentary exception handling .
    /// </summary>
    /// <param name="sender">The sender.</param>
    /// <param name="ex">The <see cref="ExceptionReceivedEventArgs"/> instance containing the event data.</param>
    private void LogErrors(object sender, ExceptionReceivedEventArgs ex)
    {
        Trace.WriteLine("Exception occurred in OnMessage: {0}" + ex.ToString());
    }

    /// <summary>
    /// Call this to stop the messages arriving from subscription.
    /// </summary>
    public void StopMessagesFromSubscription()
    {
        Client.Close(); // Close the message pump down gracefully
        CompletedResetEvent.Set(); // Let the execution of the listener complete and terminate gracefully 
    }

或者,您可以自己使用 ReceiveBatch 以更传统的方式将消息分块:

var messages = await queueClient.ReceiveBatchAsync(10, TimeSpan.FromSeconds(30),
                                                       cancellationToken);

关于c# - Azure 服务总线 - 使用 OnMessage() 方法接收消息,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/40596942/

相关文章:

c# - ASP.NET 网站管理工具访问规则按钮丢失

c# - 在 NHibernate 中进行渴望加入的最佳方法是什么?

c# - 为什么不能解构为只有一个值的元组?

c# - 将 System::String^(C# 字符串)转换为 C++ std::string

c# - DocumentDb 事务作为外部事务范围的一部分

azure - 如何使用 terraform 动态添加多个 IP 到 azure servicebus 防火墙

azureservicebus - 如何创建服务总线触发器 webjob?

powershell - 用于创建 Azure 服务总线队列和主题的 Azure powershell cmdlet

c# - 基于 Azure 服务总线订阅中的系统属性的 SQLFilter

azure - 如何从前端获取来自azure服务总线主题的通知?