您总是会听到功能代码比非功能代码本质上更容易并行化,因此我决定编写一个执行以下操作的函数:
给定一个字符串输入,每个字符串的唯一字符总数为。因此,给定输入[ "aaaaa"; "bbb"; "ccccccc"; "abbbc" ]
,我们的方法将返回a: 6; b: 6; c: 8
。
这是我写的:
(* seq<#seq<char>> -> Map<char,int> *)
let wordFrequency input =
input
|> Seq.fold (fun acc text ->
(* This inner loop can be processed on its own thread *)
text
|> Seq.choose (fun char -> if Char.IsLetter char then Some(char) else None)
|> Seq.fold (fun (acc : Map<_,_>) item ->
match acc.TryFind(item) with
| Some(count) -> acc.Add(item, count + 1)
| None -> acc.Add(item, 1))
acc
) Map.empty
该代码在理论上是可并行化的,因为
input
中的每个字符串都可以在其自己的线程上进行处理。它不像看起来那样简单,因为innerloop将项目添加到所有输入之间共享的Map中。我希望将内部循环分解到其自己的线程中,并且我不想使用任何可变状态。如何使用异步工作流程重新编写此功能?
最佳答案
如前所述,如果您尝试让不同的线程处理不同的输入字符串,则存在更新争用,因为每个线程都可以增加每个字母的计数。您可以让每个线程生成自己的Map,然后“加总所有Map”,但是最后一步可能很昂贵(由于共享数据,因此不适合使用线程)。我认为使用以下算法,大型输入可能会更快地运行,其中每个线程处理不同的字母计数(对于输入中的所有字符串)。因此,每个线程都有其自己的独立计数器,因此没有更新争用,也没有合并结果的最后步骤。但是,我们需要进行预处理以发现“唯一字母集”,并且此步骤确实存在相同的争用问题。 (在实践中,您可能预先知道字符的范围,例如字母,然后仅可以创建26个线程来处理az,并绕过此问题。)在任何情况下,大概的问题都在于探索“如何编写F#”。异步代码将工作划分为多个线程,因此下面的代码对此进行了演示。
#light
let input = [| "aaaaa"; "bbb"; "ccccccc"; "abbbc" |]
// first discover all unique letters used
let Letters str =
str |> Seq.fold (fun set c -> Set.add c set) Set.empty
let allLetters =
input |> Array.map (fun str ->
async { return Letters str })
|> Async.Parallel
|> Async.Run
|> Set.union_all // note, this step is single-threaded,
// if input has many strings, can improve this
// Now count each letter on a separate thread
let CountLetter letter =
let mutable count = 0
for str in input do
for c in str do
if letter = c then
count <- count + 1
letter, count
let result =
allLetters |> Seq.map (fun c ->
async { return CountLetter c })
|> Async.Parallel
|> Async.Run
// print results
for letter,count in result do
printfn "%c : %d" letter count
我确实已经“彻底改变了算法”,这主要是因为由于更新争用,您原来的算法并不特别适合直接数据并行化。根据您要学习的内容的不同,此答案可能对您特别满意,也可能不满意。
关于multithreading - 嵌套循环中的并行代码,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/412227/