java - SFTP Spring 集成的文件轮询和过滤问题

标签 java spring spring-batch spring-integration

我正在尝试通过 SFTP Spring Integration 提取远程文件并将其转储到本地驱动器中,然后启动 Spring Batch 作业(步骤 1 读取、处理和写入),然后执行步骤 2,其中 Tasklet 存档将本地文件下载到本地其他位置并删除下载的文件。

假设正在下载的文件是“data.service”、“data.maintenance”和“data.transaction”。我只处理“data.service”,但所有 3 个都已存档并删除。

我希望上述步骤在特定的时间间隔内发生,假设一组新文件将在远程位置被覆盖。

Bean定义

<bean id="acceptAllFileListFilter" class="org.springframework.integration.file.filters.AcceptAllFileListFilter"/>
<bean id="acceptOnceFileListFilter" class="org.springframework.integration.file.filters.AcceptOnceFileListFilter"/>

session

<int-sftp:inbound-channel-adapter id="sftpInbondAdapter"
        channel="inboundFileChannel"
        session-factory="sftpSessionFactory"
        local-directory="file:${inbound.local.directory}"
        remote-directory="${inbound.remote.directory}"
        auto-create-local-directory="true"
        delete-remote-files="false"
        preserve-timestamp="true"
        filename-pattern="*.*"
        local-filter="acceptAllFileListFilter" >
    <int:poller max-messages-per-poll="-1" fixed-rate="60000" />
</int-sftp:inbound-channel-adapter>

现在发生的事情是...... 上面的代码触发了这个序列并在无限循环中运行,而不从远程获取新文件。详情如下:

我从一个空的本地文件夹开始,远程服务器拥有全部 3 个文件 [启动程序]

  1. Spring Integration 程序将所有 3 个文件下载到本地位置
  2. Spring Batch Job Step-1 触发 Tasklet(Step-2)归档
  3. 本地文件 Tasklet(Step-2)删除 Spring 本地文件
  4. 集成程序将所有 3 个文件下载到本地位置(为什么???)
  5. 程序不断地循环运行Step-1到Step-5
  6. 新文件被删除到远程服务器上,同样的(1)到(5)没有触发作业

尝试如下替换,但程序只运行一次,即从(1)到(5),什么也没有发生

        local-filter="acceptOnceFileListFilter" >

预期

  1. Spring Integration 程序将所有 3 个文件下载到本地位置
  2. Spring Batch Job Step-1 被触发
  3. Tasklet(第 2 步)归档本地文件
  4. Tasklet(第 2 步)删除本地文件
  5. Spring Integration 程序等待将新文件拖放到远程位置(每 60000 毫秒轮询一次)
  6. 在远程服务器上放置新文件
  7. 从 (1) 到 (6) 开始

根据不同的建议 post我有一个独特的工作参数如下:

@Transformer
public JobLaunchRequest toRequest(Message<File> message) {
    JobParametersBuilder jobParametersBuilder = new JobParametersBuilder();
    jobParametersBuilder.addString(fileParameterName, message.getPayload().getAbsolutePath());
    jobParametersBuilder.addLong("currentTime", new Long(System.currentTimeMillis()));
    return new JobLaunchRequest(job, jobParametersBuilder.toJobParameters());
}

尝试了不同的过滤器组合,但没有一个达到我的目的。任何样本都会有很大帮助,自上周以来一直在努力解决这个问题。

这是我根据建议所做的更改:

添加复合过滤器如下,但是将传递给 SftpPersistentAcceptOnceFileListFilter 的参数是什么?我在互联网上看不到任何示例。

<bean id="compositeFilter" class="org.springframework.integration.file.filters.CompositeFileListFilter">
    <constructor-arg>
        <list>
            <bean class="org.springframework.integration.file.filters.FileSystemPersistentAcceptOnceFileListFilter"/>
            <bean class="org.springframework.integration.sftp.filters.SftpPersistentAcceptOnceFileListFilter">
            </bean>                     
        </list>
    </constructor-arg>
</bean>

包含在适配器中如下:

<int-sftp:inbound-channel-adapter id="sftpInbondAdapter"
        channel="inboundFileChannel"
        session-factory="sftpSessionFactory"
        local-directory="file:${inbound.local.directory}"
        remote-directory="${inbound.remote.directory}"
        auto-create-local-directory="true"
        delete-remote-files="false"  
        filter="compositeFilter">
    <int:poller max-messages-per-poll="-1" fixed-rate="600000" />
</int-sftp:inbound-channel-adapter>

更新 - 添加持久过滤器后的更改:

<int-sftp:inbound-channel-adapter id="sftpInbondAdapter"
        channel="inboundFileChannel"
        session-factory="sftpSessionFactory"
        local-directory="file:${inbound.local.directory}"
        remote-directory="file:${inbound.remote.directory}"
        auto-create-local-directory="false"
        delete-remote-files="false"  
        filter="compositeFilter"
        >
    <int:poller max-messages-per-poll="-1" fixed-rate="60000" />
</int-sftp:inbound-channel-adapter>

<bean id="compositeFilter" class="org.springframework.integration.file.filters.CompositeFileListFilter">
    <constructor-arg>
        <list>
            <bean class="org.springframework.integration.file.filters.FileSystemPersistentAcceptOnceFileListFilter">
                <constructor-arg name="store" ref="metadataStore"/>
                <constructor-arg value="*.*"/>
            </bean>
            <bean class="org.springframework.integration.sftp.filters.SftpPersistentAcceptOnceFileListFilter">
                <constructor-arg name="store" ref="metadataStore"/>
                <constructor-arg value="*.*"/>
            </bean>
        </list>
    </constructor-arg>
</bean>

<bean id="redisConnectionFactory" class="org.springframework.data.redis.connection.jedis.JedisConnectionFactory">
    <property name="port" value="7379"/>
</bean>

<bean name="metadataStore" class="org.springframework.integration.redis.metadata.RedisMetadataStore">
    <constructor-arg name="connectionFactory" ref="redisConnectionFactory"/>
</bean>

这给了我如下异常(exception)。我尝试了suggestions在 Spring 论坛中,但我没有遇到连接问题。

2014-07-17 22:27:14,139 [task-scheduler-1] INFO com.jcraft.jsch - Authentication succeeded (keyboard-interactive). 2014-07-17 22:27:14,192 [task-scheduler-1] INFO com.jcraft.jsch - Disconnecting from localhost port 22 2014-07-17 22:27:14,195 [task-scheduler-1] DEBUG org.springframework.beans.factory.support.DefaultListableBeanFactory - Returning cached instance of singleton bean 'errorChannel' 2014-07-17 22:27:14,197 [task-scheduler-1] DEBUG org.springframework.integration.channel.PublishSubscribeChannel - preSend on channel 'errorChannel', message: [Payload MessagingException content=org.springframework.messaging.MessagingException: Problem occurred while synchronizing remote to local directory][Headers={id=84cccecf-20ec-f67b-3f74-a938bf9abf3d, timestamp=1405661234197}] 2014-07-17 22:27:14,197 [task-scheduler-1] DEBUG org.springframework.integration.handler.LoggingHandler - (inner bean)#22 received message: [Payload MessagingException content=org.springframework.messaging.MessagingException: Problem occurred while synchronizing remote to local directory][Headers={id=84cccecf-20ec-f67b-3f74-a938bf9abf3d, timestamp=1405661234197}] 2014-07-17 22:27:14,199 [task-scheduler-1] ERROR org.springframework.integration.handler.LoggingHandler - org.springframework.messaging.MessagingException: Problem occurred while synchronizing remote to local directory at org.springframework.integration.file.remote.synchronizer.AbstractInboundFileSynchronizer.synchronizeToLocalDirectory(AbstractInboundFileSynchronizer.java:193) at org.springframework.integration.file.remote.synchronizer.AbstractInboundFileSynchronizingMessageSource.receive(AbstractInboundFileSynchronizingMessageSource.java:167) at org.springframework.integration.endpoint.SourcePollingChannelAdapter.receiveMessage(SourcePollingChannelAdapter.java:124) at org.springframework.integration.endpoint.AbstractPollingEndpoint.doPoll(AbstractPollingEndpoint.java:187) at org.springframework.integration.endpoint.AbstractPollingEndpoint.access$000(AbstractPollingEndpoint.java:52) at org.springframework.integration.endpoint.AbstractPollingEndpoint$1.call(AbstractPollingEndpoint.java:146) at org.springframework.integration.endpoint.AbstractPollingEndpoint$1.call(AbstractPollingEndpoint.java:143) at org.springframework.integration.endpoint.AbstractPollingEndpoint$Poller$1.run(AbstractPollingEndpoint.java:278) at org.springframework.integration.util.ErrorHandlingTaskExecutor$1.run(ErrorHandlingTaskExecutor.java:52) at org.springframework.core.task.SyncTaskExecutor.execute(SyncTaskExecutor.java:50) at org.springframework.integration.util.ErrorHandlingTaskExecutor.execute(ErrorHandlingTaskExecutor.java:49) at org.springframework.integration.endpoint.AbstractPollingEndpoint$Poller.run(AbstractPollingEndpoint.java:272) at org.springframework.scheduling.support.DelegatingErrorHandlingRunnable.run(DelegatingErrorHandlingRunnable.java:54) at org.springframework.scheduling.concurrent.ReschedulingRunnable.run(ReschedulingRunnable.java:81) at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:439) at java.util.concurrent.FutureTask$Sync.innerRun(FutureTask.java:303) at java.util.concurrent.FutureTask.run(FutureTask.java:138) at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:98) at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:206) at java.util.concurrent.ThreadPoolExecutor$Worker.runTask(ThreadPoolExecutor.java:895) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:918) at java.lang.Thread.run(Thread.java:695) Caused by: org.springframework.messaging.MessagingException: Failed to execute on session at org.springframework.integration.file.remote.RemoteFileTemplate.execute(RemoteFileTemplate.java:311) at org.springframework.integration.file.remote.synchronizer.AbstractInboundFileSynchronizer.synchronizeToLocalDirectory(AbstractInboundFileSynchronizer.java:167) ... 21 more Caused by: java.lang.ClassCastException: com.jcraft.jsch.ChannelSftp$LsEntry cannot be cast to java.io.File at org.springframework.integration.file.filters.FileSystemPersistentAcceptOnceFileListFilter.fileName(FileSystemPersistentAcceptOnceFileListFilter.java:28) at org.springframework.integration.file.filters.AbstractPersistentAcceptOnceFileListFilter.buildKey(AbstractPersistentAcceptOnceFileListFilter.java:88) at org.springframework.integration.file.filters.AbstractPersistentAcceptOnceFileListFilter.accept(AbstractPersistentAcceptOnceFileListFilter.java:48) at org.springframework.integration.file.filters.AbstractFileListFilter.filterFiles(AbstractFileListFilter.java:40) at org.springframework.integration.file.filters.CompositeFileListFilter.filterFiles(CompositeFileListFilter.java:97) at org.springframework.integration.file.remote.synchronizer.AbstractInboundFileSynchronizer.filterFiles(AbstractInboundFileSynchronizer.java:157) at org.springframework.integration.file.remote.synchronizer.AbstractInboundFileSynchronizer$1.doInSession(AbstractInboundFileSynchronizer.java:173) at org.springframework.integration.file.remote.synchronizer.AbstractInboundFileSynchronizer$1.doInSession(AbstractInboundFileSynchronizer.java:167) at org.springframework.integration.file.remote.RemoteFileTemplate.execute(RemoteFileTemplate.java:302) ... 22 more

2014-07-17 22:27:14,199 [task-scheduler-1] DEBUG org.springframework.integration.channel.PublishSubscribeChannel - postSend (sent=true) on channel 'errorChannel', message: [Payload MessagingException content=org.springframework.messaging.MessagingException: Problem occurred while synchronizing remote to local directory][Headers={id=84cccecf-20ec-f67b-3f74-a938bf9abf3d, timestamp=1405661234197}]

更新帖子删除 FileSystemPersistentAcceptOnceFileListFilter 过滤器,留下“SftpPersistentAcceptOnceFileListFilter”

2014-07-18 09:29:21,942 [task-scheduler-3] INFO com.jcraft.jsch - Authentications that can continue: publickey,keyboard-interactive,password 2014-07-18 09:29:21,942 [task-scheduler-3] INFO com.jcraft.jsch - Next authentication method: publickey 2014-07-18 09:29:21,942 [task-scheduler-3] INFO com.jcraft.jsch - Authentications that can continue: keyboard-interactive,password 2014-07-18 09:29:21,942 [task-scheduler-3] INFO com.jcraft.jsch - Next authentication method: keyboard-interactive 2014-07-18 09:29:22,022 [task-scheduler-3] INFO com.jcraft.jsch - Authentication succeeded (keyboard-interactive). 2014-07-18 09:29:22,067 [task-scheduler-3] DEBUG org.springframework.data.redis.core.RedisConnectionUtils - Opening RedisConnection 2014-07-18 09:29:22,068 [task-scheduler-3] INFO com.jcraft.jsch - Disconnecting from localhost port 22 2014-07-18 09:29:22,068 [task-scheduler-3] DEBUG org.springframework.integration.channel.PublishSubscribeChannel - preSend on channel 'errorChannel', message: [Payload MessagingException content=org.springframework.messaging.MessagingException: Problem occurred while synchronizing remote to local directory][Headers={id=55304e38-6f82-8350-e763-0f5bcb507788, timestamp=1405700962068}] 2014-07-18 09:29:22,068 [Connect thread localhost session] INFO com.jcraft.jsch - Caught an exception, leaving main loop due to Socket closed 2014-07-18 09:29:22,068 [task-scheduler-3] DEBUG org.springframework.integration.handler.LoggingHandler - (inner bean)#22 received message: [Payload MessagingException content=org.springframework.messaging.MessagingException: Problem occurred while synchronizing remote to local directory][Headers={id=55304e38-6f82-8350-e763-0f5bcb507788, timestamp=1405700962068}] 2014-07-18 09:29:22,069 [task-scheduler-3] ERROR org.springframework.integration.handler.LoggingHandler - org.springframework.messaging.MessagingException: Problem occurred while synchronizing remote to local directory at org.springframework.integration.file.remote.synchronizer.AbstractInboundFileSynchronizer.synchronizeToLocalDirectory(AbstractInboundFileSynchronizer.java:193) at org.springframework.integration.file.remote.synchronizer.AbstractInboundFileSynchronizingMessageSource.receive(AbstractInboundFileSynchronizingMessageSource.java:167) at org.springframework.integration.endpoint.SourcePollingChannelAdapter.receiveMessage(SourcePollingChannelAdapter.java:124) at org.springframework.integration.endpoint.AbstractPollingEndpoint.doPoll(AbstractPollingEndpoint.java:187) at org.springframework.integration.endpoint.AbstractPollingEndpoint.access$000(AbstractPollingEndpoint.java:52) at org.springframework.integration.endpoint.AbstractPollingEndpoint$1.call(AbstractPollingEndpoint.java:146) at org.springframework.integration.endpoint.AbstractPollingEndpoint$1.call(AbstractPollingEndpoint.java:143) at org.springframework.integration.endpoint.AbstractPollingEndpoint$Poller$1.run(AbstractPollingEndpoint.java:278) at org.springframework.integration.util.ErrorHandlingTaskExecutor$1.run(ErrorHandlingTaskExecutor.java:52) at org.springframework.core.task.SyncTaskExecutor.execute(SyncTaskExecutor.java:50) at org.springframework.integration.util.ErrorHandlingTaskExecutor.execute(ErrorHandlingTaskExecutor.java:49) at org.springframework.integration.endpoint.AbstractPollingEndpoint$Poller.run(AbstractPollingEndpoint.java:272) at org.springframework.scheduling.support.DelegatingErrorHandlingRunnable.run(DelegatingErrorHandlingRunnable.java:54) at org.springframework.scheduling.concurrent.ReschedulingRunnable.run(ReschedulingRunnable.java:81) at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:439) at java.util.concurrent.FutureTask$Sync.innerRun(FutureTask.java:303) at java.util.concurrent.FutureTask.run(FutureTask.java:138) at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:98) at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:206) at java.util.concurrent.ThreadPoolExecutor$Worker.runTask(ThreadPoolExecutor.java:895) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:918) at java.lang.Thread.run(Thread.java:695) Caused by: org.springframework.messaging.MessagingException: Failed to execute on session at org.springframework.integration.file.remote.RemoteFileTemplate.execute(RemoteFileTemplate.java:311) at org.springframework.integration.file.remote.synchronizer.AbstractInboundFileSynchronizer.synchronizeToLocalDirectory(AbstractInboundFileSynchronizer.java:167) ... 21 more Caused by: org.springframework.data.redis.RedisConnectionFailureException: Cannot get Jedis connection; nested exception is redis.clients.jedis.exceptions.JedisConnectionException: Could not get a resource from the pool at org.springframework.data.redis.connection.jedis.JedisConnectionFactory.fetchJedisConnector(JedisConnectionFactory.java:97) at org.springframework.data.redis.connection.jedis.JedisConnectionFactory.getConnection(JedisConnectionFactory.java:143) at org.springframework.data.redis.connection.jedis.JedisConnectionFactory.getConnection(JedisConnectionFactory.java:41) at org.springframework.data.redis.core.RedisConnectionUtils.doGetConnection(RedisConnectionUtils.java:128) at org.springframework.data.redis.core.RedisConnectionUtils.getConnection(RedisConnectionUtils.java:91) at org.springframework.data.redis.core.RedisConnectionUtils.getConnection(RedisConnectionUtils.java:78) at org.springframework.data.redis.core.RedisTemplate.execute(RedisTemplate.java:177) at org.springframework.data.redis.core.RedisTemplate.execute(RedisTemplate.java:152) at org.springframework.data.redis.core.AbstractOperations.execute(AbstractOperations.java:84) at org.springframework.data.redis.core.DefaultHashOperations.putIfAbsent(DefaultHashOperations.java:179) at org.springframework.data.redis.core.DefaultBoundHashOperations.putIfAbsent(DefaultBoundHashOperations.java:91) at org.springframework.data.redis.support.collections.RedisProperties.putIfAbsent(RedisProperties.java:228) at org.springframework.integration.redis.metadata.RedisMetadataStore.putIfAbsent(RedisMetadataStore.java:139) at org.springframework.integration.file.filters.AbstractPersistentAcceptOnceFileListFilter.accept(AbstractPersistentAcceptOnceFileListFilter.java:51) at org.springframework.integration.file.filters.AbstractFileListFilter.filterFiles(AbstractFileListFilter.java:40) at org.springframework.integration.file.filters.CompositeFileListFilter.filterFiles(CompositeFileListFilter.java:97) at org.springframework.integration.file.remote.synchronizer.AbstractInboundFileSynchronizer.filterFiles(AbstractInboundFileSynchronizer.java:157) at org.springframework.integration.file.remote.synchronizer.AbstractInboundFileSynchronizer$1.doInSession(AbstractInboundFileSynchronizer.java:173) at org.springframework.integration.file.remote.synchronizer.AbstractInboundFileSynchronizer$1.doInSession(AbstractInboundFileSynchronizer.java:167) at org.springframework.integration.file.remote.RemoteFileTemplate.execute(RemoteFileTemplate.java:302) ... 22 more Caused by: redis.clients.jedis.exceptions.JedisConnectionException: Could not get a resource from the pool at redis.clients.util.Pool.getResource(Pool.java:42) at redis.clients.jedis.JedisPool.getResource(JedisPool.java:84) at redis.clients.jedis.JedisPool.getResource(JedisPool.java:10) at org.springframework.data.redis.connection.jedis.JedisConnectionFactory.fetchJedisConnector(JedisConnectionFactory.java:90) ... 41 more Caused by: redis.clients.jedis.exceptions.JedisConnectionException: java.net.ConnectException: Connection refused at redis.clients.jedis.Connection.connect(Connection.java:150) at redis.clients.jedis.BinaryClient.connect(BinaryClient.java:71) at redis.clients.jedis.BinaryJedis.connect(BinaryJedis.java:1783) at redis.clients.jedis.JedisFactory.makeObject(JedisFactory.java:65) at org.apache.commons.pool2.impl.GenericObjectPool.create(GenericObjectPool.java:819) at org.apache.commons.pool2.impl.GenericObjectPool.borrowObject(GenericObjectPool.java:429) at org.apache.commons.pool2.impl.GenericObjectPool.borrowObject(GenericObjectPool.java:360) at redis.clients.util.Pool.getResource(Pool.java:40) ... 44 more Caused by: java.net.ConnectException: Connection refused at java.net.PlainSocketImpl.socketConnect(Native Method) at java.net.PlainSocketImpl.doConnect(PlainSocketImpl.java:382) at java.net.PlainSocketImpl.connectToAddress(PlainSocketImpl.java:241) at java.net.PlainSocketImpl.connect(PlainSocketImpl.java:228) at java.net.SocksSocketImpl.connect(SocksSocketImpl.java:431) at java.net.Socket.connect(Socket.java:527) at redis.clients.jedis.Connection.connect(Connection.java:144) ... 51 more

2014-07-18 09:29:22,069 [task-scheduler-3] DEBUG org.springframework.integration.channel.PublishSubscribeChannel - postSend (sent=true) on channel 'errorChannel', message: [Payload MessagingException content=org.springframework.messaging.MessagingException: Problem occurred while synchronizing remote to local directory][Headers={id=55304e38-6f82-8350-e763-0f5bcb507788, timestamp=1405700962068}]

最佳答案

Gist Here

希望是不言自明的;它只是在控制台上打印文件名,然后将其删除。

关于java - SFTP Spring 集成的文件轮询和过滤问题,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/24790805/

相关文章:

java - Arraylist 添加对象两次

java - Appium - 查找元素不一致

spring - 将 bean 连接到实现 IInvokedMethodListener 的 TestNG 监听器

java - 使用 Spring MVC 和 AngularJS 的方法 DELETE

java - Spring 批处理 : Remove stack trace of run time exception from exit description

java - 告诉 jaxb 不要删除下划线

java - 如何在JFrame显示后才读取Java中的文件?

Java Spring - 如何处理缺少所需的请求参数

java - 使用 StepExecutionContext/JobExecutionContext 共享大值 Hashmap 的后果

java - 运行应用程序后,Xml 文件不包含任何数据