java - 使用 SpringBatch 的 ItemReaders 问题读取 S3 资源

标签 java amazon-s3 spring-batch spring-cloud spring-cloud-aws

我有一个 Spring Batch 作业,它从 S3 存储桶读取一堆文件,处理它们,然后将其发送到数据库,在多线程配置中执行此操作。 application.properties 文件包含以下内容:

cloud.aws.credentials.accessKey=accessKey 
cloud.aws.credentials.secretKey=secret
cloud.aws.region.static=us-east-1
cloud.aws.credentials.instanceProfile=true 
cloud.aws.stack.auto=false

我的项目阅读器:

@Bean
ItemReader<DataRecord> itemReader() {
    FlatFileItemReader<DataRecord> flatFileItemReader = new FlatFileItemReader<>();
    flatFileItemReader.setLinesToSkip(0);
    flatFileItemReader.setLineMapper(new DataRecord.DataRecordLineMapper());
    flatFileItemReader.setSaveState(false);

    MultiResourceItemReader<DataRecord> multiResourceItemReader = new MultiResourceItemReader<>();
    multiResourceItemReader.setDelegate(flatFileItemReader);
    multiResourceItemReader.setResources(loadS3Resources(null, null));
    multiResourceItemReader.setSaveState(false);

    SynchronizedItemStreamReader<DataRecord> itemStreamReader = new SynchronizedItemStreamReader<>();
    itemStreamReader.setDelegate(multiResourceItemReader);
    return itemStreamReader;
}

还有我的任务执行器:

@Bean
TaskExecutor taskExecutor() {
    ThreadPoolTaskExecutor threadPoolTaskExecutor = new ThreadPoolTaskExecutor();
    threadPoolTaskExecutor.setCorePoolSize(Runtime.getRuntime().availableProcessors());
    return threadPoolTaskExecutor;
}

该作业仅包含一个步骤,即从文件中读取、处理它们,然后写入数据库。在此配置下,资源被加载,作业启动,步骤对第一个资源的约 240k 第一行进行处理(有 7 个资源,每个资源有 1.2M 行)。然后我得到以下异常:

org.springframework.batch.item.file.NonTransientFlatFileException: Unable to read from resource: [Amazon s3 resource [bucket='my-bucket' and object='output/part-r-00000']]
at org.springframework.batch.item.file.FlatFileItemReader.readLine(FlatFileItemReader.java:220) ~[spring-batch-infrastructure-3.0.7.RELEASE.jar!/:3.0.7.RELEASE]
at org.springframework.batch.item.file.FlatFileItemReader.doRead(FlatFileItemReader.java:173) ~[spring-batch-infrastructure-3.0.7.RELEASE.jar!/:3.0.7.RELEASE]
at org.springframework.batch.item.support.AbstractItemCountingItemStreamItemReader.read(AbstractItemCountingItemStreamItemReader.java:88) ~[spring-batch-infrastructure-3.0.7.RELEASE.jar!/:3.0.7.RELEASE]
at org.springframework.batch.item.file.MultiResourceItemReader.readFromDelegate(MultiResourceItemReader.java:140) ~[spring-batch-infrastructure-3.0.7.RELEASE.jar!/:3.0.7.RELEASE]
at org.springframework.batch.item.file.MultiResourceItemReader.readNextItem(MultiResourceItemReader.java:119) ~[spring-batch-infrastructure-3.0.7.RELEASE.jar!/:3.0.7.RELEASE]
at org.springframework.batch.item.file.MultiResourceItemReader.read(MultiResourceItemReader.java:108) ~[spring-batch-infrastructure-3.0.7.RELEASE.jar!/:3.0.7.RELEASE]
at org.springframework.batch.item.support.SynchronizedItemStreamReader.read(SynchronizedItemStreamReader.java:55) ~[spring-batch-infrastructure-3.0.7.RELEASE.jar!/:3.0.7.RELEASE]
at org.springframework.batch.core.step.item.SimpleChunkProvider.doRead(SimpleChunkProvider.java:91) ~[spring-batch-core-3.0.7.RELEASE.jar!/:3.0.7.RELEASE]
at org.springframework.batch.core.step.item.SimpleChunkProvider.read(SimpleChunkProvider.java:157) ~[spring-batch-core-3.0.7.RELEASE.jar!/:3.0.7.RELEASE]
at org.springframework.batch.core.step.item.SimpleChunkProvider$1.doInIteration(SimpleChunkProvider.java:116) ~[spring-batch-core-3.0.7.RELEASE.jar!/:3.0.7.RELEASE]
at org.springframework.batch.repeat.support.RepeatTemplate.getNextResult(RepeatTemplate.java:374) ~[spring-batch-infrastructure-3.0.7.RELEASE.jar!/:3.0.7.RELEASE]
at org.springframework.batch.repeat.support.RepeatTemplate.executeInternal(RepeatTemplate.java:215) ~[spring-batch-infrastructure-3.0.7.RELEASE.jar!/:3.0.7.RELEASE]
at org.springframework.batch.repeat.support.RepeatTemplate.iterate(RepeatTemplate.java:144) ~[spring-batch-infrastructure-3.0.7.RELEASE.jar!/:3.0.7.RELEASE]
at org.springframework.batch.core.step.item.SimpleChunkProvider.provide(SimpleChunkProvider.java:110) ~[spring-batch-core-3.0.7.RELEASE.jar!/:3.0.7.RELEASE]
at org.springframework.batch.core.step.item.ChunkOrientedTasklet.execute(ChunkOrientedTasklet.java:69) ~[spring-batch-core-3.0.7.RELEASE.jar!/:3.0.7.RELEASE]
at org.springframework.batch.core.step.tasklet.TaskletStep$ChunkTransactionCallback.doInTransaction(TaskletStep.java:406) ~[spring-batch-core-3.0.7.RELEASE.jar!/:3.0.7.RELEASE]
at org.springframework.batch.core.step.tasklet.TaskletStep$ChunkTransactionCallback.doInTransaction(TaskletStep.java:330) ~[spring-batch-core-3.0.7.RELEASE.jar!/:3.0.7.RELEASE]
at org.springframework.transaction.support.TransactionTemplate.execute(TransactionTemplate.java:133) ~[spring-tx-4.3.9.RELEASE.jar!/:4.3.9.RELEASE]
at org.springframework.batch.core.step.tasklet.TaskletStep$2.doInChunkContext(TaskletStep.java:271) ~[spring-batch-core-3.0.7.RELEASE.jar!/:3.0.7.RELEASE]
at org.springframework.batch.core.scope.context.StepContextRepeatCallback.doInIteration(StepContextRepeatCallback.java:81) ~[spring-batch-core-3.0.7.RELEASE.jar!/:3.0.7.RELEASE]
at org.springframework.batch.repeat.support.TaskExecutorRepeatTemplate$ExecutingRunnable.run(TaskExecutorRepeatTemplate.java:262) ~[spring-batch-infrastructure-3.0.7.RELEASE.jar!/:3.0.7.RELEASE]
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) ~[na:1.8.0_65]
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) ~[na:1.8.0_65]
at java.lang.Thread.run(Thread.java:745) ~[na:1.8.0_65]
Caused by: javax.net.ssl.SSLException: SSL peer shut down incorrectly
at sun.security.ssl.InputRecord.readV3Record(InputRecord.java:596) ~[na:1.8.0_65]
at sun.security.ssl.InputRecord.read(InputRecord.java:532) ~[na:1.8.0_65]
at sun.security.ssl.SSLSocketImpl.readRecord(SSLSocketImpl.java:973) ~[na:1.8.0_65]
at sun.security.ssl.SSLSocketImpl.readDataRecord(SSLSocketImpl.java:930) ~[na:1.8.0_65]
at sun.security.ssl.AppInputStream.read(AppInputStream.java:105) ~[na:1.8.0_65]
at org.apache.http.impl.io.SessionInputBufferImpl.streamRead(SessionInputBufferImpl.java:137) ~[httpcore-4.4.6.jar!/:4.4.6]
at org.apache.http.impl.io.SessionInputBufferImpl.read(SessionInputBufferImpl.java:198) ~[httpcore-4.4.6.jar!/:4.4.6]
at org.apache.http.impl.io.ContentLengthInputStream.read(ContentLengthInputStream.java:176) ~[httpcore-4.4.6.jar!/:4.4.6]
at org.apache.http.conn.EofSensorInputStream.read(EofSensorInputStream.java:135) ~[httpclient-4.5.3.jar!/:4.5.3]
at com.amazonaws.internal.SdkFilterInputStream.read(SdkFilterInputStream.java:72) ~[aws-java-sdk-core-1.11.125.jar!/:na]
at com.amazonaws.event.ProgressInputStream.read(ProgressInputStream.java:180) ~[aws-java-sdk-core-1.11.125.jar!/:na]
at com.amazonaws.internal.SdkFilterInputStream.read(SdkFilterInputStream.java:72) ~[aws-java-sdk-core-1.11.125.jar!/:na]
at com.amazonaws.services.s3.internal.S3AbortableInputStream.read(S3AbortableInputStream.java:117) ~[aws-java-sdk-s3-1.11.125.jar!/:na]
at com.amazonaws.internal.SdkFilterInputStream.read(SdkFilterInputStream.java:72) ~[aws-java-sdk-core-1.11.125.jar!/:na]
at com.amazonaws.internal.SdkFilterInputStream.read(SdkFilterInputStream.java:72) ~[aws-java-sdk-core-1.11.125.jar!/:na]
at com.amazonaws.internal.SdkFilterInputStream.read(SdkFilterInputStream.java:72) ~[aws-java-sdk-core-1.11.125.jar!/:na]
at com.amazonaws.event.ProgressInputStream.read(ProgressInputStream.java:180) ~[aws-java-sdk-core-1.11.125.jar!/:na]
at com.amazonaws.internal.SdkFilterInputStream.read(SdkFilterInputStream.java:72) ~[aws-java-sdk-core-1.11.125.jar!/:na]
at com.amazonaws.util.LengthCheckInputStream.read(LengthCheckInputStream.java:107) ~[aws-java-sdk-core-1.11.125.jar!/:na]
at com.amazonaws.internal.SdkFilterInputStream.read(SdkFilterInputStream.java:72) ~[aws-java-sdk-core-1.11.125.jar!/:na]
at sun.nio.cs.StreamDecoder.readBytes(StreamDecoder.java:284) ~[na:1.8.0_65]
at sun.nio.cs.StreamDecoder.implRead(StreamDecoder.java:326) ~[na:1.8.0_65]
at sun.nio.cs.StreamDecoder.read(StreamDecoder.java:178) ~[na:1.8.0_65]
at java.io.InputStreamReader.read(InputStreamReader.java:184) ~[na:1.8.0_65]
at java.io.BufferedReader.fill(BufferedReader.java:161) ~[na:1.8.0_65]
at java.io.BufferedReader.readLine(BufferedReader.java:324) ~[na:1.8.0_65]
at java.io.BufferedReader.readLine(BufferedReader.java:389) ~[na:1.8.0_65]
at org.springframework.batch.item.file.FlatFileItemReader.readLine(FlatFileItemReader.java:201) ~[spring-batch-infrastructure-3.0.7.RELEASE.jar!/:3.0.7.RELEASE]
... 23 common frames omitted

我想知道是否有一个简单的方法可以解决这个问题。目前我正在考虑制作文件的本地副本,然后从这些文件中读取,但我想知道是否可以通过某些配置来避免此异常。

谢谢!

最佳答案

我的猜测是一个线程关闭了 SFTP session ,而另一个线程仍在处理。

最好使用MultiResourcePartitioner为每个资源(文件)创建一个分区,然后让读者单独选取每个文件作为它自己的分区。通过该配置,您也不再需要 MultiResourceItemReader(您可以直接转到委托(delegate))。

请参阅此处的示例 https://github.com/spring-projects/spring-batch/blob/master/spring-batch-samples/src/main/resources/jobs/partitionFileJob.xml

另请参阅 How to apply partitioned count for MultiResourceItemReader?

关于java - 使用 SpringBatch 的 ItemReaders 问题读取 S3 资源,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/45360253/

相关文章:

java - 如何断言断言的数量(或强制所有成员都已在断言中进行测试的其他方式)?

java - 从文本文件计算成绩

java - 从 ArrayList 中删除一个元素

java - 如何获取作业参数?

java - 使用 JPA 事务处理 Spring 批处理错误

spring - 在 spring batch 框架中, 'lazy-init=true' 和 'scope=step' 有什么区别?

java - 用于选择多个值的 hibernate 查询

amazon-s3 - 如何在 Amazon SimpleDB 中存储指向 S3 对象的指针?

java - 带有 Java API 的 Amazon S3 和 Cloudfront

amazon-s3 - 使用 get_contents_to_filename 需要什么角色权限?