我能够使用 apache kafka 提交偏移量类并能够使用 ConsumerConnector 进行提交。我查看了具有与“auto.commit.enable”属性相同的“autoCommitEnable”消费者选项的 apache camel-kafka 组件。 现在 Camel Java DSL 中是否有任何属性或方法,在使用消息后我们可以手动提交偏移量(通过提供的方法或 URL 中的消费者选项) 或 我们必须再次使用 Kafka Consumer API 来提交消费者偏移量?
最佳答案
您可以使用 KafkaManualCommit 提交
public void process(Exchange exchange) {
KafkaManualCommit manual =
exchange.getIn().getHeader(KafkaConstants.MANUAL_COMMIT, KafkaManualCommit.class);
manual.commitSync();
}
关于java - 使用 apache camel 的 camel-kafka 组件手动提交消费者偏移量,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/32346126/