我正在开发的 Flink 消费者应用程序读取多个 Kafka 主题。不同主题中发布的消息遵循相同的架构(格式为 Avro)。对于架构管理,我使用 Confluence 架构注册表。
我一直在 KafkaSource 使用以下代码片段,它工作得很好。
KafkaSource<MyObject> source = KafkaSource.<MyObject>builder()
.setBootstrapServers(BOOTSTRAP_SERVERS)
.setTopics(TOPIC-1, TOPIC-2)
.setGroupId(GROUP_ID)
.setStartingOffsets(OffsetsInitializer.earliest())
.setValueOnlyDeserializer(ConfluentRegistryAvroDeserializationSchema.forSpecific(MyObject.class, SCHEMA_REGISTRY_URL))
.build();
现在,我想确定我处理的每条消息的主题名称。由于当前的反序列化器是 ValueOnly,我开始研究 setDeserializer() 方法,我认为该方法可以让我访问整个 ConsumerRecord 对象,并且可以从中获取主题名称。
但是,我无法弄清楚如何使用该实现。我应该实现自己的解串器吗?如果是这样,架构注册表如何适应该实现?
最佳答案
您可以将 setDeserializer
方法与 KafkaRecordDeserializationSchema
结合使用,如下所示:
public class KafkaUsageRecordDeserializationSchema
implements KafkaRecordDeserializationSchema<UsageRecord> {
private static final long serialVersionUID = 1L;
private transient ObjectMapper objectMapper;
@Override
public void open(DeserializationSchema.InitializationContext context) throws Exception {
KafkaRecordDeserializationSchema.super.open(context);
objectMapper = JsonMapper.builder().build();
}
@Override
public void deserialize(
ConsumerRecord<byte[], byte[]> consumerRecord,
Collector<UsageRecord> collector) throws IOException {
collector.collect(objectMapper.readValue(consumerRecord.value(), UsageRecord.class));
}
@Override
public TypeInformation<UsageRecord> getProducedType() {
return TypeInformation.of(UsageRecord.class);
}
}
然后您可以使用 ConsumerRecord
访问主题和其他元数据。
关于apache-kafka - 使用 Flink 自定义 avro 消息反序列化,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/72006018/