java - 如何为 Kafka 2.2 实现 FlinkKafkaProducer 序列化器

标签 java apache-kafka apache-flink

我一直在致力于更新 Flink 处理器(Flink 版本 1.9),该处理器从 Kafka 读取数据,然后写入 Kafka。我们已经编写了这个处理器来运行 Kafka 0.10.2 集群,现在我们已经部署了一个运行版本 2.2 的新 Kafka 集群。因此,我开始更新处理器以使用最新的 FlinkKafkaConsumer 和 FlinkKafkaProducer (按照 Flink 文档的建议)。然而我在卡夫卡生产者方面遇到了一些问题。我无法使用已弃用的构造函数来序列化数据(这并不奇怪),并且我无法在网上找到任何有关如何实现序列化器的实现或示例(所有示例都使用较旧的 Kafka 连接器)

当前实现(针对Kafka 0.10.2)如下

FlinkKafkaProducer010<String> eventBatchFlinkKafkaProducer = new FlinkKafkaProducer010<String>(
                "playerSessions",
                new SimpleStringSchema(),
                producerProps,
                (FlinkKafkaPartitioner) null
        );

当尝试实现以下 FlinkKafkaProducer 时

FlinkKafkaProducer<String> eventBatchFlinkKafkaProducer = new FlinkKafkaProducer<String>(
                "playerSessions",
                new SimpleStringSchema(),
                producerProps,
                null
        );

我收到以下错误:

Exception in thread "main" java.lang.NullPointerException
    at org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer.<init>(FlinkKafkaProducer.java:525)
    at org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer.<init>(FlinkKafkaProducer.java:483)
    at org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer.<init>(FlinkKafkaProducer.java:357)
    at com.ebs.flink.sessionprocessor.SessionProcessor.main(SessionProcessor.java:122)

我一直没能弄清楚为什么。 FlinkKafkaProducer 的构造函数也已弃用,当我尝试实现未弃用的构造函数时,我无法弄清楚如何序列化数据。 以下是它的外观:

FlinkKafkaProducer<String> eventBatchFlinkKafkaProducer = new FlinkKafkaProducer<String>(
                "playerSessions",
                new KafkaSerializationSchema<String>() {
                    @Override
                    public ProducerRecord<byte[], byte[]> serialize(String s, @Nullable Long aLong) {
                        return null;
                    }
                },
                producerProps,
                FlinkKafkaProducer.Semantic.EXACTLY_ONCE
        );

但是我不明白如何实现 KafkaSerializationSchema,而且我在网上或 Flink 文档中没有找到这方面的示例。

是否有人有任何实现此操作的经验,或者有任何关于 FlinkProducer 在该步骤中出现 NullPointerException 的原因的提示?

最佳答案

如果您只是将字符串发送到 Kafka:

public class ProducerStringSerializationSchema implements KafkaSerializationSchema<String>{

    private String topic;   

    public ProducerStringSerializationSchema(String topic) {
        super();
        this.topic = topic;
    }

    @Override
    public ProducerRecord<byte[], byte[]> serialize(String element, Long timestamp) {
        return new ProducerRecord<byte[], byte[]>(topic, element.getBytes(StandardCharsets.UTF_8));
    }

}

发送 Java 对象:

import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.JsonProcessingException;
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper;
import org.apache.flink.streaming.connectors.kafka.KafkaSerializationSchema;
import org.apache.kafka.clients.producer.ProducerRecord;


    public class ObjSerializationSchema implements KafkaSerializationSchema<MyPojo>{

        private String topic;   
        private ObjectMapper mapper;

        public ObjSerializationSchema(String topic) {
            super();
            this.topic = topic;
        }

        @Override
        public ProducerRecord<byte[], byte[]> serialize(MyPojo obj, Long timestamp) {
            byte[] b = null;
            if (mapper == null) {
                mapper = new ObjectMapper();
            }
             try {
                b= mapper.writeValueAsBytes(obj);
            } catch (JsonProcessingException e) {
                // TODO 
            }
            return new ProducerRecord<byte[], byte[]>(topic, b);
        }

    }

在你的代码中

.addSink(new FlinkKafkaProducer<>(producerTopic, new ObjSerializationSchema(producerTopic), 
                        params.getProperties(), FlinkKafkaProducer.Semantic.EXACTLY_ONCE));

关于java - 如何为 Kafka 2.2 实现 FlinkKafkaProducer 序列化器,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/58644549/

相关文章:

logging - Flink 日志不显示

java - Hibernate 在用户单击时加载父级的子级?

java - 更新到 Android 4.4 KitKat 后无法找到 Movies 文件夹

java - 如何检查二维文件

java - 哪种实现迭代语句的内存效率最高?

apache-kafka - kafka 在哪​​里存储主题的分区?

hdfs - KAFKA生产者可以读取日志文件吗?

apache-kafka - Kafka Streams 加入不相关的流

apache-flink - 从 IDE 运行 flink 时如何设置 presto.s3.xxx 属性?

apache-flink - Apache Flink 1.14.0 - 无法通过 Java 中的 SQL DDL 使用 python UDF