java - Spring Batch 重复调用某个项目的处理器

标签 java spring spring-batch batch-processing bean-io

我们正在使用 Spring Batch 将管道分隔文件 ETL 到数据库中。文件中的每条记录都有许多字段,并由 ClaimNumber 标识:

ClaimNumber|AdjustmentVersion|.....
0038017282|3|....
0071517729|3|....
0081517745|3|....

在批处理步骤中使用常规的读取-处理-写入流程:

<step id="stagingDataDump" next="gatherStats">
            <tasklet>
                <!-- <chunk reader="genericBatchItemReader" writer="genericBatchItemWriter" -->
                <chunk reader="genericBatchItemReader" writer="compositeWriter" processor="validationProcessor"
                    commit-interval="1000" skip-limit="100000" > 
                    <skippable-exception-classes>
                        <batch:include  class="org.springframework.batch.item.file.FlatFileParseException" />
                        <batch:include  class="org.beanio.BeanIOException" />
                    </skippable-exception-classes>
                </chunk>
                <listeners>
                    <listener ref="genericItemSkipListener"/>
                </listeners>
            </tasklet>
        </step>

读者使用 BeanIOFlatFileItemReader :

<bean id="genericBatchItemReader"   class="org.beanio.spring.BeanIOFlatFileItemReader"  scope="step"
        p:streamMapping="classpath:beanio-mapping.xml" 
        p:streamName="#{jobParameters[feedProcessorLauncherImpl.BEANIO_STREAM_MAPPING]}" 
        p:resource="file://#{jobParameters[feedProcessorLauncherImpl.RESOURCE_FILE_NAME_UNENCRYPTED]}" 
        p:errorHandler-ref="beanIoRecordErrorHandler"/>

处理器阶段封装了项目验证:

<util:map id="handlerRegistryContents">
        <entry key="#{T(org.fuwt.iws.claims.service.filemanagement.ClaimsEnums$ContentSubType).MEDI}" value-ref="medicalClaimsValidator"/>
        <entry key="#{T(org.fuwt.iws.claims.service.filemanagement.ClaimsEnums$ContentSubType).LAB}" value-ref="labClaimsValidator"/>
        <entry key="#{T(org.fuwt.iws.claims.service.filemanagement.ClaimsEnums$ContentSubType).RXPD}" value-ref="pharmaClaimsValidator"/>
</util:map>

<bean id="validationProcessor"  class="org.fuwt.iws.claims.validation.springbatch.ValidationProcessor"  scope="step">

    <property name="handlerRegistry" ref="handlerRegistryContents"/>

</bean> 

写入是复合的:

    <bean id="genericBatchItemWriter"   class="org.fuwt.iws.claims.springbatch.GenericBatchItemWriter"  scope="step"
        p:metadataId="#{jobParameters[feedProcessorLauncherImpl.METADATA_ID]}"/>

    <bean id="softValidationsItemWriter"    class="org.fuwt.iws.claims.springbatch.SoftValidationsItemWriter"   scope="step"
        p:metadataId="#{jobParameters[feedProcessorLauncherImpl.METADATA_ID]}"/>

<bean id="compositeWriter" class="org.springframework.batch.item.support.CompositeItemWriter" scope="step">
        <property name="delegates">
            <list>
                <!-- Order here is significant as ID's, which are generated by the first writer - genericBatchItemWriter - need to be passed around -->
                <ref bean="genericBatchItemWriter"/>
                <ref bean="softValidationsItemWriter"/>
            </list>
        </property>
    </bean> 

在上述处理/验证步骤中,ValidationProcessor确定记录的类型,并据此实例化适当的复合 validator (本例中为 MedicalClaimsValidator ),其中配置了该类型的所有单独验证(复合模式), HCPCSCodeLength在下面的日志中。

虽然通过此基础设施验证每个项目(声明),但发现的错误都会累积到项目的 errors 中。场-a Map<String, Collection<String>>其中特定验证的失败由该验证名称作为键控并在映射的值中进行描述。

这让我来描述我们所看到的错误行为:

在对上述包含 3 条 claim 记录的测试 CSV 文件的负载进行仔细的日志文件检查后,我们发现了以下(奇怪!?)行为,即每条记录都会接受与其在文件中的序号一样多的验证次数。如下所示:第一个记录(声明)验证一次,导致错误映射中出现一条记录;第二条记录(声明)经过两次验证,并且它的错误映射现在包含重复两次的消息;第三条记录被验证了 3 次,导致错误映射中的条目重复了 3 次。

文件中的每条记录在其无效性方面几乎相同,因此预期结果是每条记录应具有相同的错误集合。

实际结果是误差值随着后续的每条记录不断增加:

第一条记录:

INFO  2016-06-23 10:16:24,214 [main] org.fuwt.iws.claims.validation.springbatch.medical.MedicalClaimsValidator: Service date to: Thu Dec 10 00:00:00 EST 2015
INFO  2016-06-23 10:16:24,216 [main] org.fuwt.iws.claims.validation.springbatch.medical.HCPCSCodeLength: Started HCPCSCodeLength validation on org.fuwt.iws.claims.validation.springbatch.medical.HCPCSCodeLength@1a9ddcb7 for claimNumber = 0038017282
INFO  2016-06-23 10:16:24,223 [main] org.fuwt.iws.claims.validation.springbatch.medical.HCPCSCodeLength: Errors: {HCPCSCode=[supplied HCPCS code is blank or null, but should be 3-5 characters in length, anything greater than or less than is invalid value]} 

第二条记录

INFO  2016-06-23 10:16:24,227 [main] org.fuwt.iws.claims.validation.springbatch.medical.HCPCSCodeLength: Started HCPCSCodeLength validation on org.fuwt.iws.claims.validation.springbatch.medical.HCPCSCodeLength@1a9ddcb7 for claimNumber = 0071517729
INFO  2016-06-23 10:16:24,228 [main] org.fuwt.iws.claims.validation.springbatch.medical.HCPCSCodeLength: Errors: {HCPCSCode=[supplied HCPCS code is blank or null, but should be 3-5 characters in length, anything greater than or less than is invalid value]}
INFO  2016-06-23 10:16:24,228 [main] org.fuwt.iws.claims.validation.springbatch.medical.HCPCSCodeLength: Started HCPCSCodeLength validation on org.fuwt.iws.claims.validation.springbatch.medical.HCPCSCodeLength@1a9ddcb7 for claimNumber = 0071517729
INFO  2016-06-23 10:16:24,228 [main] org.fuwt.iws.claims.validation.springbatch.medical.HCPCSCodeLength: Errors: {HCPCSCode=[supplied HCPCS code is blank or null, but should be 3-5 characters in length, anything greater than or less than is invalid value, supplied HCPCS code is blank or null, but should be 3-5 characters in length, anything greater than or less than is invalid value]}

第三条记录

INFO  2016-06-23 10:16:24,228 [main] org.fuwt.iws.claims.validation.springbatch.medical.HCPCSCodeLength: Started HCPCSCodeLength validation on org.fuwt.iws.claims.validation.springbatch.medical.HCPCSCodeLength@1a9ddcb7 for claimNumber = 0081517745
INFO  2016-06-23 10:16:24,228 [main] org.fuwt.iws.claims.validation.springbatch.medical.HCPCSCodeLength: Errors: {HCPCSCode=[supplied HCPCS code is blank or null, but should be 3-5 characters in length, anything greater than or less than is invalid value]}
INFO  2016-06-23 10:16:24,228 [main] org.fuwt.iws.claims.validation.springbatch.medical.HCPCSCodeLength: Started HCPCSCodeLength validation on org.fuwt.iws.claims.validation.springbatch.medical.HCPCSCodeLength@1a9ddcb7 for claimNumber = 0081517745
INFO  2016-06-23 10:16:24,228 [main] org.fuwt.iws.claims.validation.springbatch.medical.HCPCSCodeLength: Errors: {HCPCSCode=[supplied HCPCS code is blank or null, but should be 3-5 characters in length, anything greater than or less than is invalid value, supplied HCPCS code is blank or null, but should be 3-5 characters in length, anything greater than or less than is invalid value]}
INFO  2016-06-23 10:16:24,228 [main] org.fuwt.iws.claims.validation.springbatch.medical.HCPCSCodeLength: Started HCPCSCodeLength validation on org.fuwt.iws.claims.validation.springbatch.medical.HCPCSCodeLength@1a9ddcb7 for claimNumber = 0081517745
INFO  2016-06-23 10:16:24,229 [main] org.fuwt.iws.claims.validation.springbatch.medical.HCPCSCodeLength: Errors: {HCPCSCode=[supplied HCPCS code is blank or null, but should be 3-5 characters in length, anything greater than or less than is invalid value, supplied HCPCS code is blank or null, but should be 3-5 characters in length, anything greater than or less than is invalid value, supplied HCPCS code is blank or null, but should be 3-5 characters in length, anything greater than or less than is invalid value]}

使用的版本:

spring-batch-core: 2.2.0.RELEASE
beanio: 2.1.0

问题:

如果有什么事情导致 Spring Batch 在处理器上发出这些重复的调用,这是 Spring Batch 的正常行为,以及如何阻止它并实现我上面描述的所需行为?

更新:

此验证组件表现出错误的行为:

@Component("medicalClaimsValidator")
public class MedicalClaimsValidator implements ClaimValidation {

    private final static Logger logger = LoggerFactory.getLogger(MedicalClaimsValidator.class);

    @Autowired private AbstractMedicalClaimValidation  HCPCSCodeLength;


    List<ClaimValidation> medicalClaimValidations = new ArrayList<>();

    @Override
    public boolean supports(Class<?> clazz) {
        return QualcareMedicalClaimWeeklyNDT.class.equals(clazz);
    }

    @Override

    public Map<String, Collection<String>> validate(Object item,  MessageSource messageSource) {

        logger.info("\nSoft-validating the bean...");
        QualcareMedicalClaimWeeklyNDT medicalClaim = (QualcareMedicalClaimWeeklyNDT)item;

        logger.info("Claim #: {}", medicalClaim.getClaimNumber());
        logger.info("Service date from: {}", medicalClaim.getServiceDateFrom());
        logger.info("Service date to: {}", medicalClaim.getServiceDateTo());

        //TODO: A candidate for externalization into a config file once we have all the known rules
        //medicalClaimValidations.add(new ServiceDateFromGreaterThanTo());
        //medicalClaimValidations.add(new ProcedureCodeLength());
        medicalClaimValidations.add(HCPCSCodeLength/*new HCPCSCodeLength()*/);
        //medicalClaimValidations.add(new TypeOfBillPresenseAndLengthForInstitutionalClaims());
        //medicalClaimValidations.add(new DischargeStatusPresenseAndLengthForInpatientClaims());
        //medicalClaimValidations.add(new DiagnosisCodeFormat());


        for(ClaimValidation validation:medicalClaimValidations) {
            logger.info("validation type: {}",validation.getClass());
             validation.validate(medicalClaim, messageSource);
        }

        return medicalClaim.getErrors();
    }

}

以下解决方法隐藏了错误行为:

@Component("medicalClaimsValidator")
public class MedicalClaimsValidator implements ClaimValidation {

    private final static Logger logger = LoggerFactory.getLogger(MedicalClaimsValidator.class);

    @Autowired @Qualifier("HCPCSCodeLength")private AbstractMedicalClaimValidation  HCPCSCodeLength;
    @Autowired @Qualifier("serviceDateFromGreaterThanTo")private AbstractMedicalClaimValidation  serviceDateFromGreaterThanTo;
    @Autowired @Qualifier("procedureCodeLength")private AbstractMedicalClaimValidation  procedureCodeLength;
    @Autowired @Qualifier("typeOfBillPresenseAndLengthForInstitutionalClaims")private AbstractMedicalClaimValidation  typeOfBillPresenseAndLengthForInstitutionalClaims;
    @Autowired @Qualifier("dischargeStatusPresenseAndLengthForInpatientClaims")private AbstractMedicalClaimValidation  dischargeStatusPresenseAndLengthForInpatientClaims;
    @Autowired @Qualifier("diagnosisCodeFormat")private AbstractMedicalClaimValidation  diagnosisCodeFormat;


    List<ValidationProcessTuple> medicalClaimValidations = new ArrayList<>();

    @Override
    public boolean supports(Class<?> clazz) {
        return QualcareMedicalClaimWeeklyNDT.class.equals(clazz);
    }

    @Override

    public Map<String, Collection<String>> validate(Object item,  MessageSource messageSource) {

        logger.info("\nSoft-validating the bean...");
        QualcareMedicalClaimWeeklyNDT medicalClaim = (QualcareMedicalClaimWeeklyNDT)item;

        logger.info("Claim #: {}", medicalClaim.getClaimNumber());
        logger.info("Service date from: {}", medicalClaim.getServiceDateFrom());
        logger.info("Service date to: {}", medicalClaim.getServiceDateTo());

        //TODO: A candidate for externalization into a config file once we have all the known rules
        medicalClaimValidations.add(new ValidationProcessTuple(serviceDateFromGreaterThanTo, false));
        medicalClaimValidations.add(new ValidationProcessTuple(procedureCodeLength, false));
        medicalClaimValidations.add(new ValidationProcessTuple(HCPCSCodeLength, false));
        medicalClaimValidations.add(new ValidationProcessTuple(typeOfBillPresenseAndLengthForInstitutionalClaims, false));
        medicalClaimValidations.add(new ValidationProcessTuple(dischargeStatusPresenseAndLengthForInpatientClaims, false));
        medicalClaimValidations.add(new ValidationProcessTuple(diagnosisCodeFormat, false));

        for (ValidationProcessTuple tuple : medicalClaimValidations) {
            if (!tuple.processed) {//to counteract the erroneous behavior whereby validation calls get repeated as many times as there are records
                tuple.validation.validate(item, messageSource);
                tuple.processed = true;
            }
        }

        return medicalClaim.getErrors();
    }

}

对于为什么会发生这种行为,我仍然一无所知 - 任何对此的解释当然都是受欢迎的。

最佳答案

在 MedicalClaimsValidator 类中,为什么要在 validate() 方法内累积此列表中的读取项目?我不明白其中的原因。每次处理新行时,它都会在此列表中不断添加已处理的元组。您可以在方法外部、init 方法或构造函数中定义处理规则。

   //TODO: A candidate for externalization into a config file once we have all the known rules
    medicalClaimValidations.add(new ValidationProcessTuple(serviceDateFromGreaterThanTo, false));
    medicalClaimValidations.add(new ValidationProcessTuple(procedureCodeLength, false));
    medicalClaimValidations.add(new ValidationProcessTuple(HCPCSCodeLength, false));
    medicalClaimValidations.add(new ValidationProcessTuple(typeOfBillPresenseAndLengthForInstitutionalClaims, false));
    medicalClaimValidations.add(new ValidationProcessTuple(dischargeStatusPresenseAndLengthForInpatientClaims, false));
    medicalClaimValidations.add(new ValidationProcessTuple(diagnosisCodeFormat, false));

关于java - Spring Batch 重复调用某个项目的处理器,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/37996187/

相关文章:

java - 如何在java中包装类并保存接口(interface)?

java - ResultSet 中的内存不足 allocLargeObjectOrArray

spring-batch - 多线程作业中的 Spring Batch Reader

java - 如何解决两个接口(interface)中方法名称冲突

spring - Http 请求的默认 MediaType

java - 建议使用 Spring、JSP 的 Web 报告工具

Java:不区分大小写的枚举 Jersey 查询参数绑定(bind)

带分区的 Java Batch Step 返回错误的 batchStatus 和 exitStatus

transactions - spring batch - 当 block 失败时回滚所有先前 block 提交的分区步骤

java - LIBGDX 平滑玩家移动