我有一个很大的列表 L 需要操作。令 f() 为对 L 进行操作的函数。f() 采用另一个变量,该变量每 15 分钟到期一次,需要更新。这是一个串行示例:
def main():
L = openList()
# START THE CLOCK
clockStart = dt.datetime.now()
clockExp = clockStart + dt.timedelta(seconds=900)
a = getRenewed()
for item in L:
f(item, a) # operate on item given a
# CHECK TIME REMAINING
clockCur = dt.datetime.now()
clockRem = (clockExp - clockCur).total_seconds()
# RENEW a IF NEEDED
if clockRem < 5: # renew with 5 seconds left
clockStart = dt.datetime.now()
clockExp = clockStart + dt.timedelta(seconds=900)
a = getRenewed()
由于 f() 需要几秒钟(有时甚至更长),我想并行化代码。关于如何在给定计时器的情况下执行此操作有什么提示吗?我设想共享clockExp和“a”,当进程满足clockRem < 5时,它调用getRenewed()并共享新的“a”和clockExp,然后重复。
最佳答案
如果getRenewed
是幂等的(也就是说,您可以多次调用它而不会产生副作用),您可以简单地将现有的计时器代码移动到您的工作进程中,并让它们每个调用一次他们注意到自己的计时器已经耗尽。这只需要同步您传入的列表中的项目,并且 multiprocessing.Pool
可以轻松处理该问题:
def setup_worker():
global clockExp, a
clockStart = dt.datetime.now()
clockExp = clockStart + dt.timedelta(seconds=900)
a = getRenewed()
def worker(item):
global clockExp, a
clockCur = dt.datetime.now()
clockRem = (clockExp - clockCur).total_seconds()
if clockRem < 5: # renew with 5 seconds left
clockStart = dt.datetime.now()
clockExp = clockStart + dt.timedelta(seconds=900)
a = getRenewed()
f(item, a)
def main(L):
pool = multiprocessing.Pool(initializer=setup_worker)
pool.map(worker, L)
如果getRenewed
不是幂等的,事情会需要变得更复杂一些。您无法在每个工作进程中调用它,因此您需要在进程之间设置某种通信方法,以便每个进程都可以在可用时获取最新版本。
我建议使用 multiprocessing.queue
将 a
值从主进程传递给工作线程。您仍然可以对列表项使用Pool
,您只需确保从主进程异步使用它即可。也许像这样:
def setup_worker2(queue):
global x
x = random.random()
global a_queue, a, clockExp
a_queue = queue
a = a_queue.get() # wait for the first `a` value
clockStart = dt.datetime.now()
clockExp = clockStart + dt.timedelta(seconds=900)
def worker2(item):
global a, clockExp
clockCur = dt.datetime.now()
clockRem = (clockExp - clockCur).total_seconds()
if clockRem < 60: # start checking for a new `a` value 60 seconds before its needed
try:
a = a_queue.get_nowait()
clockStart = dt.datetime.now()
clockExp = clockStart + dt.timedelta(seconds=900)
except queue.Empty:
pass
return f(item, a)
def main2(L):
queue = multiprocessing.Queue() # setup the queue for the a values
pool = multiprocessing.Pool(initializer=setup_worker2, initargs=(queue,))
result = pool.map_async(worker2, L) # send the items to the pool asynchronously
while True: # loop for sending a values through the queue
a = getRenewed() # get a new item
for _ in range(os.cpu_count()):
queue.put(a) # send one copy per worker process
try:
result.wait(900-5) # sleep for ~15 minutes, or until the result is ready
except multiprocessing.TimeoutError:
pass # if we got a timeout, keep looping!
else:
break # if not, we are done, so break out of the loop!
工作人员仍然需要有一些计时代码,否则您将面临竞争条件,其中一个工作人员可能会消耗在单批中发送到队列的两个 a
值主要流程。如果对 f
的某些调用明显慢于其他调用(如果涉及从网络下载内容,则很可能发生这种情况),则可能会发生这种情况。
关于python - 受计时器影响的多处理,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/26169409/