我正在使用 Benthos 从 Kafka 读取 AVRO 编码的消息,该消息的 kafka_key
元数据字段设置为还包含 AVRO 编码的有效负载。这些 AVRO 编码有效负载的模式存储在模式注册表中,Benthos 有一个 schema_registry_decode
处理器用于解码它们。我希望为每个包含两个字段的 Kafka 消息生成一条输出 JSON 消息,一个名为 content
包含解码的 AVRO 消息,另一个名为 metadata
包含各种 metadata fields由 Benthos 收集,包括解码后的 kafka_key
有效负载。
最佳答案
事实证明,可以使用 branch
来实现这一目标像这样的处理器:
input:
kafka:
addresses:
- localhost:9092
consumer_group: benthos_consumer_group
topics:
- benthos_input
pipeline:
processors:
# Decode the message
- schema_registry_decode:
url: http://localhost:8081
# Populate output content field
- bloblang: |
root.content = this
# Decode kafka_key metadata payload and populate output metadata field
- branch:
request_map: |
root = meta("kafka_key")
processors:
- schema_registry_decode:
url: http://localhost:8081
result_map: |
root.metadata = meta()
root.metadata.kafka_key = this
output:
stdout: {}
关于go - 如何使用 Benthos 读取和解码来自 Kafka 的 AVRO 消息及其关联的 kafka key ?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/71087902/