javascript - 尝试为类似 RxJS 的结构开发 "pull mode"会出现 RangeError

标签 javascript node.js data-structures reactive-programming

我正在尝试开发一个类似 RxJS 的方案:一个基于事件的“可观察”对象。到目前为止,一切都很好 !

当我尝试在默认的类似 RxJS 的推送模式之外添加“拉模式”到方案中时,问题就出现了。

我试图了解代码的哪一部分似乎耗尽了堆栈。

这是一个运行良好的代码片段:

const { Channel } = require('./channel');

var c = new Channel();

c.onClosed( (e) => {
    console.log(e);
    console.log("Done");
});

c.listen( (e) => {
    console.log(e);
});

let i = 0;
for (i=0; i<1000000; i++) {
    c.send(++i);
}
c.close(i);

这是触发异常的代码片段:

const { Channel } = require('./channel');

var sc = new Channel();

let i = 0;

sc.onClosed( (e) => {
    console.log(e);
    console.log("Done");
});

sc.sendAll( () => {
    if (i<100000) {
        sc.send(++i);
        console.log(i); 
    }
    sc.close(i);
});

sc.listen( (e) => {
    console.log(e);
    sc.next();
});

sc.send(1);

这是库的相关部分:

const EventEmitter = require('events');


class Channel {

    id: string;
    emitter: any;
    isClosed: boolean;

    constructor() {
        this.emitter = new EventEmitter();
        this.isClosed = false;
    }

    listen = (callback) => {
        this.emitter.on("data", callback);
    }

    send = (value) => {
        this.emitter.emit("data", value);
    }

    next = () => {
        this.emitter.emit("received");
    }

    sendAll = (callback) => {
        this.emitter.on("received", callback);
    }

    close = (value) => {
        this.isClosed = true;
        this.emitter.emit("closed", value);
    }

    onClosed = (callback) => {
        this.isClosed = true;
        this.emitter.on("closed", callback);
    }

}


module.exports = {
    Channel: Channel
}

如果迭代次数(“ channel ”上的“send()”次数超过 10.000 次调用),第二个代码段将触发 RangeError 异常

最佳答案

简短回答:sc.sendAll() 和 sc.listen() 之间的来回调用创建了一种两阶段递归函数调用:

function f() {
    g();
}

function g() {
    f();
}

f();

因为在这个方案中,每个新的事件监听器都会递归地生成一个新的回调(在回调中,等等......)。

更准确地说:事件对于实现“推”(“发送”)方案(参见 RxJS 或 Kefir)效果很好,但它们不能用于“拉”(“发送并等待确认”)方案(至少不是最初问题中的预期)。

从这个意义上说,RxJS observables 或 Node 事件(更一般的 JavaScript 响应式(Reactive)数据结构)与迭代器不同:observable 必须“按原样”(而不是“按需”)处理传入的数据流。从这个意义上说,最初帖子提出的“ channel ”结构也不同于 Golang channel ,因为它没有“阻塞”模式。

结论:可以使用 Node 事件在 JavaScript 中实现类似 golang 的“ channel ”对象,但不能使用相同的方案使其“阻塞”。

关于javascript - 尝试为类似 RxJS 的结构开发 "pull mode"会出现 RangeError,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/57196709/

相关文章:

javascript - 从 Javascript 闭包循环访问外部变量

javascript - AB1234567 的正则表达式

javascript - 将外部 JavaScript 添加到 MediaWiki 1.28

node.js - 在 sails.js 中创建模块/HMVC

javascript - 如何停止 Node.JS 通知流?

algorithm - 快速检查集合是否是存储集合的超集

javascript - 为什么 Redux 使用开关而不是利用多态行为?

node.js - expressjs项目中设置swagger出错

python - 在python中高效存储一组整数序列

javascript - 对于这种特殊情况,哪种 JavaScript 结构的访问时间更快?