python - celery 异步获取超时永不超时

标签 python redis celery django-celery

我的 celery 应用程序如下。我使用celery 4.4.2和Python 2.7.5以及OS是CentOs 7.4。这个想法是通过celery,通过在java_zip_sign_exe变量中定义的shell脚本,对分布在不同主机上的多个工作程序分布的shell脚本,使用celery对沉重的文件进行签名(有时,每个文件有时需要长达30分钟的签名)。 sign_heavy_java_zip_files()进行sign_heavy_java_zip_file()的异步调用,并为其提供了所有文件名。

sign_celery_app.py

app = Celery('tasks', broker='redis://:<hostname>:6379/0',backend='redis://:<hostname>:6379/0')
app.conf['worker_prefetch_multiplier'] = 1
app.conf['task_acks_late'] = True
app.conf.task_default_queue = 'default'
app.conf.tasks_queues = (
    Queue('default', exchange='default', routing_key='default'),
    Queue('heavy_java_zip', exchange='heavy_java_zip', routing_key='heavy_java_zip'),

@app.task

def sign_java_zip_file(filename,User,MaxPerJavaZipFileTime,Site):
  print ("Started Signing " + filename + java_zip_sign_exe )
  process=subprocess.Popen([java_zip_sign_exe,filename,User,str(MaxPerJavaZipFileTime),Site],stdout=subprocess.PIPE,stderr=subprocess.PIPE)
  out,err = process.communicate()
  print ("Finished Signing " + filename )
  return (process.returncode,out,err)

@app.task(queue='heavy_java_zip')
def sign_heavy_java_zip_files(filenames,User,MaxPerJavaZipFileTime,Site):
  job = group(sign_java_zip_file.s(filename,User,MaxPerJavaZipFileTime,Site) for filename in filenames )
  job_results=job.apply_async(queue='heavy_java_zip')
  return job_results

celery worker 开始如下
celery -A sign_celery_app  worker --loglevel=info --concurrency=2  -O fair -Q "heavy_java_zip"

我的main()python调用者文件如下
results_heavy_java_zip=sign_heavy_java_zip_files.delay(heavy_java_zip_file_list,User,int(MaxPerJavaZipFileTime),Site)

results_heavy_java_zip.get(timeout=(MaxTotalJavaZipTime*60))

如预期的那样,当没有工作线程时,超时将引发Timeout Exception。但是,如果有工作人员,并且一旦任务开始异步工作,则超时不会有任何影响。我希望即使它们正在运行,它们也会在超时结束时被打断。我的理解错了吗?

最佳答案

我相信错误在于方式,函数调用是在主例程中完成的。

results_heavy_java_zip=sign_heavy_java_zip_files.delay(heavy_java_zip_file_list,User,int(MaxPerJavaZipFileTime),Site)

必须(没有延迟-或程序需要在任务调用中注意异步调用)
results_heavy_java_zip=sign_heavy_java_zip_files(heavy_java_zip_file_list,User,int(MaxPerJavaZipFileTime),Site)

关于python - celery 异步获取超时永不超时,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/62178043/

相关文章:

python - Python 是否也有一些内置数据集(如 R)来执行测试?

java - DefaultCookieSerializer.setJvmRoute 不工作

Redis Mass Insertion 值换行

python - 如何在 Apache Airflow 中混合使用 Celery Executor 和 Kubernetes Executor?

python - celery - 链中的组

python - 基于 Python 中的字典/列表标记单词

python - 为什么 Pydev 给出内置关键字的语法错误?

python - Anaconda 已安装但无法启动 Navigator

php - Laravel Echo 服务器、Redis、Socket.IO : Can't seem to make them work

python - django + celery 离线时的处理