Mulesoft 与使用 CDC 的 Salesforce Streaming API

标签 mule esb cometd data-stream change-data-capture

我正在开发 Mule API 流来测试 Salesforce 事件流。我已设置连接器并订阅了流媒体 channel 。

当我创建/更新/删除联系人记录时,这工作得很好,事件通过,我通过将它们添加到另一个数据库来处理它们。

enter image description here

我对 replayId 功能有点困惑。使用当前设置,我可以关闭 Mule 应用程序,在组织中创建联系人,然后当我使应用程序重新上线时,它会通过从中断处添加数据来恢复。完美。

但是,我试图模拟如果 mule 应用程序在处理事件时崩溃会发生什么。

我运行了一些 APEX 来创建 100 条随机联系人记录。当我看到它在我的应用程序中记录第一个流程时,我就终止了 mule 应用程序。我的假设是,当我恢复应用程序时,它会知道它在哪里停止,就好像它在创建联系人之前处于离线状态一样,就像之前的测试一样。

我注意到它只处理在我关闭应用程序之前通过的少数联系人。

看来事件在流输入中传入的速度可能非常快,以至于它已经到达流中的最后一个 replayId。但是,由于这些记录尚未添加到我的外部数据库中,因此我正在丢失这些记录。流做了它应该做的事情,但由于应用程序仍在处理批量工作,我的 100 条记录没有像 replayId 反射(reflect)的那样被提交。

如何解决这个问题,以便在应用程序崩溃之前存在大量数据流时不会丢失数据?我记得对于 Kafka,一旦将 id 插入数据库,您就必须能够commit id,以便它知道您正式处理的最后一个 ID。 Mule 中是否有这样一个概念,我可以告诉它我在哪里正式停止并致力于数据库?

最佳答案

协议(protocol) (CometD) 级别的可靠性意味着许多属性。其中最主要的是订户已收到消息的事务性 ACK(确认)。 CometD 支持 ACK 作为扩展。 Salesforce 的 CometD 实现不支持 ACK。即使确实如此,您仍然会拥有 issues ...但风险的频率/损失可能会更低。

就您的情况而言,您必须设计一个解决方案,该解决方案相当于查找和重播未提交到目标数据库的事件。您可以在 Mule 中使用自定义代码或接线适配器来完成此操作。不保证连续事件的重播 ID 值是连续的,但它们将被排序。重播 ID 为 100 的事件 A 将紧随其后的是重播 ID 为 200 的事件 B。

您需要在数据库中存储重播 ID 值。然后,您可以在重新订阅时(订阅者失败后)使用它从 SF 检索数据库中丢失的事件。仅当故障窗口足够小时,这才有效。对于标准平台事件许可证,Salesforce 事件保留窗口目前为 24 小时。更高级别的许可证允许更长时间的保留。

根据数据量、事件频率和其他过程参数,您可以使用 Heroku Connect 开箱即用地获取所有这些信息。 。它确实意味着 Heroku 上的 Postgres DB + HC 的许可成本和运营成本,但我们大多数处于类似情况的客户都认为这是值得的。

关于Mulesoft 与使用 CDC 的 Salesforce Streaming API,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/63826451/

相关文章:

jboss - 请 Talend Open Studio ESB 用户反馈(相对于 jbossesb/mule/servicemix)

c# - nservicebus启动错误

java - 使用 websocket spring 初始化进行长轮询回退

java - Cometd如何动态创建和订阅 channel ?

esb - Mule 功能测试 - 完全糊涂

java - 如何在 Mule Dataweave 转换中使用正则表达式替换特殊字符?

ssl - 如何为独立的 Mule 应用程序启用 TLSv1.1

http - 在 Mule 中使用 Jersey 实现文件上传时出现不支持的媒体类型 (415) 错误

java - 骡子 ESB : xml to maps transformer produces empty hashmaps

tomcat - Tomcat 上的 Cometd