java - Kafka 0.10.2 消费者获得大量重复项

标签 java multithreading apache-kafka kafka-consumer-api

我有一个相当简单的 Kafka 设置 - 1 个生产者、1 个主题、10 个分区、10 个 KafkaConsumers 都具有相同的组 ID,全部运行在一台机器上。当我处理一个文件时,生产者快速创建 3269 条消息,消费者高兴地开始消费这些消息。一切都运行良好一段时间,但在某个时刻,消费者开始消费重复项——大量重复项。事实上,看起来他们只是再次开始消耗消息队列。如果我让它运行很长时间,数据库将开始接收相同的数据条目 6 次或更多次。在对日志进行一些测试后,看起来消费者正在重新使用具有相同唯一消息名称的相同消息。

据我所知,没有发生重新平衡。消费者不会消亡或增加。同样是 10 个消费者,一遍又一遍地消费相同的 3269 条消息,直到我终止该进程。如果我放任不管,消费者将写入数十万条记录,从而大量增加真正应该进入数据库的数据量。

我对 Kafka 相当陌生,但我有点不明白为什么会发生这种情况。我知道 Kafka 不能保证一次性处理,而且我可以接受这里那里的一些重复。我有代码可以防止再次保留相同的记录。但是,我不确定为什么消费者会一遍又一遍地重新消费队列。我知道 Kafka 消息在被消费后不会被删除,但如果所有消费者都在同一个组中,那么偏移量应该可以防止这种情况,对吧?我对偏移量的工作原理有所了解,但据我所知,如果没有重新平衡,它们不应该被重置,对吧?据我所知,这些消息并没有超时。有没有办法让我的消费者一次性消费队列中的所有内容,然后等待更多消息,而无需永远重新消费相同的内容?

以下是我传递给生产者和消费者的属性:

Properties props = new Properties();
        props.put("bootstrap.servers", "localhost:9092");
        props.put("acks", "all");
        props.put("retries", 0);
        props.put("batch.size", 16384);
        props.put("linger.ms", 1);
        props.put("buffer.memory", 33554432);
        props.put("group.id", "MyGroup");
        props.put("num.partitions", 10);
        props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");

        MyIngester ingester = new MyIngester(args[0], props);

最佳答案

对我来说,这似乎是确认收据的问题。 尝试以下属性

    props.put("enable.auto.commit", "true");
    props.put("auto.commit.interval.ms", "100");

关于java - Kafka 0.10.2 消费者获得大量重复项,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/43383392/

相关文章:

java - 当 i 给出输入 n 个元素时,显示输出 n 个元素?

java - 如何在java日历中设置一年的第一个月

cocoa - 核心数据错误与异常第 3 部分

C++ 线程 : can lock_guard own a mutex that owned by a unique_lock?

apache-kafka - 在不同的服务器上运行 Kafka 和 Kafka-connect?

java - 如何在命令行上获取 Windows Pid 的 cpu% 使用率?

java - 如何右键单击特定项目打开首选项页面?

c++ - 条件变量会解锁它们的互斥量吗?

apache-kafka - 在 Kafka 连接源连接器中使用消息键

java - 使用 Google Dataflow 在批处理模式下使用 KafkaIO 进行消费