java - 在池中的 Akka actor 实例之间共享可变数据

标签 java concurrency akka actor

我在互联网上找到的大多数 Akka Material (包括许多 SO 问题)都指出,我们应该在处理它们的 Actor 中维护相关数据。不鼓励使用锁定机制

我正在开发一个基于 Akka Actor 的 Java 服务,该服务维护大量动态数据。以前我使用写时复制机制进行数据更新,但这可能会由于预期的更新率而导致下一版本中的性能问题(特别是高 GC Activity )。

假设我有一个角色 (StockManagerActor) 来管理股票信息。股票价格经常被读取和更新。 (我更喜欢让一个单独的参与者接收更新并提交它们,另一个参与者在需要时读取股票价格。但我不能这样做,因为这将使可变的股票数据在不同类型的参与者之间共享) 因此StockManagerActor 处理两种类型的消息。 UpdateStockMessageGetStockMessage。当我们想到该 Actor 的单个实例正在运行时,一切似乎都很好,因为 Actor 之间没有共享数据。

我担心如果系统中只运行一个StockManagerActor,当股市高度活跃时,它的收件箱可能会迅速增长。因此,我希望有一个 StockManagerActor 池来同时处理消息。但在这种情况下,不同的 Actor 实例将同时执行更新/获取操作。 当 updateStock() 阻塞(在单独的调度程序中)和非阻塞时,对于这种场景有什么好的设计?

StockManagerActor extends UntypedActor{
    StockStroe stockStore;
    // Only StockManagerActors use the StockStore which is
    // Initially populated from outside of actor system. Methods are not
    // thread safe

    public StockManagerActor(){
        stockStore = StockStore.getInstanceFor(this);
    }

    @Override
    public void onReceive(Object message) throws Exception {
        if(message instanceof UpdateStockMessage){
            UpdateStockMessage updateMessage = (UpdateStockMessage)message;
            stockStore.updateStock(updateMessage)
        }else if(message instanceof GetStockMessage){
            GetStockMessage getMessage = (GetStockMessage)message;
            Stock stock = stockStore.getStock(getMessage.getSymbol());
            // stock here is immutable
            generateStockMessageAndSend(stock, getSender());
        }else{
            unhandled(message);
        }
    }

    //... More code
}

最佳答案

解决方案:每个库存一个 Actor - 根据需要创建。

StockActor - 负责维护/更新一只股票的数据。 - 对于每种库存,应在需要时创建一个 StockActor,并在不需要/完成时终止。

StockManagerActor 父 Actor 在需要时创建 StockActor 并监视它们。提供其他统计数据,例如一次运行的 child Actor 数量。

Akka 分片 Akka 分片和集群可以让您无缝地为每个股票创建一个 Actor,并在不需要时钝化它们。 Akka 负责分片功能。所以我们不用担心跑货 Actor 的意外死亡。

如果您不想在崩溃期间丢失 actor 状态,也可以使用 Akka Persistence

这里有一些链接。 Persistence Sharding

关于java - 在池中的 Akka actor 实例之间共享可变数据,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/45087313/

相关文章:

java - 在 getter 中添加到 ArrayList 时出现 ConcurrentModificationException

scala - 处理 apache-camel 中的连接失败

scala - 在 Scala 中,为什么没有 `Future.onComplete` 的实现?

java - 在 Android EditText 中使用样式(粗体、斜体)编写文本

java - 如何创建分布式计算系统?

java - 尝试从 Android 应用程序将录制的视频上传到服务器时出现 FileNotFoundException

go - 并发示例的值不正确

java - Spring 启动 : Conditional on database type

django - cherrypy 如何处理用户线程?

java - 如何将基于 Actor 的源与 Akka Graph 结合使用?