clojure - 为什么以下带有 channel sub/unsub 的代码会出现内存泄漏?

标签 clojure core.async

我正在使用 [org.clojure/clojure "1.10.1"],[org.clojure/core.async "1.2.603"]以及最新的 Amazon Corretto 11 JVM(如果与它们有关)。
以下代码是生产中使用的代码的简化版本,它确实会导致内存泄漏。我不知道为什么会发生这种情况,但怀疑这可能是由于 channel 的订阅/取消订阅所致。任何人都可以帮助指出我的代码可能出错的地方或我如何修复内存泄漏?

(ns test-gc.core
  (:require [clojure.core.async :as a :refer [chan put! close! <! go >! go-loop timeout]])
  (:import [java.util UUID]))

(def global-msg-ch (chan (a/sliding-buffer 200)))

(def global-msg-pub (a/pub global-msg-ch :id))

(defn io-promise []
  (let [id (UUID/randomUUID)
        ch (chan)]
    (a/sub global-msg-pub id ch)
    [id (go
          (let [x (<! ch)]
            (a/unsub global-msg-pub id ch)
            (:data x)))]))

(defn -main []
  (go-loop []
    (<! (timeout 1))
    (let [[pid pch] (io-promise)
          cmd {:id   pid
               :data (rand-int 1E5)}]
      (>! global-msg-ch cmd)
      (println (<! pch)))
    (recur))
  (while true
    (Thread/yield)))
例如,快速堆转储提供以下统计信息:
  • 按实例数分类
  • java.util.LinkedList 5,157,128 (14.4%)
  • java.util.concurrent.atomic.AtomicReference 3,698,382 (10.3%)
  • clojure.lang.Atom 3,094,279 (8.6%)
  • ...

  • 按实例大小分类
  • java.lang.Object[] 210,061,752 B (13.8%)
  • java.util.LinkedList 206,285,120 B (13.6%)
  • clojure.lang.Atom 148,525,392 B (9.8%)
  • clojure.core.async.impl.channels.ManyToManyChannel 132,022,336 B (8.7%)
  • ...

  • 最佳答案

    我终于知道为什么了。通过查看源代码,我们得到以下片段:

    (defn pub
      "Creates and returns a pub(lication) of the supplied channel, ..."
      ...
         (let [mults (atom {}) ;;topic->mult
               ensure-mult (fn [topic]
                             (or (get @mults topic)
                                 (get (swap! mults
                                             #(if (% topic) % (assoc % topic (mult (chan (buf-fn topic))))))
                                      topic)))
               p (reify
                  Mux
                  (muxch* [_] ch)
    
                  Pub
                  (sub* [p topic ch close?]
                        (let [m (ensure-mult topic)]
                          (tap m ch close?)))
                  (unsub* [p topic ch]
                          (when-let [m (get @mults topic)]
                            (untap m ch)))
                  (unsub-all* [_] (reset! mults {}))
                  (unsub-all* [_ topic] (swap! mults dissoc topic)))]
           ...
           p)))
    
    我们可以看到mults店铺所有 topic因此,如果我们不清除它,它将单调增加。我们可以添加类似 (a/unsub-all* global-msg-pub pid) 的内容解决这个问题。

    关于clojure - 为什么以下带有 channel sub/unsub 的代码会出现内存泄漏?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/63030659/

    相关文章:

    clojure - 如何确定何时从作者端关闭 core.async channel ?

    networking - 如何在 Clojure (/Java) 中稳健地发出大量并发 HTTPS 请求

    multithreading - Clojure 多线程输出流

    clojure - 将插入事实从 Prolog 转换为 core.logic

    clojure - 在 clojure 中定期运行任务的最简单方法是什么

    clojure - 使用 `case`中的 channel

    clojure - 为什么是 "Assert failed: >! used not in (go ...) block"

    clojure - 自动刷新/自动重新加载资源

    clojure - 如何展平和延迟连接列表列表

    Clojure:定期轮询数据库 - core.async w/timeout channel VS vanilla recursive Thread w/sleep?