java - 如何使用 aws-java-sdk 从 S3 中逐 block 读取文件

标签 java amazon-web-services amazon-s3 io aws-java-sdk

我正在尝试从 S3 将大文件读取成 block ,而不切割任何行以进行并行处理。

让我通过例子来解释一下: S3上有1G大小的文件。我想将此文件分成 64 MB 的 block 。这很容易,我可以这样做:

S3Object s3object = s3.getObject(new GetObjectRequest(bucketName, key));

InputStream stream = s3object.getObjectContent();

byte[] content = new byte[64*1024*1024];

while (stream.read(content)  != -1) {

//process content here 

}

但是 block 的问题是它可能有 100 个完整的行和一个不完整的行。但我无法处理不完整的行并且不想丢弃它。

有什么办法可以处理这种情况吗?表示所有卡盘都没有偏线。

最佳答案

我通常的方法( InputStream -> BufferedReader.lines() -> 批量行 -> CompletableFuture )在这里不起作用,因为底层 S3ObjectInputStream对于大文件最终会超时。

所以我创建了一个新类 S3InputStream ,它不关心它打开多长时间,并使用短暂的 AWS SDK 调用按需读取字节 block 。您提供byte[]将被重复使用。 new byte[1 << 24] (16Mb) 看起来效果很好。

package org.harrison;

import java.io.IOException;
import java.io.InputStream;

import com.amazonaws.services.s3.AmazonS3;
import com.amazonaws.services.s3.AmazonS3ClientBuilder;
import com.amazonaws.services.s3.model.GetObjectRequest;

/**
 * An {@link InputStream} for S3 files that does not care how big the file is.
 *
 * @author stephen harrison
 */
public class S3InputStream extends InputStream {
    private static class LazyHolder {
        private static final AmazonS3 S3 = AmazonS3ClientBuilder.defaultClient();
    }

    private final String bucket;
    private final String file;
    private final byte[] buffer;
    private long lastByteOffset;

    private long offset = 0;
    private int next = 0;
    private int length = 0;

    public S3InputStream(final String bucket, final String file, final byte[] buffer) {
        this.bucket = bucket;
        this.file = file;
        this.buffer = buffer;
        this.lastByteOffset = LazyHolder.S3.getObjectMetadata(bucket, file).getContentLength() - 1;
    }

    @Override
    public int read() throws IOException {
        if (next >= length) {
            fill();

            if (length <= 0) {
                return -1;
            }

            next = 0;
        }

        if (next >= length) {
            return -1;
        }

        return buffer[this.next++];
    }

    public void fill() throws IOException {
        if (offset >= lastByteOffset) {
            length = -1;
        } else {
            try (final InputStream inputStream = s3Object()) {
                length = 0;
                int b;

                while ((b = inputStream.read()) != -1) {
                    buffer[length++] = (byte) b;
                }

                if (length > 0) {
                    offset += length;
                }
            }
        }
    }

    private InputStream s3Object() {
        final GetObjectRequest request = new GetObjectRequest(bucket, file).withRange(offset,
                offset + buffer.length - 1);

        return LazyHolder.S3.getObject(request).getObjectContent();
    }
}

关于java - 如何使用 aws-java-sdk 从 S3 中逐 block 读取文件,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/44389194/

相关文章:

javascript - POST 要求每个请求只上传一个文件

python - Boto S3 偶尔会抛出 httplib.IncompleteRead

java - 如果文件大小大于 X mb,则停止记录

java - 继承AppCompatActivity和ListActivity

java - 是否可以将 Process stdout InputStream 读取到 NIO ByteBuffer 中?

amazon-web-services - 如何允许 EC2 实例从操作系统访问其自己的标签

java - 需要正则表达式来对此进行模式化

python - 如何使用 python 在 aws s3 预签名的 url 中隐藏我的访问 key

amazon-web-services - AWS : What is the relationship between S3 bucket policies and user policies?

amazon-web-services - 删除 AWS S3 存储桶时权限不足,无法列出存储桶 "XXXX"的对象