scala - 从 Spark 中的 Google 存储桶中读取文件

标签 scala hadoop apache-spark google-cloud-storage

我正在尝试从谷歌存储桶中读取文件,尽管我可以通过在进入 spark shell 时包含 gcs jar 来通过 spark-shell 读取它。通过 spark-submit 提交时抛出以下错误。

Exception in thread "main" java.lang.NoSuchMethodError: com.google.common.base.Splitter.splitToList(Ljava/lang/CharSequence;)Ljava/util/List;
        at com.google.cloud.hadoop.fs.gcs.GoogleHadoopFileSystemBase$ParentTimestampUpdateIncludePredicate.create(GoogleHadoopFileSystemBase.java:780)
        at com.google.cloud.hadoop.fs.gcs.GoogleHadoopFileSystemBase.createOptionsBuilderFromConfig(GoogleHadoopFileSystemBase.java:2130)
        at com.google.cloud.hadoop.fs.gcs.GoogleHadoopFileSystemBase.configure(GoogleHadoopFileSystemBase.java:1822)
        at com.google.cloud.hadoop.fs.gcs.GoogleHadoopFileSystemBase.initialize(GoogleHadoopFileSystemBase.java:1003)
        at com.google.cloud.hadoop.fs.gcs.GoogleHadoopFileSystemBase.initialize(GoogleHadoopFileSystemBase.java:966)
        at org.apache.hadoop.fs.FileSystem.createFileSystem(FileSystem.java:2689)
        at org.apache.hadoop.fs.FileSystem.access$200(FileSystem.java:94)
        at org.apache.hadoop.fs.FileSystem$Cache.getInternal(FileSystem.java:2723)
        at org.apache.hadoop.fs.FileSystem$Cache.get(FileSystem.java:2705)
        at org.apache.hadoop.fs.FileSystem.get(FileSystem.java:407)
        at org.apache.hadoop.fs.FileSystem.get(FileSystem.java:172)
        at org.apache.hadoop.mapred.JobConf.getWorkingDirectory(JobConf.java:656)
        at org.apache.hadoop.mapred.FileInputFormat.setInputPaths(FileInputFormat.java:440)
        at org.apache.hadoop.mapred.FileInputFormat.setInputPaths(FileInputFormat.java:413)
        at org.apache.spark.SparkContext$$anonfun$hadoopFile$1$$anonfun$33.apply(SparkContext.scala:1015)
        at org.apache.spark.SparkContext$$anonfun$hadoopFile$1$$anonfun$33.apply(SparkContext.scala:1015)
        at org.apache.spark.rdd.HadoopRDD$$anonfun$getJobConf$6.apply(HadoopRDD.scala:176)
        at org.apache.spark.rdd.HadoopRDD$$anonfun$getJobConf$6.apply(HadoopRDD.scala:176)
        at scala.Option.map(Option.scala:145)
        at org.apache.spark.rdd.HadoopRDD.getJobConf(HadoopRDD.scala:176)
        at org.apache.spark.rdd.HadoopRDD.getPartitions(HadoopRDD.scala:195)
        at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:239)
        at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:237)
        at scala.Option.getOrElse(Option.scala:120)
        at org.apache.spark.rdd.RDD.partitions(RDD.scala:237)
        at org.apache.spark.rdd.MapPartitionsRDD.getPartitions(MapPartitionsRDD.scala:35)
        at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:239)
        at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:237)
        at scala.Option.getOrElse(Option.scala:120)
        at org.apache.spark.rdd.RDD.partitions(RDD.scala:237)
        at org.apache.spark.SparkContext.runJob(SparkContext.scala:1929)
        at org.apache.spark.rdd.RDD.count(RDD.scala:1157)
        at com.google.reader.GoogleRead$.main(GoogleRead.scala:41)
        at com.google.reader.GoogleRead.main(GoogleRead.scala)
        at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
        at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
        at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
        at java.lang.reflect.Method.invoke(Method.java:498)
        at org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:752)
        at org.apache.spark.deploy.SparkSubmit$.doRunMain$1(SparkSubmit.scala:181)
        at org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:206)
        at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:121)
        at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)

我尝试排除 guava 依赖项,但仍然出现上述错误。下面是我正在使用的 build.sbt 文件

import sbt.ExclusionRule
name := "GoogleFileReader"

version := "1.0"

scalaVersion := "2.10.4"

libraryDependencies += "org.apache.spark" % "spark-sql_2.10" % "1.6.1" exclude("com.google.guava", "guava")
libraryDependencies += "com.google.cloud.bigdataoss" % "gcs-connector" % "1.6.0-hadoop2" 

assemblyMergeStrategy in assembly := {
  case PathList("javax", "servlet", xs @ _*) => MergeStrategy.last
  case PathList("javax", "activation", xs @ _*) => MergeStrategy.last
  case PathList("org", "apache", xs @ _*) => MergeStrategy.last
  case PathList("com", "google", xs @ _*) => MergeStrategy.last
  case PathList("com", "esotericsoftware", xs @ _*) => MergeStrategy.last
  case PathList("com", "codahale", xs @ _*) => MergeStrategy.last
  case PathList("com", "yammer", xs @ _*) => MergeStrategy.last
  case "about.html" => MergeStrategy.rename
  case "META-INF/ECLIPSEF.RSA" => MergeStrategy.last
  case "META-INF/mailcap" => MergeStrategy.last
  case "META-INF/mimetypes.default" => MergeStrategy.last
  case "plugin.properties" => MergeStrategy.last
  case "log4j.properties" => MergeStrategy.last
  case "pom.properties" => MergeStrategy.last
  case x =>
    val oldStrategy = (assemblyMergeStrategy in assembly).value
    oldStrategy(x)
}

最佳答案

在集群中运行时实际使用的guava库是Hadoop依赖的一个较旧的库,在driver/executor启动时提供。它在类路径中排在第一位,所以这就是为什么不使用您的代码所依赖的较新版本的原因;因此,NoSuchMethodError。

您可以尝试此解决方案 (http://jhz.name/2016/01/10/spark-classpath.html),它基本上表明应首先检查用户类路径,然后检查 Hadoop 类路径。它使用以下 spark 配置参数:

spark.driver.userClassPathFirst=true
spark.executor.userClassPathFirst=true

很可能你会像我一样遇到其他类型的麻烦(java.lang.LinkageError;因为在这种情况下使用的类加载器)。

对我有帮助的解决方案是隐藏我的代码正在使用的 Guava 版本,以便它不再与 Hadoop 所依赖的版本冲突。 请参阅对此的回复: https://groups.google.com/a/lists.datastax.com/forum/#!topic/spark-connector-user/8GCd8iQPy3c

它包括在 build.sbt 中添加此着色规则

assemblyShadeRules in assembly := Seq(  
  ShadeRule.rename("com.google.**" -> "shaded.@1").inAll 
)

确保您还更新了 com.google 的合并策略:

case PathList("shaded", xs @ _*) => MergeStrategy.last

关于scala - 从 Spark 中的 Google 存储桶中读取文件,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/41506574/

相关文章:

scala - org.specs2.mock.Mockito 匹配器未按预期工作

scala - 如何从 Play html 模板调用 Scala 函数

java - Spark 平面 map 函数抛出 "OutOfMemory"

java - 当索引位于自定义类上时,Spark join() 如何工作?

scala - 我需要更改什么才能使 Scala 2.13 MultiDict 用作​​ 2.12 的 MultiMap 的直接替代品?

scala - 在 ScalaTest + Mockito 中使用 Slick 模拟数据库并测试更新

hadoop - HDFS block 大小困惑

maven - 构建 hadoop 1.0.1 时出错 - 缺少 pom.xml

hadoop - 是否可以从Hadoop中的同一reducer写入HBase和多个文件

hadoop - 文件压缩格式如何影响我的 Spark 处理