javascript - 用响应式编程重写事件发射器(信号量示例)

标签 javascript node.js reactive-programming rxjs bacon.js

我使用事件发射器作为同步原语。例如,我有一个类在 Redis 中询问类似信号量的结构。如果设置了信号量,它会发出一个事件。代码如下:

var redis = require("redis"),
    async = require('async'),
    client = redis.createClient(),
    assert = require('assert'),
    EventEmitter = require('events').EventEmitter;
    util = require('util');
var isMaster = false,
    SEMAPHORE_ADDRESS = 'semaphore';
var SemaphoreAsker = function() {
  var self = this;
  var lifeCycle = function (next) {
    client.set([SEMAPHORE_ADDRESS, true, 'NX', 'EX', 5], function(err, val) {
      console.log('client');
      if(err !== null) { throw err; }
      if(val === 'OK') {
        self.emit('crown');
      } else {
        console.log('still a minion');
      }
    });
  };
  async.forever(
    function(next) {
      setTimeout(
        lifeCycle.bind(null, next),
        1000
      );
    }
  );
};
util.inherits(SemaphoreAsker,  EventEmitter);
(new SemaphoreAsker()).on('crown', function() {
  console.log('I`m master');
});

它可以工作,但看起来有点重。是否可以用 BaconJS(RxJS/whateverRPlibrary) 重写示例?

最佳答案

以下应该在 RXJS 中工作:

var callback = Rx.Observable.fromNodeCallback(client.set, client);

var source = Rx.Observable.interval(1000)
.selectMany(function() {
  return callback([SEMAPHORE_ADDRESS, true, 'NX', 'EX', 5]);
})
.filter(function(x) { return x === 'OK'; })
.take(1);


source.subscribe(function(x) {
  console.log("I am master");
});

如果您愿意另外包含 rx-node 模块,您还可以使用

保留您的事件发射器结构
var emitter = RxNode.toEventEmitter(source, 'crown');

emitter.on('crown', function(){});
emitter.on('error', function(){});
emitter.on('end', function(){});

关于javascript - 用响应式编程重写事件发射器(信号量示例),我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/30343068/

相关文章:

Javascript:两个元素之间的垂直间距

javascript - 在 Reactjs 中选择多个选项

javascript - 滚动表格以将其应用于 tbody

javascript - 如何将外部 css/js 文件提供给 NodeJS 应用程序显示的 html 文件?

javascript - 似乎无法找到有关测试 Cycle.js 应用程序的资源

angularjs - API请求超时问题

node.js - 如何设置 mongoose 等待 mongodb 连接的最长时间

javascript - Sequelize.js 一对多关系外键

swift - ReactiveCocoa - 具有一般错误处理功能的信号生成器序列

java - 如何在另一个 Mono 终止后触发 Mono 执行