f# - F# 中的并行存在函数

标签 f# parallel-processing task-parallel-library

动机

我有一个长时间运行的 bool 函数,它应该在数组中执行,如果数组中的元素满足条件,我想立即返回。我想并行搜索并在第一个完整线程返回正确答案时终止其他线程。

问题

在 F# 中实现并行存在函数的好方法是什么?由于我的目标是性能,因此高效的解决方案比简单或惯用的解决方案更受欢迎。

测试用例

假设我想查找一个值是否存在于数组中。比较函数 (equals) 被模拟为计算量大的函数:

open System.Diagnostics
open System.Threading

// Source at http://parallelpatterns.codeplex.com/releases/view/50473
let doCpuIntensiveOperation seconds (token:CancellationToken) throwOnCancel =
        if (token.IsCancellationRequested) then
            if (throwOnCancel) then token.ThrowIfCancellationRequested()
            false
        else
            let ms = int64 (seconds * 1000.0)
            let sw = new Stopwatch()
            sw.Start()
            let checkInterval = Math.Min(20000000, int (20000000.0 * seconds))

            // Loop to simulate a computationally intensive operation
            let rec loop i = 
                // Periodically check to see if the user has requested 
                // cancellation or if the time limit has passed
                let check = seconds = 0.0 || i % checkInterval = 0
                if check && token.IsCancellationRequested then
                    if throwOnCancel then token.ThrowIfCancellationRequested()
                    false
                elif check && sw.ElapsedMilliseconds > ms then
                    true
                else 
                  loop (i + 1)

            // Start the loop with 0 as the first value
            loop 0

let inline equals x y =
    doCpuIntensiveOperation 0.01 CancellationToken.None false |> ignore
    x = y

数组由1000个随机生成的元素组成,搜索值保证在数组的第二半(因此顺序搜索至少要经过数组的一半):

let rand = new System.Random()
let m = 1000
let N = 1000000
let xs = [|for _ in 1..m -> rand.Next(N)|]
let i = rand.Next((m-1)/2, m-1);;

#time "on";;
let b1 = parallelExists (equals xs.[i]) xs;; // Parallel
let b2 = Array.exists (equals xs.[i]) xs;; // Sequential

最佳答案

我认为你可以采取以下步骤:

  1. 生成多个 worker(线程或异步计算),并向每个 worker 传递一个相等的数组切片和一个将由所有 worker 共享的取消标记

  2. 当工作人员找到搜索到的项目时,它会在 token 上调用取消(每个工作人员应在每次迭代中检查 token 的取消状态,并在需要时退出)

我目前没有时间编写代码,因此可能会省略一些细节。

answer和相关问题可能会有帮助。

更新

这是我的想法的一个例子

open System
open System.Collections.Generic
open System.Threading
open System.Threading.Tasks

let getChunks size array =
  let rec loop s n =
    seq {
      if n > 0 then
        let r = n - size
        if r > 0 then yield (s, size); yield! loop (s + size) r
        else yield (s, size + r)
    }      
  loop 0 (Array.length array)

[<Literal>]
let CHUNK_SIZE = 3

let parallelExists f (array:_[]) =
  use cts = new CancellationTokenSource()
  let rec checkSlice i n = 
    if n > 0 && not cts.IsCancellationRequested then
      if f array.[i] then cts.Cancel()
      else checkSlice (i + 1) (n - 1)
  let workers =
    array
    |> getChunks CHUNK_SIZE
    |> Seq.map (fun (s, c) -> Task.Factory.StartNew(fun () -> checkSlice s c))
    |> Seq.toArray
  try 
    Task.WaitAll(workers, cts.Token)
    false
  with :? OperationCanceledException -> true

用法

let array = Array.init 10 id
let exists = 
  array |> parallelExists (fun i ->
    Thread.Sleep(500)
    i = 9)
printfn "%b" exists //true

关于f# - F# 中的并行存在函数,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/8535922/

相关文章:

f# - 如何在 F# 中从控制台读取输入

multithreading - 利用 SLURM 上的所有 CPU

scala - 如何为并行集合设置线程数?

python - ipyparallel 奇怪的开销行为

c# - 使用 .Result 的异步方法

c# - 新的 Tasks.Dataflow block 是否在网络上工作?

c# - TPL Parallel.ForEach 中的每线程实例对象

.net - F#作为脚本语言

f# - .NET Core 3预览4: 'AddNewtonsoftJson'未定义

generics - f# 成员约束和 IEnumerable 集合