java - Spring Batch 并行处理多次执行一个步骤

标签 java spring spring-boot spring-batch

我正在并行执行 spring 批处理作业,并使用 SimpleAsyncTaskExecutor 进行并行处理,默认节流限制(默认为 4)。 项目读取器正在从文本文件中读取行,然后进行处理。 但实际情况是文本文件中的一行正在被 4 个不同的线程处理,使其执行单个 block 4 次。

下面是我的batch.xml:

<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
    xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
    xsi:schemaLocation="http://www.springframework.org/schema/batch http://www.springframework.org/schema/batch/spring-batch.xsd
        http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd">
    <import resource="classpath*:/META-INF/spring/batch/override/**/*.xml" />
    <bean id="businessReader" class="com.rbsgbm.rates.eodtasks.batch.reader.BusinessItemReader"/>
    <bean id="businessProcessor" class="com.rbsgbm.rates.eodtasks.batch.processor.BusinessItemProcessor" />
    <bean id="businessWriter" class="com.rbsgbm.rates.eodtasks.batch.writer.BusinessItemWriter" />
    <bean id="deskReader" class="com.rbsgbm.rates.eodtasks.batch.reader.DeskItemReader"/>
    <bean id="deskProcessor" class="com.rbsgbm.rates.eodtasks.batch.processor.DeskItemProcessor" />
    <bean id="deskWriter" class="com.rbsgbm.rates.eodtasks.batch.writer.DeskItemWriter" />
    <bean class="com.rbsgbm.rates.eodtasks.batch.Tasklet.TradeSnapTasklet" id="tradeSnapTasklet"/>
    <bean class="com.rbsgbm.rates.eodtasks.batch.Tasklet.FoundryExtractTasklet" id="foundryExtractTasklet"/>
    <bean id="simpleFireTasklet"
        class="com.rbsgbm.rates.eodtasks.batch.Tasklet.SimpleFireTasklet" />

    <bean id="mdxMarketDataSnapTasklet"
        class="com.rbsgbm.rates.eodtasks.batch.Tasklet.MdxMarketDataSnapTasklet" />

    <bean id="stepListener" class="org.springframework.batch.core.listener.StepExecutionListenerSupport" />
    <bean id="restartJobListener" class="com.rbsgbm.rates.eodtasks.batch.listener.RestartListener"/>
    <bean id="failedStepListener" class="com.rbsgbm.rates.eodtasks.batch.listener.FailedStepStepExecutionListener"/>
    <bean id="taskExecutor"
        class="org.springframework.core.task.SimpleAsyncTaskExecutor">
    </bean>

    <job id="simpleDojJob"  xmlns="http://www.springframework.org/schema/batch">
        <step id="processBusiness" next="simpleFireTask">
            <tasklet>
                <chunk reader="businessReader" processor="businessProcessor"
                    writer="businessWriter" commit-interval="1" />
            </tasklet>

        </step>

        <step id="simpleFireTask" next="foundryTask">
            <tasklet task-executor="taskExecutor">
                <chunk reader="deskReader" processor="deskProcessor"
                    writer="deskWriter" commit-interval="1" />
            </tasklet>

        </step>

        <step id="foundryTask">
            <tasklet ref="foundryExtractTasklet"/>
            <listeners>
                    <listener ref="stepListener"/>
                    <listener ref="restartJobListener"/>
                    <listener ref="failedStepListener"/>
            </listeners>    
        </step>
    </job>
</beans>

最佳答案

如果你想拥有线程安全的读取器和写入器,你必须以这种方式实现它们。

默认情况下,每个线程都可能在同一时刻访问读取器或写入器的同一实例。如果您的阅读器和编写器没有为此实现,它将无法正确处理它。

确保它们线程安全的最简单的方法是将读取器和写入器方法分别标记为同步。

如果您无法更改 Reader/Writer 的代码,只需实现一个简单的 Wrapper 并委托(delegate)给您的 Reader/Writer:

public class SynchronizedItemReader<T> implements ItemReader<T>
{
    private ItemReader<T> delegate;
    public void setDelegate(ItemReader<T> delegate) {this.delegate = delegate};

    public synchronized T read() {
        return delegate.read();
    }
}

但请注意:如果您还实现 ItemStream 来跟踪编写器已成功提交的内容(因此能够在该位置重新启动),您还需要对其进行管理,因为 block 可能会相互超越。

关于java - Spring Batch 并行处理多次执行一个步骤,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/33564210/

相关文章:

java - return this 在这段代码中做了什么?

java - SOAPMessage.writeTo() 仅转义特殊字符

spring - 使用 Tomcat gradle 插件部署 Spring 应用程序

java - 使用 maven 程序集插件在 META-INF/spring.factories 中找不到自动配置类

Java BlockingQueue 会导致线程不必要地等待。

java - 根据服务名称和参数缓存 Hessian 服务结果

java - 如何在 Spring 中注销特定用户?以编程方式

java - 当响应代码为 401 时,ClientHttpResponse.getBody() 抛出 ResourceAccessException

java - 未检测到自定义 SpringVaadinServlet

java.lang.IllegalStateException :Current user principal is not of type when i try to integrate keycloak with spring boot web + spring security project