javascript - 如何使用事件和 promise 来控制程序流?

标签 javascript node.js tcp promise ecmascript-6

我有这样一个类:

import net from 'net';
import {EventEmitter} from 'events';
import Promise from 'bluebird';

class MyClass extends EventEmitter {
    constructor(host = 'localhost', port = 10011) {
        super(EventEmitter);
        this.host = host;
        this.port = port;
        this.socket = null;
        this.connect();
    }
    connect() {
        this.socket = net.connect(this.port, this.host);
        this.socket.on('connect', this.handle.bind(this));
    }
    handle(data) {
        this.socket.on('data', data => {

        });
    }
    send(data) {
        this.socket.write(data);
    }
}

我如何将 send 方法变成一个 promise ,它从套接字的 data 事件返回一个值?服务器仅在向其发送数据时才发回数据,而不是可以轻松抑制的连接消息。

我试过类似的方法:

handle(data) {
    this.socket.on('data', data => {
        return this.socket.resolve(data);
    });
    this.socket.on('error', this.socket.reject.bind(this));
}
send(data) {
    return new Promise((resolve, reject) => {
        this.socket.resolve = resolve;
        this.socket.reject = reject;
        this.socket.write(data);
    });
}

显然这是行不通的,因为 resolve/reject 会在并行链接和/或调用 send 时相互覆盖。

还有并行调用 send 两次的问题,它会解决先返回的响应。

我目前有一个使用队列和 defer 的实现,但感觉很乱,因为队列不断被检查。

我希望能够做到以下几点:

let c = new MyClass('localhost', 10011);
c.send('foo').then(response => {
    return c.send('bar', response.param);
    //`response` should be the data returned from `this.socket.on('data')`.
}).then(response => {
    console.log(response);
}).catch(error => console.log(error));

补充一下,我无法控制接收到的数据,这意味着它不能在流外修改。

编辑:看来这是不可能的,因为 TCP 没有请求-响应流。这如何仍然使用 promise 来实现,但使用单次执行(一次一个请求) promise 链或队列。

最佳答案

我将问题提炼到最低限度并使其可在浏览器中运行:

  1. 模拟了套接字类。
  2. EventEmitter 中删除了有关端口、主机和继承的信息。

该解决方案通过将新请求附加到 promise 链来工作,但在任何给定时间点最多允许一个打开/未答复的请求。 .send 每次被调用时都会返回一个新的 promise ,并且该类负责所有内部同步。因此,.send 可能会被多次调用,并且请求处理的正确顺序 (FIFO) 得到保证。如果没有待处理的请求,我添加的另一个功能是 trim promise 链。


警告 我完全省略了错误处理,但它无论如何都应该根据您的特定用例进行定制。


DEMO

class SocketMock {

  constructor(){
    this.connected = new Promise( (resolve, reject) => setTimeout(resolve,200) ); 
    this.listeners = {
  //  'error' : [],
    'data' : []
    }
  }

  send(data){

    console.log(`SENDING DATA: ${data}`);
    var response = `SERVER RESPONSE TO: ${data}`;
    setTimeout( () => this.listeners['data'].forEach(cb => cb(response)),               
               Math.random()*2000 + 250); 
  }

  on(event, callback){
    this.listeners[event].push(callback); 
  }

}

class SingleRequestCoordinator {

    constructor() {
        this._openRequests = 0; 
        this.socket = new SocketMock();
        this._promiseChain = this.socket
            .connected.then( () => console.log('SOCKET CONNECTED'));
      this.socket.on('data', (data) => {
        this._openRequests -= 1;
        console.log(this._openRequests);
        if(this._openRequests === 0){
          console.log('NO PENDING REQUEST --- trimming the chain');
          this._promiseChain = this.socket.connected
        }
        this._deferred.resolve(data);
      });

    }

    send(data) {
      this._openRequests += 1;
      this._promiseChain = this._promiseChain
        .then(() => {
            this._deferred = Promise.defer();
            this.socket.send(data);
            return this._deferred.promise;
        });
      return this._promiseChain;
    }
}

var sender = new SingleRequestCoordinator();

sender.send('data-1').then(data => console.log(`GOT DATA FROM SERVER --- ${data}`));
sender.send('data-2').then(data => console.log(`GOT DATA FROM SERVER --- ${data}`));
sender.send('data-3').then(data => console.log(`GOT DATA FROM SERVER --- ${data}`));

setTimeout(() => sender.send('data-4')
    .then(data => console.log(`GOT DATA FROM SERVER --- ${data}`)), 10000);

关于javascript - 如何使用事件和 promise 来控制程序流?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/31342363/

相关文章:

node.js - 任何用于在 Node.js 中可视化模块依赖关系的库?

javascript - 使用socket.io的聊天服务器,消息未附加到列表中

javascript - 使用 VS Code、TypeScript 和 Node.js 的断点位置不正确

c# - 升级到 Android 9 后无法从 Xamarin.Forms 应用程序打开 ESP32 套接字

javascript - 如何与来自 Node.js 进程的 C++ 程序输入流进行交互?

javascript - 在 SVG 中以特定 Angular 放置路径

javascript - 在 jQuery 中识别具有相对路径的链接

javascript - 检查用户名或电话是否已存在(正面)

c - Loopback tcp连接加速

tcp - 如何查看 Apache Kafka 生成的完整 TCP 数据包?