我正在尝试创建一个命令处理程序,将命令视为多个子命令。每个子命令都会生成一个事件(然后该事件应该更新聚合的状态)。每个子命令的处理依赖于最新的聚合状态(来自前一个子命令)。
例如,考虑以下聚合:
package axon.poc
import org.axonframework.commandhandling.CommandHandler
import org.axonframework.eventsourcing.EventSourcingHandler
import org.axonframework.modelling.command.AggregateIdentifier
import org.axonframework.modelling.command.AggregateLifecycle
import org.axonframework.spring.stereotype.Aggregate
import org.slf4j.LoggerFactory
import java.util.UUID
@Aggregate
class Aggregate() {
companion object {
private val logger = LoggerFactory.getLogger(Aggregate::class.java)
}
@AggregateIdentifier
internal var aggregateId: UUID? = null
private var value: Int = 0
@CommandHandler
constructor(command: Command): this() {
logger.info("generating create event")
var applyMore = AggregateLifecycle.apply(CreatedEvent(command.aggregateId))
for (i in 0 until command.value) {
applyMore = applyMore.andThenApply {
logger.info("generating update event: ${value+1}")
UpdatedEvent(command.aggregateId, value+1)
}
}
logger.info("completed command handler")
}
@EventSourcingHandler
fun on(event: CreatedEvent) {
logger.info("event sourcing handler: $event")
this.aggregateId = event.aggregateId
this.value = 0
}
@EventSourcingHandler
fun on(event: UpdatedEvent) {
logger.info("event sourcing handler: $event")
this.value = event.value
}
}
当此代码处理 Command(value = 2)
时,它会生成
[main] INFO org.axonframework.spring.stereotype.Aggregate - generating create event
[main] INFO org.axonframework.spring.stereotype.Aggregate - completed command handler
[main] INFO org.axonframework.spring.stereotype.Aggregate - event sourcing handler: CreatedEvent(aggregateId=65a7a461-61bb-451f-b2d9-8460994eeb1a)
[main] INFO org.axonframework.spring.stereotype.Aggregate - generating update event: 1
[main] INFO org.axonframework.spring.stereotype.Aggregate - generating update event: 1
[main] INFO org.axonframework.spring.stereotype.Aggregate - event sourcing handler: UpdatedEvent(aggregateId=65a7a461-61bb-451f-b2d9-8460994eeb1a, value=1)
[main] INFO org.axonframework.spring.stereotype.Aggregate - event sourcing handler: UpdatedEvent(aggregateId=65a7a461-61bb-451f-b2d9-8460994eeb1a, value=1)
在执行 applyMore
之前正在处理第一个事件 (CreatedEvent)。但是,即使 applyMore
已链接,UpdatedEvent
也不会由事件源处理程序处理,直到两者都生成为止。
我期待(并希望):
[main] INFO org.axonframework.spring.stereotype.Aggregate - generating create event
[main] INFO org.axonframework.spring.stereotype.Aggregate - completed command handler
[main] INFO org.axonframework.spring.stereotype.Aggregate - event sourcing handler: CreatedEvent(aggregateId=65a7a461-61bb-451f-b2d9-8460994eeb1a)
[main] INFO org.axonframework.spring.stereotype.Aggregate - generating update event: 1
[main] INFO org.axonframework.spring.stereotype.Aggregate - event sourcing handler: UpdatedEvent(aggregateId=65a7a461-61bb-451f-b2d9-8460994eeb1a, value=1)
[main] INFO org.axonframework.spring.stereotype.Aggregate - generating update event: 2
[main] INFO org.axonframework.spring.stereotype.Aggregate - event sourcing handler: UpdatedEvent(aggregateId=65a7a461-61bb-451f-b2d9-8460994eeb1a, value=2)
这是 Axon 的错误吗?或者我是否误解了应该如何使用它?如何原子地处理多个“命令”? IE。全部通过或全部失败。
最佳答案
TLDR;这是一个错误。
您遇到了一种仅发生在构造函数中的非常具体的情况。从 Axon 的角度来看,挑战在于您需要一个实例来应用事件。但是,该实例仅在构造函数完成后才可用。
andThenApply
函数正是为此目的而提供的(并且您正确使用了它)。但是,在您的情况下,代码(错误地)评估得太早了。我必须在本地运行您的代码并进行调试,以找出到底发生了什么。
根本原因是 AnnotatedAggregate
的 andThenApply
实现调用 apply
,而不是 publish
。前者将看到它当前正在执行延迟的任务,并安排在这些任务结束时实际发布。下一个任务执行相同的操作。因此,这两个事件最终都会先创建,然后在创建后发布。
您是否有兴趣将此问题作为问题提交到 Axon issue tracker 中? ?这样,积分就会到达它们所属的地方。
关于轴突框架: only first event is applied during command handler,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/58425444/