java - 集成模式 : how to sync processing message received from multiple systems

标签 java design-patterns apache-camel spring-integration integration-patterns

我正在构建一个系统,该系统将通过消息代理(目前为 JMS)从不同系统接收消息。来自所有发送方系统的所有消息都有一个 deviceId,消息的接收没有顺序。 例如,系统 A 可以发送 deviceId=1 的消息,系统 b 可以发送 deviceId=2 的消息。

我的目标是不开始处理关于同一个 deviceId 的消息,除非我从具有相同 deviceId 的所有发件人那里收到所有消息。

例如,如果我有 3 个系统 A、B 和 C 向我的系统发送消息:

System A sends messageA1 with deviceId=1
System B sends messageB1 with deviceId=1
System C sends messageC1 with deviceId=3
System C sends messageC2 with deviceId=1 <--- here I should start processing of messageA1, messageB1 and messageC2 because they are having the same deviceID 1.

是否应该通过在我的系统中使用某种同步机制、消息代理或像 spring-integration/apache camel 这样的集成框架来解决这个问题?

最佳答案

聚合器的类似解决方案(@Artem Bilan 提到的)也可以在 Camel 中使用自定义 AggregationStrategy 实现并通过使用 Exchange.AGGREGATION_COMPLETE_CURRENT_GROUP 控制聚合器完成属性(property)。

以下内容可能是一个很好的起点。 ( You can find the sample project with tests here )

路线:

from("direct:start")
    .log(LoggingLevel.INFO, "Received ${headers.system}${headers.deviceId}")
    .aggregate(header("deviceId"), new SignalAggregationStrategy(3))
    .log(LoggingLevel.INFO, "Signaled body: ${body}")
    .to("direct:result");

信号聚合策略.java

public class SignalAggregationStrategy extends GroupedExchangeAggregationStrategy implements Predicate {

    private int numberOfSystems;

    public SignalAggregationStrategy(int numberOfSystems) {
        this.numberOfSystems = numberOfSystems;
    }

    @Override
    public Exchange aggregate(Exchange oldExchange, Exchange newExchange) {
        Exchange exchange = super.aggregate(oldExchange, newExchange);

        List<Exchange> aggregatedExchanges = exchange.getProperty("CamelGroupedExchange", List.class);

        // Complete aggregation if we have "numberOfSystems" (currently 3) different messages (where "system" headers are different)
        // https://github.com/apache/camel/blob/master/camel-core/src/main/docs/eips/aggregate-eip.adoc#completing-current-group-decided-from-the-aggregationstrategy
        if (numberOfSystems == aggregatedExchanges.stream().map(e -> e.getIn().getHeader("system", String.class)).distinct().count()) {
            exchange.setProperty(Exchange.AGGREGATION_COMPLETE_CURRENT_GROUP, true);
        }

        return exchange;
    }

    @Override
    public boolean matches(Exchange exchange) {
        // make it infinite (4th bullet point @ https://github.com/apache/camel/blob/master/camel-core/src/main/docs/eips/aggregate-eip.adoc#about-completion)
        return false;
    }
}

希望对您有所帮助!

关于java - 集成模式 : how to sync processing message received from multiple systems,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/46254511/

相关文章:

java - 如何保存和重用 Eclipse 设置?

java - 如何使用 Spring Roo 创建基础域对象?

java - 命令模式与反射

java - 如何使用camel "direct-vm"在两个camel上下文之间进行通信?

java - 使用 Camel Bean validator 组件

java - 使用 netsh 从 Java 访问 Wi-Fi 网络接口(interface)详细信息

java - 如何将随机整数按数字从最小到最大排序?

design-patterns - 在 MVVM 模式中,模型和 View 什么时候直接相互通信?

design-patterns - 使用 CQRS 在单个命令处理程序中保留多个内容

java - Camel 上下文更改未反射(reflect)在 hawtio 中