multithreading - Clojure - 构建直播服务器的方式 - 有 promise ,但它是正确的方式

标签 multithreading clojure promise

我正在寻找构建实时流媒体服务器的 clojure 方法。我正在努力解决的特定问题是如何将值从单个提供程序(网络摄像头)发送到未定义数量的线程(连接的客户端)。显然,当客户端连接时,它对来自网络摄像头的完整视频文件不感兴趣,基本上需要向它发送一个 header ,然后是当时从网络摄像头到达的任何包。

在直接 java 中,我认为这很容易。每当客户端连接时,将连接添加到数组,当它断开连接时,从数组中删除连接,每当来自网络摄像头的新包到达时,将其发送到数组中的每个条目。锁定数组,以便我们添加/删除条目,或者循环遍历它以将数据包发送到。当然,我们可以在 Clojure 中构建相同的内容,但这听起来真的很糟糕。

在消息传递的多线程架构中,这听起来同样容易。

在 Clojure 中,我能想到的唯一解决方案是使用惰性 promise 序列。它确实有效,但我想知道是否有另一种方法可以使代码更清晰,更 clojure-zen :)

只是为了说明:一个简化的问题,有 promises 和 atoms:

一个提供者函数生成数据,一个线程读取该数据。后来创建了一些其他线程,它们想从第一个线程获取数据,但无法获取。

(defn provider []                                                                                                                                                                                                                                                                                                             
  (lazy-seq                                                                                                                                                                                                                                                                                                                   
    (do                                                                                                                                                                                                                                                                                                                       
      (Thread/sleep 100)                                                                                                                                                                                                                                                                                                      
      (cons (rand) (provider)))))                                                                                                                                                                                                                                                                                             

(def printer (agent nil))                                                                                                                                                                                                                                                                                                     
(defn log [& line]                                                                                                                                                                                                                                                                                                            
  (send-off printer (fn [x] (apply println line))))                                                                                                                                                                                                                                                                           

(def promises (atom (repeatedly promise)))                                                                                                                                                                                                                                                                                    

(defn client-connected-thread [x input]                                                                                                                                                                                                                                                                                       
  (log "Client connection " x " is connected with the provider and just received" @(first input))                                                                                                                                                                                                                             
  (recur x (rest input)))                                                                                                                                                                                                                                                                                                     

(.start (Thread. (fn []                                                                                                                                                                                                                                                                                                       
                   (loop [stream (provider)]                                                                                                                                                                                                                                                                                  
                     (when-let [item (first stream)]                                                                                                                                                                                                                                                                          
                       (log "I received " item", will share now")                                                                                                                                                                                                                                                             
                       (deliver (first @promises) item)                                                                                                                                                                                                                                                                       
                       (swap! promises rest))                                                                                                                                                                                                                                                                                 
                       (recur (rest stream))))))                                                                                                                                                                                                                                                                              


(Thread/sleep 300)                                                                                                                                                                                                                                                                                                            
(.start (Thread. #(client-connected-thread 1 @promises)))                                                                                                                                                                                                                                                                     
(Thread/sleep 100)                                                                                                                                                                                                                                                                                                            
(.start (Thread. #(client-connected-thread 2 @promises)))                                                                                                                                                                                                                                                                     
(Thread/sleep 50)                                                                                                                                                                                                                                                                                                             
(.start (Thread. #(client-connected-thread 3 @promises)))            

所以,基本上问题是:这是解决这个问题的正确方法吗?

另外,我们这里说的是流媒体服务器,所以provider函数每秒会提供几万个item,可能有10个clients连接。 promise 系统是否意味着如此频繁的使用?

最佳答案

Clojure 为您需要异步发送信息的情况提供了代理,这似乎很适合您的用例。

你确实非常接近,只是将工作代理放在几个地方以完成这个过程。

“在直接的 Clojure 中,我认为这很容易。每当客户端连接时,将连接添加到 代理中的代理向量,当它断开连接时删除来自agents of agents的连接,每当来自网络摄像头的新包到达时,将其发送给agent中的每个agent。”

确保使用 send-off 而不是 send 以防止清空线程池。

与“锁定数组”方法相比,这有很多优点:

  • 一个慢速客户端不会阻止您添加或删除或添加连接
  • 客户最终将获得所有帧,而无需单独跟踪每个帧
  • 你不必担心锁定
  • 您不必手动分配线程
  • 您可以使用 watch 等来报告性能,而无需更改算法的简单核心。

粗略的轮廓如下:

user> (def connections-stub (range))
user> (def connections (agent []))
#'user/connections
user> (defn accept-connection [connection] 
    (send connections conj (agent connection)))
#'user/accept-connection
user> (map accept-connection (take 10 connections-stub))
(#<Agent@2a81df82: [#<Agent@3478e59b: 0> #<Agent@6d2a3e06: 1>]> #<Agent@2a81df82: [#<Agent@3478e59b: 0> #<Agent@6d2a3e06: 1> #<Agent@704e33e7: 2>]> #<Agent@2a81df82: [#<Agent@3478e59b: 0> #<Agent@6d2a3e06: 1> #<Agent@704e33e7: 2> #<Agent@1e31bc4b: 3>]> #<Agent@2a81df82: [#<Agent@3478e59b: 0> #<Agent@6d2a3e06: 1> #<Agent@704e33e7: 2> #<Agent@1e31bc4b: 3> #<Agent@5340ef69: 4>]> #<Agent@2a81df82: [#<Agent@3478e59b: 0> #<Agent@6d2a3e06: 1> #<Agent@704e33e7: 2> #<Agent@1e31bc4b: 3> #<Agent@5340ef69: 4> #<Agent@4c260132: 5>]> #<Agent@2a81df82: [#<Agent@3478e59b: 0> #<Agent@6d2a3e06: 1> #<Agent@704e33e7: 2> #<Agent@1e31bc4b: 3> #<Agent@5340ef69: 4> #<Agent@4c260132: 5> #<Agent@5318a0ac: 6>]> #<Agent@2a81df82: [#<Agent@3478e59b: 0> #<Agent@6d2a3e06: 1> #<Agent@704e33e7: 2> #<Agent@1e31bc4b: 3> #<Agent@5340ef69: 4> #<Agent@4c260132: 5> #<Agent@5318a0ac: 6> #<Agent@75dca6d2: 7>]> #<Agent@2a81df82: [#<Agent@3478e59b: 0> #<Agent@6d2a3e06: 1> #<Agent@704e33e7: 2> #<Agent@1e31bc4b: 3> #<Agent@5340ef69: 4> #<Agent@4c260132: 5> #<Agent@5318a0ac: 6> #<Agent@75dca6d2: 7> #<Agent@694c6171: 8>]> #<Agent@2a81df82: [#<Agent@3478e59b: 0> #<Agent@6d2a3e06: 1> #<Agent@704e33e7: 2> #<Agent@1e31bc4b: 3> #<Agent@5340ef69: 4> #<Agent@4c260132: 5> #<Agent@5318a0ac: 6> #<Agent@75dca6d2: 7> #<Agent@694c6171: 8> #<Agent@159177b9: 9>]> #<Agent@2a81df82: [#<Agent@3478e59b: 0> #<Agent@6d2a3e06: 1> #<Agent@704e33e7: 2> #<Agent@1e31bc4b: 3> #<Agent@5340ef69: 4> #<Agent@4c260132: 5> #<Agent@5318a0ac: 6> #<Agent@75dca6d2: 7> #<Agent@694c6171: 8> #<Agent@159177b9: 9>]>)

user> (defn send-frame [con-agent frame] 
       (send con-agent 
         (fn [connection frame] 
           (println "sending " frame " to " connection) connection) frame))
#'user/send-frame

user> (send-frame (first @connections) "hello")
sending  hello  to  0
#<Agent@da69a9c: 0>

user> (defn dispatch-frame [frame] 
        (doall (map #(send-frame % frame) @connections)))
#'user/dispatch-frame

user> (dispatch-frame "hello")
sending  hello  to  0
sending  hello  to  1
sending  hello  to  2
sending  hello  to  3
sending  hello  to  4
sending  hello  to  5
sending  hello  to  6
sending  hello  to  7
sending  hello  to  8
sending  hello  to  9
(#<Agent@da69a9c: 0> #<Agent@34f07ec4: 1> #<Agent@11ee68d1: 2> #<Agent@3b237a89: 3> #<Agent@1641d6b4: 4> #<Agent@3c76ced6: 5> #<Agent@1c05629d: 6> #<Agent@258d3fca: 7> #<Agent@5c56fa08: 8> #<Agent@52395294: 9>)
user> 

关于multithreading - Clojure - 构建直播服务器的方式 - 有 promise ,但它是正确的方式,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/9575483/

相关文章:

javascript - 你能用一个预定义的 finally 返回一个 Promise 吗?

javascript - AngularJS : calling a factory method within the loop

c# - 有什么理由立即使用 await 和 async 吗?

Python:threading.Thread,值是按值传递还是按引用传递?

c++ - 数据结构的线程安全,在哪里添加同步原语?

excel - 使用 clojure docjure 编辑一些 Excel 单元格

rest - 从 Spring Boot RestController 返回 Clojure PersistentVector

python - 如何在python中一个接一个地运行两个进程

java - 对于现有的 Java 应用程序来说,什么是好的嵌入式语言?

javascript - Javascript "Promises"和函数式编程的 "Task"有什么区别?