python - 连续多处理

标签 python import multiprocessing os.system

我正在使用 multiprocessing.py 过滤巨大的文本文件。该代码主要是打开文本文件,对其进行处理,然后将其关闭。

问题是,我希望能够在多个文本文件上连续启动它。因此,我尝试添加一个循环,但由于某种原因它不起作用(虽然代码适用于每个文件)。我认为这是一个问题:

    if __name__ == '__main__':    

但是,我正在寻找其他东西。我尝试创建一个 Launcher 和一个 LauncherCount 文件,如下所示:

    LauncherCount.py:

    def setLauncherCount(n):
        global LauncherCount
        LauncherCount = n

和,

    Launcher.py:

import os
import LauncherCount

LauncherCount.setLauncherCount(0)

os.system("OrientedFilterNoLoop.py")

LauncherCount.setLauncherCount(1)

os.system("OrientedFilterNoLoop.py")

...

我导入 LauncherCount.py,并使用 LauncherCount.LauncherCount 作为我的循环索引。

当然,这也不起作用,因为它在本地编辑变量 LauncherCount.LauncherCount,所以它不会在 LauncherCount 的导入版本中被编辑。

有没有办法全局编辑导入文件中的变量?或者,有没有办法以任何其他方式做到这一点?我需要的是多次运行代码,更改一个值,并且显然不使用任何循环。

谢谢!

编辑:如有必要,这是我的主要代码。抱歉风格不好...

import multiprocessing
import config
import time
import LauncherCount

class Filter:

    """ Filtering methods """
    def __init__(self):
        print("launching methods")

        #   Return the list: [Latitude,Longitude]  (elements are floating point numbers)
    def LatLong(self,line):

        comaCount = []
        comaCount.append(line.find(','))
        comaCount.append(line.find(',',comaCount[0] + 1))
    comaCount.append(line.find(',',comaCount[1] + 1))
    Lat = line[comaCount[0] + 1 : comaCount[1]]
    Long = line[comaCount[1] + 1 : comaCount[2]]

    try:
        return [float(Lat) , float(Long)]
    except ValueError:
        return [0,0]

#   Return a boolean:
#   - True if the Lat/Long is within the Lat/Long rectangle defined by:
#           tupleFilter = (minLat,maxLat,minLong,maxLong)
#   - False if not                                                                   
def LatLongFilter(self,LatLongList , tupleFilter) :
    if tupleFilter[0] <= LatLongList[0] <= tupleFilter[1] and
       tupleFilter[2] <= LatLongList[1] <= tupleFilter[3]:
        return True
    else:
        return False

def writeLine(self,key,line):
    filterDico[key][1].write(line)



def filteringProcess(dico):

    myFilter = Filter()

    while True:
        try:
            currentLine = readFile.readline()
        except ValueError:
            break
        if len(currentLine) ==0:                    # Breaks at the end of the file
            break
        if len(currentLine) < 35:                    # Deletes wrong lines (too short)
            continue
        LatLongList = myFilter.LatLong(currentLine)
        for key in dico:
            if myFilter.LatLongFilter(LatLongList,dico[key][0]):
                myFilter.writeLine(key,currentLine)


###########################################################################
                # Main
###########################################################################

# Open read files:
readFile = open(config.readFileList[LauncherCount.LauncherCount][1], 'r')

# Generate writing files:
pathDico = {}
filterDico = config.filterDico

# Create outputs
for key in filterDico:
    output_Name = config.readFileList[LauncherCount.LauncherCount][0][:-4] 
                  + '_' + key +'.log'
    pathDico[output_Name] = config.writingFolder + output_Name
    filterDico[key] = [filterDico[key],open(pathDico[output_Name],'w')]


p = []
CPUCount = multiprocessing.cpu_count()
CPURange = range(CPUCount)

startingTime = time.localtime()

if __name__ == '__main__':
    ### Create and start processes:
    for i in CPURange:
        p.append(multiprocessing.Process(target = filteringProcess , 
                                            args = (filterDico,)))
        p[i].start()

    ### Kill processes:
    while True:
        if [p[i].is_alive() for i in CPURange] == [False for i in CPURange]:
            readFile.close()
            for key in config.filterDico:
                config.filterDico[key][1].close()
                print(key,"is Done!")
                endTime = time.localtime()
            break

    print("Process started at:",startingTime)
    print("And ended at:",endTime)

最佳答案

在并行处理一组文件的同时按顺序处理一组文件:

#!/usr/bin/env python
from multiprocessing import Pool

def work_on(args):
    """Process a single file."""
    i, filename = args
    print("working on %s" % (filename,))
    return i

def files():
    """Generate input filenames to work on."""
    #NOTE: you could read the file list from a file, get it using glob.glob, etc
    yield "inputfile1"
    yield "inputfile2"

def process_files(pool, filenames):
    """Process filenames using pool of processes.

    Wait for results.
    """
    for result in pool.imap_unordered(work_on, enumerate(filenames)):
        #NOTE: in general the files won't be processed in the original order
        print(result) 

def main():
   p = Pool()

   # to do "successive" multiprocessing
   for filenames in [files(), ['other', 'bunch', 'of', 'files']]:
       process_files(p, filenames)

if __name__=="__main__":
   main()

每个 process_file() 在前一个完成后按顺序调用,即来自对 process_files() 的不同调用的文件 不是 并行处理。

关于python - 连续多处理,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/9010577/

相关文章:

python - 将 QML 元素锚定到窗口

javascript - 如何使用 javascript Google Contact API 以更详细的格式导入 gmail 联系人数据?

import - 如何使用自定义包

python - 告诉多个生产者的消费者没有更多结果

python - 如何在 Python 多处理池中运行清理代码?

python - 如何使用多处理 python 更新和检索图像?

python - "accuracy"的 Caffe 自定义 python 层

python - 如何将图像转换为 URL Discord Py

swift - 这是什么意思 ?声明仅在文件范围内有效

Python Json 和 Python 的多线程/进程组合?