java - 未在 Storm UI 中创建 Storm 拓扑

标签 java hadoop apache-storm apache-storm-topology

当将 Storm 作业提交到 Hadoop 集群以使用 hdfsbolt 写入 hdfs 时,不会在 Storm UI 中创建拓扑。显示错误是因为代码中使用了一些包(org.apache.storm.hdfs.bolt.AbstractHdfsBolt.cleanup(AbstractHdfsBolt.java:261) ~[f083f1dc515311e9868bcf07babd3298.jar:?])。

错误:

42608 [Thread-20-bolt-executor[3 3]] INFO  o.a.s.util - Async loop interrupted!
42608 [Thread-19-disruptor-executor[3 3]-send-queue] INFO  o.a.s.util - Async loop interrupted!
42608 [SLOT_1024] INFO  o.a.s.d.executor - Shut down executor bolt:[3 3]
42608 [SLOT_1024] INFO  o.a.s.d.executor - Shutting down executor __acker:[1 1]
42608 [Thread-22-__acker-executor[1 1]] INFO  o.a.s.util - Async loop interrupted!
42608 [Thread-21-disruptor-executor[1 1]-send-queue] INFO  o.a.s.util - Async loop interrupted!
42608 [SLOT_1024] INFO  o.a.s.d.executor - Shut down executor __acker:[1 1]
42608 [SLOT_1024] INFO  o.a.s.d.executor - Shutting down executor __system:[-1 -1]
42608 [Thread-24-__system-executor[-1 -1]] INFO  o.a.s.util - Async loop interrupted!
42608 [Thread-23-disruptor-executor[-1 -1]-send-queue] INFO  o.a.s.util - Async loop interrupted!
42609 [SLOT_1024] INFO  o.a.s.d.executor - Shut down executor __system:[-1 -1]
42609 [SLOT_1024] INFO  o.a.s.d.executor - Shutting down executor kafka_spout:[5 5]
42609 [Thread-25-disruptor-executor[5 5]-send-queue] INFO  o.a.s.util - Async loop interrupted!
42609 [Thread-26-kafka_spout-executor[5 5]] INFO  o.a.s.util - Async loop interrupted!
42611 [SLOT_1024] INFO  o.a.s.d.executor - Shut down executor kafka_spout:[5 5]
42611 [SLOT_1024] INFO  o.a.s.d.executor - Shutting down executor forwardToKafka:[4 4]
42611 [Thread-28-forwardToKafka-executor[4 4]] INFO  o.a.s.util - Async loop interrupted!
42611 [Thread-27-disruptor-executor[4 4]-send-queue] INFO  o.a.s.util - Async loop interrupted!
42612 [SLOT_1024] ERROR o.a.s.d.s.Slot - Error when processing event
java.lang.NullPointerException: null
    at org.apache.storm.hdfs.bolt.AbstractHdfsBolt.cleanup(AbstractHdfsBolt.java:261) ~[f083f1dc515311e9868bcf07babd3298.jar:?]
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) ~[?:1.8.0_112]
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) ~[?:1.8.0_112]
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) ~[?:1.8.0_112]
    at java.lang.reflect.Method.invoke(Method.java:498) ~[?:1.8.0_112]
    at clojure.lang.Reflector.invokeMatchingMethod(Reflector.java:93) ~[clojure-1.7.0.jar:?]
    at clojure.lang.Reflector.invokeNoArgInstanceMember(Reflector.java:313) ~[clojure-1.7.0.jar:?]
    at org.apache.storm.daemon.executor$fn__9739.invoke(executor.clj:878) ~[storm-core-1.2.1.3.1.0.0-78.jar:1.2.1.3.1.0.0-78]
    at clojure.lang.MultiFn.invoke(MultiFn.java:233) ~[clojure-1.7.0.jar:?]
    at org.apache.storm.daemon.executor$mk_executor$reify__9530.shutdown(executor.clj:437) ~[storm-core-1.2.1.3.1.0.0-78.jar:1.2.1.3.1.0.0-78]
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) ~[?:1.8.0_112]
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) ~[?:1.8.0_112]
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) ~[?:1.8.0_112]
    at java.lang.reflect.Method.invoke(Method.java:498) ~[?:1.8.0_112]
    at clojure.lang.Reflector.invokeMatchingMethod(Reflector.java:93) ~[clojure-1.7.0.jar:?]
    at clojure.lang.Reflector.invokeNoArgInstanceMember(Reflector.java:313) ~[clojure-1.7.0.jar:?]
    at org.apache.storm.daemon.worker$fn__10165$exec_fn__1369__auto__$reify__10167$shutdown_STAR___10187.invoke(worker.clj:684) ~[storm-core-1.2.1.3.1.0.0-78.jar:1.2.1.3.1.0.0-78]
    at org.apache.storm.daemon.worker$fn__10165$exec_fn__1369__auto__$reify$reify__10213.shutdown(worker.clj:724) ~[storm-core-1.2.1.3.1.0.0-78.jar:1.2.1.3.1.0.0-78]
    at org.apache.storm.ProcessSimulator.killProcess(ProcessSimulator.java:67) ~[storm-core-1.2.1.3.1.0.0-78.jar:1.2.1.3.1.0.0-78]
    at org.apache.storm.daemon.supervisor.LocalContainer.kill(LocalContainer.java:69) ~[storm-core-1.2.1.3.1.0.0-78.jar:1.2.1.3.1.0.0-78]
    at org.apache.storm.daemon.supervisor.Slot.killContainerForChangedAssignment(Slot.java:311) ~[storm-core-1.2.1.3.1.0.0-78.jar:1.2.1.3.1.0.0-78]
    at org.apache.storm.daemon.supervisor.Slot.handleRunning(Slot.java:527) ~[storm-core-1.2.1.3.1.0.0-78.jar:1.2.1.3.1.0.0-78]
    at org.apache.storm.daemon.supervisor.Slot.stateMachineStep(Slot.java:265) ~[storm-core-1.2.1.3.1.0.0-78.jar:1.2.1.3.1.0.0-78]
    at org.apache.storm.daemon.supervisor.Slot.run(Slot.java:752) [storm-core-1.2.1.3.1.0.0-78.jar:1.2.1.3.1.0.0-78]
42612 [SLOT_1024] ERROR o.a.s.u.Utils - Halting process: Error when processing an event
java.lang.RuntimeException: Halting process: Error when processing an event
    at org.apache.storm.utils.Utils.exitProcess(Utils.java:1814) [storm-core-1.2.1.3.1.0.0-78.jar:1.2.1.3.1.0.0-78]
    at org.apache.storm.daemon.supervisor.Slot.run(Slot.java:796) [storm-core-1.2.1.3.1.0.0-78.jar:1.2.1.3.1.0.0-78]

以下是使用的Java代码。这是主要的拓扑文件。数据从 Kafka 采集,通过 hdfsbolt 发送到 hdfs。部分数据存储在 hdfs 中,但所有工作节点都没有工作,而且拓扑也没有在 Storm UI 中创建。

Java:

package hdpstrm.hdpstrm;

import java.io.File;
import java.io.FileOutputStream;
import java.util.Properties;

import org.apache.storm.Config;
import org.apache.storm.LocalCluster;
import org.apache.storm.StormSubmitter;

import org.apache.storm.kafka.spout. * ;
import org.apache.storm.kafka.spout.KafkaSpoutConfig.Builder;
import org.apache.storm.topology.TopologyBuilder;
import org.apache.storm.tuple.Values;
import org.apache.storm.kafka.bolt.KafkaBolt;
import org.apache.storm.kafka.bolt.mapper.FieldNameBasedTupleToKafkaMapper;
import org.apache.storm.kafka.bolt.selector.DefaultTopicSelector;

import org.apache.storm.hdfs.bolt.HdfsBolt;
import org.apache.storm.hdfs.bolt.format.DefaultFileNameFormat;
import org.apache.storm.hdfs.bolt.format.DelimitedRecordFormat;
import org.apache.storm.hdfs.bolt.format.FileNameFormat;
import org.apache.storm.hdfs.bolt.format.RecordFormat;
import org.apache.storm.hdfs.bolt.rotation.FileRotationPolicy;
import org.apache.storm.hdfs.bolt.rotation.FileSizeRotationPolicy;
import org.apache.storm.hdfs.bolt.sync.CountSyncPolicy;
import org.apache.storm.hdfs.bolt.sync.SyncPolicy;

import hdpstrm.hdpstrm.printBolt;

public class MyMain {

    private static HdfsBolt createHdfsBolt() {
        // use "|" instead of "," for field delimiter
        RecordFormat format = new DelimitedRecordFormat().withFieldDelimiter("|");

        // sync the filesystem after every 1k tuples
        SyncPolicy syncPolicy = new CountSyncPolicy(1000);

        // rotate files when they reach 5MB
        FileRotationPolicy rotationPolicy = new FileSizeRotationPolicy(5.0f, FileSizeRotationPolicy.Units.MB);

        DefaultFileNameFormat fileNameFormat = new DefaultFileNameFormat();
        fileNameFormat.withPath("/user/march26");
        fileNameFormat.withPrefix("upcse"); //Files end with the following suffix
        fileNameFormat.withExtension(".csv");

        return new HdfsBolt().withFsUrl("hdfs://localhost:8020").withFileNameFormat(fileNameFormat).withRecordFormat(format).withRotationPolicy(rotationPolicy).withSyncPolicy(syncPolicy);
    }

    public static void main(String[] args) throws Exception {

        Config config = new Config();
        config.setDebug(false);
        config.setNumWorkers(1);
        config.put("ALLFILE", (Object)"/home/kdx/out2.txt");
        TopologyBuilder tp = new TopologyBuilder();
        String kafka_bootstrap = "localhost:6667";
        String kafka_topic = args[0];
        Builder < String,
        String > kafka_config = KafkaSpoutConfig.builder(kafka_bootstrap, kafka_topic).setGroupId("group_id");
        kafka_config.build().getKafkaProps().keySet();
        KafkaSpout < String,
        String > kafka_spout = new KafkaSpout < String,
        String > (kafka_config.build());

        tp.setSpout("kafka_spout", kafka_spout, 1);
        tp.setBolt("bolt", new printBolt()).shuffleGrouping("kafka_spout");
        tp.setBolt("forwardToKafka", createHdfsBolt(), 1).shuffleGrouping("bolt");

        LocalCluster cluster = new LocalCluster();
        cluster.submitTopology("kafkaTopology", config, tp.createTopology());

        //Wait for 40 seconds
        Thread.sleep(40000);

        //Stop the topology
        cluster.shutdown();
        System.out.println(" ******** TERMINATED THE LOCAL CLUSTER *********");

        StormSubmitter.submitTopologyWithProgressBar("MyMain", config, tp.createTopology());
    }
}

预期的结果是在 Storm UI 中创建拓扑,并确保在运行 Storm jar 时所有工作节点的参与。

最佳答案

错误是由于 storm-hdfs 中的错误。

您遇到错误的行是 https://github.com/apache/storm/blob/v1.2.1/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/bolt/AbstractHdfsBolt.java#L261

只有当轮换策略是 TimedRotationPolicy 时才会初始化该变量,而您的不是。

您可以在 https://issues.apache.org/jira 提交错误报告.也欢迎 PR 在 https://github.com/apache/storm .

关于java - 未在 Storm UI 中创建 Storm 拓扑,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/55410281/

相关文章:

java - spark-1.5.1 在 java 代码中使用 HiveContext 抛出配置单元 1.2.0 的内存错误

hadoop - 使用带有回车符的HIVE-Error在Hadoop中加载电子邮件数据

hadoop - Hive外部表-删除分区

logging - Storm-0.10.0 所有worker登录supervisor.log

java - 应用程序状态 : Static variables or Setters and Getters in JAVA

SQL 选择 JSON_OBJ 的 Java 结果集迭代

java - 发送多个 "println"给客户端

java - 如何保护 Web 服务位置的安全?

apache-storm - 让 Storm 喷口等待 bolt 准备就绪

docker - 如何使用自定义配置启动Apache Storm容器?