Google Pub/Sub 消息排序不起作用(或将延迟增加到 10 秒以上)?

标签 go publish-subscribe google-cloud-pubsub

我正在尝试制作一个简化示例来演示如何使用 Google Pub/Sub 的消息排序功能 (https://cloud.google.com/pubsub/docs/ordering)。从这些文档中,为订阅启用消息排序后,

After the message ordering property is set, the Pub/Sub service delivers messages with the same ordering key in the order that the Pub/Sub service receives the messages. For example, if a publisher sends two messages with the same ordering key, the Pub/Sub service delivers the oldest message first.


我用它来编写以下示例:
package main

import (
    "context"
    "log"
    "time"

    "cloud.google.com/go/pubsub"
    uuid "github.com/satori/go.uuid"
)

func main() {
    client, err := pubsub.NewClient(context.Background(), "my-project")
    if err != nil {
        log.Fatalf("NewClient: %v", err)
    }

    topicID := "test-topic-" + uuid.NewV4().String()
    topic, err := client.CreateTopic(context.Background(), topicID)
    if err != nil {
        log.Fatalf("CreateTopic: %v", err)
    }
    defer topic.Delete(context.Background())

    subID := "test-subscription-" + uuid.NewV4().String()
    sub, err := client.CreateSubscription(context.Background(), subID, pubsub.SubscriptionConfig{
        Topic:                 topic,
        EnableMessageOrdering: true,
    })
    if err != nil {
        log.Fatalf("CreateSubscription: %v", err)
    }
    defer sub.Delete(context.Background())

    ctx, cancel := context.WithCancel(context.Background())
    defer cancel()

    messageReceived := make(chan struct{})
    go sub.Receive(ctx, func(ctx context.Context, msg *pubsub.Message) {
        log.Printf("Received message with ordering key %s: %s", msg.OrderingKey, msg.Data)
        msg.Ack()
        messageReceived <- struct{}{}
    })

    topic.Publish(context.Background(), &pubsub.Message{Data: []byte("Dang1!"), OrderingKey: "foobar"})
    topic.Publish(context.Background(), &pubsub.Message{Data: []byte("Dang2!"), OrderingKey: "foobar"})

    for i := 0; i < 2; i++ {
        select {
        case <-messageReceived:
        case <-time.After(10 * time.Second):
            log.Fatal("Expected to receive a message, but timed out after 10 seconds.")
        }
    }
}
首先,我尝试了没有指定 OrderingKey: "foobar" 的程序。在 topic.Publish()来电。这导致以下输出:
> go run main.go
2020/08/10 21:40:34 Received message with ordering key : Dang2!
2020/08/10 21:40:34 Received message with ordering key : Dang1!
换句话说,消息的接收顺序与它们发布的顺序不同,这在我的用例中是不可取的,我想通过指定 OrderingKey 来防止。
但是,只要我添加了 OrderingKey s 在发布调用中,程序在等待接收 Pub/Sub 消息 10 秒后超时:
> go run main.go
2020/08/10 21:44:36 Expected to receive a message, but timed out after 10 seconds.
exit status 1
我希望现在首先收到消息 Dang1!后跟 Dang2! ,但我没有收到任何消息。知道为什么这没有发生吗?

最佳答案

发布失败并出现以下错误:Failed to publish: Topic.EnableMessageOrdering=false, but an OrderingKey was set in Message. Please remove the OrderingKey or turn on Topic.EnableMessageOrdering .
如果您更改发布调用以检查错误,您可以看到这一点:

res1 := topic.Publish(context.Background(), &pubsub.Message{Data: []byte("Dang1!"), OrderingKey: "foobar"})
res2 := topic.Publish(context.Background(), &pubsub.Message{Data: []byte("Dang2!"), OrderingKey: "foobar"})

_, err = res1.Get(ctx)
if err != nil {
    fmt.Printf("Failed to publish: %v", err)
    return
}

_, err = res2.Get(ctx)
if err != nil {
    fmt.Printf("Failed to publish: %v", err)
    return
}
要修复它,请添加一行以启用主题的消息排序。您的主题创建如下:
topic, err := client.CreateTopic(context.Background(), topicID)
if err != nil {
    log.Fatalf("CreateTopic: %v", err)
}
topic.EnableMessageOrdering = true
defer topic.Delete(context.Background())

关于Google Pub/Sub 消息排序不起作用(或将延迟增加到 10 秒以上)?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/63351729/

相关文章:

javascript - 用于事件驱动架构的 jQuery 插件?

javascript - Node : How to handle event listening between objects?

java - Google Appengine 请求 pub sub 时出错

java - 尝试在本地运行 PubSub 模拟器时出错

go - 关闭持久连接的正确方法是什么?

Golang 包导入 - 找不到包

ios - 为 iOS 发布订阅?

apache-kafka - 是否可以为 Google Pub/Sub 主题定义架构,例如在 Kafka 中使用 AVRO?

go - 中午在Golang跑代码

dictionary - 使用附加属性作为键从结构创建映射或对象