java - 使用Spring Boot配置Kafka的问题

标签 java spring-boot apache-kafka spring-kafka

我们的团队一直在尝试使用 Kafka 来解决问题。自从我们开始开发应用程序以来,这个问题就一直存在。

一开始这些问题很快就得到了解决。 “失败了?重新启动服务器即可”。由于我们希望开始向公众分发我们的应用程序,因此这个“解决方案”不再可行。

我们面临的问题基本上有两个:

消费者停止工作

这是一个经常出现的问题。突然间,一些消费者就停下来了。消息成功发送到Kafka,我们甚至可以使用Kafka Tool看到实际的消息,但消费者就是不工作。

循环消息

这恰恰相反。有时发送消息后,消费者会继续消费该消息,直到我们重新启动服务器。

我们尝试将 Kafka 直接配置到服务器中,但我们意识到由于某种原因 Kafka 会忽略这些配置并直接从 Spring Boot 获取配置。

我们的配置如下所示:

消费者:

    properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, KAFKA_BOOTSTRAP);
    properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
    properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
    properties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
    properties.put(ConsumerConfig.GROUP_ID_CONFIG, KAFKA_CONSUMER_GROUP);
    properties.put(ConsumerConfig.MAX_PARTITION_FETCH_BYTES_CONFIG, 5000000);
    properties.put(ConsumerConfig.REQUEST_TIMEOUT_MS_CONFIG, 10800000);

制作人:

    properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, this.KAFKA_HOST);
    properties.put(ProducerConfig.RETRIES_CONFIG, 0);
    properties.put(ProducerConfig.BATCH_SIZE_CONFIG, 16384);
    properties.put(ProducerConfig.LINGER_MS_CONFIG, 1);
    properties.put(ProducerConfig.BUFFER_MEMORY_CONFIG, 33554432);
    properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
    properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer.class);
    properties.put(ProducerConfig.MAX_REQUEST_SIZE_CONFIG, 5000000);
    properties.put(ProducerConfig.COMPRESSION_TYPE_CONFIG, "gzip");
    properties.put(ProducerConfig.REQUEST_TIMEOUT_MS_CONFIG, 10800000);

您可以看到极端的超时和大小值,这是因为我们认为问题与消息的大小或服务器的超时有关。我们甚至重新设计了所有应用程序流量,以便我们可以发送更小的消息,这时我们意识到这不是根本问题。

任何帮助将不胜感激。

卡夫卡版本0.10

Spring Boot 版本 1.5.7 Dalston.SR4

我们正在使用 spring-cloud-starter-stream-kafka 依赖项。

我们检查了日志,但没有发现可识别的错误。事实上,只有信息消息,但没有一条说有用的内容。

最佳答案

不幸的是,Boot 1.5 引入了一个非常旧的 spring-kafka 版本 (1.1.x),该版本不再受支持。 Boot 对于依赖版本控制有严格的规则。如 the Spring for Apache Kafka project page 中所述:

All users with brokers >= 0.10.x.x are recommended to use spring-kafka version 1.3.x or higher due to its simpler threading model thanks to KIP-62.

当前的 1.3.x 版本是 1.3.5 。尝试升级到该版本和 0.11 kafka-clients jar。

在 1.1.x 中,存在复杂的逻辑,需要缓慢的监听器来暂停/恢复消费者以避免代理重新平衡。虽然我自己没有看到过,但我看到过一些报告称消费者在暂停后无法正确恢复。

感谢 KIP-62,不再需要此逻辑,因为心跳是在后台发送的。但是,您确实需要确保 max.poll.interval.ms 足够大以支持处理 max.poll.records。

如果可以的话,升级到 boot 2.0.4 会更好,它会引入最新的 spring-kafka 2.1.7。

关于java - 使用Spring Boot配置Kafka的问题,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/51407112/

相关文章:

java - 如何将网站中的文本转换为字符串

java - 用户友好的应用程序实例之间的数据迁移

java - 如何向最终用户显示 API 的错误消息?

python - Kafka SSL with Python : kafka. errors.NoBrokersAvailable: NoBrokersAvailable

java - 用排序后的字符串值填充空堆栈

java - 谁能告诉我如何在此下拉菜单上训练 Selenium WebDriver?

java - 触摸事件行为异常

java - 带有 JWT token 的 Spring Security 在每个请求上加载 spring UserDetails 对象

apache-spark - 星火流异常 : java. util.NoSuchElementException : None. 得到

apache-kafka - 为什么我无法连接到 Kafka/Zookeeper? (在 Docker 中)