hadoop - Storm HDFS bolt 不起作用

标签 hadoop hdfs apache-storm hortonworks-data-platform

因此,我才刚刚开始研究 Storm 并试图理解它。我正在尝试连接到kafka主题,读取数据并将其写入HDFS bolt 。
最初,我在没有shuffleGrouping(“stormspout”)的情况下创建了它,而我的Storm UI则显示该spout正在使用主题中的数据,但没有写入 bolt (除了在HDFS上创建的空文件之外) 。然后,我添加了shuffleGrouping(“stormspout”);现在 bolt 似乎出现了错误。如果有人可以提供帮助,我将非常感谢。

谢谢,
科尔曼

错误

2015-04-13 00:02:58 s.k.PartitionManager [INFO] Read partition information from: /storm/partition_0 --> null 2015-04-13 00:02:58 s.k.PartitionManager [INFO] No partition information found, using configuration to determine offset 2015-04-13 00:02:58 s.k.PartitionManager [INFO] Last commit offset from zookeeper: 0 2015-04-13 00:02:58 s.k.PartitionManager [INFO] Commit offset 0 is more than 9223372036854775807 behind, resetting to startOffsetTime=-2 2015-04-13 00:02:58 s.k.PartitionManager [INFO] Starting Kafka 192.168.134.137:0 from offset 0 2015-04-13 00:02:58 s.k.ZkCoordinator [INFO] Task [1/1] Finished refreshing 2015-04-13 00:02:58 b.s.d.task [INFO] Emitting: stormspout default [colmanblah] 2015-04-13 00:02:58 b.s.d.executor [INFO] TRANSFERING tuple TASK: 2 TUPLE: source: stormspout:3, stream: default, id: {462820364856350458=5573117062061876630}, [colmanblah] 2015-04-13 00:02:58 b.s.d.task [INFO] Emitting: stormspout __ack_init [462820364856350458 5573117062061876630 3] 2015-04-13 00:02:58 b.s.d.executor [INFO] TRANSFERING tuple TASK: 1 TUPLE: source: stormspout:3, stream: __ack_init, id: {}, [462820364856350458 5573117062061876630 3] 2015-04-13 00:02:58 b.s.d.executor [INFO] Processing received message FOR 1 TUPLE: source: stormspout:3, stream: __ack_init, id: {}, [462820364856350458 5573117062061876630 3] 2015-04-13 00:02:58 b.s.d.executor [INFO] BOLT ack TASK: 1 TIME: TUPLE: source: stormspout:3, stream: __ack_init, id: {}, [462820364856350458 5573117062061876630 3] 2015-04-13 00:02:58 b.s.d.executor [INFO] Execute done TUPLE source: stormspout:3, stream: __ack_init, id: {}, [462820364856350458 5573117062061876630 3] TASK: 1 DELTA: 2015-04-13 00:02:59 b.s.d.executor [INFO] Prepared bolt stormbolt:(2) 2015-04-13 00:02:59 b.s.d.executor [INFO] Processing received message FOR 2 TUPLE: source: stormspout:3, stream: default, id: {462820364856350458=5573117062061876630}, [colmanblah]



2015-04-13 00:02:59 b.s.util [错误]异步循环死了!
            java.lang.RuntimeException: java.lang.NullPointerException
                    at backtype.storm.utils.DisruptorQueue.consumeBatchToCursor(DisruptorQueue.java:128) ~[storm-core-0.9.3.2.2.0.0-2041.jar:0.9.3.2.2.0.0-2041]
                    at backtype.storm.utils.DisruptorQueue.consumeBatchWhenAvailable(DisruptorQueue.java:99) ~[storm-core-0.9.3.2.2.0.0-2041.jar:0.9.3.2.2.0.0-2041]
                    at backtype.storm.disruptor$consume_batch_when_available.invoke(disruptor.clj:80) ~[storm-core-0.9.3.2.2.0.0-2041.jar:0.9.3.2.2.0.0-2041]
                    at backtype.storm.daemon.executor$fn__5697$fn__5710$fn__5761.invoke(executor.clj:794) ~[storm-core-0.9.3.2.2.0.0-2041.jar:0.9.3.2.2.0.0-2041]
                    at backtype.storm.util$async_loop$fn__452.invoke(util.clj:465) ~[storm-core-0.9.3.2.2.0.0-2041.jar:0.9.3.2.2.0.0-2041]
                    at clojure.lang.AFn.run(AFn.java:24) [clojure-1.5.1.jar:na]
                    at java.lang.Thread.run(Thread.java:745) [na:1.7.0_71]
            Caused by: java.lang.NullPointerException: null
                    at org.apache.storm.hdfs.bolt.HdfsBolt.execute(HdfsBolt.java:92) ~[storm-hdfs-0.9.3.2.2.0.0-2041.jar:0.9.3.2.2.0.0-2041]
                    at backtype.storm.daemon.executor$fn__5697$tuple_action_fn__5699.invoke(executor.clj:659) ~[storm-core-0.9.3.2.2.0.0-2041.jar:0.9.3.2.2.0.0-2041]
                    at backtype.storm.daemon.executor$mk_task_receiver$fn__5620.invoke(executor.clj:415) ~[storm-core-0.9.3.2.2.0.0-2041.jar:0.9.3.2.2.0.0-2041]
                    at backtype.storm.disruptor$clojure_handler$reify__1741.onEvent(disruptor.clj:58) ~[storm-core-0.9.3.2.2.0.0-2041.jar:0.9.3.2.2.0.0-2041]
                    at backtype.storm.utils.DisruptorQueue.consumeBatchToCursor(DisruptorQueue.java:120) ~[storm-core-0.9.3.2.2.0.0-2041.jar:0.9.3.2.2.0.0-2041]
                    ... 6 common frames omitted
            2015-04-08 04:26:39 b.s.d.executor [ERROR]
            java.lang.RuntimeException: java.lang.NullPointerException
                    at backtype.storm.utils.DisruptorQueue.consumeBatchToCursor(DisruptorQueue.java:128) ~[storm-core-0.9.3.2.2.0.0-2041.jar:0.9.3.2.2.0.0-2041]
                    at backtype.storm.utils.DisruptorQueue.consumeBatchWhenAvailable(DisruptorQueue.java:99) ~[storm-core-0.9.3.2.2.0.0-2041.jar:0.9.3.2.2.0.0-2041]
                    at backtype.storm.disruptor$consume_batch_when_available.invoke(disruptor.clj:80) ~[storm-core-0.9.3.2.2.0.0-2041.jar:0.9.3.2.2.0.0-2041]
                    at backtype.storm.daemon.executor$fn__5697$fn__5710$fn__5761.invoke(executor.clj:794) ~[storm-core-0.9.3.2.2.0.0-2041.jar:0.9.3.2.2.0.0-2041]
                    at backtype.storm.util$async_loop$fn__452.invoke(util.clj:465) ~[storm-core-0.9.3.2.2.0.0-2041.jar:0.9.3.2.2.0.0-2041]
                    at clojure.lang.AFn.run(AFn.java:24) [clojure-1.5.1.jar:na]
                    at java.lang.Thread.run(Thread.java:745) [na:1.7.0_71]
            Caused by: java.lang.NullPointerException: null
                    at org.apache.storm.hdfs.bolt.HdfsBolt.execute(HdfsBolt.java:92) ~[storm-hdfs-0.9.3.2.2.0.0-2041.jar:0.9.3.2.2.0.0-2041]
                    at backtype.storm.daemon.executor$fn__5697$tuple_action_fn__5699.invoke(executor.clj:659) ~[storm-core-0.9.3.2.2.0.0-2041.jar:0.9.3.2.2.0.0-2041]
                    at backtype.storm.daemon.executor$mk_task_receiver$fn__5620.invoke(executor.clj:415) ~[storm-core-0.9.3.2.2.0.0-2041.jar:0.9.3.2.2.0.0-2041]
                    at backtype.storm.disruptor$clojure_handler$reify__1741.onEvent(disruptor.clj:58) ~[storm-core-0.9.3.2.2.0.0-2041.jar:0.9.3.2.2.0.0-2041]
                    at backtype.storm.utils.DisruptorQueue.consumeBatchToCursor(DisruptorQueue.java:120) ~[storm-core-0.9.3.2.2.0.0-2041.jar:0.9.3.2.2.0.0-2041]

代码:
    TopologyBuilder builder = new TopologyBuilder();    
    Config config = new Config();
    //config.put(Config.TOPOLOGY_TRIDENT_BATCH_EMIT_INTERVAL_MILLIS, 7000);
    config.setNumWorkers(1);
    config.setDebug(true);  
    //LocalCluster cluster = new LocalCluster();


    //zookeeper
    BrokerHosts brokerHosts = new ZkHosts("192.168.134.137:2181", "/brokers");      

    //spout
    SpoutConfig spoutConfig = new SpoutConfig(brokerHosts, "myTopic", "/kafkastorm", "KafkaSpout");
    spoutConfig.scheme = new SchemeAsMultiScheme(new StringScheme());
    spoutConfig.forceFromStart = true;
    builder.setSpout("stormspout", new KafkaSpout(spoutConfig),4);

    //bolt
    SyncPolicy syncPolicy = new CountSyncPolicy(10); //Synchronize data buffer with the filesystem every 10 tuples
    FileRotationPolicy rotationPolicy = new FileSizeRotationPolicy(5.0f, Units.MB); // Rotate data files when they reach five MB
    FileNameFormat fileNameFormat = new DefaultFileNameFormat().withPath("/stormstuff"); // Use default, Storm-generated file names
    builder.setBolt("stormbolt", new HdfsBolt()
                                 .withFsUrl("hdfs://192.168.134.137:8020")//54310
                                 .withSyncPolicy(syncPolicy)
                                 .withRotationPolicy(rotationPolicy)
                                 .withFileNameFormat(fileNameFormat),2
                    ).shuffleGrouping("stormspout");        


    //cluster.submitTopology("ColmansStormTopology", config, builder.createTopology());     

    try {
        StormSubmitter.submitTopologyWithProgressBar("ColmansStormTopology", config, builder.createTopology());

    } catch (AlreadyAliveException e) {
        e.printStackTrace();
    } catch (InvalidTopologyException e) {
        e.printStackTrace();
    }

POM.XML依赖性
              <dependencies>
            <dependency>
                <groupId>org.apache.storm</groupId>
                <artifactId>storm-core</artifactId>
                <version>0.9.3</version>
            </dependency>
            <dependency>
                <groupId>org.apache.storm</groupId>
                <artifactId>storm-kafka</artifactId>
                <version>0.9.3</version>
            </dependency>
            <dependency>
                <groupId>log4j</groupId>
                <artifactId>log4j</artifactId>
                <version>1.2.17</version>
            </dependency> 
            <dependency>
                <groupId>org.apache.storm</groupId>
                <artifactId>storm-hdfs</artifactId>
                <version>0.9.3</version>
            </dependency>
                    <dependency>
                        <groupId>org.apache.kafka</groupId>
                        <artifactId>kafka_2.10</artifactId>
                        <version>0.8.1.1</version>
                        <exclusions>
                            <exclusion>
                                <groupId>log4j</groupId>
                                <artifactId>log4j</artifactId>
                            </exclusion>
                            <exclusion>
                                <groupId>org.slf4j</groupId>
                                <artifactId>slf4j-simple</artifactId>
                            </exclusion>
                        </exclusions>
                    </dependency>
              </dependencies>  

最佳答案

首先,如果您从不同的工作线程中发出数据,则尝试从execute方法发出值,然后让所有工作线程在LinkedBlockingQueue中馈送数据,并且只有一个工作线程将允许从LinkedBlockingQueue中发出值。

其次,尝试将Config.setMaxSpoutPending设置为某个值,然后再次尝试运行代码,并检查方案是否仍然存在,尝试减小该值。

引用-Config.TOPOLOGY_MAX_SPOUT_PENDING:这设置一次可以在单个喷口任务上挂起的喷口元组的最大数量(挂起表示该元组尚未被确认或失败)。强烈建议您设置此配置以防止队列爆炸。

关于hadoop - Storm HDFS bolt 不起作用,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/29505978/

相关文章:

hadoop - 使用相同的 Pig 脚本存储多个文件

apache-storm - 在 Storm Spout 中,命名消费者组

java - 使用 hadoop 自定义字数统计

hadoop - Hive:执行错误,从org.apache.hadoop.hive.ql.exec.FunctionTask返回代码-101

java - 在Hbase中存储图片丢失Meta数据和Exif

apache-storm - 使用 KafkaSpout,确认一个元组两次会导致超时?

java.lang.RuntimeException : java. lang.ClassCastException : java. lang.Long 无法转换为 java.lang.String

Hadoop 演示代码不工作

hadoop/HDFS : Is it possible to write from several processes to the same file?

hadoop - pig 示例 apache [输入路径不存在]