java - Spring Boot批量读取/验证不同格式的多个csv文件

标签 java spring spring-boot spring-batch

我正在评估一个特定项目的 Spring Batch,经过大量的网络搜索后,我无法找到满足我的要求的 Spring Batch 解决方案。

我想知道 Spring Batch 是否能够在单个作业中读取由不同格式组成的多个 CSV 文件?例如,假设 Person.csv 和 Address.csv,两者都由不同的格式组成,但相互依赖

我需要读取、处理数据更正(即 toUpperCase 等)并验证每条记录。

如果出现验证错误,我需要将错误记录到某种对象数组中,稍后在验证完成后可以将其通过电子邮件发送给最终用户进行更正。

验证两个文件中的所有数据并且两个文件中均未发生验证错误后,继续执行批处理写入器。如果这两个文件中的任何一个发生任何错误,我需要停止整个作业。如果发生错误时写入器已经开始写入数据库,则无论相对文件中是否存在错误,都需要回滚整个作业。

如果这两个 CSV 文件中的任何一个存在任何类型的验证错误,我都无法插入其中任何一个。必须将错误通知最终用户。这些错误将用于在重新处理文件之前进行任何必要的更正。

SpringBoot 2 中的 Spring 批处理能够实现这种行为吗?

示例

人物.csv

BatchId, personId, firstName, lastName

地址.csv

BatchId, personId, address1

在上面的例子中,两个文件之间的关系是batchId和personId。如果这两个文件中的任何一个存在任何类型的验证错误,我必须使整个批处理失败。我想完成对这两个文件的验证,以便我可以响应所有错误,但只是不写入数据库。

最佳答案

I'm wondering if spring batch is capable of reading multiple CSV files made up of different formats in a single job?

是的,您可以有一个包含多个步骤的作业,每个步骤处理给定类型的文件。重点是如何设计工作。您可以应用的一种技术是使用暂存表。批处理作业可以创建临时暂存表,在其中加载所需的所有数据,然后在完成后删除它们。

在您的情况下,您可以通过两个步骤将每个文件加载到特定的暂存表中。每个步骤都可以应用特定于每个文件的验证逻辑。如果这些步骤之一失败,您的作业就会失败。临时表可以有一个用于无效记录的标记列(这对于报告很有用)。

完成这两个准备步骤后,您可以在另一个步骤中从两个临时表中读取数据,并对连接的数据应用交叉验证规则(例如从两个表中选择并通过 BatchId 连接)和 PersonId)。如果此步骤失败,则作业失败。否则,您可以在适当的地方写入数据。

此技术的优点是在整个作业期间数据在暂存表中可用。因此,每当验证步骤失败时,您都可以使用流程将失败的步骤重定向到“报告步骤”(读取无效数据并发送报告),然后使作业失败。这是一个您可以使用的独立示例:

import org.springframework.batch.core.Job;
import org.springframework.batch.core.JobParameters;
import org.springframework.batch.core.Step;
import org.springframework.batch.core.configuration.annotation.EnableBatchProcessing;
import org.springframework.batch.core.configuration.annotation.JobBuilderFactory;
import org.springframework.batch.core.configuration.annotation.StepBuilderFactory;
import org.springframework.batch.core.launch.JobLauncher;
import org.springframework.batch.repeat.RepeatStatus;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.ApplicationContext;
import org.springframework.context.annotation.AnnotationConfigApplicationContext;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

@Configuration
@EnableBatchProcessing
public class FlowJobSample {

    @Autowired
    private JobBuilderFactory jobs;

    @Autowired
    private StepBuilderFactory steps;

    @Bean
    public Step personLoadingStep() {
        return steps.get("personLoadingStep")
                .tasklet((contribution, chunkContext) -> {
                    System.out.println("personLoadingStep");
                    return RepeatStatus.FINISHED;
                })
                .build();
    }

    @Bean
    public Step addressLoadingStep() {
        return steps.get("addressLoadingStep")
                .tasklet((contribution, chunkContext) -> {
                    System.out.println("addressLoadingStep");
                    return RepeatStatus.FINISHED;
                })
                .build();
    }

    @Bean
    public Step crossValidationStep() {
        return steps.get("crossValidationStep")
                .tasklet((contribution, chunkContext) -> {
                    System.out.println("crossValidationStep");
                    return RepeatStatus.FINISHED;
                })
                .build();
    }

    @Bean
    public Step reportingStep() {
        return steps.get("reportingStep")
                .tasklet((contribution, chunkContext) -> {
                    System.out.println("reportingStep");
                    return RepeatStatus.FINISHED;
                })
                .build();
    }

    @Bean
    public Job job() {
        return jobs.get("job")
                .start(personLoadingStep()).on("INVALID").to(reportingStep())
                    .from(personLoadingStep()).on("*").to(addressLoadingStep())
                    .from(addressLoadingStep()).on("INVALID").to(reportingStep())
                    .from(addressLoadingStep()).on("*").to(crossValidationStep())
                    .from(crossValidationStep()).on("INVALID").to(reportingStep())
                    .from(crossValidationStep()).on("*").end()
                    .from(reportingStep()).on("*").fail()
                    .build()
                .build();
    }

    public static void main(String[] args) throws Exception {
        ApplicationContext context = new AnnotationConfigApplicationContext(FlowJobSample.class);
        JobLauncher jobLauncher = context.getBean(JobLauncher.class);
        Job job = context.getBean(Job.class);
        jobLauncher.run(job, new JobParameters());
    }

}

要使其中一个步骤失败,请将退出状态设置为 INVALID,例如:

@Bean
public Step personLoadingStep() {
    return steps.get("personLoadingStep")
            .tasklet((contribution, chunkContext) -> {
                System.out.println("personLoadingStep");
                chunkContext.getStepContext().getStepExecution().setExitStatus(new ExitStatus("INVALID"));
                return RepeatStatus.FINISHED;
            })
            .build();
}

我希望这会有所帮助。

关于java - Spring Boot批量读取/验证不同格式的多个csv文件,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/52068954/

相关文章:

hibernate - 多环境配置的maven应用无法部署到tomcat上

java - GL.createCapability 抛出 NullPointerException

java - 使用 Java 转换 XML 有哪些不同的方法?

java - 不同 Java 版本的应用程序在 Tomcat 6 服务器中不工作

java - 在删除之前首先检查数据库是否存在实体是有意义的,例如使用 Spring

java - 当授权 header 存在时,Spring Security 忽略路径过滤器仍然执行

spring - 如何将 OAuth2 session 存储到数据库中并在 Spring Boot 服务器之间共享

java - 是否可以从外部闪存驱动器运行 java (jdk-windows)、Eclipse (indigo) 和 Android-sdk 用于 android 应用程序开发

java - Spring继承特性

maven - 是否有任何与 Java 13 兼容的 SonarQube 版本