java - 将自定义 Java 对象发送到 Kafka 主题

标签 java serialization apache-kafka

我有自己的自定义 Java 对象,希望利用 JVM 的内置序列化将其发送到 Kafka 主题,但序列化失败并出现以下错误

org.apache.kafka.common.errors.SerializationException: Can't convert value of class com.spring.kafka.Payload to class org.apache.kafka.common.serialization.ByteArraySerializer specified in value.serializer

有效载荷.java

public class Payload implements Serializable {

    private static final long serialVersionUID = 123L;

    private String name="vinod";

    private int anInt = 5;

    private Double aDouble = new Double("5.0");

    public String getName() {
        return name;
    }

    public void setName(String name) {
        this.name = name;
    }

    public int getAnInt() {
        return anInt;
    }

    public void setAnInt(int anInt) {
        this.anInt = anInt;
    }

    public Double getaDouble() {
        return aDouble;
    }

    public void setaDouble(Double aDouble) {
        this.aDouble = aDouble;
    }

}

在创建生产者期间,我设置了以下属性

<entry key="key.serializer"
                       value="org.apache.kafka.common.serialization.ByteArraySerializer" />
                <entry key="value.serializer"
                       value="org.apache.kafka.common.serialization.ByteArraySerializer" />

我的发送调用如下

kafkaProducer.send(new ProducerRecord<String, Payload>("test", new Payload()));

通过生产者将自定义 java 对象发送到 kafka 主题的正确方法是什么?

最佳答案

我们有下面列出的 2 个选项

  1. 如果我们打算将自定义 java 对象发送给生产者,我们需要创建一个实现 org.apache.kafka.common.serialization.Serializer 的序列化程序,并在创建生产者期间传递该序列化程序类

代码引用如下

public class PayloadSerializer implements org.apache.kafka.common.serialization.Serializer {

    public void configure(Map map, boolean b) {

    }

    public byte[] serialize(String s, Object o) {
       
       try {
            ByteArrayOutputStream baos = new ByteArrayOutputStream();
            ObjectOutputStream oos = new ObjectOutputStream(baos);
            oos.writeObject(o);
            oos.close();
            byte[] b = baos.toByteArray();
            return b;
        } catch (IOException e) {
            throw new RuntimeException(e);
        }
    }

    public void close() {

    }
}

并相应地设置值序列化器

<entry key="value.serializer"
                       value="com.spring.kafka.PayloadSerializer" />
  1. 无需创建自定义序列化程序类。使用现有的 ByteArraySerializer,但在发送过程中遵循流程

Java Object -> String (Preferrably JSON represenation instead of toString)->byteArray

关于java - 将自定义 Java 对象发送到 Kafka 主题,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/41141924/

相关文章:

java - 当您订阅 kafka 上的事件时,我可以向订阅添加参数吗

java - 使用类添加最终值。 java 。

场景的 Javafx 快照不显示值和系列

java - java.io.Serializable 类的意义是什么?

hadoop - Kafka我的制片人作品主题是Par:0,Lead:1,Rep:1,Isr:1 BUT NOT Par:0,Lead:2,Rep:2,Isr:2

java - Spring Boot Kafka Consumer不消费,Kafka Listener不触发

java - 如何在 Cloud Firestore 中正确构建本地化内容?

java - Micronaut 多级 bean 验证

python - 如何将 networkx (python) 节点序列化为 json 格式?

java - 使用 Jackson 从 JSON 映射 BasicNameValuePair