linux - 通过 websocket 将一个 STDIN 流式传输到多个客户端

标签 linux unix websocket

目前我在 Linux 中有“streamgenerator”,它将数据输出到 stdout/namedpipe 等。我在网上找到了 websocketd,这有点接近我需要的,但问题是它为每个连接的客户端生成新进程。
我正在寻找的是将相同的数据(将数据生成到标准输出的单个进程)流式传输到多个客户端。

$ websocketd --port=8080 ./streamgenerator  

将为每个新的 websocket 连接创建新的 streamgenerator 进程。我想要的是 streamgenerator 的一个实例,并为所有客户端复制了它的输出。

有什么简单的方法吗?我现在唯一想到的是编写 C 程序,它将 STDIN 放入大小为 X 的缓冲区,并且每个客户端都有指向该缓冲区的指针(指向客户端能够读取的位置)......如果客户端连接他将开始只获取新数据......如果客户端太慢......他的“READ”指针将超出缓冲区他的连接将被丢弃,因为他无法跟上。

我的问题是不开发这个工具有什么办法吗?首先我想到管道到命名管道,然后让 websocketd 从中读取...但这当然行不通,因为 websocket 的第一个客户端将读取数据并将它们丢弃...

最佳答案

根据websocketd source code ,它似乎使用基于进程的服务器(每个连接一个新进程)......我不是 GO 程序员,所以我不确定我是否正确阅读,但是 README似乎表示相同的概念。

因此...

problem is it spawns new process for every client that connects

这是无法避免的。该设计意味着新连接以及它们的 STDIN 和 STDOUT 本质上是 fork 的。无法从新连接访问流式传输到原始 STDIN 的数据...

...所以将单个 streamgeneratorwebsocketd 一起使用不是一种选择。

Only thing that comes into my mind right now is writing C program...

我认为这可能是避免多进程设计的唯一方法。

您不必为此使用 C。您可能会同样轻松地使用 Ruby 或 node.js(它们可能更容易编写,同时您会为方便而付出性能代价)。

My question is is there any way without developing this tool?

我不这么认为。

然而,facil.io可以使编写使用其 native Pub/Sub API 广播数据的 C 网络套接字工具变得相当简单(允许您在将来使用 Redis 轻松扩展)......但作为作者,我有偏见。

编辑

这是一个简短的 Ruby 脚本,它将数据从管道广播到任何连接的网络套接字连接(数据由行分隔)。

文件script.rb:

#!/usr/bin/env ruby
require 'iodine'

class Example
    def self.call(env)
      if env['upgrade.websocket?'.freeze] && env["HTTP_UPGRADE".freeze] =~ /websocket/i.freeze
        env['upgrade.websocket'.freeze] = Example.new
        return [0,{}, []] # It's possible to set cookies for the response.
      end
      [404, {"Content-Length" => "12"}, ["Bad Request."] ]
    end 

    def on_open
        subscribe channel: :stream, force: :text
    end
    def on_message data
        close # are we expecting any messages?
    end
    def on_close
        # nothing to do
    end
    def on_shutdown
        # server is going away, notify client.
    end
end

Iodine::Rack.app = Example

# remove these two lines for automatice, core related, detection
Iodine.processes = 1;
Iodine.threads = 1;

# initialize the Redis engine for each iodine process.
require 'uri'
if ENV["REDIS_URL"]
  uri = URI(ENV["REDIS_URL"])
  Iodine.default_pubsub = Iodine::PubSub::RedisEngine.new(uri.host, uri.port, 0, uri.password)
else
  puts "* No Redis, it's okay, pub/sub will still run on the whole process cluster."
end

# Create the loop that reads from ARGF (the pipe)
# defer threading because we might fork the main server
root_pid = Process.pid
Iodine.run do 
    puts "Starting to listen to pipe"
    if(root_pid == Process.pid)
        Thread.new do
            ARGF.each_line do |s|
                Iodine.publish channel: :stream, message: s
                puts "read:", s
            end
        end
    end
end

# start iodine
Iodine.start

您可以从终端使用它:

$ streamgenerator | ruby script.rb

这只是一个肮脏的例子,但它表明这可能是多么容易。

哦,它需要the iodine gem ,这是 fail.io 到 Ruby 的端口(也是我的)。

编辑 2

我添加了一些代码,允许您将 Redis 与我提供的示例代码一起使用,并将发布限制为单个进程。

Redis 引擎原生于 facil.io(它使用 C 语言,带有用于 iodine 的 Ruby 桥),您可以使用它来发送命令以及 Pub/Sub。

如果您使用 Redis 来扩展多台机器,我会考虑将脚本拆分为发布者脚本和服务器应用程序。

此外,如果您使用多台机器,则只需要 Redis。如果您使用 Iodine.processes = 8 运行 Iodine,发布/订阅引擎仍然可以工作。

如果您需要,Iodine 还有许多其他功能,例如静态文件服务等。

您还可以将整个东西打包到中间件中,并使用 iodine 作为服务器(用于 Websocket 支持)使其成为现有 Rails/Sintara/Rack 项目的一部分。

...

至于:

  $ ./streamgenerator | pub --channel "redisstream"
  $ websocketd --port=8080 sub --channel "redisstream"

听起来这会缓解这个问题,虽然我认为 websocketd 仍然会为每个连接打开一个新进程,它使用比事件“ react 器模式”(例如服务器使用的 react 器模式)更多的资源例如 nginx(以及 iodine、passenger、puma 和其他一些)。

关于linux - 通过 websocket 将一个 STDIN 流式传输到多个客户端,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/46215756/

相关文章:

android - 在 Linux fedora 上获取 Google map key 时,我得到 "bash: keytool: command not found."

c++ - 如何为跨平台 C++ 开发设置 Visual Studio

linux - 按 ID 拆分内容(第一列)并根据格式生成新的数据文件

linux - 替代向上箭头 + Enter 运行上一个命令?

ajax - 如何处理多台服务器上的WebSocket?

c# - 关闭 websocket 时出现 SignalR 错误 - 无效句柄

python - 使用 nohup (LINUX) 访问后台工作的进度 - 获取前台

linux - 谁可以在 Linux/UNIX 上访问具有八进制权限 "000"的文件?

c++ - 回溯是如何工作的?

ruby - WebSocket 不是一个类(类型错误)