apache-spark - Spark 2.x + Tika : java. lang.NoSuchMethodError : org. apache.commons.compress.archivers.ArchiveStreamFactory.detect

标签 apache-spark apache-tika cloudera-cdh

我正在尝试解决 Apache Tika (>v 1.14) 解析作业的 spark-submit 类路径运行时问题。问题似乎涉及 spark-submit 类路径与我的 uber-jar。

平台:CDH 5.15 ( Spark 2.3 added via CDH docs ) 和 CDH 6 (Spark 2.2 捆绑在 CDH 6 中)

我试过/评论:

(Cloudera) Where does spark-submit look for Jar files?

(stackoverflow) resolving-dependency-problems-in-apache-spark

(stackoverflow) Apache Tika ArchiveStreamFactory.detect error

亮点:

  • Java 8/Scala 2.11
  • 我正在构建一个 uber-jar 并通过 spark-submit 调用该 uber-jar
  • 我试过在 spark-submit 调用中添加 --jars 选项(请参阅本文的进一步内容)
  • 我试过添加 --conf spark.driver.userClassPathFirst=true &&
    --conf spark.executor.userClassPathFirst=true 用于 spark-submit 调用(请参阅本文的进一步内容):

  • 如果我在 spark-submit 中包含 --conf 标志,结果:
    $ spark-submit --master local[*] --class com.example.App --conf spark.executor.userClassPathFirst=true ./target/uber-tikaTest-1.19.jar
    
    18/09/25 13:35:55 ERROR util.Utils: Exception encountered
    java.lang.NullPointerException
        at org.apache.spark.rdd.ParallelCollectionPartition$$anonfun$readObject$1.apply$mcV$sp(ParallelCollectionRDD.scala:72)
        at org.apache.spark.rdd.ParallelCollectionPartition$$anonfun$readObject$1.apply(ParallelCollectionRDD.scala:70)
        at org.apache.spark.rdd.ParallelCollectionPartition$$anonfun$readObject$1.apply(ParallelCollectionRDD.scala:70)
        at org.apache.spark.util.Utils$.tryOrIOException(Utils.scala:1307)
        at org.apache.spark.rdd.ParallelCollectionPartition.readObject(ParallelCollectionRDD.scala:70)
        at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
        at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
        at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
        at java.lang.reflect.Method.invoke(Method.java:498)
        at java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:1058)
        at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2136)
        at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2027)
        at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1535)
        at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2245)
        at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2169)
        at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2027)
        at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1535)
        at java.io.ObjectInputStream.readObject(ObjectInputStream.java:422)
        at org.apache.spark.serializer.JavaDeserializationStream.readObject(JavaSerializer.scala:75)
        at org.apache.spark.serializer.JavaSerializerInstance.deserialize(JavaSerializer.scala:114)
        at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:312)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
        at java.lang.Thread.run(Thread.java:748)
    18/09/25 13:35:55 ERROR util.Utils: Exception encountered
    java.lang.NullPointerException
        at org.apache.spark.rdd.ParallelCollectionPartition$$anonfun$readObject$1.apply$mcV$sp(ParallelCollectionRDD.scala:72)
        at org.apache.spark.rdd.ParallelCollectionPartition$$anonfun$readObject$1.apply(ParallelCollectionRDD.scala:70)
        at org.apache.spark.rdd.ParallelCollectionPartition$$anonfun$readObject$1.apply(ParallelCollectionRDD.scala:70)
        at org.apache.spark.util.Utils$.tryOrIOException(Utils.scala:1307)
        at org.apache.spark.rdd.ParallelCollectionPartition.readObject(ParallelCollectionRDD.scala:70)
        at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
        at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
        at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
        at java.lang.reflect.Method.invoke(Method.java:498)
        at java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:1058)
        at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2136)
        at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2027)
        at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1535)
        at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2245)
        at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2169)
        at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2027)
        at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1535)
        at java.io.ObjectInputStream.readObject(ObjectInputStream.java:422)
        at org.apache.spark.serializer.JavaDeserializationStream.readObject(JavaSerializer.scala:75)
        at org.apache.spark.serializer.JavaSerializerInstance.deserialize(JavaSerializer.scala:114)
        at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:312)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
        at java.lang.Thread.run(Thread.java:748)
    

    以下错误消息下方是文件:
  • build-and-run.sh 脚本(调用 spark-submit -- 关于选项的注释
    包括)
  • 示例应用程序
  • pom.xml
  • mvn 依赖树输出(显示“缺失”
    commons-compress 库包含在 uber-jar 中)

  • 运行时错误:
    18/09/25 11:47:39 ERROR executor.Executor: Exception in task 1.0 in stage 0.0 (TID 1)
    java.lang.NoSuchMethodError: org.apache.commons.compress.archivers.ArchiveStreamFactory.detect(Ljava/io/InputStream;)Ljava/lang/String;
        at org.apache.tika.parser.pkg.ZipContainerDetector.detectArchiveFormat(ZipContainerDetector.java:160)
        at org.apache.tika.parser.pkg.ZipContainerDetector.detect(ZipContainerDetector.java:104)
        at org.apache.tika.detect.CompositeDetector.detect(CompositeDetector.java:84)
        at org.apache.tika.parser.AutoDetectParser.parse(AutoDetectParser.java:116)
        at org.apache.tika.parser.AutoDetectParser.parse(AutoDetectParser.java:159)
        at com.example.App$.tikaAutoDetectParser(App.scala:55)
        at com.example.App$$anonfun$1.apply(App.scala:69)
        at com.example.App$$anonfun$1.apply(App.scala:69)
        at scala.collection.Iterator$$anon$11.next(Iterator.scala:409)
        at org.apache.spark.util.Utils$.getIteratorSize(Utils.scala:1799)
        at org.apache.spark.rdd.RDD$$anonfun$count$1.apply(RDD.scala:1158)
        at org.apache.spark.rdd.RDD$$anonfun$count$1.apply(RDD.scala:1158)
        at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:2071)
        at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:2071)
        at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)
        at org.apache.spark.scheduler.Task.run(Task.scala:109)
        at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:338)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
        at java.lang.Thread.run(Thread.java:748)
    18/09/25 11:47:39 ERROR executor.Executor: Exception in task 5.0 in stage 0.0 (TID 5)
    java.lang.NoSuchMethodError: org.apache.commons.compress.archivers.ArchiveStreamFactory.detect(Ljava/io/InputStream;)Ljava/lang/String;
        at org.apache.tika.parser.pkg.ZipContainerDetector.detectArchiveFormat(ZipContainerDetector.java:160)
        at org.apache.tika.parser.pkg.ZipContainerDetector.detect(ZipContainerDetector.java:104)
        at org.apache.tika.detect.CompositeDetector.detect(CompositeDetector.java:84)
        at org.apache.tika.parser.AutoDetectParser.parse(AutoDetectParser.java:116)
        at org.apache.tika.parser.AutoDetectParser.parse(AutoDetectParser.java:159)
        at com.example.App$.tikaAutoDetectParser(App.scala:55)
        at com.example.App$$anonfun$1.apply(App.scala:69)
        at com.example.App$$anonfun$1.apply(App.scala:69)
        at scala.collection.Iterator$$anon$11.next(Iterator.scala:409)
        at org.apache.spark.util.Utils$.getIteratorSize(Utils.scala:1799)
        at org.apache.spark.rdd.RDD$$anonfun$count$1.apply(RDD.scala:1158)
        at org.apache.spark.rdd.RDD$$anonfun$count$1.apply(RDD.scala:1158)
        at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:2071)
        at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:2071)
        at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)
        at org.apache.spark.scheduler.Task.run(Task.scala:109)
        at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:338)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
        at java.lang.Thread.run(Thread.java:748)
    

    build-and-run.sh:

    笔记:
  • 我已经尝试在两者中为 userClassPathFirst 添加 --conf 标志
    下面的 master 和 yarn 配置,
  • 使用 --jar 标志指定从 mvn 生成的 uber-jar
    使用 pom.xml 编译(在帖子中进一步提供)

  • 构建和运行.sh
    mvn compile
    
    if true
    then
    spark-submit --master local[*] --class com.example.App ./target/uber-tikaTest-1.19.jar
    fi
    
    # tried the using the userClass flags for driver and executor for above and below calls to spark-submit
    # --conf spark.driver.userClassPathFirst=true \
    # --conf spark.executor.userClassPathFirst=true \
    
    if false 
    then
    spark-submit --class com.example.App \
     --master yarn \
     --packages org.apache.commons:commons-compress:1.18 \
     --jars ./target/uber-tikaTest-1.19.jar \
     --num-executors 2 \
     --executor-memory 1024m \
     --executor-cores 2 \
     --driver-memory 2048m \
     --driver-cores 1 \
     ./target/uber-tikaTest-1.19.jar
    fi
    

    示例应用程序:
    package com.example
    ////////// Tika Imports
    import org.apache.tika.metadata.Metadata
    import org.apache.tika.parser.AutoDetectParser
    import org.apache.tika.sax.BodyContentHandler
    ////////// Java HTTP Imports 
    import java.net.URL;
    import java.net.HttpURLConnection
    import scala.collection.JavaConverters._
    import scala.collection.mutable._
    ////////// Spark Imports 
    import org.apache.spark.SparkConf
    import org.apache.spark.SparkContext
    import org.apache.spark.storage.StorageLevel
    import org.apache.spark.sql.{Row,SparkSession}
    
    
    object App {
      case class InputStreamData(sourceURL: String, headerFields: Map[String,List[String]], inputStream: java.io.InputStream)
    
      def openUrlStream(sourceURL:String,apiKey:String):(InputStreamData) = {
        try {
         val url = new URL(sourceURL)
             val urlConnection = url.openConnection().asInstanceOf[HttpURLConnection] 
         urlConnection.setInstanceFollowRedirects(true)
             val headerFields = urlConnection.getHeaderFields()
             val input = urlConnection.getInputStream()
         InputStreamData(sourceURL, headerFields.asScala.map(x => (x._1,x._2.asScala.toList)), input)
        }
          catch {
          case e: Exception => {
            println("**********************************************************************************************")
            println("PARSEURL: INVALID URL: " + sourceURL)
            println(e.toString())
            println("**********************************************************************************************")
    
            InputStreamData(sourceURL, Map("ERROR" -> List("ERROR")), null)
          }
        }
      }
    
      def tikaAutoDetectParser(inputStream:java.io.InputStream):String = {
        var parser = new AutoDetectParser();
        var handler = new BodyContentHandler(-1);
        var metadata = new Metadata();
        parser.parse(inputStream, handler, metadata);
        return handler.toString()
      }
    
      def main(args : Array[String]) {
        var sparkConf = new SparkConf().setAppName("tika-1.19-test")
        val sc = new SparkContext(sparkConf) 
        val spark = SparkSession.builder.config(sparkConf).getOrCreate()
        println("HELLO!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!")
        var urls = List("http://www.pdf995.com/samples/pdf.pdf", "https://www.amd.com/en", "http://jeroen.github.io/images/testocr.png")
    
        var rdd = sc.parallelize(urls)
        var parsed = rdd.map(x => tikaAutoDetectParser(openUrlStream(x,"").inputStream))
        println(parsed.count)
      }
    }
    

    pom.xml(构建 uber-jar):
    <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
      <modelVersion>4.0.0</modelVersion>
      <groupId>com.example</groupId>
      <artifactId>tikaTest</artifactId>
      <version>1.19</version>
      <name>${project.artifactId}</name>
      <description>Testing tika 1.19 with CDH 6 and 5.x, Spark 2.x, Scala 2.11.x</description>
      <inceptionYear>2018</inceptionYear>
      <licenses>
        <license>
          <name>My License</name>
          <url>http://....</url>
          <distribution>repo</distribution>
        </license>
      </licenses>
    
    
     <repositories>
        <repository>
          <id>cloudera</id>
          <url>https://repository.cloudera.com/artifactory/cloudera-repos/</url>
        </repository>
      </repositories>
    
    <profiles>
        <profile>
            <id>scala-2.11.12</id>
            <activation>
                <activeByDefault>true</activeByDefault>
            </activation>
            <properties>
                <scalaVersion>2.11.12</scalaVersion>
                <scalaBinaryVersion>2.11.12</scalaBinaryVersion>
            </properties>
            <dependencies>
                <!-- ************************************************************************** -->
                <!-- GOOD DEPENDENCIES +++++++++++++++++++++++++++++++++++++ -->
                <!-- ************************************************************************** -->
    
                <!-- https://mvnrepository.com/artifact/org.apache.commons/commons-compress -->
                <dependency>
                    <groupId>org.apache.commons</groupId>
                    <artifactId>commons-compress</artifactId>
                    <version>1.18</version>
                </dependency>
    
                <!-- *************** CDH flavored dependencies ***********************************************-->
                <!-- https://www.cloudera.com/documentation/spark2/latest/topics/spark2_packaging.html#versions -->
                <dependency>
                    <groupId>org.apache.spark</groupId>
                    <artifactId>spark-core_2.11</artifactId>
                    <version>2.2.0.cloudera3</version>
                    <!-- have tried scope provided / compile -->
                    <!--<scope>provided</scope>-->
                </dependency>
                <dependency>
                        <groupId>org.apache.spark</groupId>
                        <artifactId>spark-sql_2.11</artifactId>
                        <version>2.2.0.cloudera3</version>
                        <!-- have tried scope provided / compile -->
                        <!--<scope>provided</scope>-->
                    </dependency>
    
                    <!-- https://mvnrepository.com/artifact/org.apache.tika/tika-core -->
                    <dependency>
                        <groupId>org.apache.tika</groupId>
                        <artifactId>tika-core</artifactId>
                        <version>1.19</version>
                    </dependency>
    
                    <!-- https://mvnrepository.com/artifact/org.apache.tika/tika-parsers -->
                    <dependency>
                        <groupId>org.apache.tika</groupId>
                        <artifactId>tika-parsers</artifactId>
                        <version>1.19</version>
                    </dependency>
    
                    <!-- https://mvnrepository.com/artifact/javax.ws.rs/javax.ws.rs-api -->
                    <dependency>
                        <groupId>javax.ws.rs</groupId>
                        <artifactId>javax.ws.rs-api</artifactId>
                        <version>2.1.1</version>
                    </dependency>
    
                <!-- https://mvnrepository.com/artifact/org.scala-lang/scala-library -->
                    <dependency>
                        <groupId>org.scala-lang</groupId>
                        <artifactId>scala-library</artifactId>
                        <version>2.11.12</version>
                    </dependency>
    
    
                <!-- **************************************************************************************************************************
                **************************** alternative dependencies that have been tried and yield same Tika error***************************
                *******************************************************************************************************************************-->
                <!-- https://mvnrepository.com/artifact/org.apache.spark/spark-core -->
                    <!--
                    <dependency>
                        <groupId>org.apache.spark</groupId>
                        <artifactId>spark-core_2.11</artifactId>
                        <version>2.2.0</version>
                    </dependency>
                    -->
    
                <!-- https://mvnrepository.com/artifact/org.apache.spark/spark-sql -->
                    <!--
                    <dependency>
                        <groupId>org.apache.spark</groupId>
                        <artifactId>spark-sql_2.11</artifactId>
                        <version>2.2.0</version>
                    </dependency>
                    -->
    
                </dependencies>
            </profile>
        </profiles>
    
    
        <build>
            <sourceDirectory>src/main/scala</sourceDirectory>
            <plugins>
                <plugin>
                    <groupId>org.apache.maven.plugins</groupId>
                    <artifactId>maven-compiler-plugin</artifactId>
                    <version>3.5.1</version>
                    <configuration>
                        <source>1.8</source>
                        <target>1.8</target>
                    </configuration>
                </plugin>
                <plugin>
                    <groupId>net.alchim31.maven</groupId>
                    <artifactId>scala-maven-plugin</artifactId>
                    <version>3.2.2</version>
                    <executions>
                        <execution>
                            <goals>
                                <goal>compile</goal>
                                <goal>testCompile</goal>
                            </goals>
                        </execution>
                    </executions>
                    <configuration>
                        <args>
                            <!-- work-around for https://issues.scala-lang.org/browse/SI-8358 -->
                            <arg>-nobootcp</arg>
                        </args>
                    </configuration>
                </plugin>
                <plugin>
                    <groupId>org.apache.maven.plugins</groupId>
                    <artifactId>maven-shade-plugin</artifactId>
                    <version>3.1.1</version>
                    <executions>
                        <execution>
                            <phase>package</phase>
                            <goals>
                                <goal>shade</goal>
                            </goals>
                        </execution>
                    </executions>
                    <configuration>
                        <filters>
                            <filter>
                                <artifact>*:*</artifact>
                                <excludes>
                                    <exclude>META-INF/*.SF</exclude>
                                    <exclude>META-INF/*.DSA</exclude>
                                    <exclude>META-INF/*.RSA</exclude>
                                </excludes>
                            </filter>
                        </filters>
                        <finalName>uber-${project.artifactId}-${project.version}</finalName>
                    </configuration>
                </plugin>
            </plugins>
        </build>
    </project>
    

    mvn 依赖树:

    笔记:

    $ mvn dependency:tree -Ddetail=true | grep 压缩
    [INFO] +- org.apache.commons:commons-compress:jar:1.18:compile
    [INFO] | +- com.ning:compress-lzf:jar:1.0.3:compile
    

    $ mvn dependency:tree -Ddetail=true | grep公地
    [INFO] +- org.apache.commons:commons-compress:jar:1.18:compile
    [INFO] | | | \- commons-collections:commons-collections:jar:3.2.2:compile
    [INFO] | | | +- commons-cli:commons-cli:jar:1.2:compile
    [INFO] | | | +- commons-httpclient:commons-httpclient:jar:3.1:compile
    [INFO] | | | +- commons-configuration:commons-configuration:jar:1.6:compile
    [INFO] | | | | +- commons-digester:commons-digester:jar:1.8:compile
    [INFO] | | | | | \- commons-beanutils:commons-beanutils:jar:1.7.0:compile
    [INFO] | | | | \- commons-beanutils:commons-beanutils-core:jar:1.8.0:compile
    [INFO] | +- org.apache.commons:commons-lang3:jar:3.5:compile
    [INFO] | +- org.apache.commons:commons-math3:jar:3.4.1:compile
    [INFO] | +- commons-net:commons-net:jar:2.2:compile
    [INFO] | +- org.apache.commons:commons-crypto:jar:1.0.0:compile
    [INFO] | | +- org.codehaus.janino:commons-compiler:jar:3.0.8:compile
    [INFO] | | \- commons-lang:commons-lang:jar:2.6:compile
    [INFO] | +- commons-codec:commons-codec:jar:1.11:compile
    [INFO] | | \- org.apache.commons:commons-collections4:jar:4.2:compile
    [INFO] | +- org.apache.commons:commons-exec:jar:1.3:compile
    [INFO] | +- commons-io:commons-io:jar:2.6:compile
    [INFO] | +- org.apache.commons:commons-csv:jar:1.5:compile
    

    最佳答案

    异常的根源是什么?

    这是依赖冲突的结果。

    为什么我不能通过我的 POM.XML 解决它?

    因为它不是 jar 文件中的内部冲突。它与 Apache Spark 有冲突。

    究竟有什么问题?
    Spark 2.x发行版包括旧版本的 commons-compress,而 Tika库依赖于 commons-compress 的 1.18 版图书馆。

    解决方案

    使用 --driver-class-path您的 spark-shell 中的论点或 spark-submit指向 commons-compress 的正确版本图书馆。

    spark-submit 
        --driver-class-path ~/.m2/repository/org/apache/commons/commons-compress/1.18/commons-compress-1.18.jar
        --class {you.main.class}
    ....
    

    我面临着同样的问题。
    在这篇文章中找到答案enter link description here

    关于apache-spark - Spark 2.x + Tika : java. lang.NoSuchMethodError : org. apache.commons.compress.archivers.ArchiveStreamFactory.detect,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/52505752/

    相关文章:

    python - PySpark 数据框 approxQuantile 将结果作为列表返回

    java - 为什么 pyspark 失败并显示 “Error while instantiating ' org.apache.spark.sql.internal.SessionStateBuilder'”?

    scala - Spark 数据帧 : operate on groups

    java - tika默认检测器对于不同应用程序中的相同代码是不同的

    apache-spark - Spark : difference when read in . gz 和 .bz2

    java - Tika - 从文档中检索主要内容

    java - 蒂卡信息提取

    hadoop - 无法将数据加载到 hive 表中

    hadoop - Cloudera CDH4安装

    hadoop - cloudera CDH5安装错误