node.js - 在 Node.js 中调度 CQRS 消息

标签 node.js rxjs cqrs mediator mediatr

我想为 Node 应用程序执行 CQRS。

我不是 Node 人员,我来自 .NET,它有一个名为 MediatR 的出色库,它将命令/查询分派(dispatch)到可用于解耦请求和处理程序的中介器。因此它允许非常简单和优雅的 CQRS。

在 Node 世界中,我发现了许多库/博客,但它们也总是包含事件溯源。我对 ES 不感兴趣。

我可以很好地对命令和查询进行建模,但是然后呢?它们需要以解耦的方式分派(dispatch)到某个地方,以避免困惑。

根据我目前对 Node 平台的了解,一个可能的解决方案是使用观察者模式(通过 RxJs 库),这样 Controller 就可以将消息(即 CQRS 请求)分派(dispatch)给观察者,然后观察者为订阅者(即请求处理程序)发布相应的事件。这在类似 DDD 的设计中解耦了 Controller 和服务。虽然我不确定如何将结果传递回 Controller 。

其他人也是这样做的吗? Node中有更好的方法吗?

最佳答案

TL:DR:您不需要一些花哨的框架来应用 CQRS 架构,尤其是当您只进行进程内通信时。原生EventEmitterevents 模块就足够了。如果你想要进程间通信servicebus做得非常好。要查看实现示例(以下长版本答案),您可以深入研究此存储库的代码:simple node cqrs

让我们举一个非常简单的聊天应用程序的示例,如果聊天未关闭,您可以在其中发送消息,以及喜欢/不喜欢消息。

我们的主要聚合(或概念上的聚合根)是 Chat (writeModel/domain/chat.js):

const Chat = ({ id, isClosed } = {}) =>
  Object.freeze({
    id,
    isClosed,
  });

然后,我们有一个 Message 聚合 (writeModel/domain/message.js):

const Message = ({ id, chatId, userId, content, sentAt, messageLikes = [] } = {}) =>
  Object.freeze({
    id,
    chatId,
    userId,
    content,
    sentAt,
    messageLikes,
  });

发送消息的行为可以是(writeModel/domain/chat.js):

const invariant = require('invariant');
const { Message } = require('./message');

const Chat = ({ id, isClosed } = {}) =>
  Object.freeze({
    id,
    isClosed,
  });

const sendMessage = ({ chatState, messageId, userId, content, sentAt }) => {
  invariant(!chatState.isClosed, "can't post in a closed chat");
  return Message({ id: messageId, chatId: chatState.id, userId, content, sentAt });
};

我们现在需要命令(writeModel/domain/commands.js):

const commands = {
  types: {
    SEND_MESSAGE: '[chat] send a message',
  },
  sendMessage({ chatId, userId, content, sentAt }) {
    return Object.freeze({
      type: commands.types.SEND_MESSAGE,
      payload: {
        chatId,
        userId,
        content,
        sentAt,
      },
    });
  },
};

module.exports = {
  commands,
};

由于我们使用的是 JavaScript,因此没有接口(interface)来提供抽象,因此我们使用高阶函数 (writeModel/domain/getChatOfId.js):

const { Chat } = require('./message');

const getChatOfId = (getChatOfId = async id => Chat({ id })) => async id => {
  try {
    const chatState = await getChatOfId(id);
    if (typeof chatState === 'undefined') {
      throw chatState;
    }
    return chatState;
  } catch (e) {
    throw new Error(`chat with id ${id} was not found`);
  }
};

module.exports = {
  getChatOfId,
};

(writeModel/domain/saveMessage.js):

const { Message } = require('./message');

const saveMessage = (saveMessage = async (messageState = Message()) => {}) => saveMessage;

module.exports = {
  saveMessage,
};

我们现在需要实现我们的commandHandlers(应用程序服务层):

(writeModel/commandHandlers/handleSendMessage.js)

const { sendMessage } = require('../domain/chat');

const handleSendMessage = ({
  getChatOfId,
  getNextMessageId,
  saveMessage,
}) => async sendMessageCommandPayload => {
  const { chatId, userId, content, sentAt } = sendMessageCommandPayload;
  const chat = await getChatOfId(chatId);
  return saveMessage(
    sendMessage({
      chatState: chat,
      messageId: getNextMessageId(),
      userId,
      content,
      sentAt,
    }),
  );
};

module.exports = {
  handleSendMessage,
};

由于我们在 javascript 中没有接口(interface),因此我们使用高阶函数通过在运行时注入(inject)依赖项来应用依赖倒置原则。

然后我们可以实现写入模型的组合根:(`writeModel/index.js):

const { handleSendMessage } = require('./commandHandlers/handleSendMessage');
const { commands } = require('./domain/commands');

const SimpleNodeCQRSwriteModel = ({
  dispatchCommand,
  handleCommand,
  getChatOfId,
  getNextMessageId,
  saveMessage,
}) => {
  handleCommand(
    commands.types.SEND_MESSAGE,
    handleSendMessage({ getChatOfId, getNextMessageId, saveMessage }),
  );
};

module.exports = {
  SimpleNodeCQRSwriteModel,
};

您的命令命令处理程序没有绑定(bind)在一起,然后您可以在运行时提供这些函数的实现,例如使用内存数据库和 Node EventEmitter(writeModel/infrastruct/inMemory/index.js):

const uuid = require('uuid/v1');
const { saveMessage } = require('../../domain/saveMessage');
const { getChatOfId } = require('../../domain/getChatOfId');
const { getNextMessageId } = require('../../domain/getNextMessageId');

const InMemoryRepository = (initialDbState = { chats: {}, messages: {}, users: {} }) => {
  const listeners = [];

  const db = {
    ...initialDbState,
  };

  const addOnDbUpdatedListener = onDbUpdated => listeners.push(onDbUpdated);

  const updateDb = updater => {
    updater();
    listeners.map(listener => listener(db));
  };

  const saveMessageInMemory = saveMessage(async messageState => {
    updateDb(() => (db.messages[messageState.id] = messageState));
  });

  const getChatOfIdFromMemory = getChatOfId(async id => db.chats[id]);

  const getNextMessageUuid = getNextMessageId(uuid);

  return {
    addOnDbUpdatedListener,
    saveMessage: saveMessageInMemory,
    getChatOfId: getChatOfIdFromMemory,
    getNextMessageId: getNextMessageUuid,
  };
};

module.exports = {
  InMemoryRepository,
};

我们的 TestWriteModel 将它们结合在一起:

const EventEmitter = require('events');
const { SimpleNodeCQRSwriteModel } = require('../writeModel');
const { InMemoryRepository } = require('../writeModel/infrastructure/inMemory');

const TestWriteModel = () => {
  const { saveMessage, getChatOfId, getNextMessageId } = InMemoryRepository();
  const commandEmitter = new EventEmitter();
  const dispatchCommand = command => commandEmitter.emit(command.type, command.payload);
  const handleCommand = (commandType, commandHandler) => {
    commandEmitter.on(commandType, commandHandler);
  };
  return SimpleNodeCQRSwriteModel({
    dispatchCommand,
    handleCommand,
    getChatOfId,
    getNextMessageId,
    saveMessage,
  });
};

您可以深入了解此存储库中的代码(使用非常简单的读取模型):simple node cqrs

关于node.js - 在 Node.js 中调度 CQRS 消息,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/53815827/

相关文章:

javascript - 无法连接到 ejs 文件上的套接字、引用和 io 未定义错误

javascript - Canvas 后 child 的位置是 chrome 和 safari 之间的区别

cqrs - 如何在 CQRS 应用程序中实现数据网格

cqrs - 带有 membus 和 ioc 容器的 SetHandlerInterface() 的多种类型

c# - 如何在更改领域事件结构后检索历史事件

node.js - Webpack 配置不起作用,路径必须是绝对路径

node.js - Azure CDN 签名 url

javascript - 如何正确使用forkJoin

angular - 如何在不调用 next 的情况下设置新的 BehaviorSubject 值?

javascript - 什么被认为是测试返回 http observables 的方法的正确方法?