azureservicebus - 为什么 RegisterMessageHandler 不适用于特定主题名称?

标签 azureservicebus azure-servicebus-subscriptions

我不明白为什么下面引用的以下处理程序 (processMessageAsync) 没有针对特定主题名称触发,但针对其他主题名称触发:

subscriptionClient.RegisterMessageHandler(processMessageAsync, msgOptions)

以下是我的订阅者类:
open System
open System.Linq
open System.Threading
open System.Text
open System.Threading.Tasks
open Microsoft.Azure.ServiceBus

type Subscriber(connectionString:string, topic:string, subscription:string) =

    let mutable subscriptionClient : SubscriptionClient = null

    let exceptionReceivedHandler (args:ExceptionReceivedEventArgs) =
        printfn "Got an exception: %A" args.Exception
        Task.CompletedTask

    let processMessageAsync (message:Message) (_:CancellationToken) = 

        try

            let _ = Encoding.UTF8.GetString(message.Body)
            subscriptionClient.CompleteAsync(message.SystemProperties.LockToken) |> Async.AwaitTask |> Async.RunSynchronously

            Task.CompletedTask

        with
            _ -> Task.CompletedTask

    member x.Listen() =

        async {

            subscriptionClient <- new SubscriptionClient(connectionString, topic, subscription)
            subscriptionClient.OperationTimeout <- TimeSpan.FromMinutes(3.0)

            let! rulesFound     = subscriptionClient.GetRulesAsync() |> Async.AwaitTask
            let  hasDefaultRule = rulesFound.Any(fun r -> r.Name = RuleDescription.DefaultRuleName)

            if hasDefaultRule then
                do! subscriptionClient.RemoveRuleAsync(RuleDescription.DefaultRuleName) |> Async.AwaitTask

            let msgOptions = MessageHandlerOptions(fun args -> exceptionReceivedHandler(args))
            msgOptions.AutoComplete         <- false
            msgOptions.MaxAutoRenewDuration <- TimeSpan.FromMinutes(1.0)
            msgOptions.MaxConcurrentCalls   <- 1

            subscriptionClient.RegisterMessageHandler(processMessageAsync, msgOptions)
        }

    member x.CloseAsync() =

        async {

            do! subscriptionClient.CloseAsync() |> Async.AwaitTask
        }

这是我尝试运行订阅者的方式:
open System
open Subscription.Console

let connectionString = <connection_string>

[<EntryPoint>]
let main argv =

    printfn "Welcome to Subscription.Console"

    let topic,subscription = "Topic.courier-accepted","Subscription.all-messages"
    let subscriber = Subscriber(connectionString, topic, subscription)

    async { do! subscriber.Listen()
          } |> Async.RunSynchronously

    Console.ReadKey() |> ignore

    async { do! subscriber.CloseAsync()
          } |> Async.RunSynchronously

    0 // return an integer exit code 

以下代码发布了我的订阅者应该收到(但没有)的消息:
[<Fact>]
let ``Publish courier-accepted to servicebus``() =

    async {

        // Setup
        let  client    = TopicClient(sbConnectionstring, "Topic.courier-accepted")
        let! requestId = requestId()

        let updated = requestId |> modifyRequestId someCourierResponse
        let json    = JsonConvert.SerializeObject(updated)
        let message = Message(Encoding.UTF8.GetBytes(json))

        message.Label <- sprintf "request-id(%s)" (requestId.ToString())

        // Test
        do! client.SendAsync(message) |> Async.AwaitTask

        // Teardown
        do! client.CloseAsync()       |> Async.AwaitTask
    }

笔记:

上面代码的有趣之处在于,当我有一个 Azure 函数在 ServiceBusTrigger 设置为相同的主题和订阅名称的情况下运行时,每次运行测试时都会触发该 Azure 函数。
  • 我没有收到任何异常消息
  • 我的 Subscriber 实例永远不会触发 exceptionReceivedHandler 函数
  • 我在 Azure 仪表板上没有观察到 servicebus 资源的任何用户错误

  • 使用不同的主题名称成功

    如果我将主题名称更改为“courier-requested”,那么订阅者实例会收到消息:
    [<Fact>]
    let ``Publish courier-requested to servicebus topic``() =
    
        // Setup
        let client    = TopicClient(sbConnectionstring, "Topic.courier-requested")
        let message   = Message(Encoding.UTF8.GetBytes(JsonFor.courierRequest))
        message.Label <- sprintf "courier-id(%s)" "b965f552-31a4-4644-a9c6-d86dd45314c4"
    
        // Test
        async {
    
            do! client.SendAsync(message) |> Async.AwaitTask
            do! client.CloseAsync()       |> Async.AwaitTask
        }
    

    这是主题名称调整的订阅:
    [<EntryPoint>]
    let main argv =
    
        printfn "Welcome to Subscription.Console"
    
        let topic,subscription = "Topic.courier-requested","Subscription.all-messages"
        let subscriber = Subscriber(connectionString, topic, subscription)
    
        async { do! subscriber.Listen()
              } |> Async.RunSynchronously
    
        Console.ReadKey() |> ignore
    
        async { do! subscriber.CloseAsync()
              } |> Async.RunSynchronously
    
        0 // return an integer exit code
    

    以下是我的 Azure 门户中的两个主题:
    enter image description here

    在 Portal 中点击主题有不同的结果:

    我注意到我必须点击“ express 接受”两次才能查看它的订阅。但是,我可以单击“ express 请求”一次并立即查看其订阅。

    最佳答案

    如果我正确理解您尝试删除有问题的主题并重新创建它,那么这听起来像是 Azure 中的一个小问题。您不应该得到上述需要单击两次的行为。有时我在 Azure 中创建了一些东西,在他们的基础结构下游某处存在问题,支持请求是解决问题的唯一方法。

    关于azureservicebus - 为什么 RegisterMessageHandler 不适用于特定主题名称?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/60596963/

    相关文章:

    c# - 如果网络关闭,MessageReceiver.RegisterMessageHandler 会连续抛出异常

    Azure 服务总线主题请求与消息

    azure - 在启用 Azure ServiceBus session 的订阅中,为什么我会在多个订阅者实例上收到具有相同 session ID 的消息

    azure - Azure 服务总线中 SQL 筛选器和相关筛选器的主要区别是什么

    c# - 在 Azure 服务总线 SendAsync 方法上捕获异常时遇到问题

    azure - 测试 Azure 死信服务总线队列

    Azure 服务总线 http 与 websocket

    Azure 服务总线主题订阅引发有关 System.DBNull 的 FilterException

    node.js - 服务总线错误: The messaging entity