mule - 批量处理一条记录

标签 mule

Mule 批处理不处理单个记录。我们有一个场景,我们将获得包含 1 到 20k 条记录的加密文件。下面是我的 Mule Flow。当我们传递单个记录文件时,进程失败并提示“必须是接口(interface) java.lang.Iterable 类型”。关于如何在收到一个记录文件时使流程正常工作的任何建议。 注意:流程适用于具有多条记录的文件。

Exception stack is:

1.对象“java.util.LinkedHashMap”的类型不正确。它必须是“{interface java.lang.Iterable,interface java.util.Iterator,interface org.mule.routing.MessageSequence,interface java.util.Collection}”类型(java.lang.IllegalArgumentException) org.mule.util.collection.EventToMessageSequenceSplittingStrategy:64 (null) 2. 对象“java.util.LinkedHashMap”的类型不正确。它必须是“{interface java.lang.Iterable,interface java.util.Iterator,interface org.mule.routing.MessageSequence,interface java.util.Collection}”类型 (java.lang.IllegalArgumentException) (com.mulesoft.模块.batch.exception.BatchException) com.mulesoft.module.batch.engine.DefaultBatchEngine:366 (null)


根异常堆栈跟踪: java.lang.IllegalArgumentException:对象“java.util.LinkedHashMap”的类型不正确。它必须是“{interface java.lang.Iterable,interface java.util.Iterator,interface org.mule.routing.MessageSequence,interface java.util.Collection}”类型

<context:property-placeholder location="dev.properties"/>
<encryption:config name="Encryption" defaultEncrypter="PGP_ENCRYPTER" doc:name="Encryption">
<encryption:pgp-encrypter-config publicKeyRingFileName="keys/dev_pgp_wd_ecc_public.key.gpg" secretKeyRingFileName="keys/dev_pgp_wd_ecc_private.key.gpg" secretAliasId="${key.AliasId}" secretPassphrase="${key.Passphrase}" principal="${key.principal}"/>
</encryption:config>

<amqp:connector name="AMQP_Connector" validateConnections="true" host="${amqp.host}" doc:name="AMQP Connector" virtualHost="${amqp.virtualhost}" password="${amqp.password}" port="${amqp.port}" username="${amqp.user}"/>
<http:request-config name="HTTP_Request_PI" host="${pi.endpoint}" port="${pi.port}" doc:name="HTTP Request Configuration" basePath="${pi.path}" responseTimeout="30000">
<http:basic-authentication username="${pi.username}" password="${pi.password}"/>
</http:request-config>
<smtp:connector name="SMTP_Alert" contentType="text/html" validateConnections="true" doc:name="SMTP"/>
<sftp:connector name="SFTP_Inbound_Connector" validateConnections="true" autoDelete="true" pollingFrequency="120000" doc:name="SFTP"/>
<batch:job name="TestBatch" max-failed-records="-1">
<batch:input>
    <sftp:inbound-endpoint connector-ref="SFTP_Inbound_Connector" host="${sftp.host}" port="${sftp.port}" path="/test/incoming/lt" user="${sftp.user}" password="${sftp.password}" responseTimeout="10000" doc:name="SFTP"/>

    <encryption:decrypt config-ref="Encryption" using="PGP_ENCRYPTER" doc:name="Encryption"/>
    <object-to-string-transformer doc:name="Object to String"/>
    <logger message="Decrypted" level="INFO" doc:name="Logger"/>
    <splitter expression="#[xpath3('/wd:Report_Data/wd:Report_Entry', payload, 'NODESET')]" doc:name="Splitter"/>
    <mulexml:dom-to-xml-transformer doc:name="DOM to XML"/>
    <dw:transform-message doc:name="Transform Message" metadata:id="b7cbbbbb-d58b-439b-8684-5a6d0345d48c">
        <dw:input-payload doc:sample="empty.xml"/>
        <dw:set-payload></dw:set-payload>
    </dw:transform-message>
</batch:input>
<batch:process-records>
    <batch:step name="Batch_Step1">
        <json:object-to-json-transformer doc:name="Object to JSON"/>

            <amqp:outbound-endpoint exchangeName="${amqp.exchangeName}" queueName="${amqp.queueName}" responseTimeout="10000" encoding="UTF-8"  connector-ref="AMQP_Connector" doc:name="AMQP"/>

    </batch:step>

</batch:process-records>
<batch:on-complete>
    <logger message="#[flowVars.totalRecords.totalRecords]" level="INFO" doc:name="Logger"/>
    <logger message="#[flowVars.failedReocrds.failedReocrds]" level="INFO" doc:name="Logger"/>            
    <set-payload value="${mail.html}" doc:name="Set Payload" mimeType="text/html"/>
    <smtp:outbound-endpoint host="${mail.host}" port="${mail.port}" user="${mail.user}" password="${mail.password}" connector-ref="SMTP_Alert" to="${mail.receiver}" from="${mail.from}" subject="${IntegrationName}" responseTimeout="10000" doc:name="SMTP"/>
</batch:on-complete>

最佳答案

我在拆分器之后包含了集合聚合器,现在批处理适用于一个记录场景。

<collection-aggregator failOnTimeout="true" doc:name="Collection Aggregator"/>

关于mule - 批量处理一条记录,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/34389686/

相关文章:

Mule 中的加密转换器

java - cxf运行时异常未知客户端异常

java - 使用 mule esb 中的 servlet 从项目获取 java 转换器中的文件路径

email - 骡子 ESB : how to filter emails based on subject or sender?

mule - 我在哪里可以下载 MuleSoft 社区版?

unit-testing - 在 Mule 中的流单元测试中模拟过滤器

java - SQL异常仅发生在三台服务器之一上

java - XSLT 转换不起作用

java - Mule Dataweave 日期时间从完整日期时间转换为短日期时间

java - Streaming 如何为 Mule 数据库连接器工作?