java - KafkaConsumer 在轮询时进入无限期等待状态

标签 java apache-kafka spark-streaming kafka-consumer-api

我正在尝试使用 KafkaConsumer API 轮询 kafka 主题。但即使我们通过轮询超时,它也会进入无限期的等待状态,并且不会兑现。

从线程转储中,它显示可运行状态,我进行了多个线程转储,主线程始终保持在同一位置,我相信它不会退出等待。

"main" #1 prio=5 os_prio=0 tid=0x00007f42a800f000 nid=0x59 runnable [0x00007f42b0782000]
   java.lang.Thread.State: RUNNABLE
        at sun.nio.ch.EPollArrayWrapper.epollWait(Native Method)
        at sun.nio.ch.EPollArrayWrapper.poll(EPollArrayWrapper.java:269)
        at sun.nio.ch.EPollSelectorImpl.doSelect(EPollSelectorImpl.java:79)
        at sun.nio.ch.SelectorImpl.lockAndDoSelect(SelectorImpl.java:86)
        - locked <0x00000006c02e2088> (a sun.nio.ch.Util$2)
        - locked <0x00000006c02e2078> (a java.util.Collections$UnmodifiableSet)
        - locked <0x00000006c02e1f60> (a sun.nio.ch.EPollSelectorImpl)
        at sun.nio.ch.SelectorImpl.select(SelectorImpl.java:97)
        at org.apache.kafka.common.network.Selector.select(Selector.java:425)
        at org.apache.kafka.common.network.Selector.poll(Selector.java:254)
        at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:270)
        at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.clientPoll(ConsumerNetworkClient.java:303)
        at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:197)
        at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:187)
        at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.awaitMetadataUpdate(ConsumerNetworkClient.java:126)
        at org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureCoordinatorKnown(AbstractCoordinator.java:186)
        at org.apache.kafka.clients.consumer.KafkaConsumer.pollOnce(KafkaConsumer.java:857)
        at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:829)
        at org.test.TestReceiver(TestReceiver:100)
        at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
        at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
        at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
        at java.lang.reflect.Method.invoke(Method.java:497)
        at org.apache.spark.deploy.JavaMainApplication.start(SparkApplication.scala:52)
        at org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:879)
        at org.apache.spark.deploy.SparkSubmit$.doRunMain$1(SparkSubmit.scala:197)
        at org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:227)
        at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:136)
        at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)

这是代码片段......仅打印第一个日志。

LOG.info("Going to wait {}ms for ConsumerRecords", POLLING_TIMEOUT_MILLIS);
ConsumerRecords<String, String> records = consumer.poll(POLLING_TIMEOUT_MILLIS); 
LOG.info("Received {} ConsumerRecords to process.", (records != null ? records.count() : null));

库版本...
kafka_2.11:jar:0.9.0.0
kafka-clients:jar:0.9.0.0

最佳答案

KafkaConsumer#poll() 如果需要刷新其元数据但无法连接到集群,则可能会阻塞。

通过 KIP-266 寻址:https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=75974886

关于java - KafkaConsumer 在轮询时进入无限期等待状态,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/50392782/

相关文章:

java - Spring RequestMapping 正则表达式排除字符串

java - System.out 被声明为 static Final 并用 null 初始化?

java - 删除笔记数组后如何重新加载笔记数组?

apache-kafka - 如何将 kafkacat 有效负载打印输出转换为二进制

java - 发送到主题时抛出 TimeoutException

apache-kafka - BigQuery 以 Kafka 作为源

java - 使用 Java 混音器混合两个音频流

java - 使用 Spark 从 Azure Blob 读取数据

apache-spark - Cassandra 中频繁截断的问题和 24 小时 ttl 创建大型墓碑

scala - 将 Spark-kafka InputDStream 转换为 Array[Bytes]