我需要创建一个可观察的对象,我可以从中“提取”数据,以使用可分页 api。我每个请求只能获取 100 个项目,我希望能够使用 observable 作为生成器函数(我可以调用 .next()
来发出获取接下来 100 个项目的请求。
不幸的是,我无法找到一种方法来使用 Rx 来做到这一点。我想可以使用受控可观察对象或主题。你们能给我举个例子吗?
这是我到目前为止所得到的:
function list(entityType, viewName, fetchAll = false) {
var skip = 0,
total = 0;
const subject = new Rx.Subject(),
response$ = subject
.takeWhile(() => skip <= total)
.startWith(skip)
.flatMap((skip) => fetchPagePromise(skip)),
next = () => subject.onNext(skip);
if (fetchAll) {
Rx.Observable.timer(100, 100).subscribe(() => next());
}
return {
data$: response$.map(response => response),
next: fetchAll === true ? undefined : next
};
function fetchPagePromise() {
let limit = 100,
obj = {
viewName, limit, skip
},
qs = objectToQueryString(obj);
return $http.get(`${apiBase}/api/data/${entityType}${qs}`).then((res) => {
total = res.data.Total;
skip += limit;
return res.data.Rows;
});
}
}
这有点像发电机。它返回一个 Observable 和 next 处理程序。每当调用 next
时,它都会从 api 中提取接下来的 100 个项目并将其推送到 Observable 中。另外,如果传递了第三个参数fetchAll
,那么它将继续获取数据,直到没有更多数据为止。令我害怕的是,函数的闭包中有 2 个变异变量 - skip
和 total
,而且我不知道在异步/不可预测的环境中像这样管理它们是否可以.
最佳答案
您通常想要避免的事情之一是尝试将 Rx 变成普通的旧事件发射器。通常,当您尝试触发 Observables
时,它是一个指示器手动传递 Subjects
观察者界面。
您应该问自己,我的数据来自哪里?什么叫next()
,什么叫这个,等等。在了解了足够多的这些之后,您通常会发现这会导致您找到可以用 Observable
包裹的东西。直接而不是显式调用 next()
。另外,我认为fetchAll
标志确实应该保留在外部。您只是通过传入一个标志将接口(interface)转变为 void 方法,从而使接口(interface)变得困惑。
所以我建议像这样重构:
Rx.Observable.prototype.lazyRequest = function(entityType, viewName, limit = 100) {
var source = this;
return Rx.Observable.create(obs => {
var response = source
//Skip is really just the (limit * index)
.map((x, i) => i * limit)
.flatMap((skip) => {
let obj = {viewName, skip, limit},
qs = objectToQueryString(obj);
//Handle promises implicitly
return $http.get(`${apiBase}/api/data/${entityType}${qs}`);
},
//Return this with our skip information
(skip, res) => {skip, res})
//Publish it so the stream get shared.
.publish();
//This will emit once once you are out of data
var stop = response.first(x => x.skip >= x.res.data.Total);
return new CompositeDisposable(
//Complete this stream when stop emits
response.takeUntil(stop)
//Downstream only cares about the data rows
.map(x => x.res.data.Rows)
.subscribe(obs),
//Hook everything up
response.connect());
});
}
然后你可以像这样使用它:
//An example of a "starting point", a button click
//Update the rows every time a new event comes through
Rx.Observable.fromEvent($button, 'click')
.startWith(0) //Inject some data into the pipeline
.lazyRequest(entityType, viewName)
.subscribe(/*Do something with the returned rows*/);
//Get all of the rows, will keep hitting the endpoint until it completes
Rx.Observable.interval(100)
.lazyRequest(entityType, viewName)
//Gather all the values into an array and emit that.
.toArray()
.subscribe();
关于rxjs - 可观察和获取分页数据?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/33090996/