csv - 如何在 Mule 中读取巨大的 CSV 文件

标签 csv mule mule-studio

我正在使用 Mule Studio 3.4.0 社区版。
我有一个关于如何解析通过 File Endpoint 传入的大型 CSV 文件的大问题。场景是我有 3 个 CSV 文件,我会将文件的内容放入数据库中。
但是当我尝试加载一个巨大的文件(大约 144MB)时,我得到了“OutOfMemory”异常。我认为作为将我的大 CSV 划分/拆分为较小尺寸的 CSV 的解决方案(我不知道这个解决方案是否是最好的)或尝试找到一种处理 CSV 的方法而不会引发异常。

<file:connector name="File" autoDelete="true" streaming="true" validateConnections="true" doc:name="File"/>

<flow name="CsvToFile" doc:name="CsvToFile">
        <file:inbound-endpoint path="src/main/resources/inbox" moveToDirectory="src/main/resources/processed"  responseTimeout="10000" doc:name="CSV" connector-ref="File">
            <file:filename-wildcard-filter pattern="*.csv" caseSensitive="true"/>
        </file:inbound-endpoint>
        <component class="it.aizoon.grpBuyer.AddMessageProperty" doc:name="Add Message Property"/>
        <choice doc:name="Choice">
            <when expression="INVOCATION:nome_file=azienda" evaluator="header">
                <jdbc-ee:csv-to-maps-transformer delimiter="," mappingFile="src/main/resources/companies-csv-format.xml" ignoreFirstRecord="true" doc:name="CSV2Azienda"/>
                <jdbc-ee:outbound-endpoint exchange-pattern="one-way" queryKey="InsertAziende" queryTimeout="-1" connector-ref="jdbcConnector" doc:name="Database Azienda">
                    <jdbc-ee:query key="InsertAziende" value="INSERT INTO aw006_azienda VALUES (#[map-payload:AW006_ID], #[map-payload:AW006_ID_CLIENTE], #[map-payload:AW006_RAGIONE_SOCIALE])"/>
                </jdbc-ee:outbound-endpoint>
            </when>
            <when expression="INVOCATION:nome_file=servizi" evaluator="header">
                <jdbc-ee:csv-to-maps-transformer delimiter="," mappingFile="src/main/resources/services-csv-format.xml" ignoreFirstRecord="true" doc:name="CSV2Servizi"/>
                <jdbc-ee:outbound-endpoint exchange-pattern="one-way" queryKey="InsertServizi" queryTimeout="-1" connector-ref="jdbcConnector" doc:name="Database Servizi">
                    <jdbc-ee:query key="InsertServizi" value="INSERT INTO ctrl_aemd_unb_servizi VALUES (#[map-payload:CTRL_ID_TIPO_OPERAZIONE], #[map-payload:CTRL_DESCRIZIONE], #[map-payload:CTRL_COD_SERVIZIO])"/>
                </jdbc-ee:outbound-endpoint>
            </when>
            <when expression="INVOCATION:nome_file=richiesta" evaluator="header">
                <jdbc-ee:csv-to-maps-transformer delimiter="," mappingFile="src/main/resources/requests-csv-format.xml" ignoreFirstRecord="true" doc:name="CSV2Richiesta"/>
                <jdbc-ee:outbound-endpoint exchange-pattern="one-way" queryKey="InsertRichieste" queryTimeout="-1" connector-ref="jdbcConnector" doc:name="Database Richiesta">
                    <jdbc-ee:query key="InsertRichieste" value="INSERT INTO ctrl_aemd_unb_richiesta VALUES (#[map-payload:CTRL_ID_CONTROLLER], #[map-payload:CTRL_NUM_RICH_VENDITORE], #[map-payload:CTRL_VENDITORE], #[map-payload:CTRL_CANALE_VENDITORE], #[map-payload:CTRL_CODICE_SERVIZIO], #[map-payload:CTRL_STATO_AVANZ_SERVIZIO], #[map-payload:CTRL_DATA_INSERIMENTO])"/>
                </jdbc-ee:outbound-endpoint>
            </when>
        </choice>   
    </flow>

拜托,我不知道如何解决这个问题。
在此先感谢您的任何帮助

最佳答案

正如 SteveS 所说,csv-to-maps-transformer可能会尝试在处理之前将整个文件加载到内存中。您可以尝试做的是将 csv 文件分成更小的部分并将这些部分发送到 VM要单独处理。
首先,创建一个组件来实现这第一步:

public class CSVReader implements Callable{
    @Override
    public Object onCall(MuleEventContext eventContext) throws Exception {

        InputStream fileStream = (InputStream) eventContext.getMessage().getPayload();
        DataInputStream ds = new DataInputStream(fileStream);
        BufferedReader br = new BufferedReader(new InputStreamReader(ds));

        MuleClient muleClient = eventContext.getMuleContext().getClient();

        String line;
        while ((line = br.readLine()) != null) {
            muleClient.dispatch("vm://in", line, null);
        }

        fileStream.close();
        return null;
    }
}

然后,将您的主要流程一分为二
<file:connector name="File" 
    workDirectory="yourWorkDirPath" autoDelete="false" streaming="true"/>

<flow name="CsvToFile" doc:name="Split and dispatch">
    <file:inbound-endpoint path="inboxPath"
        moveToDirectory="processedPath" pollingFrequency="60000"
        doc:name="CSV" connector-ref="File">
        <file:filename-wildcard-filter pattern="*.csv"
            caseSensitive="true" />
    </file:inbound-endpoint>
    <component class="it.aizoon.grpBuyer.AddMessageProperty" doc:name="Add Message Property" />
    <component class="com.dgonza.CSVReader" doc:name="Split the file and dispatch every line to VM" />
</flow>

<flow name="storeInDatabase" doc:name="receive lines and store in database">
    <vm:inbound-endpoint exchange-pattern="one-way"
        path="in" doc:name="VM" />
    <Choice>
        .
        .
        Your JDBC Stuff
        .
        .
    <Choice />
</flow>

维护您当前的file-connector配置以启用流。使用此解决方案,可以处理 csv 数据,而无需先将整个文件加载到内存中。
HTH

关于csv - 如何在 Mule 中读取巨大的 CSV 文件,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/16395665/

相关文章:

linux - 在 bash (Linux) 中从一个 csv 中查找另一个 csv 中的值(如 vlookup)

java - 获取属性中的 session 变量

http - Mule - HTTP Post - 超过超时

java - 如何处理在Mule中扩展AbstractMessageTransformer的Java类中的FileNotFoundException?

mule - 如何在 Munit 中附加文件以进行功能测试用例 - Mule ESB

csv - 如何在 pig 中使用CSVLoader或CSVExcelStorage用双引号上载csv数据

python - 使用 python 中的列表查找和替换 csv 字符串

c# - 从 CSV 读取数据到屏幕输出

ssl - 连接到外部 https 网络服务时出错

mule - JSON对象中的关键存在(Mule 4)