c# - Azure消息总线异步检索数据并通过 Controller 操作返回数据

标签 c# azure asp.net-core async-await azureservicebus

我有一个 .NetCore 项目,用于向 Azure 服务总线发送消息和从 Azure 服务总线接收消息。

要接收消息,以下方法效果很好:

public static void Main(string[] args)
        {
            MainAsync().GetAwaiter().GetResult();
        }

        private static async Task MainAsync()
        {
            queueClient = new QueueClient(ServiceBusConnectionString, QueueName, ReceiveMode.PeekLock);
            ReceiveMessages();

            Console.WriteLine("Press any key to stop receiving messages.");
            Console.ReadKey();

            // Close the client after the ReceiveMessages method has exited.
            await queueClient.CloseAsync();
        }

        // Receives messages from the queue in a loop
        private static void ReceiveMessages()
        {
            try
            {
                // Register a OnMessage callback
                queueClient.RegisterMessageHandler(
                    async (message, token) =>
                    {
                        // Process the message
                        Console.WriteLine($"Received message: SequenceNumber:{message.SequenceNumber} Body:{message.GetBody<string>()}");

                        // Complete the message so that it is not received again.
                        // This can be done only if the queueClient is opened in ReceiveMode.PeekLock mode.
                        await queueClient.CompleteAsync(message.LockToken);
                    },
                    new RegisterHandlerOptions() {MaxConcurrentCalls = 1, AutoComplete = false});
            }
            catch (Exception exception)
            {
                Console.WriteLine($"{DateTime.Now} > Exception: {exception.Message}");
            }
        }

我正在尝试将此控制台应用程序移植到 webapi 项目。其想法是,当调用 api 时,它将返回队列中的当前消息。

我这样做的方式如下:

[HttpGet]
        public async Task<IActionResult> Get()
        {
            queueClient = new QueueClient(ServiceBusConnectionString, QueueName, ReceiveMode.PeekLock);

            MainAsync().GetAwaiter().GetResult();
            // Close the client after the ReceiveMessages method has exited.
            await queueClient.CloseAsync();
            return Ok(nameList);
        }
        private static async Task MainAsync()
        {
            queueClient = new QueueClient(ServiceBusConnectionString, QueueName, ReceiveMode.PeekLock);
            ReceiveMessages();

            //Console.WriteLine("Press any key to stop receiving messages.");
            //Console.ReadKey();

            //// Close the client after the ReceiveMessages method has exited.
            //await queueClient.CloseAsync();
        }

        // Receives messages from the queue in a loop
        private static void ReceiveMessages()
        {
            try
            {
                // Register a OnMessage callback
                queueClient.RegisterMessageHandler(
                    async (message, token) =>
                    {
                        // Process the message
                        nameList.Add(
                            $"Received message: SequenceNumber:{message.SequenceNumber} Body:{message.GetBody<string>()}");
                        //reList.Add(message.GetBody<string>());
                        // Complete the message so that it is not received again.
                        // This can be done only if the queueClient is opened in ReceiveMode.PeekLock mode.
                        await queueClient.CompleteAsync(message.LockToken);
                    },
                    new RegisterHandlerOptions() {MaxConcurrentCalls = 1, AutoComplete = false});
            }
            catch (Exception exception)
            {
                Console.WriteLine($"{DateTime.Now} > Exception: {exception.Message}");
            }

        }

最后是queueClient.CloseAsync();在从队列中检索消息之前就被调用。

我尝试了各种方法来完成此任务,但看起来我的异步编程根本不起作用。

非常感谢任何对此的帮助。

最佳答案

您正在注册一个处理程序,然后您要返回。队列中的消息在您返回后开始处理。OnMessage 对于“pub/sub”结构有意义。您可以使用“pub/sub”来当队列进入时将响应返回给客户端,但是您需要将传入的消息推送到客户端。(例如使用 SignalR)。您希望在请求后返回队列中的所有消息。您可以从以下位置接收消息队列一一(在带有 Receive 方法的循环中),然后您将返回响应。

   while (true) 
    { 
        try 
        { 
            //receive messages from Queue 
            message = queueClient.Receive(TimeSpan.FromSeconds(5)); 
            if (message != null) 
            { 
                Console.WriteLine(string.Format("Message received: Id = {0}, Body = {1}", message.MessageId, message.GetBody<string>())); 
                // Further custom message processing could go here… 
                message.Complete(); 
            } 
            else 
            { 
                //no more messages in the queue 
                break; 
            } 
        } 
        catch (MessagingException e) 
        { 
            if (!e.IsTransient) 
            { 
                Console.WriteLine(e.Message); 
                throw; 
            } 
            else 
            { 
                HandleTransientErrors(e); 
            } 
        } 
    } 
    queueClient.Close(); 
}

关于c# - Azure消息总线异步检索数据并通过 Controller 操作返回数据,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/42757926/

相关文章:

asp.net-core - flutter websocket连接问题

c# - 当属性 getter/setter 需要 LinkDemand 时,如何安抚 FxCop?

c# - 用多个 C# .dll 包装多个 C++ .dll

c# - Entity Framework 核心 3.1 与 NetTopologySuite.Geometries.Point : SqlException: The supplied value is not a valid instance of data type geography

c# - 如何使用 Join 和 Where 返回 IList?

azure - 如何使用 Azure.Data.Tables.TableClient 进行事务?

Azure SQL Server REST API

azure - 使用 net.tcp 端点监视 WCF 服务

不依赖 IIS 的 ASP.NET Web API 应用程序

c# - 如何从 Razor Pages 应用程序中的插件动态加载页面?