javascript - Observable 确定订阅者功能是否已完成

标签 javascript typescript promise rxjs observable

确定订阅者是否已完成执行或更好地返回某些内容并在上游捕获它的最佳方法是什么?例如:

    this._subscriptions.push(this._client
        .getCommandStream(this._command) // Returns an IObservable from a Subject stream
        .subscribe(msg => {
            // Do some processing maybe some promise stuff
            http.request(url).then(
                // some more stuff
            );
        });

确定订阅已完成的最佳信息。我已经实现如下:

    this._subscriptions.push(this._client
        .getCommandStream(this._command)
        .subscribe(msg => {
            // Do some processing maybe some promise stuff
            http.request(url).then(re => {
                // some more stuff
                msg.done()
            }).catch(err => msg.done(err));
        });

即向传入的对象添加了一个 done 方法以确定是否已完成。问题是我必须在每个 promise 或 catch block 中调用 done 并发现它有点太详尽了。是否有更清洁、更自动化的方式来执行此操作?

我认为我给出的例子还不够好。此实现使用 RX 构建内部消息传递总线。 get 命令流实际上返回一个只读 channel (作为一个 Observable)来获取命令并处理它们。现在处理可能是一个 http 请求,后面跟着许多其他的东西,或者只是一个 if 语句。

this._client
.getCommandStream(this._command) // Returns an IObservable from a Subject stream
  .subscribe(msg => {
      // Do some processing maybe some promise stuff
      http.request(url).then({
          // some more stuff

          }).then({
            // Here I wanna do some file io
            if(x) {
                file.read('path', (content) => {
                    msg.reply(content);
                    msg.done();
                });
            } else {
            // Or maybe not do a file io or maybe even do some image processing
                msg.reply("pong");
                msg.done()
            }
            });
      });

我觉得这是 Observable 模式的一个很好的用法,因为这正是输入的一系列命令,并且该逻辑想要对它们采取行动。问题是注意 msg.done() 被到处调用。我想知道限制调用的最佳方法是什么,并知道整个过程何时完成。另一种选择是将其全部包装在一个 Promise 中,但是 resolvemsg.done() 之间有什么区别?

最佳答案

实际上,不推荐在 subscribe() 中发出另一个异步请求,因为它只会让事情变得更复杂,并且以这种方式使用 Rx 不会帮助你让你的代码更容易理解。

由于您需要向返回 PROmise 的远程服务发出请求,您可以将其合并到链中:

this._subscriptions.push(this._client
    .getCommandStream(this._command)
    .concatMap(msg  => http.request(url))
    .subscribe(...)

此外,subscribe 的第三个参数是一个回调,在源 Observable 完成时调用。

您还可以在处理链时添加自己的拆卸逻辑。这在 subscribe(...) 中的 complete 回调被调用后调用:

const subscription = this._subscriptions.push(this._client
    ...
    .subscribe(...)

subscription.add(() => doWhatever())

顺便说一句,这等同于使用 finally() 运算符。

关于javascript - Observable 确定订阅者功能是否已完成,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/43722663/

相关文章:

Javascript css marginTop 过渡不触发动画

javascript - 如何在客户端与 Netflix Cadmium 视频播放器交互?

TypeScript:常量或字符串之一的接口(interface)或类型

Angular 从 BehaviorSubject 退订为 Observable

javascript - Node Promises Bluebird Collections API 和 Spread() 问题

javascript - Promise.all()被拒绝后的值,显示['' PromiseStatus''] : resolved if catch block is present

c++ - C++ 中的延迟/ promise 模式

php - Fusion Tables API : How do I make a PUT request to update a style via Ruby, JavaScript 或其他语言?

javascript - 如何使用javascript在一个句子中搜索多个单词

typescript - 在 typescript 中扩展枚举