scala - 仅在所有事件持久化后更新 actor 状态

标签 scala akka akka-persistence

在持久化actor的接收方法中,我收到了一堆我想要持久化的事件,只有在所有事件都被持久化之后,我才会再次更新我的状态。我该怎么做?

def receive: Receive = {
  ...
  case NewEvents(events) =>
    persist(events) { singleEvent =>
      // Update state using this single event
    }
    // After every events are persisted, do one more thing
}

请注意,persist() 调用不会阻塞,因此我不能将我的代码紧随其后。


更新:为什么我需要这个

这些新事件来自外部网络服务。我的持久性 actor 需要在其状态中存储最后一个事件 id,当它收到命令时,它将用于后续的 ws 调用。问题是这些命令可能同时出现,所以我需要某种锁定系统:

  • 收到 ws 调用命令:存储下一个命令,直到这个命令完成(即,总而言之,一个 bool 值)
  • 收到来自 ws 的响应:存储它们,更新状态并保存最后一个 id,对存储中的所有命令执行另一个单个 ws 调用(我将命令发送者保存到能够在完成后全部响应)否则不要再隐藏命令。

最佳答案

我还没有尝试过 defer,我最初的解决方案是给自己发送一个 PersistEventsDone 消息。它之所以有效,是因为 persist 方法将存储所有传入消息,直到执行所有事件处理程序。如果进程中出现另一个命令,它是在 PersistEventsDone 之前还是之后并不重要:

def receive: Receive = {
  ...
  case PersistEventsDone =>
    ...
  case NewEvents(events) =>
    persist(events) { singleEvent =>
      // Update state using this single event
    }
    self ! PersistEventsDone
}

defer 在我的例子中有点奇怪,因为它需要一个我不需要的事件。但它看起来仍然比我的解决方案更自然。

关于scala - 仅在所有事件持久化后更新 actor 状态,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/27549494/

相关文章:

Scala - 如何定义引用自身的结构类型?

scala - Untyped vs TypedActors - 为什么要使用 untyped?

java-8 - 等待响应时何时选择使用 Actor Inbox 还是 Futures?

akka - Akka 持久性 Actor 的用例是什么?

scala - Scala中带有接收器的函数类型

scala - 首次运行时无法下载 SBT(Scala 的构建工具)的依赖项

akka - Akka 中受监督的 actor 应该直接接收消息还是通过其监督者接收消息?

scala - 匹配 Akka 中的值类

java - Akka 持久化 cassandra 插件在启动时抛出 NoSuchMethodError

scala - 如何使用 Akka Persistence 保存流式数据