使用 Python RQ ,我们正在尝试动态管理工作进程。我们使用定制的 worker 脚本,其(简化形式)如下:
from rq import Connection, Worker
queues_to_listen_on = get_queues_to_listen_on()
with Connection(connection = get_worker_connection()):
w = Worker(queues_to_listen_on)
w.work()
我们对 worker 停工特别感兴趣。我们主要关心的是如何优雅地关闭 worker,以一种能够在关闭之前完成当前工作的方式。适当的 Worker
对象上的 request_stop(...)
信号处理程序似乎可以满足我们的需要,但似乎没有办法(至少据我所知)发射它,除非它是通过在终端中运行的工作进程上按 CTRL+C
。
在我看来,有两种可能的解决方案(肯定还有更多)- 按优先顺序排列:
- 以编程方式,使用
rq
库,将信号发送到request_stop
,从而触发正常关闭。 - 以某种方式获取正确进程的 pid(不确定是主力进程还是工作监听器进程)并使用其他方法向该进程发送适当的信号。我们有一些方法可以做到这一点,但它很可能需要更多的工作并引入其他变量来解决我希望被忽略的问题(例如,使用
Fabric
运行远程命令或类似的东西)。
如果有更好的方法来解决这个问题或可以实现相同目标的不同替代方法,我将不胜感激您的建议。
最佳答案
选项 1 在设计方面肯定更好。
然而,为了解决您必须使用 CTRL + C
退出进程的特定问题(我也讨厌这样),您可以为您的工作人员使用以下策略:
# WORKER_NAME.py
import os
PID = os.getpid()
@atexit.register
def clean_shut():
print "Clean shut performed"
try:
os.unlink("WORKER_NAME.%d" % PID)
except:
pass
# Worker main
def main():
f = open("WORKER_NAME.%d" % PID, "w")
f.write("Delete this to end WORKER_NAME gracefully")
f.close()
while os.path.exists("WORKER_NAME.%d" % PID):
# Worker working
然后在您的主脚本中,按照@Borys 的建议获取工作人员 PID,发送热停止请求,然后 os.unlink("path/to/WORKER_NAME.%d"% worker_PID)
以确保安全优雅的关机:)
虽然这只适用于运行无限循环的工作人员。如果工作进程调用的东西甚至会阻塞普通的顺序一次性作业,则您必须进一步跟踪可能阻塞的例程以从那里解决问题,例如应用某种超时策略。
关于python - 如何正确地动态关闭 Python RQ 工作进程?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/15119646/