我对 Spring Cloud Stream 和 Kafka 都很陌生。在有效负载中发送字符串时,我从 kafka 生产者处收到以下错误。非常感谢任何帮助或见解。我尝试使用 bytearray 序列化器/反序列化器以及 json 来代替纯文本。
错误信息: org.apache.kafka.common.errors.SerializationException:无法将类 [B 的值转换为 value.serializer 中指定的类 org.apache.kafka.common.serialization.StringSerializer
错误堆栈:
2019-10-25 16:13:40.762 ERROR 4628 --- [ XNIO-1 task-1] o.z.problem.spring.common.AdviceTraits : Internal Server Error
org.springframework.messaging.MessageHandlingException: error occurred in message handler [org.springframework.cloud.stream.binder.kafka.KafkaMessageChannelBinder$ProducerConfigurationMessageHandler@ebad77c]; nested exception is org.apache.kafka.common.errors.SerializationException: Can't convert value of class [B to class org.apache.kafka.common.serialization.StringSerializer specified in value.serializer
at org.springframework.integration.support.utils.IntegrationUtils.wrapInHandlingExceptionIfNecessary(IntegrationUtils.java:189)
at org.springframework.integration.handler.AbstractMessageHandler.handleMessage(AbstractMessageHandler.java:186)
at org.springframework.cloud.stream.binder.AbstractMessageChannelBinder$SendingHandler.handleMessageInternal(AbstractMessageChannelBinder.java:1095)
at org.springframework.integration.handler.AbstractMessageHandler.handleMessage(AbstractMessageHandler.java:176)
at org.springframework.integration.dispatcher.AbstractDispatcher.tryOptimizedDispatch(AbstractDispatcher.java:115)
at org.springframework.integration.dispatcher.UnicastingDispatcher.doDispatch(UnicastingDispatcher.java:132)
at org.springframework.integration.dispatcher.UnicastingDispatcher.dispatch(UnicastingDispatcher.java:105)
at org.springframework.integration.channel.AbstractSubscribableChannel.doSend(AbstractSubscribableChannel.java:73)
at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:453)
at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:401)
at com.ll.kafkaservice.greeting.GreetingsService.sendGreeting(GreetingsService.java:30)
at com.ll.kafkaservice.greeting.GreetingsController.greetings(GreetingsController.java:29)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
....
Caused by: org.apache.kafka.common.errors.SerializationException: Can't convert value of class [B to class org.apache.kafka.common.serialization.StringSerializer specified in value.serializer
Caused by: java.lang.ClassCastException: [B cannot be cast to java.lang.String
at org.apache.kafka.common.serialization.StringSerializer.serialize(StringSerializer.java:28)
at org.apache.kafka.common.serialization.Serializer.serialize(Serializer.java:62)
at org.apache.kafka.clients.producer.KafkaProducer.doSend(KafkaProducer.java:894)
at org.apache.kafka.clients.producer.KafkaProducer.send(KafkaProducer.java:856)
at org.springframework.kafka.core.DefaultKafkaProducerFactory$CloseSafeProducer.send(DefaultKafkaProducerFactory.java:470)
at org.springframework.kafka.core.KafkaTemplate.doSend(KafkaTemplate.java:407)
at org.springframework.kafka.core.KafkaTemplate.send(KafkaTemplate.java:242)
at org.springframework.integration.kafka.outbound.KafkaProducerMessageHandler.handleRequestMessage(KafkaProducerMessageHandler.java:382)
at org.springframework.integration.handler.AbstractReplyProducingMessageHandler.handleMessageInternal(AbstractReplyProducingMessageHandler.java:123)
at org.springframework.integration.handler.AbstractMessageHandler.handleMessage(AbstractMessageHandler.java:176)
at org.springframework.cloud.stream.binder.AbstractMessageChannelBinder$SendingHandler.handleMessageInternal(AbstractMessageChannelBinder.java:1095)
at org.springframework.integration.handler.AbstractMessageHandler.handleMessage(AbstractMessageHandler.java:176)
at org.springframework.integration.dispatcher.AbstractDispatcher.tryOptimizedDispatch(AbstractDispatcher.java:115)
at org.springframework.integration.dispatcher.UnicastingDispatcher.doDispatch(UnicastingDispatcher.java:132)
at org.springframework.integration.dispatcher.UnicastingDispatcher.dispatch(UnicastingDispatcher.java:105)
at org.springframework.integration.channel.AbstractSubscribableChannel.doSend(AbstractSubscribableChannel.java:73)
at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:453)
at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:401)
at com.ll.kafkaservice.greeting.GreetingsService.sendGreeting(GreetingsService.java:30)
at com.ll.kafkaservice.greeting.GreetingsController.greetings(GreetingsController.java:29)
此处列出的是 Spring Cloud Stream 设置
cloud:
stream:
bindings:
greetings-in:
destination: greetings
#content-type: application/json
content-type: text/plain
greetings-out:
destination: greetings
#content-type: application/json
content-type: text/plain
生产者设置
2019-10-25 16:12:28.346 INFO 4628 --- [ restartedMain] com.ll.kafkaservice.KafkaServiceApp : Started KafkaServiceApp in 22.248 seconds (JVM running for 23.136)
2019-10-25 16:12:28.366 DEBUG 4628 --- [ restartedMain] c.l.k.aop.logging.LoggingAspect : Enter: com.ll.kafkaservice.service.KafkaServiceKafkaProducer.init() with argument[s] = []
2019-10-25 16:12:28.377 INFO 4628 --- [ restartedMain] c.l.k.service.KafkaServiceKafkaProducer : Kafka producer initializing...
2019-10-25 16:12:28.378 INFO 4628 --- [ restartedMain] o.a.k.clients.producer.ProducerConfig : ProducerConfig values:
acks = 1
batch.size = 16384
bootstrap.servers = [localhost:9092]
buffer.memory = 33554432
client.dns.lookup = default
client.id =
compression.type = none
connections.max.idle.ms = 540000
delivery.timeout.ms = 120000
enable.idempotence = false
interceptor.classes = []
key.serializer = class org.apache.kafka.common.serialization.StringSerializer
linger.ms = 0
max.block.ms = 60000
max.in.flight.requests.per.connection = 5
max.request.size = 1048576
metadata.max.age.ms = 300000
metric.reporters = []
metrics.num.samples = 2
metrics.recording.level = INFO
metrics.sample.window.ms = 30000
partitioner.class = class org.apache.kafka.clients.producer.internals.DefaultPartitioner
receive.buffer.bytes = 32768
reconnect.backoff.max.ms = 1000
reconnect.backoff.ms = 50
request.timeout.ms = 30000
retries = 2147483647
retry.backoff.ms = 100
sasl.client.callback.handler.class = null
sasl.jaas.config = null
sasl.kerberos.kinit.cmd = /usr/bin/kinit
sasl.kerberos.min.time.before.relogin = 60000
sasl.kerberos.service.name = null
sasl.kerberos.ticket.renew.jitter = 0.05
sasl.kerberos.ticket.renew.window.factor = 0.8
sasl.login.callback.handler.class = null
sasl.login.class = null
sasl.login.refresh.buffer.seconds = 300
sasl.login.refresh.min.period.seconds = 60
sasl.login.refresh.window.factor = 0.8
sasl.login.refresh.window.jitter = 0.05
sasl.mechanism = GSSAPI
security.protocol = PLAINTEXT
send.buffer.bytes = 131072
ssl.cipher.suites = null
ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1]
ssl.endpoint.identification.algorithm = https
ssl.key.password = null
ssl.keymanager.algorithm = SunX509
ssl.keystore.location = null
ssl.keystore.password = null
ssl.keystore.type = JKS
ssl.protocol = TLS
ssl.provider = null
ssl.secure.random.implementation = null
ssl.trustmanager.algorithm = PKIX
ssl.truststore.location = null
ssl.truststore.password = null
ssl.truststore.type = JKS
transaction.timeout.ms = 60000
transactional.id = null
value.serializer = class org.apache.kafka.common.serialization.StringSerializer
2019-10-25 16:12:28.399 INFO 4628 --- [ restartedMain] o.a.kafka.common.utils.AppInfoParser : Kafka version: 2.3.0
此处列出的是包含消息的对象
package com.ll.kafkaservice.messaging;
import java.io.Serializable;
public class Greeting implements Serializable {
private static final long serialVersionUID = 1L;
private String message;
public Greeting() {
}
public String getMessage() {
return message;
}
public void setMessage(String message) {
this.message = message;
}
public String toString() {
StringBuffer sbuffer = new StringBuffer();
sbuffer.append("{");
sbuffer.append("message:");
sbuffer.append(message);
sbuffer.append("}");
return sbuffer.toString();
}
}
定义流
package com.ll.kafkaservice.greeting;
import org.springframework.cloud.stream.annotation.Input;
import org.springframework.cloud.stream.annotation.Output;
import org.springframework.messaging.MessageChannel;
import org.springframework.messaging.SubscribableChannel;
public interface GreetingsStreams {
String INPUT = "greetings-in";
String OUTPUT = "greetings-out";
@Input(INPUT)
SubscribableChannel inboundGreetings();
@Output(OUTPUT)
MessageChannel outboundGreetings();
}
绑定(bind)流
package com.ll.kafkaservice.config;
import org.springframework.cloud.stream.annotation.EnableBinding;
import com.ll.kafkaservice.greeting.GreetingsStreams;
@EnableBinding(GreetingsStreams.class)
public class StreamsConfiguration {
}
生成/发送消息
package com.ll.kafkaservice.greeting;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.messaging.MessageChannel;
import org.springframework.messaging.MessageHeaders;
import org.springframework.messaging.support.MessageBuilder;
import org.springframework.stereotype.Service;
import org.springframework.util.MimeTypeUtils;
import com.ll.kafkaservice.messaging.Greeting;
@Service
public class GreetingsService {
private final Logger log = LoggerFactory.getLogger(GreetingsService.class);
private final GreetingsStreams greetingsStreams;
private MessageChannel messageChannel;
public GreetingsService(GreetingsStreams greetingsStreams) {
this.greetingsStreams = greetingsStreams;
}
public void sendGreeting(final Greeting greeting) {
messageChannel = greetingsStreams.outboundGreetings();
log.info("Before send {}", greeting.toString());
messageChannel.send(MessageBuilder
// Sends a string to payload not the object
.withPayload(greeting.getMessage())
// Note: tried this with and without the header
//.setHeader(MessageHeaders.CONTENT_TYPE, MimeTypeUtils.APPLICATION_JSON)
.setHeader(MessageHeaders.CONTENT_TYPE, MimeTypeUtils.TEXT_PLAIN)
.build());
}
}
消费/接收消息
package com.ll.kafkaservice.greeting;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.cloud.stream.annotation.StreamListener;
import org.springframework.messaging.handler.annotation.Payload;
import org.springframework.stereotype.Component;
import com.ll.kafkaservice.messaging.Greeting;
@Component
public class GreetingsListener {
private final Logger log = LoggerFactory.getLogger(GreetingsListener.class);
@StreamListener(GreetingsStreams.INPUT)
public void handleGreetings(@Payload Greeting greetings) {
log.info("Received greetings: {}", greetings.getMessage());
}
//@StreamListener(GreetingsStreams.INPUT)
//public void handleGreetings(String greetings) {
// log.info("Received greetings: {}", greetings);
//}
}
最佳答案
默认情况下,SCSt 框架将有效负载转换为 byte[]
并使用 ByteArraySerializers
。
由于您已将绑定(bind)配置为使用自定义序列化程序,因此必须将 useNativeEncoding
设置为 true
。请参阅Producer Properties .
useNativeEncoding
When set to true, the outbound message is serialized directly by the client library, which must be configured correspondingly (for example, setting an appropriate Kafka producer value serializer). When this configuration is being used, the outbound message marshalling is not based on the contentType of the binding. When native encoding is used, it is the responsibility of the consumer to use an appropriate decoder (for example, the Kafka consumer value de-serializer) to deserialize the inbound message. Also, when native encoding and decoding is used, the headerMode=embeddedHeaders property is ignored and headers are not embedded in the message. See the consumer property useNativeDecoding.
但是,如果您想发送 POJO,则需要使用 JsonSerializer
而不仅仅是字符串序列化程序。
是否有某种原因您不依赖框架来为您进行转换?
关于spring-boot - Spring Cloud Stream & Kafka : Kafka Producer serialization error,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/58565814/