假设以下场景:
我在本地启动了zookeeper和一个kafka代理,并创建了“测试”主题,如kafka快速入门中所述:https://kafka.apache.org/quickstart
然后,我运行一个简单的 java 程序,每秒向“测试”主题生成一条消息。一段时间后,我关闭了本地的 kafka 代理,并看到生产者继续生成消息,它不会抛出任何异常。最后,我再次启动 kafka 代理,生产者能够重新连接到代理并继续生成消息,但是,在 kafka 代理停机期间生成的所有消息都会丢失。当生产者检测到健康的 kafka 代理时,不会重播它们。
如何防止这种情况发生?我希望 kafka 生产者在检测到 kafka 代理重新上线时重播这些消息。这是我的生产者配置:
props.put("bootstrap.servers", "localhost:9092");
props.put("acks", "all");
props.put("linger.ms", 0);
props.put("key.serializer", StringSerializer.class.getName());
props.put("value.serializer", StringSerializer.class.getName());
最佳答案
Kafka Producer库内置了重试机制,但默认情况下它是关闭的。将 retries
Producer 配置更改为大于 0(默认值)的值以将其打开。您还应该尝试使用 retry.backoff.ms
和 request.timetout.ms
来自定义 Producer 重试。
启用重试的 Kafka Producer 配置示例:
retries=2147483647 //Integer.MAX_VALUE
retry.backoff.ms=1000
request.timeout.ms=305000 //5 minutes
max.block.ms=2147483647 //Integer.MAX_VALUE
您可以在Apache Kafka documentation中找到有关这些属性的更多信息。 .
关于java - 当经纪人关闭时,卡夫卡生产者正在丢失消息,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/49681645/