apache-kafka - 使用 Flink 自定义 avro 消息反序列化

标签 apache-kafka apache-flink avro flink-streaming confluent-schema-registry

我正在开发的 Flink 消费者应用程序读取多个 Kafka 主题。不同主题中发布的消息遵循相同的架构(格式为 Avro)。对于架构管理,我使用 Confluence 架构注册表。

我一直在 KafkaSource 使用以下代码片段,它工作得很好。

KafkaSource<MyObject> source = KafkaSource.<MyObject>builder()
                .setBootstrapServers(BOOTSTRAP_SERVERS)
                .setTopics(TOPIC-1, TOPIC-2)
                .setGroupId(GROUP_ID)
                .setStartingOffsets(OffsetsInitializer.earliest())
                .setValueOnlyDeserializer(ConfluentRegistryAvroDeserializationSchema.forSpecific(MyObject.class, SCHEMA_REGISTRY_URL))
                .build();

现在,我想确定我处理的每条消息的主题名称。由于当前的反序列化器是 ValueOnly,我开始研究 setDeserializer() 方法,我认为该方法可以让我访问整个 ConsumerRecord 对象,并且可以从中获取主题名称。

但是,我无法弄清楚如何使用该实现。我应该实现自己的解串器吗?如果是这样,架构注册表如何适应该实现?

最佳答案

您可以将 setDeserializer 方法与 KafkaRecordDeserializationSchema 结合使用,如下所示:

public class KafkaUsageRecordDeserializationSchema
        implements KafkaRecordDeserializationSchema<UsageRecord> {

    private static final long serialVersionUID = 1L;

    private transient ObjectMapper objectMapper;

    @Override
    public void open(DeserializationSchema.InitializationContext context) throws Exception {
        KafkaRecordDeserializationSchema.super.open(context);
        objectMapper = JsonMapper.builder().build();
    }

    @Override
    public void deserialize(
            ConsumerRecord<byte[], byte[]> consumerRecord,
            Collector<UsageRecord> collector) throws IOException {

        collector.collect(objectMapper.readValue(consumerRecord.value(), UsageRecord.class));
    }

    @Override
    public TypeInformation<UsageRecord> getProducedType() {
        return TypeInformation.of(UsageRecord.class);
    }
}

然后您可以使用 ConsumerRecord 访问主题和其他元数据。

关于apache-kafka - 使用 Flink 自定义 avro 消息反序列化,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/72006018/

相关文章:

java - 如何从kafka主题中按键获取消息

java - Apache Kafka 获取特定主题的消费者列表

java - 大源数据的flink检查点

hadoop - 在 Amazon EMR 上配置 Flink Rest API

apache-flink - 将 subtask_id 映射到 Flink 中的 TaskManager

java - Avro:ReflectDatumWriter 不输出架构信息

java - 如何使用 apache avro 生成无模式的 avro 文件?

apache-kafka - Kafka 消费者 100% cpu 使用率

scala - ERROR Uncaught throwable from user code : java. lang.IllegalStateException in spark Streaming

java - 如何使用 avro IDL 为枚举指定默认值?