spring-boot - Spring Cloud Stream & Kafka : Kafka Producer serialization error

标签 spring-boot spring-cloud-stream kafka-producer-api

我对 Spring Cloud Stream 和 Kafka 都很陌生。在有效负载中发送字符串时,我从 kafka 生产者处收到以下错误。非常感谢任何帮助或见解。我尝试使用 bytearray 序列化器/反序列化器以及 json 来代替纯文本。

错误信息: org.apache.kafka.common.errors.SerializationException:无法将类 [B 的值转换为 value.serializer 中指定的类 org.apache.kafka.common.serialization.StringSerializer

错误堆栈:

2019-10-25 16:13:40.762 ERROR 4628 --- [  XNIO-1 task-1] o.z.problem.spring.common.AdviceTraits   : Internal Server Error

org.springframework.messaging.MessageHandlingException: error occurred in message handler [org.springframework.cloud.stream.binder.kafka.KafkaMessageChannelBinder$ProducerConfigurationMessageHandler@ebad77c]; nested exception is org.apache.kafka.common.errors.SerializationException: Can't convert value of class [B to class org.apache.kafka.common.serialization.StringSerializer specified in value.serializer
        at org.springframework.integration.support.utils.IntegrationUtils.wrapInHandlingExceptionIfNecessary(IntegrationUtils.java:189)
        at org.springframework.integration.handler.AbstractMessageHandler.handleMessage(AbstractMessageHandler.java:186)
        at org.springframework.cloud.stream.binder.AbstractMessageChannelBinder$SendingHandler.handleMessageInternal(AbstractMessageChannelBinder.java:1095)
        at org.springframework.integration.handler.AbstractMessageHandler.handleMessage(AbstractMessageHandler.java:176)
        at org.springframework.integration.dispatcher.AbstractDispatcher.tryOptimizedDispatch(AbstractDispatcher.java:115)
        at org.springframework.integration.dispatcher.UnicastingDispatcher.doDispatch(UnicastingDispatcher.java:132)
        at org.springframework.integration.dispatcher.UnicastingDispatcher.dispatch(UnicastingDispatcher.java:105)
        at org.springframework.integration.channel.AbstractSubscribableChannel.doSend(AbstractSubscribableChannel.java:73)
        at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:453)
        at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:401)
        at com.ll.kafkaservice.greeting.GreetingsService.sendGreeting(GreetingsService.java:30)
        at com.ll.kafkaservice.greeting.GreetingsController.greetings(GreetingsController.java:29)
        at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
		....
		
Caused by: org.apache.kafka.common.errors.SerializationException: Can't convert value of class [B to class org.apache.kafka.common.serialization.StringSerializer specified in value.serializer
Caused by: java.lang.ClassCastException: [B cannot be cast to java.lang.String
        at org.apache.kafka.common.serialization.StringSerializer.serialize(StringSerializer.java:28)
        at org.apache.kafka.common.serialization.Serializer.serialize(Serializer.java:62)
        at org.apache.kafka.clients.producer.KafkaProducer.doSend(KafkaProducer.java:894)
        at org.apache.kafka.clients.producer.KafkaProducer.send(KafkaProducer.java:856)
        at org.springframework.kafka.core.DefaultKafkaProducerFactory$CloseSafeProducer.send(DefaultKafkaProducerFactory.java:470)
        at org.springframework.kafka.core.KafkaTemplate.doSend(KafkaTemplate.java:407)
        at org.springframework.kafka.core.KafkaTemplate.send(KafkaTemplate.java:242)
        at org.springframework.integration.kafka.outbound.KafkaProducerMessageHandler.handleRequestMessage(KafkaProducerMessageHandler.java:382)
        at org.springframework.integration.handler.AbstractReplyProducingMessageHandler.handleMessageInternal(AbstractReplyProducingMessageHandler.java:123)
        at org.springframework.integration.handler.AbstractMessageHandler.handleMessage(AbstractMessageHandler.java:176)
        at org.springframework.cloud.stream.binder.AbstractMessageChannelBinder$SendingHandler.handleMessageInternal(AbstractMessageChannelBinder.java:1095)
        at org.springframework.integration.handler.AbstractMessageHandler.handleMessage(AbstractMessageHandler.java:176)
        at org.springframework.integration.dispatcher.AbstractDispatcher.tryOptimizedDispatch(AbstractDispatcher.java:115)
        at org.springframework.integration.dispatcher.UnicastingDispatcher.doDispatch(UnicastingDispatcher.java:132)
        at org.springframework.integration.dispatcher.UnicastingDispatcher.dispatch(UnicastingDispatcher.java:105)
        at org.springframework.integration.channel.AbstractSubscribableChannel.doSend(AbstractSubscribableChannel.java:73)
        at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:453)
        at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:401)
        at com.ll.kafkaservice.greeting.GreetingsService.sendGreeting(GreetingsService.java:30)
        at com.ll.kafkaservice.greeting.GreetingsController.greetings(GreetingsController.java:29)

此处列出的是 Spring Cloud Stream 设置

  cloud:
    stream:
      bindings:
          greetings-in:
              destination: greetings
              #content-type: application/json
              content-type: text/plain
          greetings-out:
              destination: greetings
              #content-type: application/json
              content-type: text/plain

生产者设置

2019-10-25 16:12:28.346  INFO 4628 --- [  restartedMain] com.ll.kafkaservice.KafkaServiceApp      : Started KafkaServiceApp in 22.248 seconds (JVM running for 23.136)
2019-10-25 16:12:28.366 DEBUG 4628 --- [  restartedMain] c.l.k.aop.logging.LoggingAspect          : Enter: com.ll.kafkaservice.service.KafkaServiceKafkaProducer.init() with argument[s] = []
2019-10-25 16:12:28.377  INFO 4628 --- [  restartedMain] c.l.k.service.KafkaServiceKafkaProducer  : Kafka producer initializing...
2019-10-25 16:12:28.378  INFO 4628 --- [  restartedMain] o.a.k.clients.producer.ProducerConfig    : ProducerConfig values:
        acks = 1
        batch.size = 16384
        bootstrap.servers = [localhost:9092]
        buffer.memory = 33554432
        client.dns.lookup = default
        client.id =
        compression.type = none
        connections.max.idle.ms = 540000
        delivery.timeout.ms = 120000
        enable.idempotence = false
        interceptor.classes = []
        key.serializer = class org.apache.kafka.common.serialization.StringSerializer
        linger.ms = 0
        max.block.ms = 60000
        max.in.flight.requests.per.connection = 5
        max.request.size = 1048576
        metadata.max.age.ms = 300000
        metric.reporters = []
        metrics.num.samples = 2
        metrics.recording.level = INFO
        metrics.sample.window.ms = 30000
        partitioner.class = class org.apache.kafka.clients.producer.internals.DefaultPartitioner
        receive.buffer.bytes = 32768
        reconnect.backoff.max.ms = 1000
        reconnect.backoff.ms = 50
        request.timeout.ms = 30000
        retries = 2147483647
        retry.backoff.ms = 100
        sasl.client.callback.handler.class = null
        sasl.jaas.config = null
        sasl.kerberos.kinit.cmd = /usr/bin/kinit
        sasl.kerberos.min.time.before.relogin = 60000
        sasl.kerberos.service.name = null
        sasl.kerberos.ticket.renew.jitter = 0.05
        sasl.kerberos.ticket.renew.window.factor = 0.8
        sasl.login.callback.handler.class = null
        sasl.login.class = null
        sasl.login.refresh.buffer.seconds = 300
        sasl.login.refresh.min.period.seconds = 60
        sasl.login.refresh.window.factor = 0.8
        sasl.login.refresh.window.jitter = 0.05
        sasl.mechanism = GSSAPI
        security.protocol = PLAINTEXT
        send.buffer.bytes = 131072
        ssl.cipher.suites = null
        ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1]
        ssl.endpoint.identification.algorithm = https
        ssl.key.password = null
        ssl.keymanager.algorithm = SunX509
        ssl.keystore.location = null
        ssl.keystore.password = null
        ssl.keystore.type = JKS
        ssl.protocol = TLS
        ssl.provider = null
        ssl.secure.random.implementation = null
        ssl.trustmanager.algorithm = PKIX
        ssl.truststore.location = null
        ssl.truststore.password = null
        ssl.truststore.type = JKS
        transaction.timeout.ms = 60000
        transactional.id = null
        value.serializer = class org.apache.kafka.common.serialization.StringSerializer

2019-10-25 16:12:28.399  INFO 4628 --- [  restartedMain] o.a.kafka.common.utils.AppInfoParser     : Kafka version: 2.3.0

此处列出的是包含消息的对象

package com.ll.kafkaservice.messaging;

import java.io.Serializable;

public class Greeting  implements Serializable {
	private static final long serialVersionUID = 1L;
	
	private String message;

	
	public Greeting() {
	}

	
	public String getMessage() {
		return message;
	}

	public void setMessage(String message) {
		this.message = message;
	}
	
	public String toString() {
		StringBuffer sbuffer = new StringBuffer();
		
		sbuffer.append("{");
		sbuffer.append("message:");
		sbuffer.append(message);
		sbuffer.append("}");
		
		return sbuffer.toString();
	}
}

定义流

package com.ll.kafkaservice.greeting;

import org.springframework.cloud.stream.annotation.Input;
import org.springframework.cloud.stream.annotation.Output;
import org.springframework.messaging.MessageChannel;
import org.springframework.messaging.SubscribableChannel;


public interface GreetingsStreams {
    String INPUT = "greetings-in";
    String OUTPUT = "greetings-out";
    
    @Input(INPUT)
    SubscribableChannel inboundGreetings();
    
    @Output(OUTPUT)
    MessageChannel outboundGreetings();
}

绑定(bind)流

package com.ll.kafkaservice.config;

import org.springframework.cloud.stream.annotation.EnableBinding;
import com.ll.kafkaservice.greeting.GreetingsStreams;


@EnableBinding(GreetingsStreams.class)
public class StreamsConfiguration {

}

生成/发送消息

package com.ll.kafkaservice.greeting;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.messaging.MessageChannel;
import org.springframework.messaging.MessageHeaders;
import org.springframework.messaging.support.MessageBuilder;
import org.springframework.stereotype.Service;
import org.springframework.util.MimeTypeUtils;

import com.ll.kafkaservice.messaging.Greeting;


@Service
public class GreetingsService {
    private final Logger log = LoggerFactory.getLogger(GreetingsService.class);
    
    private final GreetingsStreams greetingsStreams;
    
    private MessageChannel messageChannel;
    
    
    public GreetingsService(GreetingsStreams greetingsStreams) {
        this.greetingsStreams = greetingsStreams;
    }
    
    public void sendGreeting(final Greeting greeting) {
        messageChannel = greetingsStreams.outboundGreetings();
        log.info("Before send {}", greeting.toString());
        messageChannel.send(MessageBuilder
        		// Sends a string to payload not the object
                .withPayload(greeting.getMessage())
                // Note:  tried this with and without the header
                //.setHeader(MessageHeaders.CONTENT_TYPE, MimeTypeUtils.APPLICATION_JSON)
                .setHeader(MessageHeaders.CONTENT_TYPE, MimeTypeUtils.TEXT_PLAIN)
                .build());
    }
}

消费/接收消息

package com.ll.kafkaservice.greeting;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.cloud.stream.annotation.StreamListener;
import org.springframework.messaging.handler.annotation.Payload;
import org.springframework.stereotype.Component;

import com.ll.kafkaservice.messaging.Greeting;



@Component
public class GreetingsListener {
    private final Logger log = LoggerFactory.getLogger(GreetingsListener.class);

    @StreamListener(GreetingsStreams.INPUT)
    public void handleGreetings(@Payload Greeting greetings) {
        log.info("Received greetings: {}", greetings.getMessage());
    }
    
    //@StreamListener(GreetingsStreams.INPUT)
    //public void handleGreetings(String greetings) {
    //    log.info("Received greetings: {}", greetings);
    //}
}

最佳答案

默认情况下,SCSt 框架将有效负载转换为 byte[] 并使用 ByteArraySerializers

由于您已将绑定(bind)配置为使用自定义序列化程序,因此必须将 useNativeEncoding 设置为 true。请参阅Producer Properties .

useNativeEncoding

When set to true, the outbound message is serialized directly by the client library, which must be configured correspondingly (for example, setting an appropriate Kafka producer value serializer). When this configuration is being used, the outbound message marshalling is not based on the contentType of the binding. When native encoding is used, it is the responsibility of the consumer to use an appropriate decoder (for example, the Kafka consumer value de-serializer) to deserialize the inbound message. Also, when native encoding and decoding is used, the headerMode=embeddedHeaders property is ignored and headers are not embedded in the message. See the consumer property useNativeDecoding.

但是,如果您想发送 POJO,则需要使用 JsonSerializer 而不仅仅是字符串序列化程序。

是否有某种原因您不依赖框架来为您进行转换?

关于spring-boot - Spring Cloud Stream & Kafka : Kafka Producer serialization error,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/58565814/

相关文章:

java - 如何刷新/更新@Autowired EurekaClient

spring - 如何通过配置禁用 Rabbit 健康检查

java - Spring Cloud 流: Republish to other amqp connection if current connection throws exception

java - 使用 Spring Cloud Stream 寻求 Kafka 偏移量

scala - 在 Spark Streaming 中重用 kafka producer

scala - spark kafka 生产者可序列化

spring-boot - Heroku - Spring Boot App - 进程以状态 1 退出

java - 如何在 Spring-Cloud-Stream 中配置重新连接到 Kafka

apache-kafka - 是 ISR 列表的领导者部分

spring - 为什么 sortBy 的排序与 Kotlin 中的流排序不同?