java - Spring Batch - 了解 block 大小和 ItemReadListener 之间的行为

标签 java spring iteration listener spring-batch

情况

我已经使用 java 配置在 Spring Batch 中设置了一个简单的读取作业,并且我正在尝试编写一个简单的监听器。监听器应显示读取一定数量的记录所花费的时间(以秒为单位)。

bean 如下所示:

@Bean
public SimpleItemReaderListener listener(){
    SimpleItemReaderListener listener = new SimpleItemReaderListener<>();           
    listener.setLogInterval(50000);
    return listener;
}

根据设置的日志间隔,将显示一条消息,消息将如下所示:

14:42:11,445  INFO main SimpleItemReaderListener:45 - Read records [0] to [50.000] in average 1,30 seconds
14:42:14,453  INFO main SimpleItemReaderListener:45 - Read records [50.000] to [100.000] in average 2,47 seconds
14:42:15,489  INFO main SimpleItemReaderListener:45 - Read records [100.000] to [150.000] in average 1,03 seconds
14:42:16,448  INFO main SimpleItemReaderListener:45 - Read records [150.000] to [200.000] in average 0,44 seconds 

正是我想要的,完美。但是,当我将批量配置中的 block 从 100.000 更改为 1.000 时,日志记录会发生变化,并且我不知道是什么导致了变化...

14:51:24,893  INFO main SimpleItemReaderListener:45 - Read records [0] to [50.000] in average 0,90 seconds
14:51:50,657  INFO main SimpleItemReaderListener:45 - Read records [50.000] to [100.000] in average 0,57 seconds
14:52:16,392  INFO main SimpleItemReaderListener:45 - Read records [100.000] to [150.000] in average 0,59 seconds
14:52:42,125  INFO main SimpleItemReaderListener:45 - Read records [150.000] to [200.000] in average 0,61 seconds

印象中,ItemReaderListener 中的 beforeRead 和 afterRead 方法将为每个单独的项目执行,我预计每个 50.000 所花费的时间与 slf4j 日志中显示的时间更加一致(例如,大约每 50.000 26 秒)。

问题

当我更改 block 大小时,监听器的哪一部分导致了这种不需要的行为?

重现

我的 ItemReadListener 实现如下:

public class SimpleItemReaderListener<Item> implements ItemReadListener<Item>{

    private static final Logger LOG = LoggerFactory.getLogger(SimpleItemReaderListener.class);
    private static final double NANO_TO_SECOND_DIVIDER_NUMBER = 1_000_000_000.0;    
    private static final String PATTERN = ",###";   

    private int startCount = 0;
    private int logInterval = 50000;
    private int currentCount;
    private int totalCount;
    private long timeElapsed;
    private long startTime;
    private DecimalFormat decimalFormat = new DecimalFormat(PATTERN);

    @Override
    public void beforeRead() {
        startTime = System.nanoTime();              
    }

    @Override
    public void afterRead(Item item) {
        updateTimeElapsed();        
        if (currentCount == logInterval) {          
            displayMessage();
            updateStartCount();
            resetCount();
        } else {
            increaseCount();
        }       
    }

    private void updateTimeElapsed() {
        timeElapsed += System.nanoTime() - startTime;
    }

    private void displayMessage() {
        LOG.info(String.format("Read records [%s] to [%s] in average %.2f seconds", 
                decimalFormat.format(startCount), 
                decimalFormat.format(totalCount), 
                timeElapsed / NANO_TO_SECOND_DIVIDER_NUMBER));      
    }

    private void updateStartCount() {
        startCount += currentCount;
    }

    private void resetCount() {
        currentCount = 0;
        timeElapsed = 0;
    }

    private void increaseCount() {
        currentCount++;
        totalCount++;
    }

    @Override
    public void onReadError(Exception arg0) {
        // NO-OP
    }

    public void setLogInterval(int logInterval){
        this.logInterval = logInterval;
    }
}

完整的批量配置类:

@Configuration
@EnableBatchProcessing
public class BatchConfiguration {   

    @Autowired
    public JobBuilderFactory jobBuilderFactory;

    @Autowired
    public StepBuilderFactory stepBuilderFactory;

    @Bean
    public Job importUserJob() {
        return jobBuilderFactory.get("importUserJob")            
                .flow(validateInput())
                .end()
                .build();
    }    

    @Bean
    public Step validateInput() {
        return stepBuilderFactory.get("validateInput")
                .chunk(1000)
                .reader(reader())               
                .listener(listener())
                .writer(writer())
                .build();
    }

    @Bean 
    public HeaderTokenizer tokenizeHeader(){
        HeaderTokenizer tokenizer = new HeaderTokenizer();
        //optional setting, custom delimiter is set to ','
        //tokenizer.setDelimiter(",");
        return tokenizer;
    }

    @Bean
    public SimpleItemReaderListener listener(){
        SimpleItemReaderListener listener = new SimpleItemReaderListener<>();
        //optional setting, custom logging is set to 1000, increase for less verbose logging
        listener.setLogInterval(50000);
        return listener;
    }

    @Bean   
    public FlatFileItemReader reader() {
        FlatFileItemReader reader = new FlatFileItemReader();        
        reader.setLinesToSkip(1);        
        reader.setSkippedLinesCallback(tokenizeHeader());
        reader.setResource(new ClassPathResource("majestic_million.csv"));
        reader.setLineMapper(new DefaultLineMapper() {{
            setLineTokenizer(tokenizeHeader());
            setFieldSetMapper(new PassThroughFieldSetMapper());
        }});
        return reader;
    }    

    @Bean
    public DummyItemWriter writer(){
        DummyItemWriter writer = new DummyItemWriter();
        return writer;
    }
}

或者使用 http://projects.spring.io/spring-batch/ 中的 spring boot 示例并添加 SimpleItemReaderListener bean。

最佳答案

当批量大小较小时,您的应用程序会在读取器之外花费更多时间。您的计时代码仅测量阅读器中花费的时间,但日志记录框架显示时间戳,即花费的总时间。

关于java - Spring Batch - 了解 block 大小和 ItemReadListener 之间的行为,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/36940165/

相关文章:

java - 在运行时动态选择一个 spring-batch reader

r - 如何使用一系列变量自动绘制方差分析诊断图(即残差和拟合值)

Java无限递归和StackOverflowError

java - 拓扑排序与DFS的区别仅在于递归调用后对当前元素进行处理

java - 在 Spring 中使用用户输入的用户名和密码发送邮件

java - BCryptPasswordEncoder 似乎资源消耗很大

java - JTable 添加行不适用于 actionListener

Java - 自定义形状面板?

Python while 循环日期迭代

java - 如何在 Java 中迭代子 TreeMap?