我正在寻找构建实时流媒体服务器的 clojure 方法。我正在努力解决的特定问题是如何将值从单个提供程序(网络摄像头)发送到未定义数量的线程(连接的客户端)。显然,当客户端连接时,它对来自网络摄像头的完整视频文件不感兴趣,基本上需要向它发送一个 header ,然后是当时从网络摄像头到达的任何包。
在直接 java 中,我认为这很容易。每当客户端连接时,将连接添加到数组,当它断开连接时,从数组中删除连接,每当来自网络摄像头的新包到达时,将其发送到数组中的每个条目。锁定数组,以便我们添加/删除条目,或者循环遍历它以将数据包发送到。当然,我们可以在 Clojure 中构建相同的内容,但这听起来真的很糟糕。
在消息传递的多线程架构中,这听起来同样容易。
在 Clojure 中,我能想到的唯一解决方案是使用惰性 promise 序列。它确实有效,但我想知道是否有另一种方法可以使代码更清晰,更 clojure-zen :)
只是为了说明:一个简化的问题,有 promises 和 atoms:
一个提供者函数生成数据,一个线程读取该数据。后来创建了一些其他线程,它们想从第一个线程获取数据,但无法获取。
(defn provider []
(lazy-seq
(do
(Thread/sleep 100)
(cons (rand) (provider)))))
(def printer (agent nil))
(defn log [& line]
(send-off printer (fn [x] (apply println line))))
(def promises (atom (repeatedly promise)))
(defn client-connected-thread [x input]
(log "Client connection " x " is connected with the provider and just received" @(first input))
(recur x (rest input)))
(.start (Thread. (fn []
(loop [stream (provider)]
(when-let [item (first stream)]
(log "I received " item", will share now")
(deliver (first @promises) item)
(swap! promises rest))
(recur (rest stream))))))
(Thread/sleep 300)
(.start (Thread. #(client-connected-thread 1 @promises)))
(Thread/sleep 100)
(.start (Thread. #(client-connected-thread 2 @promises)))
(Thread/sleep 50)
(.start (Thread. #(client-connected-thread 3 @promises)))
所以,基本上问题是:这是解决这个问题的正确方法吗?
另外,我们这里说的是流媒体服务器,所以provider函数每秒会提供几万个item,可能有10个clients连接。 promise 系统是否意味着如此频繁的使用?
最佳答案
Clojure 为您需要异步发送信息的情况提供了代理,这似乎很适合您的用例。
你确实非常接近,只是将工作代理放在几个地方以完成这个过程。
“在直接的 Clojure 中,我认为这很容易。每当客户端连接时,将连接添加到 代理中的代理向量,当它断开连接时删除来自agents of agents的连接,每当来自网络摄像头的新包到达时,将其发送给agent中的每个agent。”
确保使用 send-off
而不是 send
以防止清空线程池。
与“锁定数组”方法相比,这有很多优点:
- 一个慢速客户端不会阻止您添加或删除或添加连接
- 客户最终将获得所有帧,而无需单独跟踪每个帧
- 你不必担心锁定
- 您不必手动分配线程
- 您可以使用 watch 等来报告性能,而无需更改算法的简单核心。
粗略的轮廓如下:
user> (def connections-stub (range))
user> (def connections (agent []))
#'user/connections
user> (defn accept-connection [connection]
(send connections conj (agent connection)))
#'user/accept-connection
user> (map accept-connection (take 10 connections-stub))
(#<Agent@2a81df82: [#<Agent@3478e59b: 0> #<Agent@6d2a3e06: 1>]> #<Agent@2a81df82: [#<Agent@3478e59b: 0> #<Agent@6d2a3e06: 1> #<Agent@704e33e7: 2>]> #<Agent@2a81df82: [#<Agent@3478e59b: 0> #<Agent@6d2a3e06: 1> #<Agent@704e33e7: 2> #<Agent@1e31bc4b: 3>]> #<Agent@2a81df82: [#<Agent@3478e59b: 0> #<Agent@6d2a3e06: 1> #<Agent@704e33e7: 2> #<Agent@1e31bc4b: 3> #<Agent@5340ef69: 4>]> #<Agent@2a81df82: [#<Agent@3478e59b: 0> #<Agent@6d2a3e06: 1> #<Agent@704e33e7: 2> #<Agent@1e31bc4b: 3> #<Agent@5340ef69: 4> #<Agent@4c260132: 5>]> #<Agent@2a81df82: [#<Agent@3478e59b: 0> #<Agent@6d2a3e06: 1> #<Agent@704e33e7: 2> #<Agent@1e31bc4b: 3> #<Agent@5340ef69: 4> #<Agent@4c260132: 5> #<Agent@5318a0ac: 6>]> #<Agent@2a81df82: [#<Agent@3478e59b: 0> #<Agent@6d2a3e06: 1> #<Agent@704e33e7: 2> #<Agent@1e31bc4b: 3> #<Agent@5340ef69: 4> #<Agent@4c260132: 5> #<Agent@5318a0ac: 6> #<Agent@75dca6d2: 7>]> #<Agent@2a81df82: [#<Agent@3478e59b: 0> #<Agent@6d2a3e06: 1> #<Agent@704e33e7: 2> #<Agent@1e31bc4b: 3> #<Agent@5340ef69: 4> #<Agent@4c260132: 5> #<Agent@5318a0ac: 6> #<Agent@75dca6d2: 7> #<Agent@694c6171: 8>]> #<Agent@2a81df82: [#<Agent@3478e59b: 0> #<Agent@6d2a3e06: 1> #<Agent@704e33e7: 2> #<Agent@1e31bc4b: 3> #<Agent@5340ef69: 4> #<Agent@4c260132: 5> #<Agent@5318a0ac: 6> #<Agent@75dca6d2: 7> #<Agent@694c6171: 8> #<Agent@159177b9: 9>]> #<Agent@2a81df82: [#<Agent@3478e59b: 0> #<Agent@6d2a3e06: 1> #<Agent@704e33e7: 2> #<Agent@1e31bc4b: 3> #<Agent@5340ef69: 4> #<Agent@4c260132: 5> #<Agent@5318a0ac: 6> #<Agent@75dca6d2: 7> #<Agent@694c6171: 8> #<Agent@159177b9: 9>]>)
user> (defn send-frame [con-agent frame]
(send con-agent
(fn [connection frame]
(println "sending " frame " to " connection) connection) frame))
#'user/send-frame
user> (send-frame (first @connections) "hello")
sending hello to 0
#<Agent@da69a9c: 0>
user> (defn dispatch-frame [frame]
(doall (map #(send-frame % frame) @connections)))
#'user/dispatch-frame
user> (dispatch-frame "hello")
sending hello to 0
sending hello to 1
sending hello to 2
sending hello to 3
sending hello to 4
sending hello to 5
sending hello to 6
sending hello to 7
sending hello to 8
sending hello to 9
(#<Agent@da69a9c: 0> #<Agent@34f07ec4: 1> #<Agent@11ee68d1: 2> #<Agent@3b237a89: 3> #<Agent@1641d6b4: 4> #<Agent@3c76ced6: 5> #<Agent@1c05629d: 6> #<Agent@258d3fca: 7> #<Agent@5c56fa08: 8> #<Agent@52395294: 9>)
user>
关于multithreading - Clojure - 构建直播服务器的方式 - 有 promise ,但它是正确的方式,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/9575483/