java - 卡夫卡1.0流媒体API : message consumption from partitions get delayed

标签 java apache-kafka apache-kafka-streams

最近,我将我们的流应用程序从 Spark-streaming 2.1 切换为使用 kafka-streaming 新 API (1.0) 和 kafka 代理服务器 0.11.0.0

我已经实现了自己的Processor类,并且在process方法中,我只是打印了消息内容。

我有一个由 3 台机器组成的 kafka 集群,我挂接的主题有 300 个分区。

我在一台具有 32 GB RAM 和 8 个内核的机器上运行了具有 100 个线程的流应用程序。

我的问题是,在某些情况下,我在消息到达kafka主题/分区后收到消息,而在其他情况下,我在消息到达主题后10-15分钟收到消息,不知道为什么!

我使用下面的命令行来跟踪流应用程序的 group.id 的 kafka 主题上的延迟。

./bin/kafka-run-class.sh kafka.admin.ConsumerGroupCommand --bootstrap-server kafka1:9092,kafka2:9092,kafka3:9092 --new-consumer --describe --group kf_streaming_gp_id

但不幸的是,它并不能始终给出准确的结果,甚至根本不能给出结果,有人知道为什么吗?

流媒体应用程序是否有我错过的东西,以便我可以在到达分区后一致地读取消息? 任何消费者属性都可以解决此类问题。

我的 kafka-streaming 应用程序结构如下:

        Properties config = new Properties();
        config.put(StreamsConfig.APPLICATION_ID_CONFIG, "kf_streaming_gp_id");
        config.put(StreamsConfig.CLIENT_ID_CONFIG, "kf_streaming_gp_id");
        config.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "kafka1:9092,kafka2:9092,kafka3:9092");
        config.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
        config.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, DocumentSerde.class);
        config.put(StreamsConfig.DEFAULT_TIMESTAMP_EXTRACTOR_CLASS_CONFIG, CustomTimeExtractor.class);
        config.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
        config.put(StreamsConfig.NUM_STREAM_THREADS_CONFIG, 100);

        KStream<String, Document> topicStreams = builder.stream(sourceTopic);

        topicStreams.process(() -> new DocumentProcessor(appName, environment, dimensions, vector, sinkTopic));

        KafkaStreams streams = new KafkaStreams(builder.build(), config);
        streams.start();

最佳答案

我找出了我的案例中的问题所在。

事实证明,有一些线程被困在执行高CPU密集型工作,这导致阻止其他线程消费消息,这就是为什么我看到这样的突发,当我停止这个CPU密集型逻辑时,一切都非常快,消息一旦到达kafka主题就进入流作业。

关于java - 卡夫卡1.0流媒体API : message consumption from partitions get delayed,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/48789973/

相关文章:

java - 正则表达式: Finding String and digits

apache-kafka - Kafka Streams 线程模型在同一实例和 JVM 上具有多个流

apache-kafka-streams - Kafka 流 DSL : aggregate, 丰富并发送

apache-kafka - 如何修复与组协调器相关的 kafka 流问题不可用或无效,将尝试重新发现

java - Repaint() 没有在 Java while 循环中被调用

java - 无法将 StringTokenizer 传递给多个方法

java - java.util.Random 中的种子

apache-kafka - Zookeeper-Kafka 和一致性哈希

java - 卡夫卡生产者: how to handle "java.net.ConnectException: Connection refused"

Java Maven 项目构建失败