我想了解基于 CQRS 的系统中命令处理程序、聚合、存储库和事件存储之间关系的一些细节。
到目前为止我所理解的:
到现在为止还挺好。
现在有一些我还没有得到的问题:
最佳答案
以下内容基于我自己的经验以及我对 Lokad.CQRS、NCQRS 等各种框架的实验。我确信有多种方法可以处理这个问题。我会发布对我最有意义的内容。
1.聚合创建:
每次命令处理程序需要聚合时,它都会使用存储库。存储库从事件存储中检索相应的事件列表并调用重载的构造函数,注入(inject)事件
var stream = eventStore.LoadStream(id)
var User = new User(stream)
如果聚合之前不存在,则流将为空,并且新创建的对象将处于其原始状态。您可能希望确保在此状态下仅允许少数命令使聚合生效,例如User.Create()
.2.新事件的存储
命令处理发生在工作单元内。在命令执行期间,每个生成的事件都将添加到聚合内的列表中 (
User.Changes
)。执行完成后,更改将附加到事件存储中。在下面的示例中,这发生在以下行中:store.AppendToStream(cmd.UserId, stream.Version, user.Changes)
3. 事件顺序想象一下,如果两个后续
CustomerMoved
事件以错误的顺序重播。一个例子
我将尝试用一段伪代码来说明(我故意将存储库问题留在命令处理程序中以显示幕后会发生什么):
申请服务:
UserCommandHandler
Handle(CreateUser cmd)
stream = store.LoadStream(cmd.UserId)
user = new User(stream.Events)
user.Create(cmd.UserName, ...)
store.AppendToStream(cmd.UserId, stream.Version, user.Changes)
Handle(BlockUser cmd)
stream = store.LoadStream(cmd.UserId)
user = new User(stream.Events)
user.Block(string reason)
store.AppendToStream(cmd.UserId, stream.Version, user.Changes)
聚合:User
created = false
blocked = false
Changes = new List<Event>
ctor(eventStream)
isNewEvent = false
foreach (event in eventStream)
this.Apply(event, isNewEvent)
Create(userName, ...)
if (this.created) throw "User already exists"
isNewEvent = true
this.Apply(new UserCreated(...), isNewEvent)
Block(reason)
if (!this.created) throw "No such user"
if (this.blocked) throw "User is already blocked"
isNewEvent = true
this.Apply(new UserBlocked(...), isNewEvent)
Apply(userCreatedEvent, isNewEvent)
this.created = true
if (isNewEvent) this.Changes.Add(userCreatedEvent)
Apply(userBlockedEvent, isNewEvent)
this.blocked = true
if (isNewEvent) this.Changes.Add(userBlockedEvent)
更新:作为旁注:Yves 的回答让我想起了几年前 Udi Dahan 的一篇有趣的文章:
关于events - CQRS 中的命令处理程序、聚合、存储库和事件存储之间的关系,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/12362641/