.net - 并行和线程安全串行模式

标签 .net thread-safety task-parallel-library

是否有一种模式可以将并行与并行上的线程安全计算相结合?

需要计算一个结果,其中第一步将从并行中受益,第二步是对并行结果的串行处理。

一种选择是并行运行并将输出保存到集合中,然后串行处理该集合,我可以正常工作。问题在于内存管理,因为集合可能非常大。

以下是串行版本。基本上我想并行 TableQueryGetRowKeys 并以线程安全的方式使用该结果。尝试并行 for 并在最终结果周围加锁,但 rowKeys 可能会关闭。尝试过聚合,但我不知道如何将集合传递给聚合,更不用说在聚合中执行线程安全的相交了。

IEnumerable<string> finalResults = null;
if (partitionKey.Length == 0) return finalResults;
object lockObject = new object();
finalResults = TableQueryGetRowKeys(partitionKey[0], 0);
HashSet<string> rowKeys;
for(int i = 1; i < partitionKey.Length; i++)
{
    // IO operation to Azure Table Storage against the PartitionKey
    // so very amenable to parallel
    rowKeys = TableQueryGetRowKeys(partitionKey[i]);
    // a memory and CPU operation 
    // this should be much faster than TableQueryGetRowKeys
    // going parallel and wrapping this in a lock did not properly synch rowKeys
    finalResults = finalResults.Intersect(rowKeys); 
}
return finalResults;

最佳答案

假设TableQueryGetRowKeys线程安全:

var final = partitionKey.AsParallel()
                        // By returning AsParallel we can get parallel intersect
                        .Select(k => TableQueryGetRowKeys(k).AsParallel())
                        .Aggregate((x, y) => x.Intersect(y));

// Using fake-ish data I see about a 30% speed-up on a 4-core machine:
// static HashSet<string> TableQueryGetRowKeys(string prefix)
// {
//     // Simulate 1s of IO round-trip
//     if (useSleep) Thread.Sleep(1000);
//
//     return new HashSet<string>(
//         Enumerable.Range(0, 500)
//                   .Select(_ => random.Value.Next(0, 500).ToString()));
// }

该算法以逐步方式工作,如下所示:

  1. partitionKey.AsParallel()变成常规IEnumerable<string>进入ParallelQuery<string>它允许并行处理序列。
  2. 下一步,ParallelEnumerable.Select用于调用TableQueryGetRowKeys并行。
  3. 每次调用 TableQueryGetRowKeys 的结果然后包裹在 ParallelQuery<T> 中使用AsParallel() .
  4. ParallelEnumerable.Intersect用作 TableQueryGetRowKeys 返回的每个“启用并行”枚举的聚合函数.

实际上,这可以连续使用,通过删除 AsParallel 来替换您以前的代码。调用,如下所示:

var serialEquivalent = partitionKey.Select(k => TableQueryGetRowKeys(k))
                                   .Aggregate((x,y) => x.Intersect(y));

当您查看实现的主要内容时,您可以“说服”自己这与您的方法等效:

IEnumerable<string> results = SomeMethod(0);
for (int ii = 1; ii < count; ++ii)
{
    results = results.Intersect(SomeMethod(ii));
}

使用+重写上面的内容而不是Intersect :

int results = SomeMethod(0);
for (int ii = 1; ii < count; ++ii)
{
    results = results + SomeMethod(ii);
}

现在很明显 Intersect可以用来代替其他更“常见”的聚合函数(例如数学运算符)。

关于.net - 并行和线程安全串行模式,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/11366539/

相关文章:

c# - 在elasticsearch.net中按字符串数组查询字符串数组

C# 窗口应用程序从其他应用程序的光标位置获取单词

multithreading - 实现线程安全

async-await - 等待 Task.CompletedTask 与返回

c# - 从 Task.WhenAll 获取结果

c# - 在电子邮件中发送表格

javascript - 如何在 Javascript 中打开下载对话框?

c# - 并行 Foreach 超时优雅地关闭每个工作线程

c++ - 当其他库仍在使用时如何删除动态库中的指针

c# - 识别 TPL 数据流中的同时任务