考虑以下代码片段,并注意将 numberTasksToSpinOff
设置为 1 和 3,4 或更大(取决于计算机上的线程资源)之间的总运行时间差异。我注意到当分拆更多任务时,运行时间会更长。
我特意将数据集合传递到每个工作实例中,每个工作任务同时从中读取数据。我认为只要这些操作只是读取或枚举,任务就可以无阻塞地访问共享数据结构。
我的目标是分拆多个任务,这些任务通过读取操作迭代相同的共享数据结构,并几乎同时完成,无论分拆的任务数量如何。
编辑:请参阅第二个代码片段,我在其中实现 Parallel.Foreach()
并创建每个工作人员自己的数据集,因此不同任务/线程不会访问相同的数据结构。但我仍然看到大量的开销令人无法接受。
class Program
{
static void Main(string[] args)
{
Console.WriteLine($"Entry Main Function Thread Id: {Thread.CurrentThread.ManagedThreadId}");
//run
var task = Task.Run(async () =>
{
Console.WriteLine($"Entry RunMe Task Thread Id: {Thread.CurrentThread.ManagedThreadId}");
await RunMe();
Console.WriteLine($"Exit RunMe Task Thread Id: {Thread.CurrentThread.ManagedThreadId}");
});
task.Wait();
Console.WriteLine($"Exit Main Function Thread Id: {Thread.CurrentThread.ManagedThreadId}");
Console.WriteLine("Press key to quit");
Console.ReadLine();
}
private static async Task RunMe()
{
var watch = new Stopwatch();
var numberTasksToSpinOff = 6;
var numberItems = 20000;
var random = new Random((int)DateTime.Now.Ticks);
var dataPoints = Enumerable.Range(1, numberItems).Select(x => random.NextDouble()).ToList();
var tasks = new List<Task>();
var workers = new List<Worker>();
//structure workers
for (int i = 1; i <= numberTasksToSpinOff; i++)
{
workers.Add(new Worker(i, dataPoints));
}
//start timer
watch.Restart();
//spin off tasks
foreach (var worker in workers)
{
tasks.Add(Task.Run(() =>
{
Console.WriteLine($"Entry WorkerId: {worker.WorkerId} -> New Tasks spun off with in Thread Id: {Thread.CurrentThread.ManagedThreadId}");
worker.DoSomeWork();
Console.WriteLine($"Exit WorkerId: {worker.WorkerId} -> New Tasks spun off with in Thread Id: {Thread.CurrentThread.ManagedThreadId}");
}));
}
//completion tasks
await Task.WhenAll(tasks);
//stop timer
watch.Stop();
Console.WriteLine($"Time it took to complete in Milliseconds: {watch.ElapsedMilliseconds}");
}
}
public class Worker
{
public int WorkerId { get; set; }
private List<double> _data;
public Worker(int workerId, List<double> data)
{
WorkerId = workerId;
_data = data;
}
public void DoSomeWork()
{
var indexPos = 0;
foreach (var dp in _data)
{
var subSet = _data.Skip(indexPos).Take(_data.Count - indexPos).ToList();
indexPos++;
}
}
}
第二个代码片段:
class Program
{
static void Main(string[] args)
{
var watch = new Stopwatch();
var numberTasksToSpinOff = 1;
var numberItems = 20000;
//var random = new Random((int)DateTime.Now.Ticks);
//var dataPoints = Enumerable.Range(1, numberItems).Select(x => random.NextDouble()).ToList();
var workers = new List<Worker>();
//structure workers
for (int i = 1; i <= numberTasksToSpinOff; i++)
{
workers.Add(new Worker(i));
}
//start timer
watch.Restart();
//parellel work
if (workers.Any())
{
var processorCount = Environment.ProcessorCount;
var parallelOptions = new ParallelOptions { MaxDegreeOfParallelism = processorCount };
Parallel.ForEach(workers, parallelOptions, DoSomeWork);
}
//stop timer
watch.Stop();
Console.WriteLine($"Time it took to complete in Milliseconds: {watch.ElapsedMilliseconds}");
Console.WriteLine("Press key to quit");
Console.ReadLine();
}
private static void DoSomeWork(Worker worker)
{
Console.WriteLine($"WorkerId: {worker.WorkerId} -> New Tasks spun off with in Thread Id: {Thread.CurrentThread.ManagedThreadId}");
var indexPos = 0;
foreach (var dp in worker.Data)
{
var subSet = worker.Data.Skip(indexPos).Take(worker.Data.Count - indexPos).ToList();
indexPos++;
}
}
}
public class Worker
{
public int WorkerId { get; set; }
public List<double> Data { get; set; }
public Worker(int workerId)
{
WorkerId = workerId;
var numberItems = 20000;
var random = new Random((int)DateTime.Now.Ticks);
Data = Enumerable.Range(1, numberItems).Select(x => random.NextDouble()).ToList();
}
}
最佳答案
注意:以下答案基于测试和观察,而不是明确的知识。
分拆的任务越多,产生的开销就越多,因此总执行时间也会增加。 但是如果您从另一个角度考虑它,您会发现实际处理的“数据点”将增加您启动的更多任务(直到达到可用硬件线程的限制):
以下值是在我的机器 (4C/8T) 上生成的,每个列表有 10000 个点:
- 1 名 worker -> 1891 毫秒 -> 5288 p/s
- 2 名工作人员 -> 1921 毫秒 -> 10411 p/s
- 4 名工作人员 -> 2670 毫秒 -> 14981 p/s
- 8 个工作人员 -> 4871 毫秒 -> 16423 p/s
- 12 名工作人员 -> 7449 毫秒 -> 16109 p/s
在那里你看到,直到我达到我的“核心限制”,处理的数据显着增加,然后直到我达到我的“线程限制”,它的增加仍然很明显,但之后它再次减少,因为开销增加并且没有更多可用的硬件资源。
关于c# - 如何正确并行工作任务?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/48637097/