是否有一种模式可以将并行与并行上的线程安全计算相结合?
需要计算一个结果,其中第一步将从并行中受益,第二步是对并行结果的串行处理。
一种选择是并行运行并将输出保存到集合中,然后串行处理该集合,我可以正常工作。问题在于内存管理,因为集合可能非常大。
以下是串行版本。基本上我想并行 TableQueryGetRowKeys 并以线程安全的方式使用该结果。尝试并行 for 并在最终结果周围加锁,但 rowKeys 可能会关闭。尝试过聚合,但我不知道如何将集合传递给聚合,更不用说在聚合中执行线程安全的相交了。
IEnumerable<string> finalResults = null;
if (partitionKey.Length == 0) return finalResults;
object lockObject = new object();
finalResults = TableQueryGetRowKeys(partitionKey[0], 0);
HashSet<string> rowKeys;
for(int i = 1; i < partitionKey.Length; i++)
{
// IO operation to Azure Table Storage against the PartitionKey
// so very amenable to parallel
rowKeys = TableQueryGetRowKeys(partitionKey[i]);
// a memory and CPU operation
// this should be much faster than TableQueryGetRowKeys
// going parallel and wrapping this in a lock did not properly synch rowKeys
finalResults = finalResults.Intersect(rowKeys);
}
return finalResults;
最佳答案
假设TableQueryGetRowKeys
线程安全:
var final = partitionKey.AsParallel()
// By returning AsParallel we can get parallel intersect
.Select(k => TableQueryGetRowKeys(k).AsParallel())
.Aggregate((x, y) => x.Intersect(y));
// Using fake-ish data I see about a 30% speed-up on a 4-core machine:
// static HashSet<string> TableQueryGetRowKeys(string prefix)
// {
// // Simulate 1s of IO round-trip
// if (useSleep) Thread.Sleep(1000);
//
// return new HashSet<string>(
// Enumerable.Range(0, 500)
// .Select(_ => random.Value.Next(0, 500).ToString()));
// }
该算法以逐步方式工作,如下所示:
-
partitionKey.AsParallel()
变成常规IEnumerable<string>
进入ParallelQuery<string>
它允许并行处理序列。 - 下一步,
ParallelEnumerable.Select
用于调用TableQueryGetRowKeys
并行。 - 每次调用
TableQueryGetRowKeys
的结果然后包裹在ParallelQuery<T>
中使用AsParallel()
. -
ParallelEnumerable.Intersect
用作TableQueryGetRowKeys
返回的每个“启用并行”枚举的聚合函数.
实际上,这可以连续使用,通过删除 AsParallel
来替换您以前的代码。调用,如下所示:
var serialEquivalent = partitionKey.Select(k => TableQueryGetRowKeys(k))
.Aggregate((x,y) => x.Intersect(y));
当您查看实现的主要内容时,您可以“说服”自己这与您的方法等效:
IEnumerable<string> results = SomeMethod(0);
for (int ii = 1; ii < count; ++ii)
{
results = results.Intersect(SomeMethod(ii));
}
使用+
重写上面的内容而不是Intersect
:
int results = SomeMethod(0);
for (int ii = 1; ii < count; ++ii)
{
results = results + SomeMethod(ii);
}
现在很明显 Intersect
可以用来代替其他更“常见”的聚合函数(例如数学运算符)。
关于.net - 并行和线程安全串行模式,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/11366539/