我正在尝试开发一个类似 RxJS 的方案:一个基于事件的“可观察”对象。到目前为止,一切都很好 !
当我尝试在默认的类似 RxJS 的推送模式之外添加“拉模式”到方案中时,问题就出现了。
我试图了解代码的哪一部分似乎耗尽了堆栈。
这是一个运行良好的代码片段:
const { Channel } = require('./channel');
var c = new Channel();
c.onClosed( (e) => {
console.log(e);
console.log("Done");
});
c.listen( (e) => {
console.log(e);
});
let i = 0;
for (i=0; i<1000000; i++) {
c.send(++i);
}
c.close(i);
这是触发异常的代码片段:
const { Channel } = require('./channel');
var sc = new Channel();
let i = 0;
sc.onClosed( (e) => {
console.log(e);
console.log("Done");
});
sc.sendAll( () => {
if (i<100000) {
sc.send(++i);
console.log(i);
}
sc.close(i);
});
sc.listen( (e) => {
console.log(e);
sc.next();
});
sc.send(1);
这是库的相关部分:
const EventEmitter = require('events');
class Channel {
id: string;
emitter: any;
isClosed: boolean;
constructor() {
this.emitter = new EventEmitter();
this.isClosed = false;
}
listen = (callback) => {
this.emitter.on("data", callback);
}
send = (value) => {
this.emitter.emit("data", value);
}
next = () => {
this.emitter.emit("received");
}
sendAll = (callback) => {
this.emitter.on("received", callback);
}
close = (value) => {
this.isClosed = true;
this.emitter.emit("closed", value);
}
onClosed = (callback) => {
this.isClosed = true;
this.emitter.on("closed", callback);
}
}
module.exports = {
Channel: Channel
}
如果迭代次数(“ channel ”上的“send()”次数超过 10.000 次调用),第二个代码段将触发 RangeError 异常
最佳答案
简短回答:sc.sendAll() 和 sc.listen() 之间的来回调用创建了一种两阶段递归函数调用:
function f() {
g();
}
function g() {
f();
}
f();
因为在这个方案中,每个新的事件监听器都会递归地生成一个新的回调(在回调中,等等......)。
更准确地说:事件对于实现“推”(“发送”)方案(参见 RxJS 或 Kefir)效果很好,但它们不能用于“拉”(“发送并等待确认”)方案(至少不是最初问题中的预期)。
从这个意义上说,RxJS observables 或 Node 事件(更一般的 JavaScript 响应式(Reactive)数据结构)与迭代器不同:observable 必须“按原样”(而不是“按需”)处理传入的数据流。从这个意义上说,最初帖子提出的“ channel ”结构也不同于 Golang channel ,因为它没有“阻塞”模式。
结论:可以使用 Node 事件在 JavaScript 中实现类似 golang 的“ channel ”对象,但不能使用相同的方案使其“阻塞”。
关于javascript - 尝试为类似 RxJS 的结构开发 "pull mode"会出现 RangeError,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/57196709/