Akka 的 Java 聚合器

标签 java akka aggregation integration-patterns

我正在尝试实现 Java Aggregator对于 Akka ,since it doesn't look like Java API 支持它们(为什么不!?)

这是迄今为止我最好的尝试:

// Groovy pseudo-code
abstract class Aggregator<T> extends UntypedActor {
    ActorRef recipient
    Set<T> aggregation
    // TODO: Timer timer (?)

    abstract boolean isAggregated()

    @Override
    void onReceive(Object message) {
        aggregation << message as T

        if(isAggregated()) {
            recipient.tell(new Aggregation(aggregation))    // again, pseudo-code
            aggregation.clear()
            // TODO: timer.reset()
        }
    }
}

缺少的是某种Timer 构造,如果Aggregator 尚未聚合,它会在60 秒后超时。超时时,它应该抛出某种异常。在聚合时,应该重置计时器。有什么想法可以做到这一点吗?

最佳答案

您正在寻找的是ReceiveTimeout。 Akka 提供了一个功能,当特定参与者在预定义的时间内没有收到任何内容时,就会超时。

在 Java 中,你可以在 actor 中执行类似的操作:

getContext().setReceiveTimeout(Duration.create("1 second"));

当此触发器时,它会向参与者发送一条 ReceiveTimeout 类型的消息,然后您可以决定要执行的操作(异常、日志记录、重置...)。

您可以在“接收超时”部分下找到更多信息:http://doc.akka.io/docs/akka/snapshot/java/untyped-actors.html

另一方面,github 上有一些开源库可以做这些事情。看看https://github.com/sksamuel/akka-patterns了解更多示例。

关于Akka 的 Java 聚合器,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/30934731/

相关文章:

java - 如何指定这些VM参数?

java - 在线程上使用 getState() 方法 - Java

java - 无法在 thymeleaf 中 Autowiring TemplateEngine?

java - Akka:如何找到集群中的当前节点?

java - 使用 mongodb 查找,同时使用 java 中的吗啡迭代列表

javax.faces.el.E​​valuationException : java. lang.NullPointerException

java - 理解 akka hello-world 示例

scala - 如何在 Actor 中包装 REST API 客户端

python - 通过 pd.TimeGrouper ('D' 按天聚合每小时时间序列;问题@时间戳00 :00:00 (hour 24)

java - 关联、组合和聚合 - 用 java 实现