Node.js - 持久连接适配器

标签 node.js tcp persistent-connection

我的要求有点不同,不知道能不能实现。

我正在使用 Node.js 开发后端应用程序服务器。这个服务器基本上做了两个工作:

(1) 服务客户:我的客户都是手机,他们将发送 HTTP(S) 请求,并在收到响应后关闭 session 。

(2) 调用其他一些异步工作的服务:另一方面,服务器将连接到其他一些仅通过 TCP/IP 连接而非 HTTP 工作的服务器。这里的异步意味着,服务器将发送请求并且不应该等待响应。将通过相同的 TCP/IP 连接接收响应。

所以我要实现的流程是:

  1. 手机向服务器发送HTTP请求
  2. 服务器收到 HTTP 请求后,调用 TCP/IP 上的服务
  3. 服务器通过 TCP/IP 连接接收来自 TCP/IP 服务的响应
  4. 服务器用响应响应电话。

为了表示上面的流程,我附上了下面的图片。

enter image description here

在上图中,TCP/IP 服务器由其他供应商管理。

我在 node.js 中编写了以下代码,它有时完全按照我们的要求工作,但有时它会向 HTTP 请求发送错误的响应。我没有编写任何代码来处理这个问题。

    var net = require('net');

    var client = new net.Socket();
    client.connect(2202, 'example_ip', function () {
    console.log('Connected');
    // client.write('Hello, server! Love, Client.');
    });

    //Lets require/import the HTTP module
    var http = require('http');

    //Lets define a port we want to listen to
    const PORT = 8080;

    //We need a function which handles requests and send response
    function handleRequest(request, response) {
    var body = '';

    request.on('data', function (chunk) {
        body += chunk;
    });

    request.on('end', function () {
        console.log('Received request from JMeter------------>>>');
        // console.log(body);
        client.write(body);

    var count = 0;
    client.on('data', function (data) {
            console.log('<<<------------Received from SSM: ' + data);
            response.end(data);
            // client.destroy(); // kill client after server's response
        });


    });



    client.on('close', function () {
        console.log('Connection closed');
    });

}

//Create a server
var server = http.createServer(handleRequest);

//Lets start our server
server.listen(PORT, function () {
    //Callback triggered when server is successfully listening. Hurray!
    console.log("Server listening on: http://localhost:%s", PORT);
});

请有人指导我解决这个问题。

最佳答案

TCP 流不像 WebSocket 流那样工作(如您所料)。您需要使用自己的协议(protocol)与 TCP 服务器通信。请记住,HTTP 客户端有很多,而您只有一个 TCP 连接来处理它们,因此请使用如下所示的 requestId,代码会自行解释。

未经测试,但您可以理解。

共享.js

exports.tcp = {
    host: 'example_ip',
    port: 2202
};

exports.http = {
    host: 'localhost',
    port: 8080
};

/**
 *  TCP "guarantees" that a receiver will receive the reconstituted 
 *   stream of --> BYTES <-- as it was originally sent by the sender.
 *
 *  eg. if written message = 'How are you today?'
 *   the messages can come to us as follows:
 *     
 *     'How ar'
 *     'e you '
 *     'today?'
 *  
 *  so we need to use a simple protocol to handle messages
 */
exports.protocol = protocol;

function protocol(options) {

    if (!options) options = {};

    this.END_OF_MESSAGE = options.endOfMessage || '\0';
    this.END_OF_PART = options.endOfPart || '\1';

    this.dataBuffer = '';
}

protocol.prototype.packMessage = function(id, body) {
    return [id, body].join( this.END_OF_PART ) + this.END_OF_MESSAGE;
};

protocol.prototype.unpackMessage = function(message) {

    var parts = message.toString('utf8').split( this.END_OF_PART );
    return {id: parts.shift(), body: parts.shift()};
};

protocol.prototype.extractMessages = function(data, callback) {

    this.dataBuffer += data.toString('utf8');

    if (this.dataBuffer.indexOf(this.END_OF_MESSAGE) !== -1)
    {
        var messages = this.dataBuffer.split(this.END_OF_MESSAGE);
        var incomplete = this.dataBuffer.slice(-1) === this.END_OF_MESSAGE
            ? '' : messages.pop();

        messages.forEach(function(message)
        {
            if (message !== '') {
                callback( this.unpackMessage(message) );
            }
        });

        this.dataBuffer = incomplete;
        // rest of 'data'
    }

    /**
    if (Buffer.byteLength(this.dataBuffer, 'utf8') > 10240) { // 10KB
        console.log('[!] socket flooded');
        this.dataBuffer = '';
    }
    */
};

protocol.prototype.reset = function() {
    this.dataBuffer = '';
};

httpServer.js

var http = require('http');
var net = require('net');

var shared = require('./shared.js');
var protocol = new shared.protocol();

var server = http.createServer(handleRequest);
server.listen(shared.http.port, shared.http.host, function() {
    console.log('HTTP server listening: %s:%s', shared.http.host, shared.http.port);
});

function handleRequest(request, response) {

    var body = '';

    var requestId = nextId++;
    var eventName = 'message' + requestId;

    request.on('data', function(chunk) {
        body += chunk.toString('utf8');
    });

    request.on('end', function()
    {
        // ref#2
        client.write( protocol.packMessage(requestId, body) );

        // ref#3
        client.once(eventName, function(data) {

            clearTimeout(timeoutId);
            response.end(data);
        });
    });

    var timeoutId = setTimeout(function() {

        client.removeListener(eventName);
        response.end('timeout');

    }, 10000); // 10 sec.

    /** 
     * [!] Don't do this; you are adding just another 'data' event to
     *  the TCP client for EVERY http request !?
     *  
     *  request: UNIQUE obj. for every http request
     *  client: a PERSISTENT (TCP) stream obj.
     *
    client.on('data', function() { });
    **/
}

var client = new net.Socket();

// ref#1
client.connect(shared.tcp.port, shared.tcp.host, function() {
    console.log('TCP conn. established to: ', shared.tcp.host, shared.tcp.port);
});

var nextId = 0;
// unique per http req.

/**
 * [!] Do this ( once ) ( not for every request )
 */
client.on('data', function(data)
{
    protocol.extractMessages(data, function(message) {

        client.emit('message' + message.id, message.body);
        // ref#3
    });
});

client.on('close', function()
{
    console.log('TCP conn. closed');
    client.removeAllListeners();
})

client.on('error', function()
{
    console.log('TCP conn. error', arguments);
    // client.destroy(); // and reconnect here
});

tcpServer.js

var net = require('net');

var shared = require('./shared.js');
var protocol = new shared.protocol();

var server = net.createServer(handleConnection);
server.listen(shared.tcp, function() {
    console.log('TCP server listening %s:%s', shared.tcp.host, shared.tcp.port);
});

// [!] CONNECTION handler ( ref#1 )
function handleConnection(client)
{
    var this.dataBuffer = '';

    // [!] DATA handler ( ref#2 )
    client.on('data', function(data) {

        protocol.extractMessages(data, function(message)
        {
            var requestId = message.id;
            var body = message.body;

            // Do whatever you want with 'body' here

            /**
             * And return back to 'client' with 'requestId' using same protocol again
             *  so the 'client' ( from httpServer.js ) can handle your response
             */
            client.write( protocol.packMessage(requestId, body) );
        });
    });
}

关于Node.js - 持久连接适配器,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/37202052/

相关文章:

node.js - 错误 : Platform can NOT be empty at new Payload in Dialogflow

node.js - Expressjs 无法识别静态文件

amazon-web-services - 如果从同一实例转发,则 Amazon Web Services 负载均衡器转发到监听器不起作用

PHPUnit 重用 PDO 连接

node.js - 如何使用 Mongoose 将多个引用保存到 MongoDB 中的其他文档?

sql - Node.js 和 SQL Server Sequelize 连接表

linux - 一台 linux 机器可以有多少个打开的 udp 或 tcp/ip 连接?

c++ - boost::asio 中 TCP 数据包的数量由什么决定?

php - 使用 PHP 无限循环持久连接到服务器