我目前正在使用 Spring Integration 4.1.0 和 Spring 4.1.2。 我需要能够逐行读取文件并将读取的每一行用作消息。基本上我想允许“重播”我们的消息源之一,但消息不会保存在单个文件中,而是保存在单个文件中。我对这个用例没有交易要求。 我的要求与此帖子类似,除了驻留在与运行 JVM 的服务器相同的服务器上的文件:spring integration - read a remote file line by line
在我看来,我有以下选择:
1.使用 int-file:inbound-channel-adapter
读取文件,然后“拆分”该文件,这样 1 条消息现在变成多条消息。
示例配置文件:
<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:context="http://www.springframework.org/schema/context" xmlns:int="http://www.springframework.org/schema/integration" xmlns:int-jms="http://www.springframework.org/schema/integration/jms" xmlns:int-file="http://www.springframework.org/schema/integration/file" xmlns:task="http://www.springframework.org/schema/task"
xsi:schemaLocation="http://www.springframework.org/schema/jms http://www.springframework.org/schema/jms/spring-jms.xsd
http://www.springframework.org/schema/integration http://www.springframework.org/schema/integration/spring-integration.xsd
http://www.springframework.org/schema/integration/file http://www.springframework.org/schema/integration/file/spring-integration-file.xsd
http://www.springframework.org/schema/integration/jms http://www.springframework.org/schema/integration/jms/spring-integration-jms.xsd
http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd
http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context.xsd
http://www.springframework.org/schema/task http://www.springframework.org/schema/task/spring-task.xsd">
<int-file:inbound-channel-adapter id="filereader" directory="/tmp" filename-pattern="myfile.txt" channel="channel1"/>
<int-file:file-to-string-transformer input-channel="channel1" output-channel="channel2"/>
<int:channel id="channel1"/>
<int:splitter input-channel="channel2" output-channel="nullChannel"/>
<int:channel id="channel2"/>
</beans>
问题是文件非常大,当使用上述技术时,整个文件首先被读入内存,然后被拆分,JVM 会耗尽堆空间。实际上所需的步骤是:读取一行并将行转换为消息、发送消息、从内存中删除消息、重复。
将
int-file:tail-inbound-channel-adapter
与end="false"
一起使用(这基本上表示从文件的开头读取).根据每个文件的需要启动和停止此适配器(在每次启动前更改文件名)。 示例配置文件:<?xml version="1.0" encoding="UTF-8"?> <beans xmlns="http://www.springframework.org/schema/beans" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:context="http://www.springframework.org/schema/context" xmlns:int="http://www.springframework.org/schema/integration" xmlns:int-jms="http://www.springframework.org/schema/integration/jms" xmlns:int-file="http://www.springframework.org/schema/integration/file" xmlns:task="http://www.springframework.org/schema/task" xsi:schemaLocation="http://www.springframework.org/schema/jms http://www.springframework.org/schema/jms/spring-jms.xsd http://www.springframework.org/schema/integration http://www.springframework.org/schema/integration/spring-integration.xsd http://www.springframework.org/schema/integration/file http://www.springframework.org/schema/integration/file/spring-integration-file.xsd http://www.springframework.org/schema/integration/jms http://www.springframework.org/schema/integration/jms/spring-integration-jms.xsd http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context.xsd http://www.springframework.org/schema/task http://www.springframework.org/schema/task/spring-task.xsd"> <int-file:tail-inbound-channel-adapter id="apache" channel="exchangeSpringQueueChannel" task-executor="exchangeFileReplayTaskExecutor" file="C:\p2-test.txt" delay="1" end="false" reopen="true" file-delay="10000" /> <int:channel id="exchangeSpringQueueChannel" /> <task:executor id="exchangeFileReplayTaskExecutor" pool-size="1" /> </beans>
让 Spring Integration 调用 Spring Batch 并使用
ItemReader
来处理文件。当然允许对整个过程进行更细粒度的控制,但需要大量工作来设置作业存储库等(而且我不关心作业历史,所以我要么告诉作业不要记录状态和/或使用内存中的MapJobRepository
)。
4.通过扩展 MessageProducerSupport
创建我自己的 FileLineByLineInboundChannelAdapter
。
大部分代码可以从 ApacheCommonsFileTailingMessageProducer
中借用(另请参阅 http://forum.spring.io/forum/spring-projects/integration/119897-custom-upd-inbound-channel-adapter)。下面是一个示例,但需要做一些工作才能将读取内容放入它自己的 Thread
中,以便我在逐行读取时遵守 stop()
命令。
package com.xxx.exchgateway.common.util.springintegration;
import java.io.BufferedReader;
import java.io.File;
import java.io.FileInputStream;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.io.InputStreamReader;
import org.apache.commons.io.IOUtils;
import org.springframework.core.task.SimpleAsyncTaskExecutor;
import org.springframework.core.task.TaskExecutor;
import org.springframework.integration.core.MessageSource;
import org.springframework.integration.endpoint.MessageProducerSupport;
import org.springframework.integration.file.FileHeaders;
import org.springframework.messaging.Message;
import org.springframework.util.Assert;
/**
* A lot of the logic for this class came from {@link #ApacheCommonsFileTailingMessageProducer}.
* See {@link http://forum.spring.io/forum/spring-projects/integration/119897-custom-upd-inbound-channel-adapter}
*/
public class FileLineByLineInboundChannelAdapter extends MessageProducerSupport implements MessageSource<String> {
private volatile File file;
/**
* The name of the file you wish to tail.
* @param file The absolute path of the file.
*/
public void setFile(File file) {
Assert.notNull("'file' cannot be null");
this.file = file;
}
protected File getFile() {
if (this.file == null) {
throw new IllegalStateException("No 'file' has been provided");
}
return this.file;
}
@Override
public String getComponentType() {
return "file:line-by-line-inbound-channel-adapter";
}
private void readFile() {
FileInputStream fstream;
try {
fstream = new FileInputStream(getFile());
BufferedReader br = new BufferedReader(new InputStreamReader(fstream));
String strLine;
// Read File Line By Line, make sure we honor if someone manually sets the isRunning=false (via clicking the stop() method in JMX)
while ((strLine = br.readLine()) != null && isRunning()) {
send(strLine);
}
//Close the input stream
IOUtils.closeQuietly(br);
IOUtils.closeQuietly(fstream);
} catch (FileNotFoundException e) {
// TODO Auto-generated catch block
e.printStackTrace();
} catch (IOException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
@Override
protected void doStart() {
super.doStart();
// TODO this needs to be moved into it's own thread since isRunning() will return "false" until this method has completed
// and we want to honor the stop() command while we read line-by-line
readFile();
}
protected void send(String line) {
Message<?> message = this.getMessageBuilderFactory().withPayload(line).setHeader(FileHeaders.FILENAME, this.file.getAbsolutePath()).build();
super.sendMessage(message);
}
@Override
public Message<String> receive() {
// TODO Auto-generated method stub
return null;
}
}
在我看来,我的用例并不超出人们可能喜欢做的典型事情的范围,所以我很惊讶我找不到开箱即用的解决方案。不过,我进行了相当多的搜索,并查看了很多示例,不幸的是,我还没有找到适合我需要的内容。
我假设我可能错过了框架已经提供的一些明显的东西(尽管这可能属于 Spring Integraton 和 Spring Batch 之间的模糊界限)。有人可以让我知道我的想法是否完全偏离实际,或者是否有一个我错过的简单解决方案,或者提供替代建议?
最佳答案
Spring Integration 4.x 有一个很好的新特性,使用 Iterator 作为消息:
Starting with version 4.1, the AbstractMessageSplitter supports the Iterator type for the value to split.
这允许将 Iterator 作为消息发送,而不是将整个文件读入内存。
Here is Spring 上下文将 CSV 文件拆分为每行一条消息的简单示例:
<int-file:inbound-channel-adapter
directory="${inputFileDirectory:/tmp}"
channel="inputFiles"/>
<int:channel id="inputFiles">
<int:dispatcher task-executor="executor"/>
</int:channel>
<int:splitter
input-channel="inputFiles"
output-channel="output">
<bean
class="FileSplitter"
p:commentPrefix="${commentPrefix:#}" />
</int:splitter>
<task:executor
id="executor"
pool-size="${poolSize:8}"
queue-capacity="${aueueCapacity:0}"
rejection-policy="CALLER_RUNS" />
<int:channel id="output"/>
import java.io.BufferedReader;
import java.io.File;
import java.io.FileReader;
import java.io.IOException;
import java.util.Iterator;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.integration.splitter.AbstractMessageSplitter;
import org.springframework.integration.transformer.MessageTransformationException;
import org.springframework.messaging.Message;
import org.springframework.util.Assert;
public class FileSplitter extends AbstractMessageSplitter {
private static final Logger log = LoggerFactory.getLogger(FileSplitter.class);
private String commentPrefix = "#";
public Object splitMessage(Message<?> message) {
if(log.isDebugEnabled()) {
log.debug(message.toString());
}
try {
Object payload = message.getPayload();
Assert.isInstanceOf(File.class, payload, "Expected java.io.File in the message payload");
return new BufferedReaderFileIterator((File) payload);
}
catch (IOException e) {
String msg = "Unable to transform file: " + e.getMessage();
log.error(msg);
throw new MessageTransformationException(msg, e);
}
}
public void setCommentPrefix(String commentPrefix) {
this.commentPrefix = commentPrefix;
}
public class BufferedReaderFileIterator implements Iterator<String> {
private File file;
private BufferedReader bufferedReader;
private String line;
public BufferedReaderFileIterator(File file) throws IOException {
this.file = file;
this.bufferedReader = new BufferedReader(new FileReader(file));
readNextLine();
}
@Override
public boolean hasNext() {
return line != null;
}
@Override
public String next() {
try {
String res = this.line;
readNextLine();
return res;
}
catch (IOException e) {
log.error("Error reading file", e);
throw new RuntimeException(e);
}
}
void readNextLine() throws IOException {
do {
line = bufferedReader.readLine();
}
while(line != null && line.trim().startsWith(commentPrefix));
if(log.isTraceEnabled()) {
log.trace("Read next line: {}", line);
}
if(line == null) {
close();
}
}
void close() throws IOException {
bufferedReader.close();
file.delete();
}
@Override
public void remove() {
throw new UnsupportedOperationException();
}
}
}
请注意从 splitMessage() 处理程序方法返回的 Iterator 对象。
关于java - Spring Integration Inbound-Channel-Adapter 逐行读取大文件,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/27064737/