java - Storm KafkaSpout 停止消费来自 Kafka Topic 的消息

标签 java apache-kafka apache-storm apache-zookeeper

我的问题是 Storm KafkaSpout 在一段时间后停止使用来自 Kafka 主题的消息。当在 Storm 中启用调试时,我得到这样的日志文件:

2016-07-05 03:58:26.097 o.a.s.d.task [INFO] Emitting: packet_spout __metrics [#object[org.apache.storm.metric.api.IMetricsConsumer$TaskInfo 0x2c35b34f "org.apache.storm.metric.api.IMetricsConsumer$TaskInfo@2c35b34f"] [#object[org.apache.storm.metric.api.IMetricsConsumer$DataPoint 0x798f1e35 "[__ack-count = {default=0}]"] #object[org.apache.storm.metric.api.IMetricsConsumer$DataPoint 0x230867ec "[__sendqueue = {sojourn_time_ms=0.0, write_pos=5411461, read_pos=5411461, overflow=0, arrival_rate_secs=0.0, capacity=1024, population=0}]"] #object[org.apache.storm.metric.api.IMetricsConsumer$DataPoint 0x7cdec8eb "[__complete-latency = {default=0.0}]"] #object[org.apache.storm.metric.api.IMetricsConsumer$DataPoint 0x658fc59 "[__skipped-max-spout = 0]"] #object[org.apache.storm.metric.api.IMetricsConsumer$DataPoint 0x3c1f3a50 "[__receive = {sojourn_time_ms=4790.5, write_pos=2468305, read_pos=2468304, overflow=0, arrival_rate_secs=0.20874647740319383, capacity=1024, population=1}]"] #object[org.apache.storm.metric.api.IMetricsConsumer$DataPoint 0x262d7906 "[__skipped-inactive = 0]"] #object[org.apache.storm.metric.api.IMetricsConsumer$DataPoint 0x73648c7e "[kafkaPartition = {Partition{host=slave103:9092, topic=packet, partition=12}/fetchAPICallCount=0, Partition{host=slave103:9092, topic=packet, partition=12}/fetchAPILatencyMax=null, Partition{host=slave103:9092, topic=packet, partition=12}/lostMessageCount=0, Partition{host=slave103:9092, topic=packet, partition=12}/fetchAPILatencyMean=null, Partition{host=slave103:9092, topic=packet, partition=12}/fetchAPIMessageCount=0}]"] #object[org.apache.storm.metric.api.IMetricsConsumer$DataPoint 0x4e43df61 "[kafkaOffset = {packet/totalLatestCompletedOffset=154305947, packet/partition_12/spoutLag=82472754, packet/totalEarliestTimeOffset=233919465, packet/partition_12/earliestTimeOffset=233919465, packet/partition_12/latestEmittedOffset=154307691, packet/partition_12/latestTimeOffset=236778701, packet/totalLatestEmittedOffset=154307691, packet/partition_12/latestCompletedOffset=154305947, packet/totalLatestTimeOffset=236778701, packet/totalSpoutLag=82472754}]"] #object[org.apache.storm.metric.api.IMetricsConsumer$DataPoint 0x49fe816b "[__transfer-count = {__ack_init=0, default=0, __metrics=0}]"] #object[org.apache.storm.metric.api.IMetricsConsumer$DataPoint 0x63e2bdc0 "[__fail-count = {}]"] #object[org.apache.storm.metric.api.IMetricsConsumer$DataPoint 0x3b17bb7b "[__skipped-throttle = 1086120]"] #object[org.apache.storm.metric.api.IMetricsConsumer$DataPoint 0x1315a68c "[__emit-count = {__ack_init=0, default=0, __metrics=0}]"]]]

2016-07-05 03:58:55.042 o.a.s.d.executor [INFO] Processing received message FOR -2 TUPLE: source: __system:-1, stream: __tick, id: {}, [30]

2016-07-05 03:59:25.042 o.a.s.d.executor [INFO] Processing received message FOR -2 TUPLE: source: __system:-1, stream: __tick, id: {}, [30]

2016-07-05 03:59:25.946 o.a.s.d.executor [INFO] Processing received message FOR -2 TUPLE: source: __system:-1, stream: __metrics_tick, id: {}, [60]

我的测试拓扑非常简单,一个 KafkaSpout 和另一个 Counter Bolt。当拓扑正常工作时,FORTUPLE 之间的值为正数;当拓扑停止消费消息时,该值变为负数。所以我很好奇是什么导致了 Processing received message FOR -2 TUPLE 的问题,以及如何解决这个问题?

对了,我的实验环境是:

OS: Red Hat Enterprise Linux Server release 7.0 (Maipo)
Kafka: 0.10.0.0
Storm: 1.0.1

最佳答案

在 stom 邮件列表的帮助下,我能够调整 KafkaSpout 并解决问题。以下设置对我有用。

config.put(Config.TOPOLOGY_MAX_SPOUT_PENDING, 2048);
config.put(Config.TOPOLOGY_BACKPRESSURE_ENABLE, false);
config.put(Config.TOPOLOGY_EXECUTOR_RECEIVE_BUFFER_SIZE, 16384);
config.put(Config.TOPOLOGY_EXECUTOR_SEND_BUFFER_SIZE, 16384);

我通过发送 20k-50k 批处理并在突发之间暂停 1 秒来进行测试。每条消息为 2048 字节。

我正在运行 3 节点集群,我的拓扑有 4 个 spout,主题有 64 个分区。

在 200M 消息后它仍然有效....

关于java - Storm KafkaSpout 停止消费来自 Kafka Topic 的消息,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/38198961/

相关文章:

java - java Action 监听器和内部类

java - 使用java布局定义容器最小尺寸

java - Spring Boot Kafka 消费者多种类型崩溃

java - 长时间获取 NotLeaderForPartitionException

apache-storm - 在哪里更改 SleepSpoutWaitStrategyTime 的值

java - 如何在 JComponent 的子类中使用 setPreferredSize(...) ?

java - Lucene/Hibernate 搜索锁定异常

json - 具有 json 模式的 Kafka jdbc 接收器连接器不起作用

apache-zookeeper - Apache Storm : Could not find leader nimbus from seed hosts

java - Jedis "couldn' t get pool resource"on a Storm topology