python - 如何获得ipython并行处理的中间结果?

标签 python parallel-processing ipython

我的工作是处理大量的xml;为了获得更快的结果,我想使用 ipython 的并行处理;下面是我的示例代码。因为我只是使用 celementTree 模块查找 xml/xsd 的元素数量。

>>> from IPython.parallel import Client
>>> import os
>>> c = Client()
>>> c.ids
>>> lview = c.load_balanced_view()
>>> lview.block =True
>>> def return_len(xml_filepath):
        import xml.etree.cElementTree as cElementTree
        tree = cElementTree.parse(xml_filepath)
        my_count=0
        file_result=[]
        cdict={}
        for elem in tree.getiterator():
            cdict[my_count]={}
            if elem.tag:
                cdict[my_count]['tag']=elem.tag
            if elem.text:
                cdict[my_count]['text']=(elem.text).strip()
            if elem.attrib.items():
                cdict[my_count]['xmlattb']={}
                for key, value in elem.attrib.items():
                    cdict[my_count]['xmlattb'][key]=value
            if list(elem):
                cdict[my_count]['xmlinfo']=len(list(elem))
            if elem.tail:
                cdict[my_count]['tail']=elem.tail.strip()
            my_count+=1
        output=xml_filepath.split('\\')[-1],len(cdict)
        return output
        ## return cdict
>>> def get_dir_list(target_dir, *extensions):
        """
        This function will filter out the files from given dir based on their extensions
        """
        my_paths=[]
        for top, dirs, files in os.walk(target_dir):
            for nm in files:
                fileStats = os.stat(os.path.join(top, nm))
                if nm.split('.')[-1] in extensions:
                    my_paths.append(top+'\\'+nm)
        return my_paths
>>> r=lview.map_async(return_len,get_dir_list('C:\\test_folder','xsd','xml'))

为了得到最终结果我必须做 >>> r.get() 通过这个,我将在流程完成时得到结果

我的问题是我能够在完成时获得中间结果吗?
例如,如果我已将工作应用到包含 1000 个 xml/xsds 文件的文件夹,那么在处理该特定文件后我可以立即获得结果吗?就像第一个文件已完成-->显示其结果...第二个文件已完成-->显示其结果........第1000个文件已完成-->显示其结果不像上面现在的工作; 等到最终文件完成然后它将显示所有这 1000 个文件的完整结果。
还为了处理导入/命名空间错误,我在 return_len 函数中定义了 import ;有没有更好的办法来处理这个问题?

最佳答案

当然。 AsyncMapResult(map_async返回的类型)可以立即迭代, 迭代产生的项与 r.get() 最终产生的列表相同。所以在你这样做之后:

amr = lview.map_async(return_len, get_dir_list('C:\\test_folder','xsd','xml'))

你可以这样做:

for r in amr:
    print r

或者用枚举保留索引

for i,r in enumerate(amr):
    print i, r 

或使用内置的reduce执行缩减:

summary_result = reduce(myfunc, amr)

所有这些都会在您的结果到达时对其进行迭代。如果您不关心顺序并且每个任务的时间差异很大,则可以传递 map_async(...,ordered=False)。如果这样做,当您迭代 AMR 时,您将按照先到先得的原则获得单独的结果,而不是保留提交顺序。

还有更多信息 in the ipython docs .

also to deal with import/namespace error i have defined import inside of return_len function; is there any better way to deal with that?

是和否。有几种方法可以设置引擎命名空间,例如使用模块、@parallel.require("module") 装饰器,或者简单地使用 %px import xml 显式执行导入.etree.cElementTree 和 cElementTree,每种元素在某些场景下都有好处。但我经常发现在函数中引入导入是最简单的方法,而且惊喜最少。

关于python - 如何获得ipython并行处理的中间结果?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/13583326/

相关文章:

python - 在 Pandas 中转置几列(不是整个数据框)(与 get_dummies 相反)

R并行: rbind parallely into separate data.帧

python - 来自命令行和 python api 的 docker exec 中的管道

python - 无法在另一个python脚本中使用python函数的属性

python - numpy.apply_along_axis() 的简单并行化?

ipython - 我在 Windows 上使用 jupyter notebook,ipython 3。每当我启动我的 python 3 时,我都会收到 "Kernel Dead"消息

python - 使用多个 Python 和 IPython 路径运行 Jupyter

python - 使用 scipy.signal.find_peaks_cwt

python - 用户定义的模板 Django

c++ - 并行化 SVD 计算 c++