我需要在晚上执行一个作业,它将获取 kafka 队列中的所有消息并使用它们执行一个进程。我能够收到消息,但 kafka 流正在等待更多消息,我无法继续我的流程。我有以下代码:
...
private ConsumerConnector consumerConnector;
private final static String TOPIC = "test";
public MessageStreamConsumer() {
Properties properties = new Properties();
properties.put("zookeeper.connect", "localhost:2181");
properties.put("group.id", "test-group");
ConsumerConfig consumerConfig = new ConsumerConfig(properties);
consumerConnector = Consumer.createJavaConsumerConnector(consumerConfig);
}
public List<String> getMessages() {
Map<String, Integer> topicCountMap = new HashMap<String, Integer>();
topicCountMap.put(TOPIC, new Integer(1));
Map<String, List<KafkaStream<byte[], byte[]>>> consumerMap = consumerConnector
.createMessageStreams(topicCountMap);
KafkaStream<byte[], byte[]> stream = consumerMap.get(TOPIC).get(0);
ConsumerIterator<byte[], byte[]> it = stream.iterator();
List<String> messages = new ArrayList<>();
while (it.hasNext())
messages.add(new String(it.next().message()));
return messages;
}
代码能够获取消息,但是当它处理最后一条消息时,它停留在行中:
while (it.hasNext())
问题是,我怎样才能从 kafka 获取所有消息,停止流并继续我的其他任务。
希望你能帮帮我
谢谢
最佳答案
好像kafka stream不支持从头开始消费。
您可以创建一个原生的 kafka 消费者并将 auto.offset.reset
设置为 earliest,然后它将从头开始消费消息。
关于java - 获取队列中的所有kafka消息并停止在java中流式传输,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/38662312/