具有 session 的 Azure 服务总线/函数,不等待函数完成

标签 azure azure-functions azureservicebus

您好,有一个 Azure 服务总线和一个 Azure 函数,可以在启用 session 的情况下获取消息并处理它们。该函数本身读取余额表以获取当前余额,然后根据消息中的金额添加/减去余额,然后将新记录插入到包含金额和新余额的交易表中,就像您看到的那样在银行交易 list 上。然后它还会更新余额表中的记录。

问题是,当消息大量快速传入时,第一个消息会获取余额并开始进行处理,但在完成之前,下一个消息会获取相同的余额,而不是等待第一个完成,然后使用新的天平。

这是一个例子......

Example

第一列是金额,最后一列是余额,正如您所看到的,前 2 条消息都获取了 7500.87 英镑的余额,并将其金额添加到其中。因此余额最终为 26,791.22 英镑,而不是 70,624.73 英镑

函数签名如下...

[FunctionName("TransactionsQueueTrigger")]
public async Task Run([ServiceBusTrigger("transactions", Connection = "test_SERVICEBUS", IsSessionsEnabled = true)] string myQueueItem, ILogger log)
{
    // do some work on the balance
}

这是host.json

{
    "version": "2.0",
    "logging": {
        "applicationInsights": {
            "samplingExcludedTypes": "Request",
            "samplingSettings": {
                "isEnabled": true
            }
        }
    }
}

session 标识符仅适用于每个帐户,因此我可以为不同帐户同时运行这些 session 标识符,但不能为同一帐户同时运行。

那么,我可以强制该函数等待第一个函数完成(对于该 session ),然后再开始另一个函数吗?

最佳答案

The problem, is that when messages are coming in thick and fast, the first one picks up the Balance and begins to do the processing,

首先,尝试将逻辑放入异步方法中,并使用await使其等待:

using System;
using System.Threading;
using System.Threading.Tasks;
using Microsoft.Azure.WebJobs;
using Microsoft.Azure.WebJobs.Host;
using Microsoft.Extensions.Logging;

namespace FunctionApp113
{
    public static class Function1
    {
        [FunctionName("Function1")]
        public static async Task Run([ServiceBusTrigger("myqueue2", Connection = "str", IsSessionsEnabled = true)]string myQueueItem, ILogger log)
        {
            log.LogInformation($"C# ServiceBus queue trigger function processed message: {myQueueItem}");
            log.LogInformation($"Is loading...");
            await testc();
            log.LogInformation($"Completed!");
        }
        public static async Task testc()
        {
            //Some logic here.
            await Task.Delay(2000);
        }
    }
}

如果还是不行,请尝试使用下面的host.json:

{
    "version": "2.0",
  "logging": {
    "applicationInsights": {
      "samplingSettings": {
        "isEnabled": true,
        "excludedTypes": "Request"
      }
    }
  },
  "extensions": {
    "serviceBus": {
      "messageHandlerOptions": {
        "maxConcurrentCalls": 1
      }
    }
  }
}

此外,这可能来自 azure function 应用自动横向扩展,因此请尝试将最大横向扩展限制设置为 1:

enter image description here

关于具有 session 的 Azure 服务总线/函数,不等待函数完成,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/67504987/

相关文章:

ruby-on-rails - 如何使用 Azure Pipelines 将 Ruby 部署到 Azure 应用服务?

Azure函数无法连接到Azure SQL数据库

azure - 如何向 Azure Functions 提供私有(private) NuGet 源凭据?

c# - 如何使用 Azure.Messaging.ServiceBus 库添加自定义属性?

Azure ServiceBus BrokeredMessage 正文为 null

Azure 表存储 - 分布式锁定

Azure 通知中心 : The supplied notification payload is invalid

Azure DevOps 管道失败 : Sequence was not expected

azure - 使用数组中的元素加入流分析

c# - 正确使用Azure Durable Function - 序列化复杂对象