c# - 如何正确并行工作任务?

标签 c# multithreading task task-parallel-library

考虑以下代码片段,并注意将 numberTasksToSpinOff 设置为 1 和 3,4 或更大(取决于计算机上的线程资源)之间的总运行时间差异。我注意到当分拆更多任务时,运行时间会更长。

我特意将数据集合传递到每个工作实例中,每个工作任务同时从中读取数据。我认为只要这些操作只是读取或枚举,任务就可以无阻塞地访问共享数据结构。

我的目标是分拆多个任务,这些任务通过读取操作迭代相同的共享数据结构,并几乎同时完成,无论分拆的任务数量如何。

编辑:请参阅第二个代码片段,我在其中实现 Parallel.Foreach() 并创建每个工作人员自己的数据集,因此不同任务/线程不会访问相同的数据结构。但我仍然看到大量的开销令人无法接受。

class Program
{
    static void Main(string[] args)
    {
        Console.WriteLine($"Entry Main Function Thread Id: {Thread.CurrentThread.ManagedThreadId}");

        //run
        var task = Task.Run(async () =>
        {
            Console.WriteLine($"Entry RunMe Task Thread Id: {Thread.CurrentThread.ManagedThreadId}");
            await RunMe();

            Console.WriteLine($"Exit RunMe Task Thread Id: {Thread.CurrentThread.ManagedThreadId}");
        });

        task.Wait();

        Console.WriteLine($"Exit Main Function Thread Id: {Thread.CurrentThread.ManagedThreadId}");

        Console.WriteLine("Press key to quit");
        Console.ReadLine();
    }

    private static async Task RunMe()
    {
        var watch = new Stopwatch();
        var numberTasksToSpinOff = 6;
        var numberItems = 20000;
        var random = new Random((int)DateTime.Now.Ticks);
        var dataPoints = Enumerable.Range(1, numberItems).Select(x => random.NextDouble()).ToList();
        var tasks = new List<Task>();
        var workers = new List<Worker>();

        //structure workers
        for (int i = 1; i <= numberTasksToSpinOff; i++)
        {
            workers.Add(new Worker(i, dataPoints));
        }

        //start timer
        watch.Restart();

        //spin off tasks
        foreach (var worker in workers)
        {
            tasks.Add(Task.Run(() =>
            {
                Console.WriteLine($"Entry WorkerId: {worker.WorkerId} -> New Tasks spun off with in Thread Id: {Thread.CurrentThread.ManagedThreadId}");
                worker.DoSomeWork();
                Console.WriteLine($"Exit WorkerId: {worker.WorkerId} -> New Tasks spun off with in Thread Id: {Thread.CurrentThread.ManagedThreadId}");
            }));

        }

        //completion tasks
        await Task.WhenAll(tasks);

        //stop timer
        watch.Stop();

        Console.WriteLine($"Time it took to complete in Milliseconds: {watch.ElapsedMilliseconds}");
    }
}

public class Worker
{
    public int WorkerId { get; set; }
    private List<double> _data;

    public Worker(int workerId, List<double> data)
    {
        WorkerId = workerId;
        _data = data;
    }

    public void DoSomeWork()
    {
        var indexPos = 0;

        foreach (var dp in _data)
        {
            var subSet = _data.Skip(indexPos).Take(_data.Count - indexPos).ToList();
            indexPos++;
        }
    }
}

第二个代码片段:

class Program
{
    static void Main(string[] args)
    {
        var watch = new Stopwatch();
        var numberTasksToSpinOff = 1;
        var numberItems = 20000;
        //var random = new Random((int)DateTime.Now.Ticks);
        //var dataPoints = Enumerable.Range(1, numberItems).Select(x => random.NextDouble()).ToList();
        var workers = new List<Worker>();

        //structure workers
        for (int i = 1; i <= numberTasksToSpinOff; i++)
        {
            workers.Add(new Worker(i));
        }

        //start timer
        watch.Restart();

        //parellel work

        if (workers.Any())
        {
            var processorCount = Environment.ProcessorCount;
            var parallelOptions = new ParallelOptions { MaxDegreeOfParallelism = processorCount };
            Parallel.ForEach(workers, parallelOptions, DoSomeWork);
        }

        //stop timer
        watch.Stop();
        Console.WriteLine($"Time it took to complete in Milliseconds: {watch.ElapsedMilliseconds}");

        Console.WriteLine("Press key to quit");
        Console.ReadLine();
    }

    private static void DoSomeWork(Worker worker)
    {
        Console.WriteLine($"WorkerId: {worker.WorkerId} -> New Tasks spun off with in Thread Id: {Thread.CurrentThread.ManagedThreadId}");

        var indexPos = 0;

        foreach (var dp in worker.Data)
        {
            var subSet = worker.Data.Skip(indexPos).Take(worker.Data.Count - indexPos).ToList();
            indexPos++;
        }
    }
}

public class Worker
{
    public int WorkerId { get; set; }
    public List<double> Data { get; set; }

    public Worker(int workerId)
    {
        WorkerId = workerId;

        var numberItems = 20000;
        var random = new Random((int)DateTime.Now.Ticks);
        Data = Enumerable.Range(1, numberItems).Select(x => random.NextDouble()).ToList();

    }
}

最佳答案

注意:以下答案基于测试和观察,而不是明确的知识。

分拆的任务越多,产生的开销就越多,因此总执行时间也会增加。 但是如果您从另一个角度考虑它,您会发现实际处理的“数据点”将增加您启动的更多任务(直到达到可用硬件线程的限制):

以下值是在我的机器 (4C/8T) 上生成的,每个列表有 10000 个点:

  • 1 名 worker -> 1891 毫秒 -> 5288 p/s
  • 2 名工作人员 -> 1921 毫秒 -> 10411 p/s
  • 4 名工作人员 -> 2670 毫秒 -> 14981 p/s
  • 8 个工作人员 -> 4871 毫秒 -> 16423 p/s
  • 12 名工作人员 -> 7449 毫秒 -> 16109 p/s

在那里你看到,直到我达到我的“核心限制”,处理的数据显着增加,然后直到我达到我的“线程限制”,它的增加仍然很明显,但之后它再次减少,因为开销增加并且没有更多可用的硬件资源。

关于c# - 如何正确并行工作任务?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/48637097/

相关文章:

c# - Nginx 作为 Ubuntu 上带有 UseRewriter 的 Asp.Net Core 2.0 Web 应用程序的反向代理

c# - 删除未使用代码的最佳实践

java - 在线程内使用 System.exit() 后显示主线程的输出

c# - 如何使用线程完成,然后重用线程?

JavaFX 和多线程 : IllegalStateException on ObservableList. add()

c# - 在 .NET Web 应用程序中填充缓存时的线程管理

python - Scikit-learn 多线程

azure - 无法使用 SSIS 将文件上传到 Azure 存储

grails - Gant构建脚本,如何检索我要执行的任务

c# - 如何在多个 RDLC 报告中共享和嵌入图像