scala - 为什么 Akka Persisence Query Read Journal 无法检索我的事件?

标签 scala akka akka-persistence

我在理解 Akka Persistence Query 时遇到问题,尤其是 eventsByTag 方法,因为它的行为与我预期的不同。

在我的主类中,我调用一个类来开始监听使用特定标记保存的任何事件。

class CassandraJournal(implicit val system: ActorSystem) {

 def engageStreaming = {
   val readJournal = PersistenceQuery(system).readJournalFor[CassandraReadJournal](CassandraReadJournal.Identifier)
   implicit val mat = ActorMaterializer()

   readJournal.eventsByTag("account", Offset.noOffset)
     .runForeach { event => println(event) }
   }
}

每当我启动服务器并且事件存储为空并且我保留第一个事件(通过调用 Akka HTTP 中内置的 http 服务)时,该事件确实会被打印。但是,当我重新启动服务器并且事件存储中已有事件时,新的持久事件将不会被打印。

这有解释吗?我很难弄清楚为什么会发生这种情况。

编辑

我使用的事件存储是 Cassandra。这是 PersistentActor(我没有使用事件适配器来标记事件,只是将它们包装在 Tagged() 周围)

class Account(id: UUID) extends PersistentActor {

  override def receiveRecover: Receive = {
    case createCheckingsAccount: CreateCheckingsAccount =>
      println("Creating checkings account")
  }

  override def receiveCommand: Receive = {
    case createCheckingsAccount: CreateCheckingsAccount =>
      persist(Tagged(CheckingsAccountCreated(id), Set("account"))) { event =>
        val checkingsAccountCreatedEvent = event.payload.asInstanceOf[CheckingsAccountCreated]
        sender ! CreateCheckingsAccountResponse(checkingsAccountCreatedEvent.id.toString)
      }

  }

  def updateState(evt: Event): Unit = {
  }

  override def persistenceId: String = s"account-$id"
}

最佳答案

如果 receiveRecover 没有执行必要的状态恢复工作,持久性将无法正常工作。我建议在 receiveRecover 中放置一些基本的状态恢复逻辑,并让您的 updateState 方法也涵盖标记的事件案例。

我在一个应用程序中使用了 eventsByTag ,其状态恢复逻辑类似于以下内容,并且在重新启动和恢复时都运行良好。

def updateState(e: Any): Unit = e match {
  case evt: Event =>
    state = state.updated(evt)
  case Tagged(evt: Event, _) =>
    state = state.updated(evt)
}

...

override def receiveRecover: Receive = {
  case evt: Event => updateState(evt)
  case taggedEvt: Tagged => updateState(taggedEvt)
}

关于scala - 为什么 Akka Persisence Query Read Journal 无法检索我的事件?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/43171143/

相关文章:

scala - 验证 Akka Persistence 命令的最佳实践

scala - 将一对列表减少到键映射及其聚合计数的惯用方法?

python - 访问 WrappedArray 元素

scala - Scala 中的匿名递归函数

java - 为什么在 IDEA 中创建的默认 Play 应用程序在启动时崩溃,并将 Akka 2.3.3 声明为依赖项?

unit-testing - 生成的 akka 持久性 actor 测试事件

scala - Play框架重启时如何重新加载akka调度程序

scala - 如何捕获重复键值违规的光滑 postgres 异常

java - Akka:测试主管推荐

scala - 值(value) !不是 Actor 的成员