java - 如何将 Flink 的 protobuf 字节数组写入 Kafka

标签 java apache-kafka apache-flink

我是 Flink 的新手。我想要做的就是将我的 protobuf POJO 作为字节数组放入 kafka。所以我的 FlinkKafkaProducer 看起来像这样:

FlinkKafkaProducer<String> flinkKafkaProducer = createStringProducer(outputTopic, address);
        stringInputStream
                .map(//here returns byte[])
                .addSink(flinkKafkaProducer);

public static FlinkKafkaProducer<String> createStringProducer(String topic, String kafkaAddress) {
        return new FlinkKafkaProducer<>(kafkaAddress, topic, new SimpleStringSchema());
    }

现在它工作正常,但我的输出是字符串。我尝试添加 TypeInformationSerializationSchema() 而不是 new SimpleStringSchema() 来更改输出,但我不知道如何正确调整它。找不到任何教程。有人可以帮忙吗?

最佳答案

所以,我终于想出了如何将 protobuf 作为字节数组写入 kafka 生产者。问题在于序列化。在 POJO 的情况下,flink 使用 libery Kryo 进行自定义反序列化。编写 protobuf 的最佳方式是使用 ProtobufSerializer.class。在这个例子中,我将从 kafka 字符串消息中读取并写入字节数组。 Gradle 依赖项:

 compile (group: 'com.twitter', name: 'chill-protobuf', version: '0.7.6'){
        exclude group: 'com.esotericsoftware.kryo', module: 'kryo'
    }
    implementation 'com.google.protobuf:protobuf-java:3.11.0'

注册:

StreamExecutionEnvironment environment = StreamExecutionEnvironment.getExecutionEnvironment();
environment.getConfig().registerTypeWithKryoSerializer(MyProtobuf.class, ProtobufSerializer.class);

KafkaSerializerClass

@Data
@RequiredArgsConstructor
public class MyProtoKafkaSerializer implements KafkaSerializationSchema<MyProto> {
    private final String topic;
    private final byte[] key;

    @Override
    public ProducerRecord<byte[], byte[]> serialize(MyProto element, Long timestamp) {
                
        return new ProducerRecord<>(topic, key, element.toByteArray());
    }
}

工作

  public static FlinkKafkaProducer<MyProto> createProtoProducer(String topic, String kafkaAddress) {
        MyProtoKafkaSerializer myProtoKafkaSerializer = new MyProtoKafkaSerializer(topic);
        Properties props = new Properties();
        props.setProperty("bootstrap.servers", kafkaAddress);
        props.setProperty("group.id", consumerGroup);
        return new FlinkKafkaProducer<>(topic, myProtoKafkaSerializer, props, FlinkKafkaProducer.Semantic.AT_LEAST_ONCE);
    }

 public static FlinkKafkaConsumer<String> createProtoConsumerForTopic(String topic, String kafkaAddress, String kafkaGroup) {
        Properties props = new Properties();
        props.setProperty("bootstrap.servers", kafkaAddress);
        props.setProperty("group.id", kafkaGroup);
        return new FlinkKafkaConsumer<>(topic, new SimpleStringSchema(), props);
    }

DataStream<String> stringInputStream = environment.addSource(flinkKafkaConsumer);
        FlinkKafkaProducer<MyProto> flinkKafkaProducer = createProtoProducer(outputTopic, address);
        stringInputStream
                .map(hashtagMapFunction)
                .addSink(flinkKafkaProducer);

        environment.execute("My test job");

来源:

  1. https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/custom_serializers.html#register-a-custom-serializer-for-your-flink-program
  2. https://flink.apache.org/news/2020/04/15/flink-serialization-tuning-vol-1.html#protobuf-via-kryo

关于java - 如何将 Flink 的 protobuf 字节数组写入 Kafka,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/63951160/

相关文章:

java - 使 edittext 显示值

java - GWT RPC 进行时更新/刷新 View

java - 正在进行的快照太多。增加kafka生产者池的大小或减少并发检查点的数量

apache-kafka - 如何根据连接器名称获取Kafka源连接器架构

java - Flink State过期时触发

java - Flink Table API 无法将 DataSet 转换为 DataStream

java - 前缀和后缀运算符java

java - CheckBoxNodeTree 示例 - 如何在 CheckBoxNodeEditor 中正确实现 itemlistener?

java - 是否可以使用 Kafka 将 Java 对象发送到 C# 应用程序

scala - flink中wordcount示例中与JobManager的通信失败错误