我有一个小的 Clojure 消费者/发布者接收消息,处理它们并将它们发送给其他消费者,所有这些都通过 RabbitMQ。
我已经定义了一个消息处理程序,它在一个单独的线程(与主线程分开)中处理消息。
从下面的代码中可以看出,线程同步接收和发送消息,所有这些都发生在由 启动的事件循环中。 lcm/订阅功能。
那么,问题是,创建这些同步消息处理程序的 N 大小线程池的“Clojure 方式”是什么?我猜非 Clojure 方法是通过 Java 互操作手动生成多个线程。
此外,考虑到处理不是非常占用 CPU 的,这是否会加快消息处理的速度?使这些消息处理程序异步会更好 - 再次考虑到发布时间比处理时间多吗?
最后,我将如何衡量这些竞争方法的性能(我来自 Ruby/Javascript 世界,那里没有任何多线程)?
注意 :
我知道这一切都可以通过水平扩展并产生更多监听消息总线的 JVM 进程来避免,但是由于应用程序将部署在 Heroku 上,我想在每个 dyno/进程中使用尽可能多的资源.
(defn message-handler
[ch metadata ^bytes payload]
(let [msg (json/parse-string (String. payload "UTF-8"))
processed-message (process msg)]
(lb/publish ch "e.events" "" processed-message)))
(defn -main
[& args]
(let [conn (rmq/connect {:uri (System/getenv "MSGQ")})
ch (lch/open conn)
q-name "q.events.tagger"
e-sub-name "e.events.preproc"
e-pub-name "e.events"
routing-key "tasks.taggify"]
(lq/declare ch q-name :exclusive false :auto-delete false)
(le/declare ch e-pub-name "fanout" :durable false)
(lq/bind ch q-name e-sub-name :routing-key routing-key)
(.start (Thread. (fn []
(lcm/subscribe ch q-name message-handler :auto-ack true))))))
在更基本的说明中......我将如何重构此代码以支持使用附加参数注册消息处理程序回调,如下所示:
(.start (Thread. (fn []
(lcm/subscribe ch q-name (message-handler pub-name) :auto-ack true))))))
然后发布引用:
(lb/publish ch pub-name "" processed-message)))
而不是文字:
(lb/publish ch "e.events" "" processed-message)))
最佳答案
对于问题的第二部分,您可以使用部分应用,如下所示:
(defn message-handler
[pub-name ch metadata ^bytes payload]
(let [msg (json/parse-string (String. payload "UTF-8"))
processed-message (process msg)]
(lb/publish ch pub-name "" processed-message)))
(.start
(Thread.
(fn []
(lcm/subscribe ch q-name (partial message-handler e-pub-name) :auto-ack true))))))
关于multithreading - Clojure 消息处理/异步、多线程,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/12718573/