java - 从 S3 并行读取多个文件(Spark、Java)

标签 java apache-spark amazon-s3

我看到了一些关于此的讨论,但不太理解正确的解决方案: 我想将几百个文件从 S3 加载到 RDD 中。这是我现在的做法:

ObjectListing objectListing = s3.listObjects(new ListObjectsRequest().
                withBucketName(...).
                withPrefix(...));
List<String> keys = new LinkedList<>();
objectListing.getObjectSummaries().forEach(summery -> keys.add(summery.getKey())); // repeat while objectListing.isTruncated()

JavaRDD<String> events = sc.parallelize(keys).flatMap(new ReadFromS3Function(clusterProps));

ReadFromS3Function 使用 AmazonS3 客户端进行实际读取:

    public Iterator<String> call(String s) throws Exception {
        AmazonS3 s3Client = getAmazonS3Client(properties);
        S3Object object = s3Client.getObject(new GetObjectRequest(...));
        InputStream is = object.getObjectContent();
        List<String> lines = new LinkedList<>();
        String str;
        try {
            BufferedReader reader = new BufferedReader(new InputStreamReader(is));
            if (is != null) {
                while ((str = reader.readLine()) != null) {
                    lines.add(str);
                }
            } else {
                ...
            }
        } finally {
            ...
        }
        return lines.iterator();

我从我在 Scala 中看到的相同问题的答案中“翻译”了这个。我认为也可以将整个路径列表传递给 sc.textFile(...),但我不确定哪种方法是最佳做法。

最佳答案

潜在的问题是在 s3 中列出对象真的很慢,而且它看起来像目录树的方式会降低性能,无论什么时候做树状结构(就像路径的通配符模式一样)。

帖子中的代码正在执行所有子列表,这提供了更好的性能,它本质上是 Hadoop 2.8 和 s3a listFiles(path, recursive) 附带的内容,请参阅 HADOOP-13208 .

获得该 list 后,您将获得对象路径的字符串,然后您可以将其映射到 s3a/s3n 路径,以便 spark 作为文本文件输入进行处理,然后您可以将工作应用于

val files = keys.map(key -> s"s3a://$bucket/$key").mkString(",")
sc.textFile(files).map(...)

根据要求,这里是使用的 java 代码。

String prefix = "s3a://" + properties.get("s3.source.bucket") + "/";
objectListing.getObjectSummaries().forEach(summary -> keys.add(prefix+summary.getKey())); 
// repeat while objectListing truncated 
JavaRDD<String> events = sc.textFile(String.join(",", keys))

请注意,我将 s3n 切换为 s3a,因为如果您的 CP 上有 hadoop-awsamazon-sdk JAR,s3a 连接器就是您的连接器应该使用。它更好,而且它是由人们(我)针对 Spark 工作负载进行维护和测试的那个。参见 The history of Hadoop's S3 connectors .

关于java - 从 S3 并行读取多个文件(Spark、Java),我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/41062705/

相关文章:

java - 使用 JAVA 将 Oracle 表数据插入语句

java - LWJGL 相机绕点旋转?

python - "unbound method textFile() must be called with SparkContext instance as first argument (got str instance instead)"

javascript - 将 blob 直接上传到 s3 时出现问题

java - 使 javafx 应用程序在一段时间内消失

java - 将 Java Appp 部署到 App Engine 时出错?无法获取系统 Java 编译器。请使用 JDK,而不是 JRE?

python - Pyspark 'NoneType'对象没有属性 '_jvm'错误

pandas - 使用 ArrayType 列将 UDF 重写为 pandas udf

amazon-s3 - 让 Cognito 用户管理对 "own"S3 文件夹的访问

amazon-s3 - Airflow 不会将日志写入 s3