java - KafkaConsumer 0.10 Java API 错误信息 : No current assignment for partition

标签 java kafka-consumer-api

我正在使用 KafkaConsumer 0.10 Java api。我想从特定分区和特定偏移量中使用。我查了一下,发现有一个seek方法,但是它抛出了一个异常。有人有类似的用例或解决方案吗?

代码:

KafkaConsumer<String, byte[]> consumer = new KafkaConsumer<>(consumerProps);
consumer.seek(new TopicPartition("mytopic", 1), 4);

异常

java.lang.IllegalStateException: No current assignment for partition mytopic-1
    at org.apache.kafka.clients.consumer.internals.SubscriptionState.assignedState(SubscriptionState.java:251)
    at org.apache.kafka.clients.consumer.internals.SubscriptionState.seek(SubscriptionState.java:276)
    at org.apache.kafka.clients.consumer.KafkaConsumer.seek(KafkaConsumer.java:1135)
    at xx.xxx.xxx.Test.main(Test.java:182)

最佳答案

在您可以seek()之前,您首先需要subscribe()一个主题 assign() 将主题划分给消费者。还要记住,subscribe()assign() 是惰性的——因此,您还需要对 poll() 进行“虚拟调用” ,然后才能使用 seek()

Note: as of Kafka 2.0, the new poll(Duration timeout) is async and it's not guaranteed that you have a complete assignment when poll returns. Thus, you might need to check your assignment before using seek() and also poll again to refresh the assignment. (Cf. KIP-266 for details)

如果你使用subscribe(),你使用组管理:因此,你可以使用相同的group.id启动多个消费者,并且主题的所有分区都将被自动平均分配给组内的所有消费者(每个分区将分配给组中的单个消费者)。

如果要读取特定分区,需要通过assign()手动赋值。这使您可以执行任何您想要的任务。

顺便说一句:KafkaConsumer 有一个非常长的详细类 JavaDoc,包括示例。值得一读。

关于java - KafkaConsumer 0.10 Java API 错误信息 : No current assignment for partition,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/41008610/

相关文章:

go - 我如何知道在 Kafka 中使用哪个分区?

apache-kafka - Apache Kafka 如何与多个代理和单个代理一起工作

spring-boot - Kafka 无法处理所有消息 - Java Spring Boot

java - 如何为不同屏幕尺寸的 android studio 自动调整 imageview 的大小?

java - 设置 Eclipse 以使用 Java EE

Spring Integration kafka : org. apache.kafka.common.config.ConfigException 运行消费者时

node.js - Kafka-node突然从偏移0开始消费

java - JOptionPane showInputDialog 位置

java - 如何使用 :list behave with scope ="prototype"?

java - 使用 JNI 读取带偏移量的内存