scala - Apache 弗林克 : How to sink stream to Google Cloud Storage File System

标签 scala google-cloud-platform google-cloud-storage apache-flink

我正在尝试将一些数据流写入 Google 云存储文件系统中的文件中,如下所示(使用 Flink 1.8 和 Scala 2.11):

data.addSink(new BucketingSink[(String, Int)]("gs://url-try/try.txt")) 

但是我遇到以下错误:

Exception in thread "main" org.apache.flink.runtime.client.JobExecutionException: Job execution failed.
    at org.apache.flink.runtime.jobmaster.JobResult.toJobExecutionResult(JobResult.java:146)
    at org.apache.flink.runtime.minicluster.MiniCluster.executeJobBlocking(MiniCluster.java:638)
    at org.apache.flink.streaming.api.environment.LocalStreamEnvironment.execute(LocalStreamEnvironment.java:123)
    at org.apache.flink.streaming.api.scala.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.scala:654)

Caused by: java.lang.RuntimeException: Error while creating FileSystem when initializing the state of the BucketingSink.
    at org.apache.flink.streaming.connectors.fs.bucketing.BucketingSink.initializeState(BucketingSink.java:379)
    at org.apache.flink.streaming.util.functions.StreamingFunctionUtils.tryRestoreFunction(StreamingFunctionUtils.java:178)
    at org.apache.flink.streaming.util.functions.StreamingFunctionUtils.restoreFunctionState(StreamingFunctionUtils.java:160)
    at org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.initializeState(AbstractUdfStreamOperator.java:96)
    at org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:278)
    at org.apache.flink.streaming.runtime.tasks.StreamTask.initializeState(StreamTask.java:738)
    at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:289)
    at org.apache.flink.runtime.taskmanager.Task.run(Task.java:711)
    at java.lang.Thread.run(Thread.java:748)
Caused by: org.apache.flink.core.fs.UnsupportedFileSystemSchemeException: Could not find a file system implementation for scheme 'gs'. The scheme is not directly supported by Flink and no Hadoop file system to support this scheme could be loaded.
    at org.apache.flink.core.fs.FileSystem.getUnguardedFileSystem(FileSystem.java:403)
    at org.apache.flink.streaming.connectors.fs.bucketing.BucketingSink.createHadoopFileSystem(BucketingSink.java:1227)
    at org.apache.flink.streaming.connectors.fs.bucketing.BucketingSink.initFileSystem(BucketingSink.java:432)
    at org.apache.flink.streaming.connectors.fs.bucketing.BucketingSink.initializeState(BucketingSink.java:376)
    ... 8 more
Caused by: org.apache.flink.core.fs.UnsupportedFileSystemSchemeException: Cannot support file system for 'gs' via Hadoop, because Hadoop is not in the classpath, or some classes are missing from the classpath.
    at org.apache.flink.runtime.fs.hdfs.HadoopFsFactory.create(HadoopFsFactory.java:179)
    at org.apache.flink.core.fs.FileSystem.getUnguardedFileSystem(FileSystem.java:399)
    ... 11 more
Caused by: java.lang.NoClassDefFoundError: org/apache/hadoop/hdfs/HdfsConfiguration
    at org.apache.flink.runtime.fs.hdfs.HadoopFsFactory.create(HadoopFsFactory.java:85)
    ... 12 more
Caused by: java.lang.ClassNotFoundException: org.apache.hadoop.hdfs.HdfsConfiguration
    at java.net.URLClassLoader.findClass(URLClassLoader.java:382)
    at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
    at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:349)
    at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
    ... 13 more

我看到了几个与此相关的问题,我也这样做了: 环境变量: - FLINK_CONF_DIR

文件 flink-conf.yaml: - fs.hdfs.hadoopconf: src/main/resources/core-site.xml

核心站点.xml:

 >     <property>
>         <name>fs.gs.impl</name>
>         <value>com.google.cloud.hadoop.fs.gcs.
> GoogleHadoopFileSystem</value>
>         <description>The FileSystem for gs: (GCS) uris.</description>
>     </property>
>     <property>
>         <name>fs.AbstractFileSystem.gs.impl</name>
>         <value>com.google.cloud.hadoop.fs.gcs.GoogleHadoopFS</value>
>         <description>The AbstractFileSystem for gs: (GCS)
> uris.</description>

这些是我的 pom 依赖项:

  <dependencies>

        <!-- https://mvnrepository.com/artifact/org.apache.flink/flink-scala -->
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-scala_2.11</artifactId>
            <version>1.8.0</version>
        </dependency>
        <!-- https://mvnrepository.com/artifact/org.apache.flink/flink-streaming-scala -->
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-streaming-scala_2.11</artifactId>
            <version>1.8.0</version>
        </dependency>
        <!-- https://mvnrepository.com/artifact/commons-lang/commons-lang -->
        <dependency>
            <groupId>commons-lang</groupId>
            <artifactId>commons-lang</artifactId>
            <version>2.6</version>
        </dependency>
        <!-- https://mvnrepository.com/artifact/com.google.cloud.bigdataoss/gcs-connector -->
        <dependency>
            <groupId>com.google.cloud.bigdataoss</groupId>
            <artifactId>gcs-connector</artifactId>
            <version>hadoop3-1.9.16</version>
        </dependency>
        <!-- https://mvnrepository.com/artifact/org.apache.flink/flink-hadoop-fs -->
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-hadoop-fs</artifactId>
            <version>1.8.0</version>
        </dependency>
        <!-- https://mvnrepository.com/artifact/org.apache.hadoop/hadoop-core -->
        <dependency>
            <groupId>org.apache.hadoop</groupId>
            <artifactId>hadoop-core</artifactId>
            <version>1.2.1</version>
        </dependency>
        <!-- https://mvnrepository.com/artifact/org.apache.flink/flink-connector-filesystem -->
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-connector-filesystem_2.11</artifactId>
            <version>1.8.0</version>
        </dependency>
        <!-- https://mvnrepository.com/artifact/org.apache.hadoop/hadoop-hdfs -->
        <dependency>
            <groupId>org.apache.hadoop</groupId>
            <artifactId>hadoop-hdfs</artifactId>
            <version>3.2.0</version>
        </dependency>
        <dependency>
            <groupId>com.google.cloud</groupId>
            <artifactId>google-cloud-storage</artifactId>
            <version>1.35.0</version>
        </dependency>

    </dependencies>

有什么帮助吗?

最佳答案

根据您发布的堆栈跟踪,我发现您在尝试使用 Flink 和 Scala 写入 GCS 容器时遇到问题。

所以有类似的post问题已解决,请查看。

如果您还有其他问题,请随时回来。

关于scala - Apache 弗林克 : How to sink stream to Google Cloud Storage File System,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/55795428/

相关文章:

google-cloud-platform - Google 云免费试用计费帐户 1 周后关闭,仍有余额

mysql - 无法显示云sql数据库的内容

arrays - scala中的输入数组

Scala 谓词

docker - Kubernetes 上的暂存和生产

python-3.x - google.cloud storage python api在指定位置创建桶

google-cloud-storage - 文本IO。使用模式 {} 从 GCS 读取多个文件

java - 设置 Gatling 以根据百分比/比率发送请求?

scala - Scala排序是否稳定?

google-app-engine - 在 GCP(应用程序引擎)上运行 python Flask Restplus API 的问题