amazon-s3 - 尝试使用 Apache Beam 读取/写入时出现 "No filesystem found for scheme s3"

标签 amazon-s3 kotlin apache-beam

我第一次开始在一个项目中使用 Apache Beam,我想做的是从 AWS 上的 EMR 集群读取和从 S3 写入 Parquet 文件。

但是,每次我尝试执行我的代码时,我只得到:

java.lang.IllegalArgumentException: No filesystem found for scheme s3
at org.apache.beam.sdk.io.FileSystems.getFileSystemInternal(FileSystems.java:459)
at org.apache.beam.sdk.io.FileSystems.match(FileSystems.java:119)
at org.apache.beam.sdk.io.FileSystems.match(FileSystems.java:140)
at org.apache.beam.sdk.io.FileSystems.match(FileSystems.java:152)
at org.apache.beam.sdk.io.FileIO$MatchAll$MatchFn.process(FileIO.java:636)

该文档没有提供任何示例,因此我不知道是否必须在代码中的任何位置初始化某些内容。

我试图检查 Beam 源代码,但据我了解,FileSystems 类应该注册所有文件系统模块,并且我的 pom.xml 包含 Amazon Web Services Beam 模块(它又带来了 AWS S3 模块)。

我现在正在做的唯一初始化 block 是:
val options = PipelineOptionsFactory.create()
options.runner = SparkRunner::class.java
val pipeline = Pipeline.create(options)
...
val runner = SparkRunner.fromOptions(options)
runner.run(pipeline).waitUntilFinish()

Spark 开始正常运行,直到出现异常。

有什么建议吗?

最佳答案

我相信您需要为代表 Apache Beam 作业选项的 AWS 凭证创建一个自定义类。

BasicAWSCredentials awsCreds = new BasicAWSCredentials(accessKey, secretKey);
YourCustomOptionsClass options = PipelineOptionsFactory.create().as(YourCustomOptionsClass.class);
options.as(AwsOptions.class).setAwsCredentialsProvider(new AWSStaticCredentialsProvider(awsCreds));
options.as(AwsOptions.class).setAwsRegion(region);
options.setRunner(DataflowRunner.class);
options.setProject(projectId);

options.set... (All other options you need)

在我的代码中 YourCustomOptionClass 正在实现 S3Options 和 DataflowPipelineOptions

要了解有关创建自定义选项的更多信息,请查看 apache beam 文档
https://beam.apache.org/documentation/programming-guide/#configuring-pipeline-options

其他可能有帮助的完整示例:

https://github.com/asaharland/beam-pipeline-examples/tree/master/src/main/java/com/harland/example/batch

关于amazon-s3 - 尝试使用 Apache Beam 读取/写入时出现 "No filesystem found for scheme s3",我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/52041043/

相关文章:

reactjs - 如何使用 spring boot 显示来自 AWS s3 的图像并使用react?

ios - 将图像从 Parse 移至 S3 AWS

java - Kotlin:如何从 java 类调用 setter 和 getter?

android-studio - super 新手 - Android Studio 模拟器抛出 INSTALL_PARSE_FAILED_NO_CERTIFICATES 错误

java - 我们可以将avro文件写入动态创建的GCS存储桶(基于tenantID)吗?

amazon-s3 - 使用 s3cmd 覆盖文件

ios - 文件上传到适用于 iOS 的 Amazon S3 的进度条?

java - 如何使用 Apache Beam (KafkaIO) 反序列化 avro 数据

android - 撰写 UI 测试 - 如何断言文本颜色?

google-cloud-dataflow - com.google.cloud.spanner.SpannerException : DEADLINE_EXCEEDED