python - 受计时器影响的多处理

标签 python timer multiprocessing

我有一个很大的列表 L 需要操作。令 f() 为对 L 进行操作的函数。f() 采用另一个变量,该变量每 15 分钟到期一次,需要更新。这是一个串行示例:

def main():
    L = openList()
    # START THE CLOCK
    clockStart = dt.datetime.now()
    clockExp = clockStart + dt.timedelta(seconds=900)
    a = getRenewed()
    for item in L:
        f(item, a)   # operate on item given a
        # CHECK TIME REMAINING
        clockCur = dt.datetime.now()
        clockRem = (clockExp - clockCur).total_seconds()
        # RENEW a IF NEEDED
        if clockRem < 5: # renew with 5 seconds left
            clockStart = dt.datetime.now()
            clockExp = clockStart + dt.timedelta(seconds=900)
            a = getRenewed()

由于 f() 需要几秒钟(有时甚至更长),我想并行化代码。关于如何在给定计时器的情况下执行此操作有什么提示吗?我设想共享clockExp和“a”,当进程满足clockRem < 5时,它调用getRenewed()并共享新的“a”和clockExp,然后重复。

最佳答案

如果getRenewed是幂等的(也就是说,您可以多次调用它而不会产生副作用),您可以简单地将现有的计时器代码移动到您的工作进程中,并让它们每个调用一次他们注意到自己的计时器已经耗尽。这只需要同步您传入的列表中的项目,并且 multiprocessing.Pool 可以轻松处理该问题:

def setup_worker():
    global clockExp, a

    clockStart = dt.datetime.now()
    clockExp = clockStart + dt.timedelta(seconds=900)
    a = getRenewed()

def worker(item):
    global clockExp, a

    clockCur = dt.datetime.now()
    clockRem = (clockExp - clockCur).total_seconds()

    if clockRem < 5: # renew with 5 seconds left
        clockStart = dt.datetime.now()
        clockExp = clockStart + dt.timedelta(seconds=900)
        a = getRenewed()

    f(item, a)

def main(L):
    pool = multiprocessing.Pool(initializer=setup_worker)

    pool.map(worker, L)

如果getRenewed不是幂等的,事情会需要变得更复杂一些。您无法在每个工作进程中调用它,因此您需要在进程之间设置某种通信方法,以便每个进程都可以在可用时获取最新版本。

我建议使用 multiprocessing.queuea 值从主进程传递给工作线程。您仍然可以对列表项使用Pool,您只需确保从主进程异步使用它即可。也许像这样:

def setup_worker2(queue):
    global x
    x = random.random()
    global a_queue, a, clockExp

    a_queue = queue
    a = a_queue.get()    # wait for the first `a` value
    clockStart = dt.datetime.now()
    clockExp = clockStart + dt.timedelta(seconds=900)

def worker2(item):
    global a, clockExp

    clockCur = dt.datetime.now()
    clockRem = (clockExp - clockCur).total_seconds()
    if clockRem < 60: # start checking for a new `a` value 60 seconds before its needed
        try:
            a = a_queue.get_nowait()
            clockStart = dt.datetime.now()
            clockExp = clockStart + dt.timedelta(seconds=900)
        except queue.Empty:
            pass

    return f(item, a)

def main2(L):
    queue = multiprocessing.Queue()     # setup the queue for the a values

    pool = multiprocessing.Pool(initializer=setup_worker2, initargs=(queue,))

    result = pool.map_async(worker2, L) # send the items to the pool asynchronously

    while True:                   # loop for sending a values through the queue
        a = getRenewed()          # get a new item
        for _ in range(os.cpu_count()):
            queue.put(a)          # send one copy per worker process

        try:
            result.wait(900-5)    # sleep for ~15 minutes, or until the result is ready
        except multiprocessing.TimeoutError:
            pass                  # if we got a timeout, keep looping!
        else:
            break                 # if not, we are done, so break out of the loop!

工作人员仍然需要有一些计时代码,否则您将面临竞争条件,其中一个工作人员可能会消耗在单批中发送到队列的两个 a 值主要流程。如果对 f 的某些调用明显慢于其他调用(如果涉及从网络下载内容,则很可能发生这种情况),则可能会发生这种情况。

关于python - 受计时器影响的多处理,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/26169409/

相关文章:

python - 在 Python 中通过 stdin/stdout/pipes 进行通信的预定义协议(protocol)?

python - 如何通过python slackclient发送application/x-www-form-urlencoded到slack?

java - 可以在控制台应用程序中将 Thread.sleep 用作计时器吗?

python - 想要在python中返回全零矩阵的行数

Android - 安排事件每 10 毫秒发生一次?

javascript - 使用javascript的计时器

Python 3.5 多处理池和队列不起作用

c++ - 获取优先于提升进程的互斥量

python - 来自 joblib 的多处理不并行化?

python - 使用索引列表对 pandas DataFrame 进行索引并填充值