我的问题是 Storm KafkaSpout 在一段时间后停止使用来自 Kafka 主题的消息。当在 Storm 中启用调试时,我得到这样的日志文件:
2016-07-05 03:58:26.097 o.a.s.d.task [INFO] Emitting: packet_spout __metrics [#object[org.apache.storm.metric.api.IMetricsConsumer$TaskInfo 0x2c35b34f "org.apache.storm.metric.api.IMetricsConsumer$TaskInfo@2c35b34f"] [#object[org.apache.storm.metric.api.IMetricsConsumer$DataPoint 0x798f1e35 "[__ack-count = {default=0}]"] #object[org.apache.storm.metric.api.IMetricsConsumer$DataPoint 0x230867ec "[__sendqueue = {sojourn_time_ms=0.0, write_pos=5411461, read_pos=5411461, overflow=0, arrival_rate_secs=0.0, capacity=1024, population=0}]"] #object[org.apache.storm.metric.api.IMetricsConsumer$DataPoint 0x7cdec8eb "[__complete-latency = {default=0.0}]"] #object[org.apache.storm.metric.api.IMetricsConsumer$DataPoint 0x658fc59 "[__skipped-max-spout = 0]"] #object[org.apache.storm.metric.api.IMetricsConsumer$DataPoint 0x3c1f3a50 "[__receive = {sojourn_time_ms=4790.5, write_pos=2468305, read_pos=2468304, overflow=0, arrival_rate_secs=0.20874647740319383, capacity=1024, population=1}]"] #object[org.apache.storm.metric.api.IMetricsConsumer$DataPoint 0x262d7906 "[__skipped-inactive = 0]"] #object[org.apache.storm.metric.api.IMetricsConsumer$DataPoint 0x73648c7e "[kafkaPartition = {Partition{host=slave103:9092, topic=packet, partition=12}/fetchAPICallCount=0, Partition{host=slave103:9092, topic=packet, partition=12}/fetchAPILatencyMax=null, Partition{host=slave103:9092, topic=packet, partition=12}/lostMessageCount=0, Partition{host=slave103:9092, topic=packet, partition=12}/fetchAPILatencyMean=null, Partition{host=slave103:9092, topic=packet, partition=12}/fetchAPIMessageCount=0}]"] #object[org.apache.storm.metric.api.IMetricsConsumer$DataPoint 0x4e43df61 "[kafkaOffset = {packet/totalLatestCompletedOffset=154305947, packet/partition_12/spoutLag=82472754, packet/totalEarliestTimeOffset=233919465, packet/partition_12/earliestTimeOffset=233919465, packet/partition_12/latestEmittedOffset=154307691, packet/partition_12/latestTimeOffset=236778701, packet/totalLatestEmittedOffset=154307691, packet/partition_12/latestCompletedOffset=154305947, packet/totalLatestTimeOffset=236778701, packet/totalSpoutLag=82472754}]"] #object[org.apache.storm.metric.api.IMetricsConsumer$DataPoint 0x49fe816b "[__transfer-count = {__ack_init=0, default=0, __metrics=0}]"] #object[org.apache.storm.metric.api.IMetricsConsumer$DataPoint 0x63e2bdc0 "[__fail-count = {}]"] #object[org.apache.storm.metric.api.IMetricsConsumer$DataPoint 0x3b17bb7b "[__skipped-throttle = 1086120]"] #object[org.apache.storm.metric.api.IMetricsConsumer$DataPoint 0x1315a68c "[__emit-count = {__ack_init=0, default=0, __metrics=0}]"]]]
2016-07-05 03:58:55.042 o.a.s.d.executor [INFO] Processing received message FOR -2 TUPLE: source: __system:-1, stream: __tick, id: {}, [30]
2016-07-05 03:59:25.042 o.a.s.d.executor [INFO] Processing received message FOR -2 TUPLE: source: __system:-1, stream: __tick, id: {}, [30]
2016-07-05 03:59:25.946 o.a.s.d.executor [INFO] Processing received message FOR -2 TUPLE: source: __system:-1, stream: __metrics_tick, id: {}, [60]
我的测试拓扑非常简单,一个 KafkaSpout 和另一个 Counter Bolt。当拓扑正常工作时,FOR 和TUPLE 之间的值为正数;当拓扑停止消费消息时,该值变为负数。所以我很好奇是什么导致了 Processing received message FOR -2 TUPLE 的问题,以及如何解决这个问题?
对了,我的实验环境是:
OS: Red Hat Enterprise Linux Server release 7.0 (Maipo)
Kafka: 0.10.0.0
Storm: 1.0.1
最佳答案
在 stom 邮件列表的帮助下,我能够调整 KafkaSpout 并解决问题。以下设置对我有用。
config.put(Config.TOPOLOGY_MAX_SPOUT_PENDING, 2048);
config.put(Config.TOPOLOGY_BACKPRESSURE_ENABLE, false);
config.put(Config.TOPOLOGY_EXECUTOR_RECEIVE_BUFFER_SIZE, 16384);
config.put(Config.TOPOLOGY_EXECUTOR_SEND_BUFFER_SIZE, 16384);
我通过发送 20k-50k 批处理并在突发之间暂停 1 秒来进行测试。每条消息为 2048 字节。
我正在运行 3 节点集群,我的拓扑有 4 个 spout,主题有 64 个分区。
在 200M 消息后它仍然有效....
关于java - Storm KafkaSpout 停止消费来自 Kafka Topic 的消息,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/38198961/