java - Spring Integration Inbound-Channel-Adapter 逐行读取大文件

标签 java spring spring-batch spring-integration

我目前正在使用 Spring Integration 4.1.0 和 Spring 4.1.2。 我需要能够逐行读取文件并将读取的每一行用作消息。基本上我想允许“重播”我们的消息源之一,但消息不会保存在单个文件中,而是保存在单个文件中。我对这个用例没有交易要求。 我的要求与此帖子类似,除了驻留在与运行 JVM 的服务器相同的服务器上的文件:spring integration - read a remote file line by line

在我看来,我有以下选择:

1.使用 int-file:inbound-channel-adapter 读取文件,然后“拆分”该文件,这样 1 条消息现在变成多条消息。 示例配置文件:

    <?xml version="1.0" encoding="UTF-8"?>
    <beans xmlns="http://www.springframework.org/schema/beans" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:context="http://www.springframework.org/schema/context" xmlns:int="http://www.springframework.org/schema/integration" xmlns:int-jms="http://www.springframework.org/schema/integration/jms" xmlns:int-file="http://www.springframework.org/schema/integration/file" xmlns:task="http://www.springframework.org/schema/task"
        xsi:schemaLocation="http://www.springframework.org/schema/jms http://www.springframework.org/schema/jms/spring-jms.xsd
            http://www.springframework.org/schema/integration http://www.springframework.org/schema/integration/spring-integration.xsd
            http://www.springframework.org/schema/integration/file http://www.springframework.org/schema/integration/file/spring-integration-file.xsd
            http://www.springframework.org/schema/integration/jms http://www.springframework.org/schema/integration/jms/spring-integration-jms.xsd
            http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd
            http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context.xsd
            http://www.springframework.org/schema/task http://www.springframework.org/schema/task/spring-task.xsd">

        <int-file:inbound-channel-adapter id="filereader" directory="/tmp" filename-pattern="myfile.txt" channel="channel1"/>
        <int-file:file-to-string-transformer input-channel="channel1" output-channel="channel2"/>
        <int:channel id="channel1"/>
        <int:splitter input-channel="channel2" output-channel="nullChannel"/>
        <int:channel id="channel2"/>
    </beans>

问题是文件非常大,当使用上述技术时,整个文件首先被读入内存,然后被拆分,JVM 会耗尽堆空间。实际上所需的步骤是:读取一行并将行转换为消息、发送消息、从内存中删除消息、重复。

  1. int-file:tail-inbound-channel-adapterend="false" 一起使用(这基本上表示从文件的开头读取).根据每个文件的需要启动和停止此适配器(在每次启动前更改文件名)。 示例配置文件:

    <?xml version="1.0" encoding="UTF-8"?>
    <beans xmlns="http://www.springframework.org/schema/beans" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:context="http://www.springframework.org/schema/context" xmlns:int="http://www.springframework.org/schema/integration" xmlns:int-jms="http://www.springframework.org/schema/integration/jms" xmlns:int-file="http://www.springframework.org/schema/integration/file" xmlns:task="http://www.springframework.org/schema/task"
        xsi:schemaLocation="http://www.springframework.org/schema/jms http://www.springframework.org/schema/jms/spring-jms.xsd
            http://www.springframework.org/schema/integration http://www.springframework.org/schema/integration/spring-integration.xsd
            http://www.springframework.org/schema/integration/file http://www.springframework.org/schema/integration/file/spring-integration-file.xsd
            http://www.springframework.org/schema/integration/jms http://www.springframework.org/schema/integration/jms/spring-integration-jms.xsd
            http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd
            http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context.xsd
            http://www.springframework.org/schema/task http://www.springframework.org/schema/task/spring-task.xsd">
    
        <int-file:tail-inbound-channel-adapter id="apache"
            channel="exchangeSpringQueueChannel"
            task-executor="exchangeFileReplayTaskExecutor"
            file="C:\p2-test.txt"
            delay="1"
            end="false"
            reopen="true"
            file-delay="10000" />
    
        <int:channel id="exchangeSpringQueueChannel" />
        <task:executor id="exchangeFileReplayTaskExecutor" pool-size="1" />
    </beans>
    
  2. 让 Spring Integration 调用 Spring Batch 并使用 ItemReader 来处理文件。当然允许对整个过程进行更细粒度的控制,但需要大量工作来设置作业存储库等(而且我不关心作业历史,所以我要么告诉作业不要记录状态和/或使用内存中的 MapJobRepository)。

4.通过扩展 MessageProducerSupport 创建我自己的 FileLineByLineInboundChannelAdapter。 大部分代码可以从 ApacheCommonsFileTailingMessageProducer 中借用(另请参阅 http://forum.spring.io/forum/spring-projects/integration/119897-custom-upd-inbound-channel-adapter)。下面是一个示例,但需要做一些工作才能将读取内容放入它自己的 Thread 中,以便我在逐行读取时遵守 stop() 命令。

    package com.xxx.exchgateway.common.util.springintegration;

    import java.io.BufferedReader;
    import java.io.File;
    import java.io.FileInputStream;
    import java.io.FileNotFoundException;
    import java.io.IOException;
    import java.io.InputStreamReader;
    import org.apache.commons.io.IOUtils;
    import org.springframework.core.task.SimpleAsyncTaskExecutor;
    import org.springframework.core.task.TaskExecutor;
    import org.springframework.integration.core.MessageSource;
    import org.springframework.integration.endpoint.MessageProducerSupport;
    import org.springframework.integration.file.FileHeaders;
    import org.springframework.messaging.Message;
    import org.springframework.util.Assert;

    /**
     * A lot of the logic for this class came from {@link #ApacheCommonsFileTailingMessageProducer}.
     * See {@link http://forum.spring.io/forum/spring-projects/integration/119897-custom-upd-inbound-channel-adapter}
     */
    public class FileLineByLineInboundChannelAdapter extends MessageProducerSupport implements MessageSource<String> {
        private volatile File file;

        /**
         * The name of the file you wish to tail.
         * @param file The absolute path of the file.
         */
        public void setFile(File file) {
            Assert.notNull("'file' cannot be null");
            this.file = file;
        }

        protected File getFile() {
            if (this.file == null) {
                throw new IllegalStateException("No 'file' has been provided");
            }
            return this.file;
        }

        @Override
        public String getComponentType() {
            return "file:line-by-line-inbound-channel-adapter";
        }

        private void readFile() {
            FileInputStream fstream;
            try {
                fstream = new FileInputStream(getFile());

                BufferedReader br = new BufferedReader(new InputStreamReader(fstream));

                String strLine;

                // Read File Line By Line, make sure we honor if someone manually sets the isRunning=false (via clicking the stop() method in JMX)
                while ((strLine = br.readLine()) != null && isRunning()) {
                    send(strLine);
                }

                //Close the input stream
                IOUtils.closeQuietly(br);
                IOUtils.closeQuietly(fstream);
            } catch (FileNotFoundException e) {
                // TODO Auto-generated catch block
                e.printStackTrace();
            } catch (IOException e) {
                // TODO Auto-generated catch block
                e.printStackTrace();
            }
        }

        @Override
        protected void doStart() {
            super.doStart();

            // TODO this needs to be moved into it's own thread since isRunning() will return "false" until this method has completed
            // and we want to honor the stop() command while we read line-by-line
            readFile();
        }

        protected void send(String line) {
            Message<?> message = this.getMessageBuilderFactory().withPayload(line).setHeader(FileHeaders.FILENAME, this.file.getAbsolutePath()).build();
            super.sendMessage(message);
        }

        @Override
        public Message<String> receive() {
            // TODO Auto-generated method stub
            return null;
        }
    }

在我看来,我的用例并不超出人们可能喜欢做的典型事情的范围,所以我很惊讶我找不到开箱即用的解决方案。不过,我进行了相当多的搜索,并查看了很多示例,不幸的是,我还没有找到适合我需要的内容。

我假设我可能错过了框架已经提供的一些明显的东西(尽管这可能属于 Spring Integraton 和 Spring Batch 之间的模糊界限)。有人可以让我知道我的想法是否完全偏离实际,或者是否有一个我错过的简单解决方案,或者提供替代建议?

最佳答案

Spring Integration 4.x 有一个很好的新特性,使用 Iterator 作为消息:

Spring Integration Reference

Starting with version 4.1, the AbstractMessageSplitter supports the Iterator type for the value to split.

这允许将 Iterator 作为消息发送,而不是将整个文件读入内存。

Here is Spring 上下文将 CSV 文件拆分为每行一条消息的简单示例:

<int-file:inbound-channel-adapter 
        directory="${inputFileDirectory:/tmp}"
        channel="inputFiles"/>

<int:channel id="inputFiles">
    <int:dispatcher task-executor="executor"/>
</int:channel>

<int:splitter 
    input-channel="inputFiles" 
    output-channel="output">
    <bean 
        class="FileSplitter" 
        p:commentPrefix="${commentPrefix:#}" />
</int:splitter>

<task:executor 
    id="executor" 
    pool-size="${poolSize:8}" 
    queue-capacity="${aueueCapacity:0}" 
    rejection-policy="CALLER_RUNS" />

<int:channel id="output"/>

这是 splitter implementation :

import java.io.BufferedReader;
import java.io.File;
import java.io.FileReader;
import java.io.IOException;
import java.util.Iterator;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.integration.splitter.AbstractMessageSplitter;
import org.springframework.integration.transformer.MessageTransformationException;
import org.springframework.messaging.Message;
import org.springframework.util.Assert;

public class FileSplitter extends AbstractMessageSplitter {
    private static final Logger log = LoggerFactory.getLogger(FileSplitter.class);

    private String commentPrefix = "#";

    public Object splitMessage(Message<?> message) {
        if(log.isDebugEnabled()) {
            log.debug(message.toString());
        }
        try {

            Object payload = message.getPayload();
            Assert.isInstanceOf(File.class, payload, "Expected java.io.File in the message payload"); 

            return new BufferedReaderFileIterator((File) payload);
        } 
        catch (IOException e) {
            String msg = "Unable to transform file: " + e.getMessage();
            log.error(msg);
            throw new MessageTransformationException(msg, e);
        }
    }

    public void setCommentPrefix(String commentPrefix) {
        this.commentPrefix = commentPrefix;
    }

    public class BufferedReaderFileIterator implements Iterator<String> {

        private File file;
        private BufferedReader bufferedReader;
        private String line;

        public BufferedReaderFileIterator(File file) throws IOException {
            this.file = file;
            this.bufferedReader = new BufferedReader(new FileReader(file));
            readNextLine();
        }

        @Override
        public boolean hasNext() {
            return line != null;
        }

        @Override
        public String next() {
            try {
                String res = this.line;
                readNextLine();
                return res;
            } 
            catch (IOException e) {
                log.error("Error reading file", e);
                throw new RuntimeException(e);
            }   
        }

        void readNextLine() throws IOException {
            do {
                line = bufferedReader.readLine();
            }
            while(line != null && line.trim().startsWith(commentPrefix));

            if(log.isTraceEnabled()) {
                log.trace("Read next line: {}", line);
            }

            if(line == null) {
                close();
            }
        }

        void close() throws IOException {
            bufferedReader.close();
            file.delete();
        }

        @Override
        public void remove() {
            throw new UnsupportedOperationException();
        }

    }

}

请注意从 splitMessage() 处理程序方法返回的 Iterator 对象。

关于java - Spring Integration Inbound-Channel-Adapter 逐行读取大文件,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/27064737/

相关文章:

java - 使用 new 关键字创建单个对象

java - 什么是合适的词法分析器生成器,我可以使用它从许多语言源文件中去除标识符?

java - 在Spring Batch中,如何读取XML属性?

java - Mybatis xml resultMap 存在集合和关联问题

java - Hibernate:如何调整默认日志记录级别(SJF4J 和 JDK 1.4 记录器)?

java - Spring创建bean时出错

java - Spring Batch 作业实例已存在

java - Spring Batch 挂起,没有输出

spring - javax.management.InstanceNotFoundException : org. springframework.boot :type=Admin, name=SpringApplication

java - 将Java程序运行到另一个程序中