spring-kafka(非集成)消费者不消费消息

标签 spring apache-kafka spring-kafka

我正在将我的应用程序与 spring-kafka(不是 spring-integration-kafka)集成。这是项目的 spring 文档:http://docs.spring.io/spring-kafka/docs/1.0.1.RELEASE/reference/htmlsingle

我的生产者工作完美,但消费者没有消费任何消息。任何指针。

这是我的配置:

@EnableKafka
public class MyConfig {

    @Value("${kafka.broker.list}") // List of servers server:port,
    private String kafkaBrokerList;

    @Bean
    public KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<Integer, Message>> kafkaListenerContainerFactory() {
        ConcurrentKafkaListenerContainerFactory<Integer, Message> factory = new ConcurrentKafkaListenerContainerFactory<>();
        factory.setConsumerFactory(consumerFactory());
        factory.setConcurrency(12);
        factory.getContainerProperties().setPollTimeout(3000);
        factory.getContainerProperties().setIdleEventInterval(60000L);
        factory.setAutoStartup(Boolean.TRUE);
        factory.setMessageConverter(new StringJsonMessageConverter());
        return factory;
    }

    @Bean
    public ConsumerFactory<Integer, Message> consumerFactory() {
        JsonDeserializer<Message> messageJsonDeserializer = new JsonDeserializer<>(Message.class);
        return new DefaultKafkaConsumerFactory<>(consumerConfigs(), new IntegerDeserializer(), messageJsonDeserializer);
    }

    @Bean
    public Map<String, Object> consumerConfigs() {
        Map<String, Object> props = new HashMap<>();
        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaBrokerList);
        props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, true);
        props.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, 10000);
        props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, 15000);
        props.put(ConsumerConfig.CONNECTIONS_MAX_IDLE_MS_CONFIG, 60000);
        props.put(ConsumerConfig.RETRY_BACKOFF_MS_CONFIG, 10000);
        return props;
    }

    @KafkaListener(topics = "myTopic", containerFactory = "kafkaListenerContainerFactory")
    public void listen(@Payload Message message) {
        System.out.println(message);
    }

}

** 编辑了更多信息 **

感谢加里的回复。我在日志中没有看到任何异常。我还尝试了具有类似配置的 KafkaTemplate 并且我能够将消息发布到队列但是对于消费者来说,运气不好。我正在更改代码以使用 String 而不是我的 Message 对象。以下是部分日志:

2016-07-11 09:31:43.314 INFO [RMI TCP Connection(2)-127.0.0.1] o.a.k.c.c.ConsumerConfig [AbstractConfig.java:165] ConsumerConfig values: 
    metric.reporters = []  
    metadata.max.age.ms = 300000  
    value.deserializer = class org.apache.kafka.common.serialization.StringDeserializer  
    group.id = 
    partition.assignment.strategy = [org.apache.kafka.clients.consumer.RangeAssignor]  
    reconnect.backoff.ms = 50  
    sasl.kerberos.ticket.renew.window.factor = 0.8  
    max.partition.fetch.bytes = 1048576  
    bootstrap.servers = [app1.qa:9092, app1.qa:9093, app2.qa:9092, app2.qa:9093, app3.qa:9092, app3.qa:9093]  
    retry.backoff.ms = 10000  
    sasl.kerberos.kinit.cmd = /usr/bin/kinit  
    sasl.kerberos.service.name = null  
    sasl.kerberos.ticket.renew.jitter = 0.05  
    ssl.keystore.type = JKS  
    ssl.trustmanager.algorithm = PKIX  
    enable.auto.commit = true  
    ssl.key.password = null  
    fetch.max.wait.ms = 500  
    sasl.kerberos.min.time.before.relogin = 60000  
    connections.max.idle.ms = 60000  
    ssl.truststore.password = null  
    session.timeout.ms = 15000  
    metrics.num.samples = 2  
    client.id =   
    ssl.endpoint.identification.algorithm = null  
    key.deserializer = class org.apache.kafka.common.serialization.IntegerDeserializer  
    ssl.protocol = TLS  
    check.crcs = true  
    request.timeout.ms = 40000  
    ssl.provider = null  
    ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1]  
    ssl.keystore.location = null  
    heartbeat.interval.ms = 3000  
    auto.commit.interval.ms = 10000  
    receive.buffer.bytes = 32768  
    ssl.cipher.suites = null  
    ssl.truststore.type = JKS  
    security.protocol = PLAINTEXT  
    ssl.truststore.location = null  
    ssl.keystore.password = null  
    ssl.keymanager.algorithm = SunX509  
    metrics.sample.window.ms = 30000  
    fetch.min.bytes = 1  
    send.buffer.bytes = 131072  
    auto.offset.reset = latest  

我也确实在日志中看到以下内容:

2016-07-11 09:31:53.515 INFO [org.springframework.kafka.KafkaListenerEndpointContainer#0-0-kafka-consumer-10] o.s.k.l.KafkaMessageListenerContainer [AbstractMessageListenerContainer.java:224] partitions revoked:[]  
2016-07-11 09:31:53.515 INFO [org.springframework.kafka.KafkaListenerEndpointContainer#0-0-kafka-consumer-11] o.s.k.l.KafkaMessageListenerContainer [AbstractMessageListenerContainer.java:224] partitions revoked:[]  
2016-07-11 09:31:53.516 INFO [org.springframework.kafka.KafkaListenerEndpointContainer#0-0-kafka-consumer-3] o.s.k.l.KafkaMessageListenerContainer [AbstractMessageListenerContainer.java:224] partitions revoked:[]  
2016-07-11 09:31:53.516 INFO [org.springframework.kafka.KafkaListenerEndpointContainer#0-0-kafka-consumer-12] o.s.k.l.KafkaMessageListenerContainer [AbstractMessageListenerContainer.java:224] partitions revoked:[]  
2016-07-11 09:31:53.578 INFO [org.springframework.kafka.KafkaListenerEndpointContainer#0-0-kafka-consumer-8] o.a.k.c.c.i.AbstractCoordinator [AbstractCoordinator.java:529] Marking the coordinator 2147483639 dead.  
2016-07-11 09:31:53.578 INFO [org.springframework.kafka.KafkaListenerEndpointContainer#0-0-kafka-consumer-3] o.a.k.c.c.i.AbstractCoordinator [AbstractCoordinator.java:529] Marking the coordinator 2147483639 dead.  
2016-07-11 09:31:53.578 INFO [org.springframework.kafka.KafkaListenerEndpointContainer#0-0-kafka-consumer-10] o.a.k.c.c.i.AbstractCoordinator [AbstractCoordinator.java:529] Marking the coordinator 2147483639 dead.  
2016-07-11 09:31:53.578 INFO [org.springframework.kafka.KafkaListenerEndpointContainer#0-0-kafka-consumer-12] o.a.k.c.c.i.AbstractCoordinator [AbstractCoordinator.java:529] Marking the coordinator 2147483639 dead.  

最佳答案

上面引用的文档说:

Although Serializer/Deserializer API is pretty simple and flexible from the low-level Kafka Consumer and Producer perspective, it is not enough on the Messaging level, where KafkaTemplate and @KafkaListener are present. To easy convert to/from org.springframework.messaging.Message, Spring for Apache Kafka provides MessageConverter abstraction with the MessagingMessageConverter implementation and its StringJsonMessageConverter customization.

但在您的情况下,您结合了 MessageConverter:

        factory.setMessageConverter(new StringJsonMessageConverter());

使用自定义反序列化器:

        JsonDeserializer<Message> messageJsonDeserializer = new JsonDeserializer<>(Message.class);

最简单的解决方案应该是使用 StringDeserializer 代替:

https://kafka.apache.org/090/javadoc/org/apache/kafka/common/serialization/StringDeserializer.html

关于上面给出的日志消息Marking the coordinator XXX dead.,该错误与spring-kafka 项目无关,但意味着问题出在您的 Kafka 上配置。在我的例子中,当 Kafka 节点无法访问 zookeper 时,我们遇到了这样的问题。为了解决这个问题,我建议您在本地安装 Kafka 和 Zookeper,并使用 kafka-console-producerkafka-console-consumer 确保生产消费正常工作>,例如:

https://www.cloudera.com/documentation/kafka/latest/topics/kafka_command_line.html

然后,作为下一阶段,您可以检查具有相同本地安装的示例 spring-kafka 应用程序。

关于spring-kafka(非集成)消费者不消费消息,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/38274667/

相关文章:

java - 如何将已检查的异常从 CompletableFuture 传递到 ControllerAdvice

java - 如何使用Spring+Hibernate查询join表

java - Kafka Streaming 不适用于多个实例

java - spring-kafka监听并发

java - @KafkaListner - 跳过旧消息,即仅接收新消息

spring-integration - spring-cloud-stream-kafka 应用程序启动后仅消费最新消息

spring - 限制Spring Cloud Data Flow中同时运行的任务总数?

java - 如何使用 Spring RestTemplate 在 POST 中传递数组?

apache-kafka - 在 Kafka Connect 中将 long 转换为 int64

apache-kafka - Kafka Connect REST接口(interface) "PUT/connectors/(string: name)/config"返回错误代码500