java - Kafka 数据丢失,在生产者中

标签 java apache-kafka kafka-consumer-api kafka-producer-api

我一直在尝试配置一个Kafka Broker、一个主题、一个生产者、一个消费者。 当生产者生产 时,如果 Broker 宕机,就会发生数据丢失, 例如:

In Buffer:
Datum 1 - published
Datum 2 - published
.
. ---->(Broker goes down for a while and reconnects...)
.
Datum 4 - published
Datum 5 - published

为生产者配置的属性有:

bootstrap.servers=localhost:9092
acks=all
retries=1
batch.size=16384
linger.ms=2
buffer.memory=33554432
key.serializer=org.apache.kafka.common.serialization.IntegerSerializer
value.serializer=org.apache.kafka.common.serialization.StringSerializer
producer.type=sync
buffer.size=102400
reconnect.interval=30000
request.required.acks=1

数据大小小于配置的缓冲区大小。 帮助我知道我哪里出了问题......!

最佳答案

不确定你到底在做什么。我假设您在代理关闭时尝试写入 Kafka 的消息不会被 Kafka 确认。如果消息没有被ack,则表明消息没有写入Kafka,生产者需要重新尝试写入消息。

最简单的方法是相应地设置配置参数 retriesretry.backoff.ms

在应用程序级别,您还可以在 send(..., Callback) 中注册一个 Callback 以获取有关成功/失败的信息。如果失败,您可以再次调用send()来重试发送。

关于java - Kafka 数据丢失,在生产者中,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/38616723/

相关文章:

使用 Xamarin 注册 Google Cloud Messaging 时出现 Java.Lang.LinkageError

java - 验证月份输入 01 和 1?

apache-kafka - 卡夫卡消费者 : Want to read same message again if not committed previous messages offset and auto commit is disabled

hadoop - 基于时间的桶记录(kafka-hdfs-connector)

java - 替换字符串中正则表达式匹配的空白区域

java - Java中在新行中打印数字的每个数字

rest - Kafka REST 代理 API 有什么好处?

java - 如何从 shell 脚本覆盖 Log4j 值?

java - 执行 kafka-console-consumer.sh 时,zookeeper 不是可识别的选项

apache-kafka - 了解卡夫卡检查点