java - 使用Spring Boot配置Kafka的问题

标签 java spring-boot apache-kafka spring-kafka

我们的团队一直在尝试使用 Kafka 来解决问题。自从我们开始开发应用程序以来,这个问题就一直存在。

一开始这些问题很快就得到了解决。 “失败了?重新启动服务器即可”。由于我们希望开始向公众分发我们的应用程序,因此这个“解决方案”不再可行。

我们面临的问题基本上有两个:

消费者停止工作

这是一个经常出现的问题。突然间,一些消费者就停下来了。消息成功发送到Kafka,我们甚至可以使用Kafka Tool看到实际的消息,但消费者就是不工作。

循环消息

这恰恰相反。有时发送消息后,消费者会继续消费该消息,直到我们重新启动服务器。

我们尝试将 Kafka 直接配置到服务器中,但我们意识到由于某种原因 Kafka 会忽略这些配置并直接从 Spring Boot 获取配置。

我们的配置如下所示:

消费者:

    properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, KAFKA_BOOTSTRAP);
    properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
    properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
    properties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
    properties.put(ConsumerConfig.GROUP_ID_CONFIG, KAFKA_CONSUMER_GROUP);
    properties.put(ConsumerConfig.MAX_PARTITION_FETCH_BYTES_CONFIG, 5000000);
    properties.put(ConsumerConfig.REQUEST_TIMEOUT_MS_CONFIG, 10800000);

制作人:

    properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, this.KAFKA_HOST);
    properties.put(ProducerConfig.RETRIES_CONFIG, 0);
    properties.put(ProducerConfig.BATCH_SIZE_CONFIG, 16384);
    properties.put(ProducerConfig.LINGER_MS_CONFIG, 1);
    properties.put(ProducerConfig.BUFFER_MEMORY_CONFIG, 33554432);
    properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
    properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer.class);
    properties.put(ProducerConfig.MAX_REQUEST_SIZE_CONFIG, 5000000);
    properties.put(ProducerConfig.COMPRESSION_TYPE_CONFIG, "gzip");
    properties.put(ProducerConfig.REQUEST_TIMEOUT_MS_CONFIG, 10800000);

您可以看到极端的超时和大小值,这是因为我们认为问题与消息的大小或服务器的超时有关。我们甚至重新设计了所有应用程序流量,以便我们可以发送更小的消息,这时我们意识到这不是根本问题。

任何帮助将不胜感激。

卡夫卡版本0.10

Spring Boot 版本 1.5.7 Dalston.SR4

我们正在使用 spring-cloud-starter-stream-kafka 依赖项。

我们检查了日志,但没有发现可识别的错误。事实上,只有信息消息,但没有一条说有用的内容。

最佳答案

不幸的是,Boot 1.5 引入了一个非常旧的 spring-kafka 版本 (1.1.x),该版本不再受支持。 Boot 对于依赖版本控制有严格的规则。如 the Spring for Apache Kafka project page 中所述:

All users with brokers >= 0.10.x.x are recommended to use spring-kafka version 1.3.x or higher due to its simpler threading model thanks to KIP-62.

当前的 1.3.x 版本是 1.3.5 。尝试升级到该版本和 0.11 kafka-clients jar。

在 1.1.x 中,存在复杂的逻辑,需要缓慢的监听器来暂停/恢复消费者以避免代理重新平衡。虽然我自己没有看到过,但我看到过一些报告称消费者在暂停后无法正确恢复。

感谢 KIP-62,不再需要此逻辑,因为心跳是在后台发送的。但是,您确实需要确保 max.poll.interval.ms 足够大以支持处理 max.poll.records。

如果可以的话,升级到 boot 2.0.4 会更好,它会引入最新的 spring-kafka 2.1.7。

关于java - 使用Spring Boot配置Kafka的问题,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/51407112/

相关文章:

java - 在 Java 中使用 JsonPath 解析 JSON

spring-boot - oauth2 身份验证成功后获取 404 和匿名 token

java - LDAP - Spring Boot |获取 javax.naming.CommunicationException |连接被拒绝

java - 如何调试错误 : java. lang.AssertionError:JSON 路径 "$.name"处没有值

apache-spark - 将 Apache Spark 结果发布到另一个应用程序/Kafka

java - 如何在 Selenium Webdriver 中使用相同的 session 打开浏览器?

java - 动态传递布局 ID

python-3.x - 在 jupyter notebook 中将自定义 jars 添加到 pyspark

Kafka Producer 的 java 客户端示例,发送方法不接受 KeyedMessage

java - MySQL 日期时间 <-> Java 日期