c# - 为什么在C#中首次调用的并行处理要慢得多?

标签 c# parallel-processing task parallel.foreach partitioner

我正在尝试使用C#应用程序尽快处理数字。我使用Thread.Sleep()模拟处理和随机数。我使用3种不同的技术。

这是我使用的测试代码:

using System;
using System.Collections.Concurrent;
using System.Collections.Generic;
using System.Diagnostics;
using System.Linq;
using System.Threading;
using System.Threading.Tasks;

namespace Test
{
    internal class Program
    {
        private static void Main()
        {
            var data = new int[500000];
            var random = new Random();

            for (int i = 0; i < 500000; i++)
            {
                data[i] = random.Next();
            }

            var partialTimes = new Dictionary<int, double>();
            var iterations = 5;

            for (int i = 1; i < iterations + 1; i++)
            {
                Console.Write($"ProcessData3 {i}\t");
                StartProcessing(data, partialTimes, ProcessData3);
                GC.Collect();
            }

            Console.WriteLine();
            Console.WriteLine("Press Enter to Exit");
            Console.ReadLine();
        }

        private static void StartProcessing(int[] data, Dictionary<int, double> partialTimes, Action<int[], Dictionary<int, double>> processData)
        {
            var stopwatch = Stopwatch.StartNew();

            try
            {
                processData?.Invoke(data, partialTimes);
                stopwatch.Stop();

                Console.WriteLine($"{stopwatch.Elapsed.ToString(@"mm\:ss\:fffffff")} total = {partialTimes.Sum(s => s.Value)} max = {partialTimes.Values.Max()}");
            }
            finally
            {
                partialTimes.Clear();
            }
        }

        private static void ProcessData1(int[] data, Dictionary<int, double> partialTimes)
        {
            Parallel.ForEach(data, number =>
            {
                var partialStopwatch = Stopwatch.StartNew();

                Thread.Sleep(1);

                partialStopwatch.Stop();

                lock (partialTimes)
                {
                    partialTimes[number] = partialStopwatch.Elapsed.TotalMilliseconds;
                }
            });
        }

        private static void ProcessData3(int[] data, Dictionary<int, double> partialTimes)
        {
            // Partition the entire source array.
            var rangePartitioner = Partitioner.Create(0, data.Length);

            // Loop over the partitions in parallel.
            Parallel.ForEach(rangePartitioner, (range, loopState) =>
            {
                // Loop over each range element without a delegate invocation.
                for (int i = range.Item1; i < range.Item2; i++)
                {
                    var number = data[i];
                    var partialStopwatch = Stopwatch.StartNew();

                    Thread.Sleep(1);

                    partialStopwatch.Stop();

                    lock (partialTimes)
                    {
                        partialTimes[number] = partialStopwatch.Elapsed.TotalMilliseconds;
                    }
                }
            });
        }

        private static void ProcessData2(int[] data, Dictionary<int, double> partialTimes)
        {
            var tasks = new Task[data.Count()];
            for (int i = 0; i < data.Count(); i++)
            {
                var number = data[i];

                tasks[i] = Task.Factory.StartNew(() =>
                {
                    var partialStopwatch = Stopwatch.StartNew();

                    Thread.Sleep(1);

                    partialStopwatch.Stop();

                    lock (partialTimes)
                    {
                        partialTimes[number] = partialStopwatch.Elapsed.TotalMilliseconds;
                    }
                });
            }

            Task.WaitAll(tasks);
        }
    }
}


对于每种技术,我都会重新启动程序。我得到这些结果,
Thread.Sleep( 1 )

ProcessData1 1  00:56:1796688 total = 801335,282599955 max = 16,8783
ProcessData1 2  00:23:5390014 total = 816167,642100022 max = 14,5913
ProcessData1 3  00:14:7090566 total = 827589,675899998 max = 13,2617
ProcessData1 4  00:10:8929177 total = 829296,528300007 max = 15,0175
ProcessData1 5  00:10:6333310 total = 839282,123200008 max = 29,2738

ProcessData2 1  00:37:8084153 total = 824507,174200022 max = 112,071
ProcessData2 2  00:16:3762096 total = 849272,47810001  max = 77,1514
ProcessData2 3  00:12:9177717 total = 854012,353100029 max = 67,5684
ProcessData2 4  00:10:4798701 total = 857396,642899983 max = 92,9408
ProcessData2 5  00:09:2206146 total = 870966,655499989 max = 51,8945

ProcessData3 1  01:13:6814541 total = 803581,718699918 max = 25,6815
ProcessData3 2  01:07:9809277 total = 814069,532899922 max = 26,0671
ProcessData3 3  01:07:9857984 total = 814148,329399928 max = 21,3116
ProcessData3 4  01:07:4812183 total = 808042,695499966 max = 16,8601
ProcessData3 5  01:07:2954614 total = 805895,325499903 max = 23,8517


哪里
total总共花费在每个Parallel.ForEach()函数内部的时间,并且
max是每个功能的最长时间。

为什么第一个循环这么慢?其他尝试如何如此迅速地处理?第一次尝试如何实现更快的并行处理?



编辑:

所以我也尝试了Thread.Sleep( 10 )
结果是:

ProcessData1 1  02:50:2845698 total = 5109831,95429994 max = 12,0612
ProcessData1 2  00:56:3361645 total = 5125884,05919954 max = 12,7666
ProcessData1 3  00:53:4911541 total = 5131105,15209993 max = 12,7486
ProcessData1 4  00:49:5665628 total = 5144654,75829992 max = 13,2678
ProcessData1 5  00:46:0218194 total = 5152955,19509996 max = 13,702

ProcessData2 1  01:21:7207557 total = 5121889,31579983 max = 73,8152
ProcessData2 2  00:39:6660074 total = 5175557,68889969 max = 59,369
ProcessData2 3  00:31:9036416 total = 5193819,89889973 max = 56,2895
ProcessData2 4  00:27:4616803 total = 5207168,56969977 max = 65,5495
ProcessData2 5  00:24:4270755 total = 5222567,9044998  max = 65,368

ProcessData3 1  02:44:9985645 total = 5110117,19019997 max = 11,7172
ProcessData3 2  02:25:6533128 total = 5237779,27010012 max = 26,3171
ProcessData3 3  02:22:2771259 total = 5116123,45259975 max = 12,0581
ProcessData3 4  02:22:1678911 total = 5112574,93779995 max = 11,5334
ProcessData3 5  02:21:9418178 total = 5104980,07120004 max = 11,5583


因此,第一个循环仍比其他循环花费更多的时间。

最佳答案

您所看到的行为完全由ThreadPool类延迟创建新线程的事实来解释,直到经过一小段时间(大约1秒钟的时间…多年来一直在变化)。

将仪器添加到自己的程序中可能会很有帮助。在您的示例中,一个非常有用的工具是对线程池管理的并发线程数进行计数,确定“高水位线”(即最终确定的最大线程数),然后使用该数字覆盖线程池的行为。

当我这样做时,我发现在第一种方法的第一次运行中,您最多可以看到25个线程。但是,由于线程池的默认设置是仅创建与计算机上的内核数量相同的线程数量(在我的情况下为8),因此创建其他线程可能会花费大量时间。当然,在那段时间内,您获得的吞吐量要比其他情况要少得多(因此,导致的延迟比仅仅20秒左右的延迟要大得多)。

在该测试的后续运行中,最大线程数逐渐增加(因为每次新运行都从上次运行开始,已经在线程池中添加了更多线程),最高达到53。

如果您预先知道线程池需要多少个线程才能有效地执行工作,则可以使用SetMinThreads()方法来增加线程数量,该数量将在切换到受限制的线程之前按需立即创建,创建算法。例如,手头有53个线程的高水位标记,您可以将最小线程数设置为该数量(或一个不错的第一轮,例如50)。

当我这样做时,您的第一个测试的所有五次运行(以前耗时在25秒到1分钟之间(当然,更长的运行时间在更早的时间)),大约需要19秒才能完成。

我想强调您应该非常小心地使用SetMinThreads()。通常,线程池对于管理工作负载非常有用。您上面展示的场景显然只是出于示例的目的,并不现实,但是确实存在一个问题,即您实际上并没有在每个Parallel.ForEach()迭代中真正做那么多工作。并发似乎不太适合并发,因为花费的大量时间都花在了开销上。在任何类似情况下使用SetMinThreads()都可以解决更隐患的潜在问题。

您会发现,如果您调整工作负载以更好地匹配可用资源,并最大程度地减少任务和线程之间的转换,则可以获得良好的吞吐量,而不必覆盖默认的线程池数。


有关此特定测试的其他注意事项…

请注意,如果您更改程序以在同一会话中运行所有三个测试(每个运行五次),则“第一次运行时间较长”仅发生在第一个测试中。供以后参考,您应该始终着眼于这种“第一次时间较慢”的问题,以测试不同的组合和顺序,以验证这是否是受此效果影响的特定实现,或者您是否看到第一个效果测试,无论首先运行哪个实现。有许多实现和平台详细信息,包括JIT,线程池,磁盘缓存,它们可能会影响任何算法的初始运行,并且您需要确保快速缩小搜索范围,以了解是否要处理在您自己的算法中遇到其中一种或某些真正的问题。

顺便说一句,并不是真的对您的问题很重要,但是我发现使用data数组中的随机数作为计时字典的键很奇怪。由于随机数的冲突,此恕我直言使这些时序值无用。您不会每次都计数(发生碰撞时,只会存储该数字的最后一个实例),这意味着显示的“总”时间少于实际花费的总时间,即使最大值也不会一定是正确的(如果使用相同的键将真实的最大值替换为以后的值,则将丢失)。


这是我对第一个测试的修改后的版本,其中显示了我添加的诊断代码以及(注释掉)用于设置线程池计数以产生更快,更一致的行为的语句:

private static int _threadCount1;
private static int _maxThreadCount1;

private static void ProcessData1(int[] data, Dictionary<int, double> partialTimes)
{
    const int minOverride = 50;
    int minMain, minIOCP, maxMain, maxIOCP;

    ThreadPool.GetMinThreads(out minMain, out minIOCP);
    ThreadPool.GetMaxThreads(out maxMain, out maxIOCP);

    WriteLine($"cores: {Environment.ProcessorCount}");
    WriteLine($"threads: {minMain} min, {maxMain} max");

    // Uncomment two lines below to see uniform behavior across test runs:

    //ThreadPool.SetMinThreads(minOverride, minIOCP);
    //ThreadPool.SetMaxThreads(minOverride, maxIOCP);

    _threadCount1 = _maxThreadCount1 = 0;

    Parallel.ForEach(data, number =>
    {
        int threadCount = Interlocked.Increment(ref _threadCount1);

        var partialStopwatch = Stopwatch.StartNew();

        Thread.Sleep(1);

        partialStopwatch.Stop();

        lock (partialTimes)
        {
            partialTimes[number] = partialStopwatch.Elapsed.TotalMilliseconds;
            if (_maxThreadCount1 < threadCount)
            {
                _maxThreadCount1 = threadCount;
            }
        }

        Interlocked.Decrement(ref _threadCount1);
    });

    ThreadPool.SetMinThreads(minMain, minIOCP);
    ThreadPool.SetMaxThreads(maxMain, maxIOCP);
    WriteLine($"max thread count: {_maxThreadCount1}");
}

关于c# - 为什么在C#中首次调用的并行处理要慢得多?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/47035547/

相关文章:

c# - 等待与 task.Result 相同的已完成任务?

c# - Entity Framework ,带有 dispose 方法的 UnitofWork 模式

c# - 自动属性值和默认值

python - 多处理的并行处理比顺序处理慢

c - MPI 中派生数据类型的范围

c# - 如何正确使用 Task.WhenAll

task - 艾达。构建 "main"文件在执行任务时需要很长时间

c# - EF6如何自定义复数

c# - WCF 数据服务 (OData) 和 CORS

c++ - 在 C++ 中使用 OpenMP 并行化递归函数