我使用一个线程轮询数据,另一个线程处理数据。 由于处理需要更长的时间,所以当数据变得越来越大时我想限制数据大小。我的计划是轮询(超时:0毫秒)然后它将返回空记录。然而,事实是,大约每 10 次,poll(0) 方法就会获取已满的记录。
我的问题是,
为什么 poll(0) 有时会取回完整记录?尽管大多数时候不会。
有什么办法可以限制投票吗?
PS:我尝试了consumer.pause()方法。但由于消费者随机获取分区。我不知道要暂停哪些分区。我如何知道消费者实例占用了哪些分区?
最佳答案
consumer.assignment() 会将分配的分区集返回给您的消费者。 max.poll.records 设置确定要返回的 ConsumerRecord 的最大数量,默认为 500。我猜如果线程在向 kafka 发出请求后但在选择响应之前暂停,则 poll(0) 只会在 super 繁忙的系统上返回记录。
关于java - 卡夫卡消费者民意调查(超时: 0) but actually fetch the records back,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/43816362/