在 Clojure 中,如何阻塞线程( future )直到条件变为真?或者,也许继续重试直到条件变为真?当你有条件变量时这很容易,但我不确定 Clojure 的方法是什么。
更具体地说,我有一个共享变量,可以同时被多个 futures 访问。 future 应该做以下事情:
- 检查变量的状态。
- 如果状态满足特定条件,则将其更新为新状态。
- 如果状态不满足条件, future 应该阻塞或重试,直到满足条件(由另一个线程修改状态)。
最佳答案
Java 平台支持条件变量,参见文档java.util.concurrent.locks.Condition
.
上面页面中的示例很容易转换为 Clojure:
;;; based on the example in java.util.concurrent.locks.Condition
;;; documentation for JDK 1.7, see the link above
(defprotocol PBoundedBuffer
(-put [buf x])
(-take [buf]))
(import (java.util.concurrent.locks ReentrantLock Condition))
(deftype BoundedBuffer [^ReentrantLock lock
^Condition not-full?
^Condition not-empty?
^objects items
^:unsynchronized-mutable ^int putptr
^:unsynchronized-mutable ^int takeptr
^:unsynchronized-mutable ^int cnt]
PBoundedBuffer
(-put [buf x]
(.lock lock)
(try
(while (== cnt (alength items))
(.await not-full?))
(aset items putptr x)
(set! putptr (unchecked-inc-int putptr))
(if (== putptr (alength items))
(set! putptr (int 0)))
(set! cnt (unchecked-inc-int cnt))
(.signal not-empty?)
(finally
(.unlock lock))))
(-take [buf]
(.lock lock)
(try
(while (zero? cnt)
(.await not-empty?))
(let [x (aget items takeptr)]
(set! takeptr (unchecked-inc-int takeptr))
(if (== takeptr (alength items))
(set! takeptr (int 0)))
(set! cnt (unchecked-dec-int cnt))
(.signal not-full?)
x)
(finally
(.unlock lock)))))
(defn bounded-buffer [capacity]
(let [lock (java.util.concurrent.locks.ReentrantLock.)]
(BoundedBuffer. lock
(.newCondition lock)
(.newCondition lock)
(object-array capacity)
0
0
0)))
REPL 试驾:
(def bb (bounded-buffer 3))
(-put bb 1)
(-put bb 2)
(-put bb 3)
(future (-put bb 4) (println :foo))
(-take bb)
根据需要,future block ,然后在最终调用 -take
之后打印 :foo
。
关于concurrency - 在条件变为真之前如何阻塞线程?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/23098575/