我在理解 Akka Persistence Query 时遇到问题,尤其是 eventsByTag 方法,因为它的行为与我预期的不同。
在我的主类中,我调用一个类来开始监听使用特定标记保存的任何事件。
class CassandraJournal(implicit val system: ActorSystem) {
def engageStreaming = {
val readJournal = PersistenceQuery(system).readJournalFor[CassandraReadJournal](CassandraReadJournal.Identifier)
implicit val mat = ActorMaterializer()
readJournal.eventsByTag("account", Offset.noOffset)
.runForeach { event => println(event) }
}
}
每当我启动服务器并且事件存储为空并且我保留第一个事件(通过调用 Akka HTTP 中内置的 http 服务)时,该事件确实会被打印。但是,当我重新启动服务器并且事件存储中已有事件时,新的持久事件将不会被打印。
这有解释吗?我很难弄清楚为什么会发生这种情况。
编辑
我使用的事件存储是 Cassandra。这是 PersistentActor(我没有使用事件适配器来标记事件,只是将它们包装在 Tagged() 周围)
class Account(id: UUID) extends PersistentActor {
override def receiveRecover: Receive = {
case createCheckingsAccount: CreateCheckingsAccount =>
println("Creating checkings account")
}
override def receiveCommand: Receive = {
case createCheckingsAccount: CreateCheckingsAccount =>
persist(Tagged(CheckingsAccountCreated(id), Set("account"))) { event =>
val checkingsAccountCreatedEvent = event.payload.asInstanceOf[CheckingsAccountCreated]
sender ! CreateCheckingsAccountResponse(checkingsAccountCreatedEvent.id.toString)
}
}
def updateState(evt: Event): Unit = {
}
override def persistenceId: String = s"account-$id"
}
最佳答案
如果 receiveRecover
没有执行必要的状态恢复工作,持久性将无法正常工作。我建议在 receiveRecover
中放置一些基本的状态恢复逻辑,并让您的 updateState
方法也涵盖标记的事件案例。
我在一个应用程序中使用了 eventsByTag
,其状态恢复逻辑类似于以下内容,并且在重新启动和恢复时都运行良好。
def updateState(e: Any): Unit = e match {
case evt: Event =>
state = state.updated(evt)
case Tagged(evt: Event, _) =>
state = state.updated(evt)
}
...
override def receiveRecover: Receive = {
case evt: Event => updateState(evt)
case taggedEvt: Tagged => updateState(taggedEvt)
}
关于scala - 为什么 Akka Persisence Query Read Journal 无法检索我的事件?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/43171143/