javascript - RxJs flatMapLatest/switchMap 取消回调。 onCancel() 在哪里?

标签 javascript reactive-programming rxjs observable

我有 2 个嵌套的 Observable Streams,它们执行 HTTP 请求。现在我想显示加载指示器,但无法使其正常工作。

var pageStream = Rx.createObservableFunction(_self, 'nextPage')
        .startWith(1)
        .do(function(pageNumber) {
            pendingRequests++;
        })
        .concatMap(function(pageNumber) {
            return MyHTTPService.getPage(pageNumber);
        })
        .do(function(response) {
            pendingRequests--;
        });

Rx.createObservableFunction(_self, 'search')
        .flatMapLatest(function(e) {
            return pageStream;
        })
        .subscribe();



search();
nextPage(2);
nextPage(3);
search();

这将触发 pendingRequests++ 4 次,但 pendingRequests-- 只会触发一次,因为 flatMapLatest 会在前 3 个 HTTP 之前取消内部 observable响应到达。

我找不到类似 onCancel 回调的任何东西。我还尝试了 onCompletedonError,但它们也不会被 flatMapLatest 触发。

还有其他方法可以实现吗?

谢谢!

编辑:所需的加载指示器行为

  1. 示例:单个 search() 调用。

    • search() -> 开始加载指示器
    • 当 search() 响应返回时 -> 禁用加载指示器
  2. 示例:search()nextPage() 调用。 (在 search() 响应返回之前调用了 nextPage()。)

    • search() -> 开始加载指示器
    • nextPage() -> 指示器已经启动,但此处无需执行任何操作
    • 两个响应到达后停止加载指示器
  3. 示例:search()search()(search() 调用相互覆盖,尽管第一个调用的响应可以被忽略)

    • search() -> 开始加载指示器
    • search() -> 指示器已经启动,但这里无需执行任何操作
    • second search() 的响应到达时停止加载指示器
  4. 示例:search()nextPage()search()(再说一遍:因为第二次search(),前面search()和nextPage()的响应可以忽略)

    • search() -> 开始加载指示器
    • nextPage() -> 指示器已经启动,但此处无需执行任何操作
    • search() -> 指示器已经启动,但这里无需执行任何操作
    • second search() 的响应到达时停止加载指示器
  5. 示例:search()nextPage()。但是这次 nextPage() 是在 search() 响应返回后调用的。

    • search() -> 开始加载指示器
    • 停止加载指示器,因为 search() 响应到达
    • nextPage() -> 开始加载指示器
    • 停止加载指示器,因为 nextPage() 响应到达

我尝试使用pendingRequests 计数器,因为我可以同时有多个相关 请求(例如:search()、nextPage()、nextPage ())。然后,当然我想在所有相关请求完成后禁用加载指示器。

调用search()、search()时,第一个search()是无关紧要的。这同样适用于 search()、nextPage()、search()。在这两种情况下,只有一个事件的相关请求(最后一个 search())。

最佳答案

使用 switchMap 又名 flatMapLatest,您希望在新的外部项到达时尽快 trim 当前内部流的执行。这无疑是一个很好的设计决定,否则它会带来很多困惑并允许一些诡异的 Action 。如果你真的想做一些事情 onCancel 你总是可以使用自定义 unsubscribe 回调创建你自己的可观察对象。但我仍然建议不要将 unsubscribe 与外部上下文的状态更改结合起来。理想情况下,unsubscribe 只会清理内部使用的资源。

然而,您的特殊情况可以在不访问 onCancel 或类似方法的情况下得到解决。关键观察是 - 如果我正确理解您的用例 - 在 search 上所有以前/未决的操作可能会被忽略。因此,不用担心递减计数器,我们可以简单地从 1 开始计数。

关于片段的一些评论:

  • 使用 BehaviorSubject 来计算未决请求 - 因为它已准备好与其他流组合;
  • 检查了您在问题中发布的所有案例,它们都有效;
  • 添加了一些模糊测试来证明正确性;
  • 不确定当 search 仍未决时是否允许 nextPage - 但似乎只是使用 concatMapTo 的问题vs merge;
  • 仅使用标准的Rx 运算符。

PLNKR

console.clear();

const searchSub = new Rx.Subject(); // trigger search 
const nextPageSub = new Rx.Subject(); // triger nextPage
const pendingSub = new Rx.BehaviorSubject(); // counts number of pending requests

const randDurationFactory = min => max => () => Math.random() * (max - min) + min;
const randDuration = randDurationFactory(250)(750);
const addToPending = n => () => pendingSub.next(pendingSub.value + n);
const inc = addToPending(1);
const dec = addToPending(-1);

const fakeSearch = (x) => Rx.Observable.of(x)
  .do(() => console.log(`SEARCH-START: ${x}`))
  .flatMap(() => 
    Rx.Observable.timer(randDuration())
    .do(() => console.log(`SEARCH-SUCCESS: ${x}`)))

const fakeNextPage = (x) => Rx.Observable.of(x)
  .do(() => console.log(`NEXT-PAGE-START: ${x}`))
  .flatMap(() =>
    Rx.Observable.timer(randDuration())
    .do(() => console.log(`NEXT-PAGE-SUCCESS: ${x}`)))

// subscribes
searchSub
  .do(() => console.warn('NEW_SEARCH'))
  .do(() => pendingSub.next(1)) // new search -- ingore current state
  .switchMap(
    (x) => fakeSearch(x)
    .do(dec) // search ended
    .concatMapTo(nextPageSub // if you wanted to block nextPage when search still pending
      // .merge(nextPageSub // if you wanted to allow nextPage when search still pending
      .do(inc) // nexpage started
      .flatMap(fakeNextPage) // optionally switchMap
      .do(dec) // nextpage ended
    )
  ).subscribe();

pendingSub
  .filter(x => x !== undefined) // behavior-value initially not defined
  .subscribe(n => console.log('PENDING-REQUESTS', n))

// TEST
const test = () => {
    searchSub.next('s1');
    nextPageSub.next('p1');
    nextPageSub.next('p2');

    setTimeout(() => searchSub.next('s2'), 200)
  }
// test();

// FUZZY-TEST
const COUNTER_MAX = 50;
const randInterval = randDurationFactory(10)(350);
let counter = 0;
const fuzzyTest = () => {
  if (counter % 10 === 0) {
    searchSub.next('s' + counter++)
  }
  nextPageSub.next('p' + counter++);
  if (counter < COUNTER_MAX) setTimeout(fuzzyTest, randInterval());
}

fuzzyTest()
<script src="https://npmcdn.com/rxjs@5.0.0-beta.11/bundles/Rx.umd.js"></script>

关于javascript - RxJs flatMapLatest/switchMap 取消回调。 onCancel() 在哪里?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/39236361/

相关文章:

rxjs ofObjectChanges 已过时

javascript - div 不随动态内容增长

javascript - 网站 : Make printable version with footnotes?

c# - .Net Reactive Extensions Framework (Rx) 是否考虑拓扑顺序?

Java react 器-链Mono <Void>与另一个产生Mono <Object>的异步任务

angular - 验证数组中的对象也存在于可观察数组中

javascript - sibling 之间的图像轻松交流

javascript - Indesign 脚本 - 如何获取线程文本框架中的第一段

python - 如何在 Python 中使用响应式(Reactive)扩展 (Rx) LINQ?

angular - 在 Angular 2 中重新订阅以前取消订阅的 Rx.Subject