我想定义一个谓词,将一些谓词作为输入
具有相应的输入(它们可以作为惰性调用序列给出),
并行运行它们并计算结果的逻辑或,
这样,在谓词调用终止的那一刻,返回 true
,
整个计算也终止(返回 true
)。
除了提供时间优化外,这还有助于避免
在某些情况下不终止(某些谓词调用可能不会终止)。
实际上,将非终止解释为第三个 undefined
值(value),
此谓词模拟 Kleene's K3 logic 中的 or 操作
(加入初始居中 Kleene algebra )。
提出了类似的东西 here为 haskell 家族。
在 Clojure 中是否有任何(最好是简单的)方法来做到这一点?
编辑 : 我决定在阅读评论后添加一些说明。
(a) 首先,线程池耗尽后会发生什么并不重要。我认为创建一个足够大的线程池来满足我们的需求是一个合理的约定。
(b) 最关键的要求是谓词调用开始并行运行,一旦谓词调用终止,返回 true
,所有其他正在运行的线程都会被中断。预期的行为是:
true
: 并行或返回 true
false
换句话说,它的行为类似于
false
给出的 3 元素格中的连接。 < undefined
< true
, 与 undefined
代表不终止。(c) 并行或应该能够将许多谓词和许多谓词输入(每个对应一个谓词)作为输入。但如果将惰性序列作为输入,那就更好了。然后,命名并行或
pany
(对于“parallel any”),我们可以有如下调用:(pany (map (comp eval list) predicates inputs))
(pany (map (comp eval list) predicates (repeat input)))
(pany (map (comp eval list) (repeat predicate) inputs))
相当于 (pany (map predicate (unchunk inputs)))
最后说一句,我认为要求诸如
pany
之类的东西是很自然的。 ,双pall
或者构建这种提前终止并行缩减的机制,使其易于实现,甚至内置于 Clojure 等面向并行性的语言中。
最佳答案
我将根据归约函数定义我们的谓词。实际上,我们可以重新实现所有 Clojure 迭代函数来支持这种并行操作,但我仅以 reduce 为例。
我将定义一个计算函数。我只会使用同一个,但没有什么能阻止你拥有很多。如果累积 1000,则该函数为“真”。
(defn computor [acc val]
(let [new (+' acc val)] (if (> new 1000) (reduced new) new)))
(reduce computor 0 (range))
;; =>
1035
(reduce computor 0 (range Long/MIN_VALUE 0))
;; =>
;; ...this is a proxy for a non-returning computation
;; wrap these up in a form suitable for application of reduction
(def predicates [[computor 0 (range)]
[computor 0 (range Long/MIN_VALUE 0)]])
现在让我们来看看这个。我想在每次计算中迈出一步,如果其中一个计算完成,我想返回它。实际上,使用 pmap 一次执行一个步骤非常慢——工作单元太小,不值得线程化。在继续之前,我已经改变了每个工作单元的 1000 次迭代。您可能会根据您的工作量和步骤的成本来调整它。
(defn p-or-reducer* [reductions]
(let [splits (map #(split-at 1000 %) reductions) ;; do at least 1000 iterations per cycle
complete (some #(if (empty? (second %)) (last (first %))) splits)]
(or complete (recur (map second splits)))))
然后我将它包装在一个驱动程序中。
(defn p-or [s]
(p-or-reducer* (map #(apply reductions %) s)))
(p-or predicates)
;; =>
1035
在哪里插入CPU并行性? p-or-reducer* 中的 s/map/pmap/应该这样做。我建议只对第一个操作进行并行化,因为这将插入减少序列的计算。
(defn p-or-reducer* [reductions]
(let [splits (pmap #(split-at 1000 %) reductions) ;; do at least 1000 iterations per cycle
complete (some #(if (empty? (second %)) (last (first %))) splits)]
(or complete (recur (map second splits)))))
(def parallelism-tester (conj (vec (repeat 40000 [computor 0 (range Long/MIN_VALUE 0)]))
[computor 0 (range)]))
(p-or parallelism-tester) ;; terminates even though the first 40K predicates will not
定义一个高性能的通用版本是非常困难的。在不知道每次迭代的成本的情况下,很难推导出有效的并行策略——如果一次迭代需要 10 秒,那么我们可能一次只执行一个步骤。如果需要 100ns,那么我们需要一次采取许多步骤。
关于clojure - 如何在 Clojure 中实现并行逻辑或提前终止,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/55925196/