node.js - ZeroMQ 推/拉和 Nodejs 读取流

标签 node.js sockets stream zeromq

我试图通过打开读取流来读取一些文件,并通过 ZMQ 将文件 block 发送到另一个进程来使用它们。该流正在正常工作,但是当我启动工作程序时,它看不到已发送的数据。

我尝试每 500 毫秒通过套接字发送数据,而不是在回调中,当我启动工作程序时,它会收集所有以前的数据 block :

sender = zmq.socket('push')
setInterval(() ->
  console.log('sending work');
  sender.send('some work')
, 500)
<小时/>
receiver = zmq.socket("pull")
receiver.on "message", (msg) ->
  console.log('work is here: %s', msg.toString())
<小时/>

输出:

sending work
sending work
sending work
sending work
sending work
// here I start the worker
sending work
work is here: some work
work is here: some work
work is here: some work
work is here: some work
work is here: some work
work is here: some work
sending work
work is here: some work
sending work
work is here: some work
sending work
work is here: some work

因此,当工作进程启动时,它首先提取所有以前的数据,然后每次有新数据进入时都会提取它。当我这样做时,这不适用:

readStream = fs.createReadStream("./data/pg2701.txt", {'bufferSize': 100 * 1024})
readStream.on "data", (data) ->
  console.log('sending work');
  sender.send('some work'); // I'd send 'data' if it worked..

在这种情况下,工作线程根本不会提取任何数据。 那些类型的套接字是否应该创建队列?我在这里缺少什么?

最佳答案

是的,推送套接字在达到 HWM 之前一直处于阻塞状态,并且没有人可以发送。 也许发件人尚未绑定(bind),请尝试如下操作:

sender.bind('address', function(err) {
  if (err) throw err;
  console.log('sender bound!');

  // the readStream code.
}

您的代码示例中还缺少 connect,我敢打赌它就在那里,但也许您忘记了。

关于node.js - ZeroMQ 推/拉和 Nodejs 读取流,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/13276680/

相关文章:

mongodb - 我需要在 Mongoose、Express-session-mongo 和 mongolian 中指定副本集的所有主机吗?

javascript - 为什么 Express 渲染的页面不是 res.render 中指定的页面

c# - 如何在不关闭套接字的情况下多次调用套接字发送和接收?

java - 发送邮件到 Gmail 帐户

node.js - 使用Node JS中的流串联解决Promise中的错误

audio - WP7音频流问题

node.js - 将 session 存储在express(node.js)和neo4j数据库上

node.js - 无法在 ubuntu 中运行 nw.js 应用程序

python - recv 和 recvfrom,使用 python 进行套接字编程

javascript - 在 Node.js 中使用流来缓冲 HTTP 通信