尝试过的flink版本:1.4.0、1.4.1、1.4.2
当我尝试制作这个简单的 flink 应用程序时
val env: StreamExecutionEnvironment =
StreamExecutionEnvironment.getExecutionEnvironment
env.fromElements("a", "b", "c").addSink(new BucketingSink[String]("file:///Users/joshlemer/projects/my-project/target/output"))
我收到以下运行时异常:
Exception in thread "main" java.lang.NoClassDefFoundError: Lorg/apache/hadoop/fs/FileSystem;
at java.lang.Class.getDeclaredFields0(Native Method)
at java.lang.Class.privateGetDeclaredFields(Class.java:2583)
at java.lang.Class.getDeclaredFields(Class.java:1916)
at org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:72)
at org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.clean(StreamExecutionEnvironment.java:1550)
at org.apache.flink.streaming.api.datastream.DataStream.clean(DataStream.java:184)
at org.apache.flink.streaming.api.datastream.DataStream.addSink(DataStream.java:1134)
at org.apache.flink.streaming.api.scala.DataStream.addSink(DataStream.scala:1036)
at com.company.project.Job$.run(Job.scala:52)
at com.company.project.Job$.main(Job.scala:28)
at com.company.project.Job.main(Job.scala)
Caused by: java.lang.ClassNotFoundException: org.apache.hadoop.fs.FileSystem
at java.net.URLClassLoader.findClass(URLClassLoader.java:381)
at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:335)
at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
即使我可以使用 dataStream.writeAsText(...)
写入文本文件,也是如此。
我的 build.sbt 也非常典型:
val flinkVersion = "1.4.2"
val flinkDependencies =
Seq(
"org.apache.flink" %% "flink-scala" % flinkVersion % "provided",
"org.apache.flink" %% "flink-streaming-scala" % flinkVersion % "provided",
"org.apache.flink" %% "flink-statebackend-rocksdb" % flinkVersion,
"org.apache.flink" %% "flink-connector-kafka-0.11" % flinkVersion,
"org.apache.flink" %% "flink-connector-filesystem" % flinkVersion,
"org.apache.flink" %% "flink-test-utils" % flinkVersion % "test",
"org.apache.flink" % "flink-test-utils-junit" % flinkVersion % "test"
)
Flink 为 IntelliJ 用户推荐的附加 idea.sbt
lazy val mainRunner = project.in(file("mainRunner")).dependsOn(RootProject(file("."))).settings(
// we set all provided dependencies to none, so that they are included in the classpath of mainRunner
libraryDependencies := (libraryDependencies in RootProject(file("."))).value.map{
module =>
if (module.configurations.equals(Some("provided"))) {
module.copy(configurations = None)
} else {
module
}
}
)
这是我用来运行应用程序的(mainRunner 设置为应用程序类路径)。
我很困惑为什么会发生这种情况,特别是为什么包以“Lorg”而不是“org”开头?
谢谢!
最佳答案
Starting with version 1.4, Flink can run without any Hadoop dependencies present in the Classpath. Along with simply running without Hadoop, this enables Flink to dynamically use whatever Hadoop version is available in the classpath.
You could, for example, download the Hadoop-free release of Flink but use that to run on any supported version of YARN, and Flink would dynamically use the Hadoop dependencies from YARN.
This also means that in cases where you used connectors to HDFS, such as the BucketingSink or RollingSink, you now have to ensure that you either use a Flink distribution with bundled Hadoop dependencies or make sure to include Hadoop dependencies when building a jar file for your application.
关于java - Flink BucketingSink 因 NoClassDefFoundError : Lorg/apache/hadoop/fs/FileSystem 崩溃,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/49635713/