java - Spark 从 Blob 读取文本文件

标签 java apache-spark blob azure-hdinsight

编写了一段代码以通过 Spark 读取文本文件...在本地工作正常...但在 HDInsight 中运行时生成错误 -> 从 Blob 读取文本文件

org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 0.0 failed 4 times, most recent failure: Lost task 0.3 in stage 0.0 (TID 5, wn1-hchdin.bpqkkmavxs0ehkfnaruw4ed03d.dx.internal.cloudapp.net, executor 2): java.lang.AbstractMethodError: com.journaldev.sparkdemo.WordCounter$$Lambda$17/790636414.call(Ljava/lang/Object;)Ljava/util/Iterator; at org.apache.spark.api.java.JavaRDDLike$$anonfun$fn$1$1.apply(JavaRDDLike.scala:125) at org.apache.spark.api.java.JavaRDDLike$$anonfun$fn$1$1.apply(JavaRDDLike.scala:125) at scala.collection.Iterator$$anon$12.nextCur(Iterator.scala:434) at scala.collection.Iterator$$anon$12.hasNext(Iterator.scala:440) at scala.collection.Iterator$class.foreach(Iterator.scala:893) at scala.collection.AbstractIterator.foreach(Iterator.scala:1336) at org.apache.spark.rdd.RDD$$anonfun$foreach$1$$anonfun$apply$28.apply(RDD.scala:927) at org.apache.spark.rdd.RDD$$anonfun$foreach$1$$anonfun$apply$28.apply(RDD.scala:927) at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:2074) at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:2074) at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87) at org.apache.spark.scheduler.Task.run(Task.scala:109) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:345) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) at java.lang.Thread.run(Thread.java:748)

这是我的代码

    JavaSparkContext ct = new JavaSparkContext();
        Configuration config = ct.hadoopConfiguration();
        config.set("fs.azure", "org.apache.hadoop.fs.azure.NativeAzureFileSystem");
        config.set("org.apache.hadoop.fs.azure.SimpleKeyProvider", "<<key>>");

        JavaRDD<String> inputFile = ct.textFile("wasb://<<container-nam>>@<<account>>.blob.core.windows.net/directory/file.txt");

        JavaRDD<String> wordsFromFile = inputFile.flatMap(content -> Arrays.asList(content.split(" ")));

        wordsFromFile.foreach(cc ->{System.out.println("p :"+cc.toString());});

最佳答案

对于本地运行的Spark,官方有一个blog介绍如何从 Spark 访问 Azure Blob 存储。关键是您需要在 core-site.xml 文件中将 Azure 存储帐户配置为与 HDFS 兼容的存储,并将两个 jar 文件 hadoop-azure 和 azure-storage 添加到类路径中,以便通过协议(protocol) wasb[s] 访问 HDFS。可以引用官方tutorial了解带有 wasb 的 HDFS 兼容存储,以及 blog有关 HDInsight 配置的更多详细信息。

对于在Azure上运行的Spark,区别只是仅使用wasb访问HDFS,其他准备工作已经由Azure在使用Spark创建HDInsight集群时完成。列出文件的方法是listFiles或wholeTextFiles SparkContext 的。

希望有帮助。

关于java - Spark 从 Blob 读取文本文件,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/56836700/

相关文章:

scala - 你如何在 sbt 中链接 spark 和 kafka?

java - Azure Blob 存储 Java SDK 12 从 Blob 名称中提取文件名

mysql - 在cakephp中下载Blob文件

java - Selenium FirefoxDriver 如何使用 Crontab 执行自动化测试

java - 用 Java 解析 XML

apache-spark - 如何在spark查询中不硬编码任何列名的情况下检查一行的所有列是否为空?

java - 无法通过 MongoDB 连接器使用 Spark SQL 查询 MongoDB

javascript - 哪个ArrayBufferView

java - 安卓应用注册验证

java - Swift 解码和编码字符串