javascript - 如何将一个 RXJ 流用于 2 件不同的事情

标签 javascript typescript rxjs

我的应用程序中有 2 项服务。一个通过网络加载 youTube 评论线程和评论的 youTube 服务,以及一个管理批量评论加载的评论服务。评论服务特定于我的应用,youTube 服务与应用无关。

我有一个加载评论线程的函数getCommentThreadsForChannel。主要实现是在 youTube 服务中,但这是在评论服务上调用的,评论服务基本上只是调用从 youTube 服务返回可观察的结果。

就我调用它的 Controller 而言,这只是一个可观察的注释线程序列。但是,在我的 commentService 中,我想在本地存储这些线程。每当我获得 100 个以上的线程时,我希望将其批处理为存储所有线程,这样我就不会处理每个新数据位的列表。我想出了这段代码:

    getCommentThreadsForChannel(): Rx.Observable<ICommentThread> {
        var threadStream: Rx.Observable<ICommentThread> =
            this.youTubeService.getCommentThreadsForChannel();

        threadStream
            .bufferWithCount(100)
            .scan( ( allItems, currentItem ) => {
                currentItem.forEach(thread => {
                    allItems.push(thread);
                });

                console.log( `Save items to local storage: ${allItems.length}` )

                return allItems;
            }, []  );

        return threadStream;
    }

我认为这里用于批处理线程并将所有线程累积到一个数组中的逻辑很好,但从未调用此代码。我认为这是因为我根本没有订阅该线程。

我不想在这里订阅,因为这会订阅底层流,然后我会有 2 个订阅,所有数据将加载两次(有很多数据 - 加载全部数据大约需要一分钟)一次调用 100 个线程超过 30 次)。

我基本上想要一个 do ,它不会影响传递到 Controller 的流,但我想使用 RXjs 的缓冲和累积逻辑。

我认为我需要以某种方式共享或发布流,但我之前使用这些运算符几乎没有成功,并且不知道如何在不添加第二个订阅的情况下做到这一点。

如何共享一个流并以两种不同的方式使用它而无需订阅两次?我可以创建某种被动流,仅当它所基于的可观察对象订阅时才订阅吗?

最佳答案

我最终解决了这个问题(在 @MonkeyMagiic 的帮助下)。

我必须共享流,以便我可以对数据做两件不同的事情,分别缓冲它和每个值。问题是这两个流都必须订阅,但我不想订阅服务 - 这应该在 Controller 中完成。

解决方案是再次组合 2 个流并忽略缓冲区中的值:

var intervalStream = Rx.Observable.interval(250)
    .take(8)
    .do( function(value){console.log( "source: " + value );} )
    .shareReplay(1);

var bufferStream = intervalStream.bufferWithCount(3)
    .do( function(values){
      console.log( "do something with buffered values: " + values );
    } )
    .flatMap( function(values){ return Rx.Observable.empty(); } );

var mergeStream = intervalStream.merge( bufferStream );

mergeStream.subscribe(
  function( value ){ console.log( "value received by controller: " + value ); },
  function( error ){ console.log( "error: " + error ); },
  function(){ console.log( "onComplete" ); }
);

输出:

"source: 0"
"value received by controller: 0"
"source: 1"
"value received by controller: 1"
"source: 2"
"value received by controller: 2"
"do something with buffered values: 0,1,2"
"source: 3"
"value received by controller: 3"
"source: 4"
"value received by controller: 4"
"source: 5"
"value received by controller: 5"
"do something with buffered values: 3,4,5"
"source: 6"
"value received by controller: 6"
"source: 7"
"value received by controller: 7"
"do something with buffered values: 6,7"
"onComplete"

JSBin

关于javascript - 如何将一个 RXJ 流用于 2 件不同的事情,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/35367991/

相关文章:

angular - 如何首先监听表单的变化?

javascript - Angular 中的重复功能失败

angular - RxJS 和 Angular 6 - 可观察对象、主题和基本的获取和添加数组操作

rxjs - 过滤 BehaviorSubject

javascript - 停止某些链接(包含特定字符串的 href)工作

父级上的javascript重定向

typescript - 如何修改JSDoc block 注释代码

rxjs - 如何将 Fetch API 响应转换为 RxJS Observable?

javascript - React 中的深度复制

javascript - 如何将同一行中的两个单词算作 1