go - 如何使用 Benthos 读取和解码来自 Kafka 的 AVRO 消息及其关联的 kafka key ?

标签 go apache-kafka confluent-schema-registry benthos

我正在使用 Benthos 从 Kafka 读取 AVRO 编码的消息,该消息的 kafka_key 元数据字段设置为还包含 AVRO 编码的有效负载。这些 AVRO 编码有效负载的模式存储在模式注册表中,Benthos 有一个 schema_registry_decode 处理器用于解码它们。我希望为每个包含两个字段的 Kafka 消息生成一条输出 JSON 消息,一个名为 content 包含解码的 AVRO 消息,另一个名为 metadata 包含各种 metadata fields由 Benthos 收集,包括解码后的 kafka_key 有效负载。

最佳答案

事实证明,可以使用 branch 来实现这一目标像这样的处理器:

input:
  kafka:
    addresses:
      - localhost:9092
    consumer_group: benthos_consumer_group
    topics:
      - benthos_input

pipeline:
  processors:
    # Decode the message
    - schema_registry_decode:
        url: http://localhost:8081

    # Populate output content field
    - bloblang: |
        root.content = this

    # Decode kafka_key metadata payload and populate output metadata field
    - branch:
        request_map: |
          root = meta("kafka_key")

        processors:
          - schema_registry_decode:
              url: http://localhost:8081

        result_map: |
          root.metadata = meta()
          root.metadata.kafka_key = this

output:
  stdout: {}

关于go - 如何使用 Benthos 读取和解码来自 Kafka 的 AVRO 消息及其关联的 kafka key ?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/71087902/

相关文章:

go - Golang sha256哈希无法满足okta代码挑战

go - 使用 -ldflags -H=windowsgui 编译 golang 应用程序时将输出打印到命令窗口

Golang 冲突的命令行标志

java - 卡夫卡流: Invalid topology: StateStore is not added yet

java - Confluence Kafka Streams - 找不到类 io.confluence.connect.avro.ConnectDefault

Go:再次将类型为 uuid.UUID (satori) 的 reflect.Value 转换回 uuid.UUID

hadoop - Confluent HDFS 连接器

postgresql - Debezium Kafka 连接。十进制模式错误

apache-kafka - Spring Embedded Kafka + Mock Schema Registry : State Store ChangeLog Schema not registered

json - 如何通过 Kafka Console Consumer 和 Producer 使用 JSON 模式使用和生成消息