python - 具有共享内存复杂对象的多处理大型 XML 文件

标签 python python-3.x parallel-processing shared-memory python-multiprocessing

我正在改进一个解析 XML 并对它的子树进行分类和索引的程序。实际程序太大,无法在此处显示,因此我将其简化为一个最小的测试用例,以显示我遇到的问题。

思路是:

  1. 逐个处理目录中的 XML 文件
  2. 处理所有alpino_ds文件中的节点,并行
  3. 在此过程中,该过程需要对共享变量进行读/写访问,以便例如我们可以检查某个属性总共出现了多少次,或者跟踪文件句柄

请注意,在实际代码中还有一些注意事项:

  • 简单地为每个进程返回新值,然后将它们合并到主线程中似乎不可取,而且可能相当慢,因为实际数据结构是 dict四层深的 s 由 dicts 组成, set小号,int s 和 string s,以及 dict-to-filehandle 和 Counter()对象;
  • 我尝试使用线程(使用 ThreadPoolExecutor ),尽管有一些 增益(我计算出速度提高了大约 5%),但这对我来说还不够好;
  • 我正在处理的实际数据可能包含超过 60GB 或多达 1500 万的 XML 文件 alpino_ds每个文件的标签。这是我想要并行运行事物的主要原因 - 只有这么多数据。这意味着嵌套对象也变得相当大,因此在进程之间合并/共享这些对象本身可能是一个瓶颈。

示例代码:

from pathlib import Path
from collections import Counter
from copy import copy
from lxml import etree

import concurrent.futures


class XmlGrinder:
    def __init__(self, m=1):
        if m is False:
            self.m = 1
        elif m == 0:
            self.m = None
        else:
            self.m = m

        self.max_a = 7
        self.max_b = 1000

        self.pdin = self.pdout = None
        self.pattern_counter = self.fhs = self.corpus = None

    def grind(self, din, dout):
        self.pdin = Path(din)
        self.pdout = Path(dout)

        for file in self.pdin.glob('*.xml'):
            self._grind_xml(file)

    def _grind_xml(self, pfin):
        self.pattern_counter = Counter()
        self.filenames = set()
        self.fhs = {}
        self.corpus = pfin.stem

        with concurrent.futures.ProcessPoolExecutor(max_workers=self.m) as executor:
            jobs = []
            context = etree.iterparse(str(pfin), tag='alpino_ds')

            for _, node in context:
                attrs = node.attrib
                # node has to have id
                if 'id' not in attrs:
                    continue

                jobs.append(executor.submit(self._process_node, etree.tostring(node)))

                # Makes sure our memory usage is kept in check by getting rid of unused elements
                # Borrowed from https://stackoverflow.com/a/7171543/1150683
                node.clear()
                # Also eliminate now-empty references from the root node to elem
                for ancestor in node.xpath('ancestor-or-self::*'):
                    while ancestor.getprevious() is not None:
                        del ancestor.getparent()[0]

            # Get rid of xml iterator
            del context

            sentence_nr = 0
            for job in concurrent.futures.as_completed(jobs):
                sentence_nr += 1
                print(f"Processed {self.corpus} sentence {sentence_nr:,d}", job.result(), flush=True)

            # self.* variables are empty! :-(
            print('pattern counter:', self.pattern_counter)
            print('filenames:', self.filenames)
            print('filehandles:', self.fhs)

            # won't do anything because fh is empty:
            for fh in self.fhs.values():
                fh.close()

    def _process_node(self, xml_str):
        node = etree.fromstring(xml_str)

        all_cats = ''
        for subnode in node.iter('node'):
            children_size = sum(1 for _ in subnode.iterchildren('node'))
            descendants_size = sum(1 for _ in subnode.iter('node'))
            # Size requirements of children and descendants
            if children_size < 1 \
                    or self.max_a < children_size \
                    or descendants_size > self.max_b:
                continue

            # get attribute of node
            cat = subnode.attrib['cat']
            all_cats += cat
            self.pattern_counter[cat] += 1

            # Create new XML tree
            tree_xml = etree.Element('tree', {
                'index': f"{cat}-{self.pattern_counter[cat]}"
            })
            tree_xml.append(copy(subnode))

            # open filehandle and write new tree to file
            if cat not in self.fhs:
                (self.pdout / self.corpus).mkdir(exist_ok=True, parents=True)
                tree_filename = self.pdout / self.corpus / f"{self.corpus}-{cat}-trees.xml"
                # open file handle and keep it open, only close after loop
                self.fhs[cat] = tree_filename.open(mode='a', encoding='utf-8')

            self.fhs[cat].write('\n\t\t' + etree.tostring(tree_xml, encoding='unicode'))

        return all_cats


if __name__ == '__main__':
    # use m=[int] to enable multiple cores or m=0 to utilise all cores
    xml_grindr = XmlGrinder()
    xml_grindr.grind(r'../data', r'../output')

示例 XML(将其保存到 XML 文件并将其放入目录;使用该目录作为 xml_grindr.grind() 的第一个参数):

<?xml version="1.0" encoding="UTF-8"?><treebank><alpino_ds version="1.3" id="18.head.1.s.1"><node begin="0" cat="top" end="4" id="0" rel="top"><node begin="0" cat="conj" end="4" id="1" rel="--"><node begin="0" end="1" frame="within_word_conjunct" id="2" lcat="np" lemma="_" pos="prefix" postag="SPEC(afgebr)" pt="spec" rel="cnj" root="taal" sense="taal" spectype="afgebr" word="taal-"/><node begin="1" conjtype="neven" end="2" frame="conj(en)" id="3" lcat="vg" lemma="en" pos="vg" postag="VG(neven)" pt="vg" rel="crd" root="en" sense="en" word="en"/><node begin="2" cat="mwu" end="4" id="4" mwu_root="spraaktechnologienieuws jul&apos;03" mwu_sense="spraaktechnologienieuws jul&apos;03" rel="cnj"><node begin="2" end="3" frame="proper_name(both)" genus="onz" getal="ev" graad="basis" id="5" lcat="np" lemma="spraaktechnologienieuws" naamval="stan" ntype="soort" num="both" pos="name" postag="N(soort,ev,basis,onz,stan)" pt="n" rel="mwp" root="spraaktechnologienieuws" sense="spraaktechnologienieuws" word="spraaktechnologienieuws"/><node begin="3" end="4" frame="proper_name(both)" id="6" lcat="np" lemma="_" num="both" pos="name" postag="SPEC(symb)" pt="spec" rel="mwp" root="jul&apos;03" sense="jul&apos;03" spectype="symb" word="jul&apos;03"/></node></node></node></alpino_ds><alpino_ds version="1.3" id="18.head.2.s.1"><node begin="0" cat="top" end="2" id="0" rel="top"><node begin="0" end="1" frame="noun(de,count,sg)" gen="de" id="1" lcat="np" lemma="1" num="sg" numtype="hoofd" pos="noun" positie="vrij" postag="TW(hoofd,vrij)" pt="tw" rel="--" root="1" sense="1" word="1"/><node begin="1" end="2" frame="punct(punt)" id="2" lcat="punct" lemma="." pos="punct" postag="LET()" pt="let" rel="--" root="." sense="." special="punt" word="."/></node></alpino_ds><alpino_ds version="1.3" id="18.head.2.s.2"><node begin="0" cat="top" end="5" id="0" rel="top"><node begin="0" cat="mwu" end="5" id="1" mwu_root="DISCUSSIE OVER TAALTECHNOLOGIE IN TAALSCHRIFT" mwu_sense="DISCUSSIE OVER TAALTECHNOLOGIE IN TAALSCHRIFT" rel="--"><node begin="0" buiging="met-e" end="1" frame="proper_name(both)" graad="basis" id="2" lcat="np" lemma="Discussie" naamval="stan" num="both" pos="name" positie="prenom" postag="ADJ(prenom,basis,met-e,stan)" pt="adj" rel="mwp" root="DISCUSSIE" sense="DISCUSSIE" word="DISCUSSIE"/><node begin="1" end="2" frame="proper_name(both)" id="3" lcat="np" lemma="over" num="both" pos="name" postag="VZ(init)" pt="vz" rel="mwp" root="OVER" sense="OVER" vztype="init" word="OVER"/><node begin="2" end="3" frame="proper_name(both)" genus="zijd" getal="ev" graad="basis" id="4" lcat="np" lemma="taaltechnologie" naamval="stan" ntype="soort" num="both" pos="name" postag="N(soort,ev,basis,zijd,stan)" pt="n" rel="mwp" root="TAALTECHNOLOGIE" sense="TAALTECHNOLOGIE" word="TAALTECHNOLOGIE"/><node begin="3" end="4" frame="proper_name(both)" id="5" lcat="np" lemma="iN" num="both" pos="name" postag="VZ(init)" pt="vz" rel="mwp" root="IN" sense="IN" vztype="init" word="IN"/><node begin="4" end="5" frame="proper_name(both)" genus="zijd" getal="ev" graad="basis" id="6" lcat="np" lemma="taalschrift" naamval="stan" ntype="soort" num="both" pos="name" postag="N(soort,ev,basis,zijd,stan)" pt="n" rel="mwp" root="TAALSCHRIFT" sense="TAALSCHRIFT" word="TAALSCHRIFT"/></node></node></alpino_ds><alpino_ds version="1.3" id="18.head.3.s.1"><node begin="0" cat="top" end="2" id="0" rel="top"><node begin="0" end="1" frame="number(hoofd(pl_num))" id="1" infl="pl_num" lcat="np" lemma="2" numtype="hoofd" pos="num" positie="vrij" postag="TW(hoofd,vrij)" pt="tw" rel="--" root="2" sense="2" special="hoofd" word="2"/><node begin="1" end="2" frame="punct(punt)" id="2" lcat="punct" lemma="." pos="punct" postag="LET()" pt="let" rel="--" root="." sense="." special="punt" word="."/></node></alpino_ds><alpino_ds version="1.3" id="18.head.3.s.2"><node begin="0" cat="top" end="7" id="0" rel="top"><node begin="0" cat="mwu" end="7" id="1" mwu_root="AMERIKAANSE OVERHEID KIEST VOOR LINKFACTORY VAN L&amp;C" mwu_sense="AMERIKAANSE OVERHEID KIEST VOOR LINKFACTORY VAN L&amp;C" rel="--"><node begin="0" buiging="met-e" end="1" frame="proper_name(both)" graad="basis" id="2" lcat="np" lemma="Amerikaans" naamval="stan" num="both" pos="name" positie="prenom" postag="ADJ(prenom,basis,met-e,stan)" pt="adj" rel="mwp" root="AMERIKAANSE" sense="AMERIKAANSE" word="AMERIKAANSE"/><node begin="1" end="2" frame="proper_name(both)" genus="zijd" getal="ev" graad="basis" id="3" lcat="np" lemma="overheid" naamval="stan" ntype="soort" num="both" pos="name" postag="N(soort,ev,basis,zijd,stan)" pt="n" rel="mwp" root="OVERHEID" sense="OVERHEID" word="OVERHEID"/><node begin="2" end="3" frame="proper_name(both)" id="4" lcat="np" lemma="kiezen" num="both" pos="name" postag="WW(pv,tgw,met-t)" pt="ww" pvagr="met-t" pvtijd="tgw" rel="mwp" root="KIEST" sense="KIEST" word="KIEST" wvorm="pv"/><node begin="3" end="4" frame="proper_name(both)" id="5" lcat="np" lemma="voor" num="both" pos="name" postag="VZ(init)" pt="vz" rel="mwp" root="VOOR" sense="VOOR" vztype="init" word="VOOR"/><node begin="4" conjtype="neven" end="5" frame="proper_name(both)" id="6" lcat="np" lemma="linkfactory" num="both" pos="name" postag="VG(neven)" pt="vg" rel="mwp" root="LINKFACTORY" sense="LINKFACTORY" word="LINKFACTORY"/><node begin="5" end="6" frame="proper_name(both)" id="7" lcat="np" lemma="van" num="both" pos="name" postag="VZ(init)" pt="vz" rel="mwp" root="VAN" sense="VAN" vztype="init" word="VAN"/><node begin="6" end="7" frame="proper_name(both)" genus="zijd" getal="ev" graad="basis" id="8" lcat="np" lemma="l&amp;amp;C" naamval="stan" ntype="soort" num="both" pos="name" postag="N(soort,ev,basis,zijd,stan)" pt="n" rel="mwp" root="L&amp;C" sense="L&amp;C" word="L&amp;C"/></node></node></alpino_ds><alpino_ds version="1.3" id="18.head.4.s.1"><node begin="0" cat="top" end="2" id="0" rel="top"><node begin="0" end="1" frame="number(hoofd(pl_num))" id="1" infl="pl_num" lcat="np" lemma="3" numtype="hoofd" pos="num" positie="vrij" postag="TW(hoofd,vrij)" pt="tw" rel="--" root="3" sense="3" special="hoofd" word="3"/><node begin="1" end="2" frame="punct(punt)" id="2" lcat="punct" lemma="." pos="punct" postag="LET()" pt="let" rel="--" root="." sense="." special="punt" word="."/></node></alpino_ds><alpino_ds version="1.3" id="18.head.4.s.2"><node begin="0" cat="top" end="6" id="0" rel="top"><node begin="0" cat="np" end="6" id="1" rel="--"><node begin="0" buiging="met-e" end="1" frame="noun(both,both,both)" gen="both" graad="basis" id="2" lcat="np" lemma="Spraakgestuurde" naamval="stan" num="both" pos="noun" positie="prenom" postag="ADJ(prenom,basis,met-e,stan)" pt="adj" rel="hd" root="spraakgestuurde" sense="spraakgestuurde" word="SPRAAKGESTUURDE"/><node begin="1" cat="mwu" end="6" id="3" mwu_root="LAST-MINUTE TAALCURSUS VIA DE TELEFOON" mwu_sense="LAST-MINUTE TAALCURSUS VIA DE TELEFOON" rel="app"><node begin="1" buiging="met-e" end="2" frame="proper_name(both)" graad="basis" id="4" lcat="np" lemma="Last-minuat" naamval="stan" num="both" pos="name" positie="prenom" postag="ADJ(prenom,basis,met-e,stan)" pt="adj" rel="mwp" root="LAST-MINUTE" sense="LAST-MINUTE" word="LAST-MINUTE"/><node begin="2" end="3" frame="proper_name(both)" getal="mv" graad="basis" id="5" lcat="np" lemma="taalcursus" ntype="soort" num="both" pos="name" postag="N(soort,mv,basis)" pt="n" rel="mwp" root="TAALCURSUS" sense="TAALCURSUS" word="TAALCURSUS"/><node begin="3" end="4" frame="proper_name(both)" genus="zijd" getal="ev" graad="basis" id="6" lcat="np" lemma="Via" naamval="stan" ntype="eigen" num="both" pos="name" postag="N(eigen,ev,basis,zijd,stan)" pt="n" rel="mwp" root="VIA" sense="VIA" word="VIA"/><node begin="4" end="5" frame="proper_name(both)" id="7" lcat="np" lemma="dE" lwtype="bep" naamval="stan" npagr="rest" num="both" pos="name" postag="LID(bep,stan,rest)" pt="lid" rel="mwp" root="DE" sense="DE" word="DE"/><node begin="5" end="6" frame="proper_name(both)" getal="mv" graad="basis" id="8" lcat="np" lemma="telefoon" ntype="soort" num="both" pos="name" postag="N(soort,mv,basis)" pt="n" rel="mwp" root="TELEFOON" sense="TELEFOON" word="TELEFOON"/></node></node></node></alpino_ds><alpino_ds version="1.3" id="18.head.5.s.1"><node begin="0" cat="top" end="2" id="0" rel="top"><node begin="0" end="1" frame="number(hoofd(pl_num))" id="1" infl="pl_num" lcat="np" lemma="4" numtype="hoofd" pos="num" positie="vrij" postag="TW(hoofd,vrij)" pt="tw" rel="--" root="4" sense="4" special="hoofd" word="4"/><node begin="1" end="2" frame="punct(punt)" id="2" lcat="punct" lemma="." pos="punct" postag="LET()" pt="let" rel="--" root="." sense="." special="punt" word="."/></node></alpino_ds><alpino_ds version="1.3" id="18.head.5.s.2"><node begin="0" cat="top" end="5" id="0" rel="top"><node begin="0" cat="np" end="5" id="1" rel="--"><node begin="0" end="1" frame="noun(de,count,pl)" gen="de" id="2" lcat="np" lemma="_" num="pl" pos="noun" postag="SPEC(deeleigen)" pt="spec" rel="hd" root="aio_vacature" sense="aio_vacature" spectype="deeleigen" word="AIO-VACATURES"/><node begin="1" cat="pp" end="5" id="3" rel="mod"><node begin="1" end="2" frame="preposition(in,[])" id="4" lcat="pp" lemma="iN" pos="prep" postag="VZ(init)" pt="vz" rel="hd" root="in" sense="in" vztype="init" word="IN"/><node begin="2" cat="conj" end="5" id="5" rel="obj1"><node begin="2" end="3" frame="proper_name(both,'LOC')" genus="onz" getal="ev" graad="basis" id="6" lcat="np" lemma="Tilburg" naamval="stan" neclass="LOC" ntype="eigen" num="both" pos="name" postag="N(eigen,ev,basis,onz,stan)" pt="n" rel="cnj" root="TILBURG" sense="TILBURG" word="TILBURG"/><node begin="3" conjtype="neven" end="4" frame="conj(en)" id="7" lcat="vg" lemma="eN" pos="vg" postag="VG(neven)" pt="vg" rel="crd" root="en" sense="en" word="EN"/><node begin="4" end="5" frame="proper_name(both,'LOC')" genus="onz" getal="ev" graad="basis" id="8" lcat="np" lemma="Amsterdam" naamval="stan" neclass="LOC" ntype="eigen" num="both" pos="name" postag="N(eigen,ev,basis,onz,stan)" pt="n" rel="cnj" root="AMSTERDAM" sense="AMSTERDAM" word="AMSTERDAM"/></node></node></node></node></alpino_ds><alpino_ds version="1.3" id="18.p.1.s.1"><node begin="0" cat="top" end="7" id="0" rel="top"><node begin="0" cat="smain" end="7" id="1" rel="--"><node begin="0" end="1" frame="noun(de,count,pl)" gen="de" getal="mv" graad="basis" id="2" index="1" lcat="np" lemma="computer" ntype="soort" num="pl" pos="noun" postag="N(soort,mv,basis)" pt="n" rel="su" root="computer" sense="computer" word="Computers"/><node begin="1" end="2" frame="verb(hebben,pl,aux(te_inf))" id="3" infl="pl" lcat="smain" lemma="hoeven" pos="verb" postag="WW(pv,tgw,mv)" pt="ww" pvagr="mv" pvtijd="tgw" rel="hd" root="hoef" sc="aux(te_inf)" sense="hoef" tense="present" word="hoeven" wvorm="pv"/><node begin="0" cat="ti" end="7" id="4" rel="vc"><node begin="4" end="5" frame="complementizer(te)" id="5" lcat="cp" lemma="te" pos="comp" postag="VZ(init)" pt="vz" rel="cmp" root="te" sc="te" sense="te" vztype="init" word="te"/><node begin="0" cat="inf" end="7" id="6" rel="body"><node begin="0" end="1" id="7" index="1" rel="su"/><node begin="5" buiging="zonder" end="6" frame="verb('hebben/zijn',inf,aux(inf))" id="8" infl="inf" lcat="inf" lemma="kunnen" pos="verb" positie="vrij" postag="WW(inf,vrij,zonder)" pt="ww" rel="hd" root="kan" sc="aux(inf)" sense="kan" word="kunnen" wvorm="inf"/><node begin="0" cat="inf" end="7" id="9" rel="vc"><node begin="0" end="1" id="10" index="1" rel="su"/><node begin="2" cat="np" end="4" id="11" rel="obj1"><node begin="2" buiging="zonder" end="3" frame="determiner(geen,nwh,mod,pro,yparg,nwkpro,geen)" id="12" infl="geen" lcat="detp" lemma="geen" naamval="stan" npagr="agr" pdtype="det" pos="det" positie="prenom" postag="VNW(onbep,det,stan,prenom,zonder,agr)" pt="vnw" rel="det" root="geen" sense="geen" vwtype="onbep" wh="nwh" word="geen"/><node begin="3" end="4" frame="noun(het,mass,sg)" gen="het" genus="onz" getal="ev" graad="basis" id="13" lcat="np" lemma="Nederlands" naamval="stan" ntype="eigen" num="sg" pos="noun" postag="N(eigen,ev,basis,onz,stan)" pt="n" rel="hd" root="Nederlands" sense="Nederlands" word="Nederlands"/></node><node begin="6" buiging="zonder" end="7" frame="verb(hebben,inf(no_e),transitive)" id="14" infl="inf(no_e)" lcat="inf" lemma="verstaan" pos="verb" positie="vrij" postag="WW(inf,vrij,zonder)" pt="ww" rel="hd" root="versta" sc="transitive" sense="versta" word="verstaan" wvorm="inf"/></node></node></node></node></node></alpino_ds><alpino_ds version="1.3" id="18.p.2.s.2"><node begin="0" cat="top" end="14" id="0" rel="top"><node begin="7" end="8" frame="punct(komma)" id="1" lcat="punct" lemma="," pos="punct" postag="LET()" pt="let" rel="--" root="," sense="," special="komma" word=","/><node begin="10" end="11" frame="punct(komma)" id="2" lcat="punct" lemma="," pos="punct" postag="LET()" pt="let" rel="--" root="," sense="," special="komma" word=","/><node begin="0" cat="smain" end="13" id="3" rel="--"><node begin="0" end="1" frame="er_adverb(voor)" id="4" lcat="pp" lemma="daarvoor" pos="pp" postag="BW()" pt="bw" rel="mod" root="daarvoor" sense="daarvoor" special="er" word="Daarvoor"/><node begin="1" end="2" frame="verb(unacc,sg3,intransitive)" id="5" infl="sg3" lcat="smain" lemma="verlopen" pos="verb" postag="WW(pv,tgw,met-t)" pt="ww" pvagr="met-t" pvtijd="tgw" rel="hd" root="verloop" sc="intransitive" sense="verloop" tense="present" word="verloopt" wvorm="pv"/><node begin="2" cat="np" end="5" id="6" rel="su"><node begin="2" end="3" frame="determiner(de)" id="7" infl="de" lcat="detp" lemma="de" lwtype="bep" naamval="stan" npagr="rest" pos="det" postag="LID(bep,stan,rest)" pt="lid" rel="det" root="de" sense="de" word="de"/><node aform="base" begin="3" buiging="met-e" end="4" frame="adjective(e)" graad="basis" id="8" infl="e" lcat="ap" lemma="menselijk" naamval="stan" pos="adj" positie="prenom" postag="ADJ(prenom,basis,met-e,stan)" pt="adj" rel="mod" root="menselijk" sense="menselijk" vform="adj" word="menselijke"/><node begin="4" end="5" frame="noun(de,count,sg)" gen="de" genus="zijd" getal="ev" graad="basis" id="9" lcat="np" lemma="communicatie" naamval="stan" ntype="soort" num="sg" pos="noun" postag="N(soort,ev,basis,zijd,stan)" pt="n" rel="hd" root="communicatie" sense="communicatie" word="communicatie"/></node><node begin="5" cat="ap" end="7" id="10" rel="mod"><node begin="5" end="6" frame="intensifier" id="11" lcat="advp" lemma="te" pos="adv" postag="BW()" pt="bw" rel="mod" root="te" sense="te" special="intensifier" word="te"/><node aform="base" begin="6" buiging="zonder" end="7" frame="adjective(no_e(adv))" graad="basis" id="12" infl="no_e" lcat="ap" lemma="subtiel" pos="adj" positie="vrij" postag="ADJ(vrij,basis,zonder)" pt="adj" rel="hd" root="subtiel" sense="subtiel" vform="adj" word="subtiel"/></node><node begin="8" cat="ppart" end="10" id="13" rel="mod"><node begin="8" end="9" frame="intensifier" id="14" lcat="advp" lemma="te" pos="adv" postag="VZ(init)" pt="vz" rel="mod" root="te" sense="te" special="intensifier" vztype="init" word="te"/><node aform="base" begin="9" buiging="zonder" end="10" frame="adjective(ge_no_e(adv))" id="15" infl="no_e" lcat="ppart" lemma="nuanceren" pos="adj" positie="vrij" postag="WW(vd,vrij,zonder)" pt="ww" rel="hd" root="genuanceerd" sense="genuanceerd" vform="psp" word="genuanceerd" wvorm="vd"/></node><node begin="11" cat="ap" end="13" id="16" rel="mod"><node begin="11" end="12" frame="intensifier" id="17" lcat="advp" lemma="te" pos="adv" postag="BW()" pt="bw" rel="mod" root="te" sense="te" special="intensifier" word="te"/><node aform="base" begin="12" buiging="zonder" end="13" frame="adjective(no_e(adv))" graad="basis" id="18" infl="no_e" lcat="ap" lemma="rijk" pos="adj" positie="vrij" postag="ADJ(vrij,basis,zonder)" pt="adj" rel="hd" root="rijk" sense="rijk" vform="adj" word="rijk"/></node></node><node begin="13" end="14" frame="punct(punt)" id="19" lcat="punct" lemma="." pos="punct" postag="LET()" pt="let" rel="--" root="." sense="." special="punt" word="."/></node></alpino_ds></treebank>

Pipfile ,以防您希望设置本地测试环境(尽管上述脚本可能适用于 3.4 及更高版本):

[[source]]
url = "https://pypi.org/simple"
verify_ssl = true
name = "pypi"

[packages]
lxml = ">=4.2.1"

[dev-packages]

[requires]
python_version = "3.6"

根据您命名 XML 的方式,脚本的输出将如下所示:

Processing WRPEE-dummy sentence 1 topconjmwu
Processing WRPEE-dummy sentence 2 top
Processing WRPEE-dummy sentence 3 topmwu
Processing WRPEE-dummy sentence 4 top
Processing WRPEE-dummy sentence 5 topmwu
Processing WRPEE-dummy sentence 6 top
Processing WRPEE-dummy sentence 7 topnpmwu
Processing WRPEE-dummy sentence 8 top
Processing WRPEE-dummy sentence 9 topnpppconj
Processing WRPEE-dummy sentence 10 topsmaintiinfinfnp
Processing WRPEE-dummy sentence 11 topsmainnpapppartap
pattern counter: Counter()
filenames: set()
filehandles: {}

这说明我要在进程间共享的变量确实没有共享和修改。 (我知道发生这种情况是因为进程及其变量是 fork 的。)在这个虚拟示例中,它可能看起来不是很重要(尽管很明显现在我无法关闭打开的文件句柄),但在大对这些变量和其他变量进行编程,并在派生进程中进行修改和使用。那么,问题是如何让它发挥作用。

我阅读了有关 Pipe() 和 Queue() 的内容,在我看来我需要 Queue() .此外,因为我会经常从不同的进程读取和写入同一个对象,所以我想我需要一个 Manager() 。以及。然而,这是我不确定的部分。我试着阅读 this topic但这只会让我更加困惑。此外,评论suggest即使在 3.6.4 上,使用复杂对象时也可能存在问题。请记住,我共享的实际数据是嵌套的并且由不同类型组成。

总而言之,问题是:我如何重写上面的示例代码以确保所有 Process es 对 _process_node 中的实例变量具有(非阻塞)读/写访问权限以及从该过程中调用的方法?我愿意将我当前的 Python 版本 (3.6.4) 更新到 3.7,并使用其他库。

最佳答案

multiprocessing 库让您可以在并发 Python 代码中使用并行。如果没有 multiprocessing,Python GIL 往往会妨碍真正的并行执行,但您应该看到 multiprocessing 代码与其他并发技术没有什么不同。基本上,multiprocessing 和线程之间的最大区别是状态是通过慢速 IPC 调用共享的。

这意味着您需要小心处理共享资源。您当前的实现在这方面做得不好;您有多个并发任务访问共享资源,而不考虑其他人可能在做什么。您的代码中存在多种竞争条件机会,其中多个任务可以写入同一个文件,或者更新嵌套数据结构而不考虑其他更新。

当您必须更新共享数据结构或文件时,通常可以在两个选项之间进行选择:

  • 使用同步;任何需要改变资源的人都需要先获得共享锁,或者使用一些 other form of synchronisation primitive协调访问。
  • 让一个任务负责改变资源。这通常涉及一个或多个 queues .

请注意,您必须将这些对象(同步原语或队列)中的任何一个显式传递给子进程,请参阅 programming guidelines ;不要在实例上使用引用来共享状态。

对于你的情况,我会选择队列和专门的任务;您的瓶颈是数据处理,将数据写入磁盘并根据分析任务的结果更新一些数据结构相比之下相对较快。

所以使用单个任务写入文件;只需将序列化的 XML 字符串与 cat 值一起放入专用队列中,并有一个单独的任务从队列中提取这些字符串并将它们写入文件。这个单独的任务负责所有文件访问,包括打开和关闭。这序列化文件访问并消除竞争条件和破坏写入的可能性。如果这些文件的数据来得又多又快以至于使该任务成为瓶颈,请为每个目标文件创建任务。

对共享数据结构做同样的事情;将突变发送到队列,将其留给专门的任务来合并数据。更新代理对象并不合适,因为它们的更改通过 RPC 调用传播到其他进程,增加了竞争条件的机会,并且锁定不能保证数据在所有任务进程中保持一致!

对于您的简单示例,Counter() 对象的更新实际上并未共享;每个子进程在 fork 和更新本地副本时都会继承一个副本,而父进程永远不会看到所做的更改。因此,您将使用一个本地新Counter() 实例,并将其插入队列。然后,一个专门的任务可以从队列中接收这些,并使用 total_counter.update(queued_counter) 用这些值更新本地 Counter() 实例,再次确保更新是序列化的.

为了说明,这里有一个计算 Lorem Ipsum 数据的人为示例;一系列 count_words 任务进行计数,但将它们生成的 Counter() 对象传递给队列,以供单独的整理任务组合成最终的单词计数。一个单独的日志记录任务将数据从日志记录队列写入磁盘:

import datetime
import random
import re
import time

from collections import Counter
from functools import partial
from multiprocessing import Manager, Pool
from io import TextIOWrapper
from urllib.request import urlopen

COMPLETE = "COMPLETE"

def collating_task(countsqueue, logqueue):
    wordcounts = Counter()

    # Loop until COMPLETE is found in the queue
    for counts in iter(countsqueue.get, COMPLETE):
        wordcounts.update(counts)
        logqueue.put(
            f"collating: updating with {len(counts)} words "
            f"(total {len(wordcounts)})"
        )

    return wordcounts

def logging_task(logqueue):
    # Loop until COMPLETE is found in the queue
    with open('logfile.txt', 'w') as logf:
        for message in iter(logqueue.get, COMPLETE):
            print(datetime.datetime.now(), message, flush=True, file=logf)

def count_words(line, countsqueue, logqueue):
    findwords = re.compile(r"\w+").findall
    counts = Counter(findwords(line))
    logqueue.put(f"counting: counted {len(counts)} words")
    # a random short delay to make this task 'heavy'
    time.sleep(random.uniform(0.0, 0.05))
    countsqueue.put(counts)

def main():
    # Random latin text, 1000 paragraphs
    loripsum_response = urlopen("https://loripsum.net/api/1000/long/plaintext")
    text = list(TextIOWrapper(loripsum_response, encoding="utf8"))
    print(f"Will process {len(text)} lines of data")

    # create managed queues that can be passed in as arguments. The alternative
    # is to create globals or Process() objects with queues passed in.
    manager = Manager()
    countsqueue, logqueue = manager.Queue(), manager.Queue()
    with Pool() as pool:
        # start processing tasks, these loop forever until signalled
        collator = pool.apply_async(collating_task, (countsqueue, logqueue))
        logger = pool.apply_async(logging_task, (logqueue,))

        # process lines, blocks until complete
        pool.map(partial(count_words, countsqueue=countsqueue, logqueue=logqueue), text)

        countsqueue.put(COMPLETE)
        wordcounts = collator.get()
        logqueue.put(COMPLETE)
        logger.wait()

    print(f"Counted {len(wordcounts)} different words; top 5 is:")
    for word, count in wordcounts.most_common(5):
        print(f'{word:<10} {count:4d}')

if __name__ == "__main__":
    main()

产生类似的东西:

Will process 2000 lines of data
Counted 5651 different words; top 5 is:
et         2078
in         2074
est        2036
non        1911
ut         1477

和一个较大的 logfile.txt,其中包含推送到日志记录队列的信息。

关于python - 具有共享内存复杂对象的多处理大型 XML 文件,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/52927322/

相关文章:

logging - Log4Net 和来自并行实例的日志记录

python - 将元素添加到列表更改所有元素

windows - 并行化具有多个导出点的算法?

python - 如何在 python 中以优雅的方式动态创建对象?

python - 我需要创建一个字典来计算名称在列表中出现的次数

python - 我需要一些帮助来修复一个 python 脚本,该脚本为 chatgpt 提供人形语音,并允许我使用自己的声音与它交谈

python - 为什么我在编码二叉搜索树时遇到错误?

r - 如何从 clusterApply 打印?

python - 使用 numexpr 优化 numpy 中的数值运算

python实例变量初始化