接收连接和消息请求并发出消息和连接状态更新的 Websocket 史诗

标签 websocket rxjs redux-observable

我希望创建一个 redux-observable 史诗,它可以独立于我的应用程序的其余部分。它需要:

  • 监听 { type: "SOCKET_TRY_CONNECT"} 的传入操作,这也可能会忽略连接时的任何其他 SOCKET_TRY_CONNECT 事件。另外监听要发送的消息,可能是 { type: "SOCKET_MESSAGE_SEND", data }
  • 发出传出操作{ type: "SOCKET_CONNECTED"}, { type: "SOCKET_DISCONNECTED", error } and { type: "SOCKET_MESSAGE_RECEIVE", data }

史诗需要监听传入的套接字连接请求,建立套接字连接,然后在连接建立或丢失时输出状态更新。它还需要能够发送和接收消息,然后可以在别处处理这些消息。

我最接近这个的是 this 问题中提供的答案:

const somethingEpic = action$ =>
  action$.ofType('START_SOCKET_OR_WHATEVER')
    .switchMap(action =>
      Observable.webSocket('ws://localhost:8081')
        .map(response => ({ type: 'RECEIVED_MESSAGE', paylod: response }))
    );

但是我不确定如何扩展它以另外发出连接建立和断开事件,以及另外接受要发送到服务器的消息。

最佳答案

一般来说,听起来您想要这样的东西:

(注意,这是未经测试的代码,但应该非常接近可运行)

const somethingEpic = action$ =>
  action$.ofType('START_SOCKET_OR_WHATEVER')
    .switchMap(action => {
      // Subjects are a combination of an Observer *and* an Observable
      // so webSocket can call openObserver$.next(event) and
      // anyone who is subscribing to openObserver$ will receive it
      // because Subjects are "hot"
      const openObserver$ = new Subject();
      const openObserver$ = new Subject();

      // Listen for our open/close events and transform them
      // to redux actions. We could also include values from
      // the events like event.reason, etc if we wanted
      const open$ = openObserver$.map((event) => ({
        type: 'SOCKET_CONNECTED'
      }));
      const close$ = openObserver$.map((event) => ({
        type: 'SOCKET_DISCONNECTED'
      }));

      // webSocket has an overload signature that accepts this object
      const options = {
        url: 'ws://localhost:8081',
        openObserver: openObserver$,
        closeObserver: openObserver$
      };
      const msg$ = Observable.webSocket(options)
        .map(response => ({ type: 'RECEIVED_MESSAGE', payload: response }))
        .catch(e => Observable.of({
          type: 'SOCKET_ERROR',
          payload: e.message
        }))

      // We're merging them all together because we want to listen for
      // and emit actions from all three. For good measure I also included
      // a generic .takeUntil() to demonstrate the most obvious way to stop
      // the websocket (as well as the open/close, which we shouldn't forget!)
      // Also notice how I'm listening for both the STOP_SOCKET_OR_WHATEVER
      // or also a SOCKET_ERROR because we want to stop subscribing
      // to open$/close$ if there is an error.  
      return Observable.merge(open$, close$, msg$)
        .takeUntil(action$.ofType('STOP_SOCKET_OR_WHATEVER', 'SOCKET_ERROR'));
    });

如果这个史诗需要一次支持多个套接字,您将需要想出某种方法来唯一标识特定连接,并修改代码以基于此过滤信号。例如

.takeUntil(
  action$.ofType('STOP_SOCKET_OR_WHATEVER', 'SOCKET_ERROR')
    .filter(action => action.someHowHaveId === someHowHaveId)
);

关于接收连接和消息请求并发出消息和连接状态更新的 Websocket 史诗,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/57700859/

相关文章:

ajax - 在 WebSockets 之前,实时浏览器聊天是如何实现的?

jetty - 为什么将 WebSocket 实现划分为子接口(interface)是个好主意?

typescript - 使用条件 rxjs6 重构嵌套订阅

javascript - 如何正确使用 redux-observable 和 promise?

angular - redux-observable 不会在 catchError 之后执行

Websockets 与响应式(Reactive)套接字

java - 确保基于 Spring 消息传递的 websocket 服务的安全

reactjs - React 组件订阅太晚了。如何避免失去第一场比赛?

javascript - Rx - 根据内容拆分 Observable(分组依据直到更改)

ajax - 使用 Rxjs 并发 Ajax 请求