rxjs - 可观察和获取分页数据?

标签 rxjs

我需要创建一个可观察的对象,我可以从中“提取”数据,以使用可分页 api。我每个请求只能获取 100 个项目,我希望能够使用 observable 作为生成器函数(我可以调用 .next() 来发出获取接下来 100 个项目的请求。 不幸的是,我无法找到一种方法来使用 Rx 来做到这一点。我想可以使用受控可观察对象或主题。你们能给我举个例子吗?

这是我到目前为止所得到的:

function list(entityType, viewName, fetchAll = false) {
    var skip = 0,
        total = 0;

    const subject = new Rx.Subject(),
        response$ = subject
        .takeWhile(() => skip <= total)
        .startWith(skip)
        .flatMap((skip) => fetchPagePromise(skip)),

        next = () => subject.onNext(skip);

    if (fetchAll) {
        Rx.Observable.timer(100, 100).subscribe(() => next());
    }

    return {
        data$: response$.map(response => response),
        next: fetchAll === true ? undefined : next
    };

    function fetchPagePromise() {
        let limit = 100,
            obj = {
                viewName, limit, skip
            },
            qs = objectToQueryString(obj);

        return $http.get(`${apiBase}/api/data/${entityType}${qs}`).then((res) => {
            total = res.data.Total;
            skip += limit;

            return res.data.Rows;
        });
    }
}

这有点像发电机。它返回一个 Observable 和 next 处理程序。每当调用 next 时,它都会从 api 中提取接下来的 100 个项目并将其推送到 Observable 中。另外,如果传递了第三个参数fetchAll,那么它将继续获取数据,直到没有更多数据为止。令我害怕的是,函数的闭包中有 2 个变异变量 - skiptotal,而且我不知道在异步/不可预测的环境中像这样管理它们是否可以.

最佳答案

您通常想要避免的事情之一是尝试将 Rx 变成普通的旧事件发射器。通常,当您尝试触发 Observables 时,它是一个指示器手动传递 Subjects观察者界面。

您应该问自己,我的数据来自哪里?什么叫next() ,什么叫这个,等等。在了解了足够多的这些之后,您通常会发现这会导致您找到可以用 Observable 包裹的东西。直接而不是显式调用 next() 。另外,我认为fetchAll标志确实应该保留在外部。您只是通过传入一个标志将接口(interface)转变为 void 方法,从而使接口(interface)变得困惑。

所以我建议像这样重构:

Rx.Observable.prototype.lazyRequest = function(entityType, viewName, limit = 100) {
  var source = this;
  return Rx.Observable.create(obs => {
    var response = source
                     //Skip is really just the (limit * index) 
                     .map((x, i) => i * limit)
                     .flatMap((skip) => {
                               let obj = {viewName, skip, limit},
                                   qs = objectToQueryString(obj);

                               //Handle promises implicitly
                               return $http.get(`${apiBase}/api/data/${entityType}${qs}`);
                              },
                              //Return this with our skip information
                              (skip, res) => {skip, res})
                     //Publish it so the stream get shared.
                     .publish();

    //This will emit once once you are out of data
    var stop = response.first(x => x.skip >= x.res.data.Total);

    return new CompositeDisposable(
      //Complete this stream when stop emits
      response.takeUntil(stop)
              //Downstream only cares about the data rows
              .map(x => x.res.data.Rows)
              .subscribe(obs),
      //Hook everything up
      response.connect());

  });

}

然后你可以像这样使用它:

//An example of a "starting point", a button click
//Update the rows every time a new event comes through
Rx.Observable.fromEvent($button, 'click')
             .startWith(0) //Inject some data into the pipeline
             .lazyRequest(entityType, viewName)
             .subscribe(/*Do something with the returned rows*/);

//Get all of the rows, will keep hitting the endpoint until it completes
Rx.Observable.interval(100)
             .lazyRequest(entityType, viewName)
             //Gather all the values into an array and emit that.
             .toArray()
             .subscribe();

关于rxjs - 可观察和获取分页数据?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/33090996/

相关文章:

angular - 如何在 rxjs/angular 中导入 .refCount()

angular - RxJS5 随着时间的推移发出数组项并永远重复

angular - 使用原始数组本身生成可观察数组

angular - 如何使用forkJoin处理Observable数组的错误?

javascript - 将数组中的管道与 Angular Observable 组合起来

javascript - 为什么 switchMap 中的间隔停止发出值

Angular : how to debounce an Observable?

RxJS:是否有一个运算符像 mergeScan 一样工作,但只是在外部流发出时取消订阅内部流

angular - NgRx - 在 ngOnInit 中多次分派(dispatch)操作

angular - rxjs 在 combineLatest 中调用 combineLatest