java - 如何在 Vertx Kafka 客户端中使用自定义序列化器?

标签 java apache-kafka vert.x

我有以下 kafka 生产者属性。

value.serializer=MyEventSerializer
value.deserializer=MyEventDeserializer
default.value.serde=MyEventSerde

我已经在Vertx site中经历了序列化器 并使用创建了生产者

KafkaProducer<String, MyEvent> producer = KafkaProducer.create(vertx, configProperties, String.class, MyEvent.class);

但我收到以下错误:

SEVERE: Unknown class for built-in 
serializer. Supported types are: String, Short, Integer, Long, Float, Double, ByteArray, ByteBuffer, Bytes
java.lang.IllegalArgumentException: Unknown class for built-in serializer. Supported types are: String, Short, Integer, Long, Float, Double, ByteArray, ByteBuffer, Bytes

有没有办法在 Vertx kafka 客户端中拥有自定义序列化器?

最佳答案

我必须手动执行 KafkaProducer.create() 的操作。

Serializer<String> keySerializer = VertxSerdes.serdeFrom(String.class).serializer();
Serializer<MyEvent> valueSerializer = new MyEventSerializer();
KafkaWriteStream<String, MyEvent> stream = new KafkaWriteStreamImpl(vertx.getOrCreateContext(), new org.apache.kafka.clients.producer.KafkaProducer(configProperties, keySerializer, valueSerializer));
KafkaProducer<String,MyEvent> producer=(new KafkaProducerImpl(stream)).registerCloseHook();

然后使用..写入记录

KafkaProducerRecord producerRecord= KafkaProducerRecord.create(topicName,key,value);

        producer.write(producerRecord, done -> {
            if (done.succeeded()) {
                // TODO if succeeded
            } else {
                // TODO if failed
            }

         });

关于java - 如何在 Vertx Kafka 客户端中使用自定义序列化器?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/54575690/

相关文章:

JAVA 正则表达式 : Match until the specific character

java - 将 SQL Azure 与 Android 连接

apache-kafka - Kafka 滚动重启主动 Controller 最后的性能优势

apache-kafka - Kafka模式注册表在同一主题中不兼容

java - vert.x 响应式(Reactive) mySQL 客户端查询总是给出空结果

java - 如何捕获 Marathon 中的弹出对话框?

mysql - 如何使用开源 kafka connect 从 Aurora 连接 MSK

kotlin - 让我们用vert.x加密

java - 未找到注释处理器 'io.vertx.serviceproxy.ServiceProxyProcessor'

java - JNA 从 C/C++ 接收一个包含字符串的结构