动机
我有一个长时间运行的 bool 函数,它应该在数组中执行,如果数组中的元素满足条件,我想立即返回。我想并行搜索并在第一个完整线程返回正确答案时终止其他线程。
问题
在 F# 中实现并行存在函数的好方法是什么?由于我的目标是性能,因此高效的解决方案比简单或惯用的解决方案更受欢迎。
测试用例
假设我想查找一个值是否存在于数组中。比较函数 (equals
) 被模拟为计算量大的函数:
open System.Diagnostics
open System.Threading
// Source at http://parallelpatterns.codeplex.com/releases/view/50473
let doCpuIntensiveOperation seconds (token:CancellationToken) throwOnCancel =
if (token.IsCancellationRequested) then
if (throwOnCancel) then token.ThrowIfCancellationRequested()
false
else
let ms = int64 (seconds * 1000.0)
let sw = new Stopwatch()
sw.Start()
let checkInterval = Math.Min(20000000, int (20000000.0 * seconds))
// Loop to simulate a computationally intensive operation
let rec loop i =
// Periodically check to see if the user has requested
// cancellation or if the time limit has passed
let check = seconds = 0.0 || i % checkInterval = 0
if check && token.IsCancellationRequested then
if throwOnCancel then token.ThrowIfCancellationRequested()
false
elif check && sw.ElapsedMilliseconds > ms then
true
else
loop (i + 1)
// Start the loop with 0 as the first value
loop 0
let inline equals x y =
doCpuIntensiveOperation 0.01 CancellationToken.None false |> ignore
x = y
数组由1000个随机生成的元素组成,搜索值保证在数组的第二半(因此顺序搜索至少要经过数组的一半):
let rand = new System.Random()
let m = 1000
let N = 1000000
let xs = [|for _ in 1..m -> rand.Next(N)|]
let i = rand.Next((m-1)/2, m-1);;
#time "on";;
let b1 = parallelExists (equals xs.[i]) xs;; // Parallel
let b2 = Array.exists (equals xs.[i]) xs;; // Sequential
最佳答案
我认为你可以采取以下步骤:
生成多个 worker(线程或异步计算),并向每个 worker 传递一个相等的数组切片和一个将由所有 worker 共享的取消标记
当工作人员找到搜索到的项目时,它会在 token 上调用取消(每个工作人员应在每次迭代中检查 token 的取消状态,并在需要时退出)
我目前没有时间编写代码,因此可能会省略一些细节。
这answer和相关问题可能会有帮助。
更新
这是我的想法的一个例子
open System
open System.Collections.Generic
open System.Threading
open System.Threading.Tasks
let getChunks size array =
let rec loop s n =
seq {
if n > 0 then
let r = n - size
if r > 0 then yield (s, size); yield! loop (s + size) r
else yield (s, size + r)
}
loop 0 (Array.length array)
[<Literal>]
let CHUNK_SIZE = 3
let parallelExists f (array:_[]) =
use cts = new CancellationTokenSource()
let rec checkSlice i n =
if n > 0 && not cts.IsCancellationRequested then
if f array.[i] then cts.Cancel()
else checkSlice (i + 1) (n - 1)
let workers =
array
|> getChunks CHUNK_SIZE
|> Seq.map (fun (s, c) -> Task.Factory.StartNew(fun () -> checkSlice s c))
|> Seq.toArray
try
Task.WaitAll(workers, cts.Token)
false
with :? OperationCanceledException -> true
用法
let array = Array.init 10 id
let exists =
array |> parallelExists (fun i ->
Thread.Sleep(500)
i = 9)
printfn "%b" exists //true
关于f# - F# 中的并行存在函数,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/8535922/