Mule 流程设计以处理从远程位置读取的大文件

标签 mule sftp

我有一个包含以下步骤的流程:

1) 从源SFTP 服务器中选择一个文件

2) 拷贝到本地

3) 使用本地存储中的副本处理文件

4) 将处理后的文件(将被转换)放入目标 SFTP 服务器

5) 将源 SFTP 中的文件移动到源 SFTP 服务器上的不同文件夹中(我找不到执行此操作的方法,因此我从临时位置复制回 SFTP processed 文件夹)

这似乎是一个标准的工作流程,但是我找不到任何关于如何在 Mule 中具体实现它的建议。

Flow image

我当前的实现描述如下:

<file:connector name="tempFile" workDirectory="${temp.file.location}/work"
    workFileNamePattern="#[message.inboundProperties.originalFilename]"
    autoDelete="true" streaming="false" validateConnections="true"
    doc:name="File" />

<sftp:connector name="InputSFTP" validateConnections="true" keepFileOnError="true" doc:name="SFTP" >
    <reconnect frequency="${reconnectfrequency}" count="5"/>
</sftp:connector>

<sftp:connector name="DestinationSFTP" validateConnections="true" pollingFrequency="30000" doc:name="SFTP">
    <reconnect frequency="${reconnectfrequency}" count="5"/>
</sftp:connector>
<smtp:gmail-connector name="Gmail" contentType="text/plain" validateConnections="true" doc:name="Gmail"/>


<flow name="DownloadFTPFileIntoLocalFlow" processingStrategy="synchronous" tracking:enable-default-events="true">
    <sftp:inbound-endpoint connector-ref="InputSFTP" host="${source.host}" port="22" path="${source.path}" user="${source.username}" 
    password="${source.password}" responseTimeout="90000" pollingFrequency="120000" sizeCheckWaitTime="1000" doc:name="InputSFTP" autoDelete="true">
        <file:filename-regex-filter pattern="[Z].*\.csv" caseSensitive="false" />
    </sftp:inbound-endpoint>
    <file:outbound-endpoint path="${temp.file.location}" responseTimeout="10000" doc:name="Templocation" outputPattern="#[message.inboundProperties.originalFilename]" connector-ref="tempFile" />
    <exception-strategy ref="Default_Exception_Strategy" doc:name="Reference Exception Strategy"/>
</flow>
<flow name="ProcessCSVFlow" processingStrategy="synchronous" tracking:enable-default-events="true">
    <file:inbound-endpoint path="${temp.file.location}" connector-ref="tempFile" pollingFrequency="180000" fileAge="10000" responseTimeout="10000" doc:name="TempFileLocation"/>
    <transformer ref="enrichWithHeaderAndEndOfFileTransformer" doc:name="headerAndEOFEnricher" />
    <set-variable variableName="outputfilename" value="#['Mercury'+server.dateTime.year+server.dateTime.month+server.dateTime.dayOfMonth+server.dateTime.hours +server.dateTime.minutes+server.dateTime.seconds+'.csv']" doc:name="outputfilename"/>
    <sftp:outbound-endpoint exchange-pattern="one-way" connector-ref="DestinationSFTP" host="${destination.host}" port="22" responseTimeout="10000" doc:name="DestinationSFTP" 
    outputPattern="#[outputfilename]" path="${destination.path}" user="${destination.username}" password="${destination.password}"/>
    <gzip-compress-transformer/>
    <sftp:outbound-endpoint exchange-pattern="one-way" connector-ref="InputSFTP" host="${source.host}" port="22" responseTimeout="10000" doc:name="SourceArchiveSFTP" 
    outputPattern="#[outputfilename].gzip" path="Archive" user="${source.username}" password="${source.password}"/>
    <set-payload value="Hello world" doc:name="Set Payload"/>
    <smtp:outbound-endpoint host="${smtp.host}" port="${smtp.port}" user="${smtp.from.address}" password="${smtp.from.password}" 
                            to="${smtp.to.address}" from="${smtp.from.address}" subject="${mail.success.subject}" responseTimeout="10000" 
                            doc:name="SuccessEmail" connector-ref="Gmail"/>
    <logger message="Process completed successfully" level="INFO" doc:name="Logger"/>
    <tracking:transaction id="#[server.dateTime]"/>
    <exception-strategy ref="Default_Exception_Strategy" doc:name="Reference Exception Strategy"/>

</flow>
<catch-exception-strategy name="Default_Exception_Strategy">
    <logger message="Exception has occured Payload is #[payload] and Message is #[message]" level="ERROR" doc:name="Logger"/>
    <!-- <smtp:outbound-endpoint host="localhost" responseTimeout="10000" doc:name="Failure Email"/> -->
</catch-exception-strategy>

最佳答案

您是否尝试过在 SFTP 连接器上启用 autoDelete="true"以强制删除?

此外,是否无法执行流程 1:SFTP 输入 -> 转换 -> 文件输出,流程 2:文件输入 -> SFTP 输出?

HTH

关于Mule 流程设计以处理从远程位置读取的大文件,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/29784308/

相关文章:

mule - Mule FileMessageReceiver 问题 - WARN 问题

soap - mule esb web 服务使用者 SOAP - 错误 : Prefix not specified

java - 如何使用 Java 压缩 SFTP 或 FTP 服务器中的文件?

file - Mule:JUnit 测试不适用于文件端点

unit-testing - Mule Munit 测试用例不适用于 APIKit 路由器流

mule - 在 Dataweave 中将 TEXT 文件转换为 JSON

正则表达式使用文件名 shell 脚本过滤文件

powershell - 使用PowerShell中的WinSCP .NET程序集从FTP/SFTP服务器上的文件夹集中下载文件和电子邮件结果

Linux:通过 ssh 连接自动传输 sftp 文件

sftp - lftp镜像错误返回码