scala - 配置后端状态以使用 hdfs 时出错

标签 scala hadoop apache-flink

我正在尝试将后端状态设置为 hdfs

val stateUri = "hdfs/path_to_dir"
val backend: RocksDBStateBackend = new RocksDBStateBackend(stateUri, true)
env.setStateBackend(backend)

我正在运行具有以下依赖项的 flink 1.7.0(我尝试了所有组合):

   "org.apache.flink"    %% "flink-connector-filesystem"         % flinkV
"org.apache.flink"    % "flink-hadoop-fs"                     % flinkV
"org.apache.hadoop"   % "hadoop-hdfs"                         % hadoopVersion
"org.apache.hadoop"   % "hadoop-common"                       % hadoopVersion

但是在运行 jar 时出现此错误:

Caused by: org.apache.flink.core.fs.UnsupportedFileSystemSchemeException: Could not find a file system implementation for scheme 'hdfs'. The scheme is not directly supported by Flink and no Hadoop file system to support this scheme could be loaded.
    at org.apache.flink.core.fs.FileSystem.getUnguardedFileSystem(FileSystem.java:403)
    at org.apache.flink.core.fs.FileSystem.get(FileSystem.java:318)
    at org.apache.flink.core.fs.Path.getFileSystem(Path.java:298)
    at org.apache.flink.runtime.state.filesystem.FsCheckpointStorage.<init>(FsCheckpointStorage.java:58)
    at org.apache.flink.runtime.state.filesystem.FsStateBackend.createCheckpointStorage(FsStateBackend.java:444)
    at org.apache.flink.contrib.streaming.state.RocksDBStateBackend.createCheckpointStorage(RocksDBStateBackend.java:407)
    at org.apache.flink.runtime.checkpoint.CheckpointCoordinator.<init>(CheckpointCoordinator.java:249)
    ... 17 more
Caused by: org.apache.flink.core.fs.UnsupportedFileSystemSchemeException: Hadoop is not in the classpath/dependencies.
    at org.apache.flink.core.fs.UnsupportedSchemeFactory.create(UnsupportedSchemeFactory.java:64)
    at org.apache.flink.core.fs.FileSystem.getUnguardedFileSystem(FileSystem.java:399)
    ... 23 more

任何帮助将不胜感激

最佳答案

为了访问 hdfs:// 路径,只要你有 flink,就没有必要将 flink-hadoop-fs 与你的工作捆绑在一起-shaded-hadoop2-uber-1.8-SNAPSHOT.jar 位于 Flink 安装的 lib 文件夹中。

如果您的 lib 文件夹中没有此依赖项,那么我建议使用 flink-fs-hadoop-shaded 作为依赖项,因为它也会重新定位Hadoop 依赖项。

此外,重要的是,此依赖项也包含在生成的作业 jar 中。因此,请确保您使用 sbt-assembly 插件创建了一个 uber-jar。

关于scala - 配置后端状态以使用 hdfs 时出错,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/53851407/

相关文章:

java - 为什么 Apache Flink 从数据流中删除事件?

apache-flink - 如何在 apache kafka 连接器中实现 exactly once 语义

scala - 启动scala脚本的两种方式,哪种方式更可取?

hadoop - Hadoop 如何管理负载均衡

hadoop-yarn - 启用 HA 的 YARN 上的 Flink 在尝试恢复时使所有 RM 崩溃

hadoop - 无法更改HDFS上文件夹的权限。没有打印任何内容作为其原因。

shell - Linux Mint上的Hadoop HBase脚本产生奇怪的错误

scala - sbt 自动插件 - 禁用它们,但只针对一个子项目

java - Spark Structured Streaming 自动将时间戳转换为本地时间

json - 这是从 S3 : Spark 读取 Json 文件的最快方法