python concurrent.futures.ProcessPoolExecutor : Performance of . submit() vs .map()

标签 python performance python-3.x concurrency concurrent.futures

我正在使用 concurrent.futures.ProcessPoolExecutor从数字范围中查找数字的出现。目的是调查从并发中获得的加速性能。为了基准性能,我有一个控制 - 一个串行代码来执行所述任务(如下所示)。我写了 2 个并发代码,一个使用 concurrent.futures.ProcessPoolExecutor.submit()另一个使用 concurrent.futures.ProcessPoolExecutor.map()执行相同的任务。它们如下所示。前者和后者的起草建议可见herehere , 分别。

分配给所有三个代码的任务是在 0 到 1E8 的数字范围内找到数字 5 的出现次数。两者 .submit().map()分配了 6 个 worker ,和 .map()块大小为 10,000。在并发代码中离散工作负载的方式是相同的。但是,用于查找两个代码中出现的函数的函数是不同的。这是因为参数传递给 .submit() 调用的函数的方式和 .map()是不同的。

所有 3 个代码报告的出现次数相同,即 56,953,279 次。但是,完成任务所需的时间却大不相同。 .submit()执行速度比控制快 2 倍,而 .map()完成其任务所需的时间是控件的两倍。

问题:

  • 我想知道.map()的性能是否慢是我编码的人工制品还是它本质上很慢?”如果是前者,我该如何改进它。我只是惊讶它的执行速度比控件慢,因为没有太多使用它的动力。
  • 我想知道是否有任何制作.submit()代码执行得更快。我有一个条件是函数 _concurrent_submit()必须返回一个包含数字 5 的数字/出现次数的可迭代对象。

  • 基准测试结果
    benchmark results

    concurrent.futures.ProcessPoolExecutor.submit()
    #!/usr/bin/python3.5
    # -*- coding: utf-8 -*-
    
    import concurrent.futures as cf
    from time import time
    from traceback import print_exc
    
    def _findmatch(nmin, nmax, number):
        '''Function to find the occurrence of number in range nmin to nmax and return
           the found occurrences in a list.'''
        print('\n def _findmatch', nmin, nmax, number)
        start = time()
        match=[]
        for n in range(nmin, nmax):
            if number in str(n):
                match.append(n)
        end = time() - start
        print("found {0} in {1:.4f}sec".format(len(match),end))
        return match
    
    def _concurrent_submit(nmax, number, workers):
        '''Function that utilises concurrent.futures.ProcessPoolExecutor.submit to
           find the occurences of a given number in a number range in a parallelised
           manner.'''
        # 1. Local variables
        start = time()
        chunk = nmax // workers
        futures = []
        found =[]
        #2. Parallelization
        with cf.ProcessPoolExecutor(max_workers=workers) as executor:
            # 2.1. Discretise workload and submit to worker pool
            for i in range(workers):
                cstart = chunk * i
                cstop = chunk * (i + 1) if i != workers - 1 else nmax
                futures.append(executor.submit(_findmatch, cstart, cstop, number))
            # 2.2. Instruct workers to process results as they come, when all are
            #      completed or .....
            cf.as_completed(futures) # faster than cf.wait()
            # 2.3. Consolidate result as a list and return this list.
            for future in futures:
                for f in future.result():
                    try:
                        found.append(f)
                    except:
                        print_exc()
            foundsize = len(found)
            end = time() - start
            print('within statement of def _concurrent_submit():')
            print("found {0} in {1:.4f}sec".format(foundsize, end))
        return found
    
    if __name__ == '__main__':
        nmax = int(1E8) # Number range maximum.
        number = str(5) # Number to be found in number range.
        workers = 6     # Pool of workers
    
        start = time()
        a = _concurrent_submit(nmax, number, workers)
        end = time() - start
        print('\n main')
        print('workers = ', workers)
        print("found {0} in {1:.4f}sec".format(len(a),end))
    

    concurrent.futures.ProcessPoolExecutor.map()
    #!/usr/bin/python3.5
    # -*- coding: utf-8 -*-
    
    import concurrent.futures as cf
    import itertools
    from time import time
    from traceback import print_exc
    
    def _findmatch(listnumber, number):    
        '''Function to find the occurrence of number in another number and return
           a string value.'''
        #print('def _findmatch(listnumber, number):')
        #print('listnumber = {0} and ref = {1}'.format(listnumber, number))
        if number in str(listnumber):
            x = listnumber
            #print('x = {0}'.format(x))
            return x 
    
    def _concurrent_map(nmax, number, workers):
        '''Function that utilises concurrent.futures.ProcessPoolExecutor.map to
           find the occurrences of a given number in a number range in a parallelised
           manner.'''
        # 1. Local variables
        start = time()
        chunk = nmax // workers
        futures = []
        found =[]
        #2. Parallelization
        with cf.ProcessPoolExecutor(max_workers=workers) as executor:
            # 2.1. Discretise workload and submit to worker pool
            for i in range(workers):
                cstart = chunk * i
                cstop = chunk * (i + 1) if i != workers - 1 else nmax
                numberlist = range(cstart, cstop)
                futures.append(executor.map(_findmatch, numberlist,
                                            itertools.repeat(number),
                                            chunksize=10000))
            # 2.3. Consolidate result as a list and return this list.
            for future in futures:
                for f in future:
                    if f:
                        try:
                            found.append(f)
                        except:
                            print_exc()
            foundsize = len(found)
            end = time() - start
            print('within statement of def _concurrent(nmax, number):')
            print("found {0} in {1:.4f}sec".format(foundsize, end))
        return found
    
    if __name__ == '__main__':
        nmax = int(1E8) # Number range maximum.
        number = str(5) # Number to be found in number range.
        workers = 6     # Pool of workers
    
        start = time()
        a = _concurrent_map(nmax, number, workers)
        end = time() - start
        print('\n main')
        print('workers = ', workers)
        print("found {0} in {1:.4f}sec".format(len(a),end))
    

    序列号:
    #!/usr/bin/python3.5
    # -*- coding: utf-8 -*-
    
    from time import time
    
    def _serial(nmax, number):    
        start = time()
        match=[]
        nlist = range(nmax)
        for n in nlist:
            if number in str(n):match.append(n)
        end=time()-start
        print("found {0} in {1:.4f}sec".format(len(match),end))
        return match
    
    if __name__ == '__main__':
        nmax = int(1E8) # Number range maximum.
        number = str(5) # Number to be found in number range.
    
        start = time()
        a = _serial(nmax, number)
        end = time() - start
        print('\n main')
        print("found {0} in {1:.4f}sec".format(len(a),end))
    

    2017 年 2 月 13 日更新:

    除了@niemmi 的回答之外,我还根据一些个人研究提供了一个答案:
  • 如何进一步加速@niemmi 的 .map().submit()解决方案,以及
  • ProcessPoolExecutor.map()可以导致比 ProcessPoolExecutor.submit() 更多的加速.
  • 最佳答案

    概览:

    我的回答有两部分:

  • 第 1 部分展示了如何从 @niemmi 的 ProcessPoolExecutor.map() 获得更多加速解决方案。
  • 第 2 部分显示何时 ProcessPoolExecutor的子类 .submit().map()产生不等价的计算时间。

  • ================================================== ======================

    第 1 部分:ProcessPoolExecutor.map() 的更多加速​​

    背景:
    本节基于@niemmi 的 .map()解决方案,这本身就很好。在对他的离散化方案进行一些研究以更好地理解它如何与 .map() 块大小争论相互作用时,我发现了这个有趣的解决方案。

    我认为@niemmi 对 chunk = nmax // workers 的定义是块大小的定义,即工作池中每个工作人员要处理的实际数字范围(给定任务)的较小尺寸。现在,这个定义的前提是假设一台计算机有 x 个 worker ,在每个 worker 之间平均分配任务将导致每个 worker 的最佳使用,因此整个任务将最快完成。因此,将给定任务分解成的块数应始终等于池 worker 的数量。然而,这个假设正确吗?

    提案:在这里,我建议当与 ProcessPoolExecutor.map() 一起使用时,上述假设并不总是导致最快的计算时间。 .相反,将任务离散化为大于池 worker 数量的数量可以导致加速,即更快地完成给定任务 .

    实验:我修改了@niemmi 的代码,允许离散化任务的数量超过池 worker 的数量。下面给出了这段代码,用于计算数字 5 在 0 到 1E8 的数字范围内出现的次数。我已经使用 1、2、4 和 6 个池 worker 执行了此代码,并且离散化任务的数量与池 worker 的数量的比例不同。对于每个场景,进行了 3 次运行,并列出了计算时间。 “加速”在这里被定义为当离散化任务的数量大于池工作人员的数量时,在平均计算时间上使用相同数量的块和池工作人员的平均计算时间。

    调查结果:

    nchunk over nworkers
  • 左图显示了实验部分提到的所有场景所花费的计算时间。它表明 块数/工作器数 = 1 所花费的计算时间始终大于块数 > 工作器数所花费的计算时间。 也就是说,前一种情况总是比后一种情况效率低。
  • 右图显示 当块数/工作器数达到 14 或更多的阈值时,获得了 1.2 倍或更多的加速 .有趣的是,在ProcessPoolExecutor.map()时也出现了加速趋势。与 1 名 worker 一起处决。

  • 结论:在自定义 ProcessPoolExecutor.map()` 应用于解决给定任务的离散任务数量时,谨慎地确保该数量大于池工作线程的数量,因为这种做法会缩短计算时间。

    concurrent.futures.ProcessPoolExecutor.map() 代码。 (仅修订部分)
    def _concurrent_map(nmax, number, workers, num_of_chunks):
        '''Function that utilises concurrent.futures.ProcessPoolExecutor.map to
           find the occurrences of a given number in a number range in a parallelised
           manner.'''
        # 1. Local variables
        start = time()
        chunksize = nmax // num_of_chunks
        futures = []
        found =[]
        #2. Parallelization
        with cf.ProcessPoolExecutor(max_workers=workers) as executor:
            # 2.1. Discretise workload and submit to worker pool
            cstart = (chunksize * i for i in range(num_of_chunks))
            cstop = (chunksize * i if i != num_of_chunks else nmax
                     for i in range(1, num_of_chunks + 1))
            futures = executor.map(_findmatch, cstart, cstop,
                                   itertools.repeat(number))
            # 2.2. Consolidate result as a list and return this list.
            for future in futures:
                #print('type(future)=',type(future))
                for f in future:
                    if f:
                        try:
                            found.append(f)
                        except:
                            print_exc()
            foundsize = len(found)
            end = time() - start
            print('\n within statement of def _concurrent(nmax, number):')
            print("found {0} in {1:.4f}sec".format(foundsize, end))
        return found
    
    if __name__ == '__main__':
        nmax = int(1E8) # Number range maximum.
        number = str(5) # Number to be found in number range.
        workers = 4     # Pool of workers
        chunks_vs_workers = 14 # A factor of =>14 can provide optimum performance  
        num_of_chunks = chunks_vs_workers * workers
    
        start = time()
        a = _concurrent_map(nmax, number, workers, num_of_chunks)
        end = time() - start
        print('\n main')
        print('nmax={}, workers={}, num_of_chunks={}'.format(
              nmax, workers, num_of_chunks))
        print('workers = ', workers)
        print("found {0} in {1:.4f}sec".format(len(a),end))
    

    ================================================== ======================

    第 2 部分:使用 ProcessPoolExecutor 子类 .submit() 和 .map() 的总计算时间在返回排序/排序结果列表时可能不同。

    背景:我已经修改了.submit().map()允许“苹果对苹果”比较它们的计算时间和可视化主代码的计算时间的能力,主代码调用的 _concurrent 方法的计算时间来执行并发操作,以及计算_concurrent 方法调用的每个离散化任务/ worker 的时间。此外,这些代码中的并发方法被构造为直接从 .submit() 的 future 对象返回结果的无序和有序列表。和 .map() 的迭代器.下面提供了源代码(希望对您有所帮助。)。

    实验这两个新改进的代码用于执行第 1 部分中描述的相同实验,只是只考虑了 6 个池 worker 和 python 内置 listsorted方法分别用于将结果的无序和有序列表返回到代码的主要部分。

    调查结果:
    .submit vs .map plus list vs sorted
  • 从_concurrent方法的结果中,我们可以看到_concurrent方法用于创建ProcessPoolExecutor.submit()的所有Future对象的计算次数。 ,并创建 ProcessPoolExecutor.map() 的迭代器,作为离散化任务数量与池 worker 数量的函数,是等效的。这个结果仅仅意味着 ProcessPoolExecutor子类.submit().map()同样高效/快速。
  • 比较 main 和它的 _concurrent 方法的计算时间,我们可以看到 main 的运行时间比它的 _concurrent 方法长。这是可以预料的,因为它们的时间差反射(reflect)了 list 的计算时间量。和 sorted方法(以及包含在这些方法中的其他方法)。清晰可见,list方法比 sorted 花费更少的计算时间来返回结果列表方法。 list的平均计算时间.submit() 和 .map() 代码的方法相似,大约为 0.47 秒。 .submit() 和 .map() 代码的排序方法的平均计算时间分别为 1.23 秒和 1.01 秒。换句话说,list方法执行速度比 sorted 快 2.62 倍和 2.15 倍.submit() 和 .map() 代码的方法,分别。
  • 不清楚为什么 sorted方法从.map()比来自 .submit() 更快, 作为离散化的数量
    任务增加的数量超过池 worker 的数量,保存时
    离散化任务的数量等于池 worker 的数量。
    也就是说,这些发现表明决定使用同样快的 .submit().map()子类可以被 sorted 方法所阻碍。例如,如果目的是在尽可能短的时间内生成有序列表,则应该优先使用 ProcessPoolExecutor.map() 而非 ProcessPoolExecutor.submit().map()可以允许最短的总计算时间。
  • 此处显示了我的答案的第 1 部分中提到的离散化方案,以加快 .submit() 的性能。和 .map()子类。当离散化任务的数量等于池 worker 的数量时,加速量可以高达 20%。

  • 改进的 .map() 代码
    #!/usr/bin/python3.5
    # -*- coding: utf-8 -*-
    
    import concurrent.futures as cf
    from time import time
    from itertools import repeat, chain 
    
    
    def _findmatch(nmin, nmax, number):
        '''Function to find the occurence of number in range nmin to nmax and return
           the found occurences in a list.'''
        start = time()
        match=[]
        for n in range(nmin, nmax):
            if number in str(n):
                match.append(n)
        end = time() - start
        #print("\n def _findmatch {0:<10} {1:<10} {2:<3} found {3:8} in {4:.4f}sec".
        #      format(nmin, nmax, number, len(match),end))
        return match
    
    def _concurrent(nmax, number, workers, num_of_chunks):
        '''Function that utilises concurrent.futures.ProcessPoolExecutor.map to
           find the occurrences of a given number in a number range in a concurrent
           manner.'''
        # 1. Local variables
        start = time()
        chunksize = nmax // num_of_chunks
        #2. Parallelization
        with cf.ProcessPoolExecutor(max_workers=workers) as executor:
            # 2.1. Discretise workload and submit to worker pool
            cstart = (chunksize * i for i in range(num_of_chunks))
            cstop = (chunksize * i if i != num_of_chunks else nmax
                     for i in range(1, num_of_chunks + 1))
            futures = executor.map(_findmatch, cstart, cstop, repeat(number))
        end = time() - start
        print('\n within statement of def _concurrent_map(nmax, number, workers, num_of_chunks):')
        print("found in {0:.4f}sec".format(end))
        return list(chain.from_iterable(futures)) #Return an unordered result list
        #return sorted(chain.from_iterable(futures)) #Return an ordered result list
    
    if __name__ == '__main__':
        nmax = int(1E8) # Number range maximum.
        number = str(5) # Number to be found in number range.
        workers = 6     # Pool of workers
        chunks_vs_workers = 30 # A factor of =>14 can provide optimum performance 
        num_of_chunks = chunks_vs_workers * workers
    
        start = time()
        found = _concurrent(nmax, number, workers, num_of_chunks)
        end = time() - start
        print('\n main')
        print('nmax={}, workers={}, num_of_chunks={}'.format(
              nmax, workers, num_of_chunks))
        #print('found = ', found)
        print("found {0} in {1:.4f}sec".format(len(found),end))    
    

    改进的 .submit() 代码。
    此代码与 .map 代码相同,只是您将 _concurrent 方法替换为以下内容:
    def _concurrent(nmax, number, workers, num_of_chunks):
        '''Function that utilises concurrent.futures.ProcessPoolExecutor.submit to
           find the occurrences of a given number in a number range in a concurrent
           manner.'''
        # 1. Local variables
        start = time()
        chunksize = nmax // num_of_chunks
        futures = []
        #2. Parallelization
        with cf.ProcessPoolExecutor(max_workers=workers) as executor:
            # 2.1. Discretise workload and submit to worker pool
            for i in range(num_of_chunks):
                cstart = chunksize * i
                cstop = chunksize * (i + 1) if i != num_of_chunks - 1 else nmax
                futures.append(executor.submit(_findmatch, cstart, cstop, number))
        end = time() - start
        print('\n within statement of def _concurrent_submit(nmax, number, workers, num_of_chunks):')
        print("found in {0:.4f}sec".format(end))
        return list(chain.from_iterable(f.result() for f in cf.as_completed(
            futures))) #Return an unordered list
        #return list(chain.from_iterable(f.result() for f in cf.as_completed(
        #    futures))) #Return an ordered list
    

    ================================================== ======================

    关于python concurrent.futures.ProcessPoolExecutor : Performance of . submit() vs .map(),我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/42074501/

    相关文章:

    python - 将数字分解为其他数字

    python - 纬度、经度和 RSSI 转 3D 图形

    mysql - 查询 firebird slow order by/distinct

    python - `async for` 的语义 - __anext__ 调用可以重叠吗?

    javascript - 将图像存储在高速缓存中

    performance - 快速遍历ACL的策略

    python - 在没有ctypes的情况下修改bdb中的变量值

    python - Selenium Python : Can't Get Path To Chat Element On Google Hangouts

    python - 如何在 Pandas 中做 R dplyr?

    python - Pandas 有条件地用行的均值/中值替换单元格值