我有一个包含大量数据(大于内存)的迭代器,我希望能够对这些数据执行一些操作。为了快速完成此操作,我使用了多处理模块。
def __init__(self, poolSize, spaceTimeTweetCollection=None):
super().__init__()
self.tagFreq = {}
if spaceTimeTweetCollection is not None:
q = Queue()
processes = [Process(target=self.worker, args=((q),)) for i in range(poolSize)]
for p in processes:
p.start()
for tweet in spaceTimeTweetCollection:
q.put(tweet)
for p in processes:
p.join()
我的目的是创建一些监听队列的进程
def worker(self, queue):
tweet = queue.get()
self.append(tweet) #performs some actions on data
然后我遍历迭代器并将数据添加到队列中,因为 worker 方法中的 queue.get()
正在阻塞工作人员应该在从中接收数据时开始对数据执行操作队列。
然而,每个处理器上的每个 worker 只运行一次,仅此而已!因此,如果 poolSize 为 8,它将读取队列中的前 8 个项目,对 8 个不同的进程执行操作,然后完成!有谁知道为什么会这样?我在 Windows 上运行它。
编辑 我想提一下,甚至认为这一切都是在一个类中完成的,该类在 _main_like 中被调用
if __name__ == '__main__':
tweetDatabase = Database()
dataSet = tweetDatabase.read2dBoundingBox(boundaryBox)
freq = TweetCounter(8, dataSet) # this is where the multiprocessing is done
最佳答案
我相信你的 worker 是罪魁祸首。它只做一件事然后就死了。尝试:
def worker(self, queue):
while True:
tweet = queue.get()
self.append(tweet)
(不过我会看一下 Pool)
关于python多处理从多处理队列访问数据不读取所有数据,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/22977564/