我需要多次查询设备。每个查询都需要异步,并且设备不支持同时进行并发查询。 而且一旦查询过,就不能立即再次查询。至少需要暂停 1 秒才能正常工作。
我的两个查询由 saveClock()
和 saveConfig()
执行,返回一个 Promise,并且都按预期返回 undefined 来解析。
在下面的代码中,为什么删除 take()
会阻止调用 toArray()
?
这里发生了什么,是否有更好的方法来实现相同的行为?
export const saveEpic = (action$, store) =>
action$.ofType(SAVE)
.map(action => {
// access store and create object data
// ...
return data;
})
.mergeMap(data =>
Rx.Observable.from([
Rx.Observable.of(data).mergeMap(data => saveClock(data.id, data.clock)),
Rx.Observable.timer(1000),
Rx.Observable.of(data).mergeMap(data => saveConfig(data.id, data.config)),
Rx.Observable.of(data.id)
])
)
.concatAll()
.take(4)
.toArray()
// [undefined, 0, undefined, "id"]
.map(x => { type: COMPLETED, id: x[3] });
最佳答案
我看到了一些事情:
您的最终 .map()
缺少括号,其当前形式是语法错误,但细微的更改可能会意外使其成为 labeled statement而不是返回一个对象。因为在当前形式下,这是一个语法错误,我想这只是这篇文章中的一个错误,而不是您的代码中的错误(甚至无法运行),但请仔细检查!
// before
.map(x => { type: COMPLETED, id: x[3] });
// after
.map(x => ({ type: COMPLETED, id: x[3] }));
修复后,该示例确实使用一个简单的 redux-observable 测试用例运行:http://jsbin.com/hunale/edit?js,output因此,如果我没有做任何与您不同的事情,那么问题似乎出在未提供的代码中。请随意添加额外的见解,或者更好的是,在 JSBin/git 存储库中为我们重现它。
你没有提到但非常值得注意的一件事是,在 redux-observable 中,你的史诗通常是长期存在的“流程管理器”。这个史诗实际上只会处理这些保存之一,然后完成(),这可能不是您真正想要的?用户是否可以在每次应用程序启动时只保存一次内容?似乎不太可能。
相反,您需要通过将此逻辑封装在 mergeMap
中来保持您的史诗返回的顶级流处于事件状态并监听 future 的操作。 take(4)
并传递 data.id
然后变得无关紧要:
const saveEpic = (action$, store) =>
action$.ofType(SAVE)
.mergeMap(data =>
Rx.Observable.from([
Rx.Observable.of(data).mergeMap(data => saveClock(data.id, data.clock)),
Rx.Observable.timer(1000),
Rx.Observable.of(data).mergeMap(data => saveConfig(data.id, data.config))
])
.concatAll()
.toArray()
.map(() => ({ type: COMPLETED, id: data.id }))
);
Ben Lesh 在他最近的 AngularConnect 演讲中描述了这种流的分离,在错误的背景下,但它仍然适用:https://youtu.be/3LKMwkuK0ZE?t=20m (别担心,这不是 Angular 特有的!)
接下来,我想分享一些主动提出的重构建议,这些建议可能会让您的生活更轻松,但当然这是固执己见的,所以请随意忽略:
我会进行重构,以更准确地直观地反射(reflect)事件的顺序,并降低复杂性:
const saveEpic = (action$, store) =>
action$.ofType(SAVE)
.mergeMap(data =>
Rx.Observable.from(saveClock(data.id, data.clock))
.delay(1000)
.mergeMap(() => saveConfig(data.id, data.config))
.map(() => ({ type: COMPLETED, id: data.id }))
);
这里我们使用 saveClock
返回的 Promise,延迟其输出 1000 毫秒,mergeMapping 将结果调用 saveConfig()
,它也返回一个 Promise将会被消耗。然后最终将结果映射到我们的 COMPLETE
操作。
最后,请记住,如果您的史诗确实保持活力并且生命周期很长,则该史诗中没有任何内容可以阻止它在其他请求仍处于状态时接收多个保存请求-航类或尚未用尽请求之间所需的 1000 毫秒延迟。也就是说,如果任何请求之间确实需要 1000 毫秒的间隔,那么您的史诗本身并不能完全阻止您的 UI 代码破坏该间隔。在这种情况下,您可能需要考虑添加更复杂的缓冲 backpressure机制,例如将 .zip()
运算符与 BehaviorSubject
结合使用。
http://jsbin.com/waqipol/edit?js,output
const saveEpic = (action$, store) => {
// used to control how many we want to take,
// the rest will be buffered by .zip()
const requestCount$ = new Rx.BehaviorSubject(1)
.mergeMap(count => new Array(count));
return action$.ofType(SAVE)
.zip(requestCount$, action => action)
.mergeMap(data =>
Rx.Observable.from(saveClock(data.id, data.clock))
.delay(1000)
.mergeMap(() => saveConfig(data.id, data.config))
.map(() => ({ type: COMPLETED, id: data.id }))
// we're ready to take the next one, when available
.do(() => requestCount$.next(1))
);
};
这使得在我们仍在处理现有请求时传入的保存请求被缓冲,并且我们一次只获取其中一个。请记住,这是一个无界缓冲区——这意味着待处理操作队列的增长速度可能比缓冲区刷新的速度无限快。这是不可避免的,除非您采用有损背压策略,例如丢弃重叠的请求等。
如果您有其他史诗,它们有重叠的要求,不能每秒发送请求超过一次,您将需要创建某种单一的主管来为所有史诗提供这种保证。
这看起来可能非常复杂,但讽刺的是,这在 RxJS 中比使用传统的命令式代码要容易得多。最难的部分实际上是了解模式。
关于javascript - 为什么执行串联后不调用complete?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/40491285/