java - Flink BucketingSink 因 NoClassDefFoundError : Lorg/apache/hadoop/fs/FileSystem 崩溃

标签 java scala apache-flink

尝试过的flink版本:1.4.0、1.4.1、1.4.2

当我尝试制作这个简单的 flink 应用程序时

val env: StreamExecutionEnvironment = 
  StreamExecutionEnvironment.getExecutionEnvironment


env.fromElements("a", "b", "c").addSink(new BucketingSink[String]("file:///Users/joshlemer/projects/my-project/target/output"))  

我收到以下运行时异常:

Exception in thread "main" java.lang.NoClassDefFoundError: Lorg/apache/hadoop/fs/FileSystem;
    at java.lang.Class.getDeclaredFields0(Native Method)
    at java.lang.Class.privateGetDeclaredFields(Class.java:2583)
    at java.lang.Class.getDeclaredFields(Class.java:1916)
    at org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:72)
    at org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.clean(StreamExecutionEnvironment.java:1550)
    at org.apache.flink.streaming.api.datastream.DataStream.clean(DataStream.java:184)
    at org.apache.flink.streaming.api.datastream.DataStream.addSink(DataStream.java:1134)
    at org.apache.flink.streaming.api.scala.DataStream.addSink(DataStream.scala:1036)
    at com.company.project.Job$.run(Job.scala:52)
    at com.company.project.Job$.main(Job.scala:28)
    at com.company.project.Job.main(Job.scala)
Caused by: java.lang.ClassNotFoundException: org.apache.hadoop.fs.FileSystem
    at java.net.URLClassLoader.findClass(URLClassLoader.java:381)
    at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
    at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:335)
    at java.lang.ClassLoader.loadClass(ClassLoader.java:357)  

即使我可以使用 dataStream.writeAsText(...) 写入文本文件,也是如此。

我的 build.sbt 也非常典型:

val flinkVersion = "1.4.2"

val flinkDependencies =
  Seq(
    "org.apache.flink" %% "flink-scala" % flinkVersion % "provided",
    "org.apache.flink" %% "flink-streaming-scala" % flinkVersion % "provided",
    "org.apache.flink" %% "flink-statebackend-rocksdb" % flinkVersion,
    "org.apache.flink" %% "flink-connector-kafka-0.11" % flinkVersion,
    "org.apache.flink" %% "flink-connector-filesystem" % flinkVersion,
    "org.apache.flink" %% "flink-test-utils" % flinkVersion % "test",
    "org.apache.flink" % "flink-test-utils-junit" % flinkVersion % "test"
  )

Flink 为 IntelliJ 用户推荐的附加 idea.sbt

lazy val mainRunner = project.in(file("mainRunner")).dependsOn(RootProject(file("."))).settings(
  // we set all provided dependencies to none, so that they are included in the classpath of mainRunner
  libraryDependencies := (libraryDependencies in RootProject(file("."))).value.map{
    module =>
      if (module.configurations.equals(Some("provided"))) {
        module.copy(configurations = None)
      } else {
        module
      }
  }
)

这是我用来运行应用程序的(mainRunner 设置为应用程序类路径)。

我很困惑为什么会发生这种情况,特别是为什么包以“Lorg”而不是“org”开头?

谢谢!

最佳答案

来自1.4 release notes :

Starting with version 1.4, Flink can run without any Hadoop dependencies present in the Classpath. Along with simply running without Hadoop, this enables Flink to dynamically use whatever Hadoop version is available in the classpath.

You could, for example, download the Hadoop-free release of Flink but use that to run on any supported version of YARN, and Flink would dynamically use the Hadoop dependencies from YARN.

This also means that in cases where you used connectors to HDFS, such as the BucketingSink or RollingSink, you now have to ensure that you either use a Flink distribution with bundled Hadoop dependencies or make sure to include Hadoop dependencies when building a jar file for your application.

关于java - Flink BucketingSink 因 NoClassDefFoundError : Lorg/apache/hadoop/fs/FileSystem 崩溃,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/49635713/

相关文章:

java - 通过 for 循环将子列表添加到 arraylist

java - 为什么jetty服务器在发送请求时不同意协议(protocol)?

java - java.lang.NoClassDefFoundError : Could not initialize class java test step definition class

java - Apache Spark 我在这里坚持了什么?

scala - 通用 T 作为 Spark Dataset[T] 构造函数

scala - 选择Akka还是Spark进行并行处理?

java - 在 Java 中将对象列表添加到 flink 表的最佳方法是什么?

java - 验证字符串中的文本、运算符和 ([ ]) 括号

apache-flink - Flink 如何处理 IterativeStream 中的检查点和状态?

apache-flink - Apache Flink flatMap 具有数百万个输出