python RQ : pattern for callback

标签 python parallel-processing python-rq

我现在有大量文档要处理,我正在使用 Python RQ 来并行处理任务。

我希望在每个文档上执行不同的操作时完成一系列工作。例如:A -> B -> C 表示将文档传递给函数A,在之后A 已完成,继续执行 B 和最后的 C

但是,Python RQ 似乎不能很好地支持管道内容。

这是一个简单但有点肮脏的做法。总之,管道中的每个函数都以嵌套的方式调用它的下一个函数。

例如,对于管道A->B->C

在顶层,一些代码是这样写的:

q.enqueue(A, the_doc)

其中 q 是 Queue 实例,在函数 A 中有如下代码:

q.enqueue(B, the_doc)

而在B中,有这样的东西:

q.enqueue(C, the_doc)

还有比这更优雅的方式吗?例如 ONE 函数中的一些代码:

q.enqueue(A, the_doc) q.enqueue(B, the_doc, after = A) q.enqueue(C, the_doc, after= B)

depends_on参数最接近我的要求,但是,运行如下:

A_job = q.enqueue(A, the_doc) q.enqueue(B, depends_on=A_job)

不会工作。因为 q.enqueue(B, depends_on=A_job )A_job = q.enqueue(A, the_doc) 执行后立即执行。当 B 入队时,A 的结果可能还没有准备好,因为它需要时间来处理。

附言:

如果 Python RQ 不太擅长这个,我还可以使用 Python 中的什么工具来实现相同的目的:

  1. 循环并行化
  2. 管道处理支持

最佳答案

By the time B is enqueued, the result from A might not be ready as it takes time to process.

我不确定您最初发布问题时是否确实如此,但无论如何,现在情况并非如此。事实上,depends_on 功能正是为您描述的工作流程而设计的。

确实这两个函数是立即连续执行的。

A_job = q.enqueue(A, the_doc)
B_job = q.enqueue(B, depends_on=A_job )

但是在 A 完成之前,worker 不会执行 B。直到 A_job 成功执行,B.status == 'deferred'。一旦 A.status == 'finished'B 就会开始运行。

这意味着 BC 可以像这样访问和操作它们的依赖项的结果:

import time
from rq import Queue, get_current_job
from redis import StrictRedis

conn = StrictRedis()
q = Queue('high', connection=conn)

def A():
    time.sleep(100)
    return 'result A'

def B():
    time.sleep(100)
    current_job = get_current_job(conn)
    a_job_id = current_job.dependencies[0].id
    a_job_result = q.fetch_job(a_job_id).result
    assert a_job_result == 'result A'
    return a_job_result + ' result B'


def C():
    time.sleep(100)
    current_job = get_current_job(conn)
    b_job_id = current_job.dependencies[0].id
    b_job_result = q.fetch_job(b_job_id).result
    assert b_job_result == 'result A result B'
    return b_job_result + ' result C'

worker最终会打印'result A result B result C'

此外,如果您在队列中有很多作业并且 B 可能在执行之前等待一段时间,您可能需要显着增加 result_ttl 或使其无限期result_ttl=-1。否则,无论为 result_ttl 设置多少秒后,A 的结果都将被清除,在这种情况下,B 将无法再访问它并返回所需的结果。

但是,设置 result_ttl=-1 具有重要的内存含义。这意味着您的作业结果将永远不会被自动清除,并且内存将按比例增长,直到您手动从 Redis 中删除这些结果。

关于 python RQ : pattern for callback,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/24295090/

相关文章:

python - 实时能力 python 多处理(队列和管道)

java - 当多个线程并行运行时,如何在 jmeter 中保留线程变量?

python - 绘制n个相同的无重叠和中心重心的圆

python - 你如何调试 Django 模板?

python - 用颜色填充图像但保留 alpha(PIL 中的颜色叠加)

python - 我可以将函数作为参数发送到具有多处理功能的所有进程吗?

python - 在 Django 中测试 django-rq ( python-rq ) 的最佳实践

python - Redis队列: How to prevent chained Jobs from running asynchronously

python - HTTP 错误 503 : Service Unavailable when trying to download MNIST data