java - 获取队列中的所有kafka消息并停止在java中流式传输

标签 java apache-kafka kafka-consumer-api

我需要在晚上执行一个作业,它将获取 kafka 队列中的所有消息并使用它们执行一个进程。我能够收到消息,但 kafka 流正在等待更多消息,我无法继续我的流程。我有以下代码:

...
private ConsumerConnector consumerConnector;
private final static String TOPIC = "test";

public MessageStreamConsumer() {
        Properties properties = new Properties();
        properties.put("zookeeper.connect", "localhost:2181");
        properties.put("group.id", "test-group");
        ConsumerConfig consumerConfig = new ConsumerConfig(properties);
        consumerConnector = Consumer.createJavaConsumerConnector(consumerConfig);
    }
public List<String> getMessages() {
                Map<String, Integer> topicCountMap = new HashMap<String, Integer>();
                topicCountMap.put(TOPIC, new Integer(1));
                Map<String, List<KafkaStream<byte[], byte[]>>> consumerMap = consumerConnector
                        .createMessageStreams(topicCountMap);
                KafkaStream<byte[], byte[]> stream = consumerMap.get(TOPIC).get(0);
                ConsumerIterator<byte[], byte[]> it = stream.iterator();
                List<String> messages = new ArrayList<>();
                while (it.hasNext())
                    messages.add(new String(it.next().message()));
                return messages;
            }

代码能够获取消息,但是当它处理最后一条消息时,它停留在行中:

 while (it.hasNext())

问题是,我怎样才能从 kafka 获取所有消息,停止流并继续我的其他任务。

希望你能帮帮我

谢谢

最佳答案

好像kafka stream不支持从头开始消费。
您可以创建一个原生的 kafka 消费者并将 auto.offset.reset 设置为 earliest,然后它将从头开始消费消息。

关于java - 获取队列中的所有kafka消息并停止在java中流式传输,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/38662312/

相关文章:

java - Java中没有循环如何解决 "repeatSeparator"问题?

kubernetes - K8s Confluent Controlcenter pod 从服务中丢失并且没有错误

apache-kafka - Kafka 上的多线程 在 Spring Reactor Kafka 中发送

kafka-consumer-api - kafka消费者迭代器是如何工作的

java - Java clip.open无限期挂起

java - java中的动态属性查找

java - 使用 SQL Server 2005/2008 的最新 jdbc 驱动程序时,准备好的语句、 View 和存储过程的性能比较如何?

apache-kafka - Kafka 在有状态处理中验证消息

apache-kafka - 卡夫卡消费者从一开始就不消费

apache-kafka - Kafka - 处理消费者缓慢的最佳实践。如何实现更多的并行性?