node.js - Node : Set highWaterMark of socket object

标签 node.js sockets stream websocket

是否可以在创建套接字对象后设置其 highWaterMark:

var http = require('http');

var server = http.createServer();

server.on('upgrade', function(req, socket, head) {
    socket.on('data', function(chunk) {
        var frame = new WebSocketFrame(chunk);

        // skip invalid frames
        if (!frame.isValid()) return;

        // if the length in the head is unequal to the chunk 
        // node has maybe split it
        if (chunk.length != WebSocketFrame.getLength()) {
            socket.once('data', listenOnMissingChunks);
        });
    });
});

function listenOnMissingChunks(chunk, frame) {
    frame.addChunkToPayload(chunk);

    if (WebSocketFrame.getLength()) {
        // if still corrupted listen once more
    } else {
        // else proceed
    }
}

上面的代码示例不起作用。但我该怎么做呢?

进一步说明: 当我收到大的 WebSocket 帧时,它们会分成多个数据事件。这使得很难解析帧,因为我不知道这是 split 的还是损坏的帧。

最佳答案

我认为您误解了 TCP 套接字的性质。尽管 TCP 通过 IP 数据包发送数据,但 TCP 不是数据包协议(protocol)。 TCP 套接字只是一个数据流。因此,将 data 事件视为逻辑消息 是不正确的。换句话说,一端的一个 socket.write 不等同于另一端的单个 data 事件。

有很多原因导致对套接字的单次写入不能 1:1 映射到单个 data 事件:

  • 发送方的网络堆栈可能会将多个小写入合并到一个 IP 数据包中。 (Nagle algorithm)
  • 如果 IP 数据包的大小超过任何一跳的 MTU,它可能会在其传输过程中被分段(分成多个数据包) .
  • 接收方的网络堆栈可能会将多个数据包合并为一个data 事件(如您的应用程序所见)。

因此,单个 data 事件可能包含多个 消息、单个消息或仅消息的一部分。

为了正确处理通过流发送的消息,您必须 buffer incoming data直到您收到一条完整的消息。

var net = require('net');


var max = 1024 * 1024 // 1 MB, the maximum amount of data that we will buffer (prevent a bad server from crashing us by filling up RAM)
    , allocate = 4096; // how much memory to allocate at once, 4 kB (there's no point in wasting 1 MB of RAM to buffer a few bytes)
    , buffer=new Buffer(allocate) // create a new buffer that allocates 4 kB to start
    , nread=0 // how many bytes we've buffered so far
    , nproc=0 // how many bytes in the buffer we've processed (to avoid looping over the entire buffer every time data is received)
    , client = net.connect({host:'example.com', port: 8124}); // connect to the server

client.on('data', function(chunk) {
    if (nread + chunk.length > buffer.length) { // if the buffer is too small to hold the data
        var need = Math.min(chunk.length, allocate); // allocate at least 4kB
        if (nread + need > max) throw new Error('Buffer overflow'); // uh-oh, we're all full - TODO you'll want to handle this more gracefully

        var newbuf = new Buffer(buffer.length + need); // because Buffers can't be resized, we must allocate a new one
        buffer.copy(newbuf); // and copy the old one's data to the new one
        buffer = newbuf; // the old, small buffer will be garbage collected
    }

    chunk.copy(buffer, nread); // copy the received chunk of data into the buffer
    nread += chunk.length; // add this chunk's length to the total number of bytes buffered

    pump(); // look at the buffer to see if we've received enough data to act
});

client.on('end', function() {
    // handle disconnect
});


client.on('error', function(err) {
    // handle errors
});


function find(byte) { // look for a specific byte in the buffer
    for (var i = nproc; i < nread; i++) { // look through the buffer, starting from where we left off last time
        if (buffer.readUInt8(i, true) == byte) { // we've found one
            return i;
        }
    }
}
function slice(bytes) { // discard bytes from the beginning of a buffer
    buffer = buffer.slice(bytes); // slice off the bytes
    nread -= bytes; // note that we've removed bytes
    nproc = 0; // and reset the processed bytes counter
}

function pump() {
    var pos; // position of a NULL character

    while ((pos = find(0x00)) >= 0) { // keep going while there's a NULL (0x00) somewhere in the buffer
        if (pos == 0) { // if there's more than one NULL in a row, the buffer will now start with a NULL
            slice(1); // discard it
            continue; // so that the next iteration will start with data
        }
        process(buffer.slice(0,pos)); // hand off the message
        slice(pos+1); // and slice the processed data off the buffer
    }
}

function process(msg) { // here's where we do something with a message
    if (msg.length > 0) { // ignore empty messages
        // here's where you have to decide what to do with the data you've received
        // experiment with the protocol
    }
}

关于node.js - Node : Set highWaterMark of socket object,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/14537183/

相关文章:

java - FindBugs - 使用 ObjectOutputStream 时为 "may fail to close stream"

node.js - 使用 Angular 时有没有办法从服务器端加载部分 View ?

node.js - 如何将其他ember应用程序集成到现有项目中?

sockets - 是否可以在一个程序上打开多个端口?

sockets - 哪些协议(protocol)值与 socket() 中的哪些域和类型组合兼容?

node.js - 强制 process.exit() 与手动关闭所有打开的资源/套接字

java - 计算下载速度

node.js - 我可以在 node.js 中指定 RabbitMQ 凭据吗?

javascript - Mongoose 查询结果无法与数据对象一起导出

f# - (F#) 从 CryptoStream 读取所有字节的最流畅方式