java - 当经纪人关闭时,卡夫卡生产者正在丢失消息

标签 java apache-kafka kafka-producer-api

假设以下场景:

我在本地启动了zookeeper和一个kafka代理,并创建了“测试”主题,如kafka快速入门中所述:https://kafka.apache.org/quickstart

然后,我运行一个简单的 java 程序,每秒向“测试”主题生成一条消息。一段时间后,我关闭了本地的 kafka 代理,并看到生产者继续生成消息,它不会抛出任何异常。最后,我再次启动 kafka 代理,生产者能够重新连接到代理并继续生成消息,但是,在 kafka 代理停机期间生成的所有消息都会丢失。当生产者检测到健​​康的 kafka 代理时,不会重播它们。

如何防止这种情况发生?我希望 kafka 生产者在检测到 kafka 代理重新上线时重播这些消息。这是我的生产者配置:

props.put("bootstrap.servers", "localhost:9092");
props.put("acks", "all");
props.put("linger.ms", 0);
props.put("key.serializer", StringSerializer.class.getName());
props.put("value.serializer", StringSerializer.class.getName());

最佳答案

Kafka Producer库内置了重试机制,但默认情况下它是关闭的。将 retries Producer 配置更改为大于 0(默认值)的值以将其打开。您还应该尝试使用 retry.backoff.msrequest.timetout.ms 来自定义 Producer 重试。

启用重试的 Kafka Producer 配置示例:

retries=2147483647         //Integer.MAX_VALUE 
retry.backoff.ms=1000
request.timeout.ms=305000  //5 minutes
max.block.ms=2147483647    //Integer.MAX_VALUE 

您可以在Apache Kafka documentation中找到有关这些属性的更多信息。 .

关于java - 当经纪人关闭时,卡夫卡生产者正在丢失消息,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/49681645/

相关文章:

java - 验证用户名唯一获取错误

java - 在 spring-kafka 中禁用连接和消费者创建

java - Docker swarm : org. apache.kafka.common.errors.TimeoutException:WAITING节点分配超时

c++ - 从 cppkafka 推送到 kafka 主题时获取消费者的空值?

java - JPA : weird Cascade delete behaviour, 一对多。使用两个单独的 DAO 时发生 FK 违规

java - 如何忽略 JSON 属性的底层结构并将其存储为字符串?

java - 可以反编译Java程序

apache-kafka - 通过不同的线程使用Kafka Producer

apache-kafka - Apache kafka exactly once 实现不发送消息

java - Kafka Streams 在生成主题时不会将偏移量增加 1