java - Kafka 如何防止通过代理丢失数据

标签 java apache-kafka kafka-consumer-api kafka-producer-api

我使用的是 Kafka 0.10.2.0。我有 3 个经纪人,我正在做一些故障转移测试。有时,当其中一个卡夫卡经纪人不礼貌地关闭时,我会丢失数据。 卡夫卡经纪人配置:

zookeeper.connection.timeout.ms=6000
num.partitions=50
min.insync.replicas=2
unclean.leader.election.enable=false
group.max.session.timeout.ms=10000
group.min.session.timeout.ms=1000

消费者配置:

props.put(ConsumerConfig.GROUP_ID_CONFIG, getTopicName() + "group");
props.put(ConsumerConfig.CLIENT_ID_CONFIG, getClientId());
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
props.put(ConsumerConfig.REQUEST_TIMEOUT_MS_CONFIG, 30000);
props.put(ConsumerConfig.HEARTBEAT_INTERVAL_MS_CONFIG, 500);
props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, 3000);

生产者配置:

props.put(ProducerConfig.LINGER_MS_CONFIG, 1);
props.put(ProducerConfig.BUFFER_MEMORY_CONFIG, 33554432);
props.put(ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION, 1);
props.put(ProducerConfig.CLIENT_ID_CONFIG, getClientId());
props.put(ProducerConfig.MAX_BLOCK_MS_CONFIG, 800);
props.put(ProducerConfig.RETRIES_CONFIG, Integer.MAX_VALUE);
props.put(ProducerConfig.REQUEST_TIMEOUT_MS_CONFIG, 800);

我该怎么做才能防止通过 kafka 代理丢失数据?

最佳答案

关于java - Kafka 如何防止通过代理丢失数据,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/42927142/

相关文章:

java - 如何使用大写字段名称将字符串序列化为对象?

java - Netbeans 8.0.2 组织导入,可能存在错误吗?

java - Kafka设置从主题读取的最大消息数

postgresql - 如何使用 debezium 从 Postgres 流式传输更改

java - 如何获取 kafka 主题分区的最后/结束偏移量?

java - 如何在Java8中过滤列表?

java - 具有执行程序或多线程环境的 Apache DBCP

scala - 过滤 kafka 消息时 Spark 作业失败

java - 在 Kafka Java 消费者客户端上,有没有办法监控健康状态而不是简单的无数据?

java - Apache 卡夫卡 : Processor generates multiple output per input