python - celery任务的动态注册

标签 python celery

我想知道哪种方法是将 celery 用于运行时注册的任务的最佳方式。我的工作流程如下:

  • 启动 celery 应用
  • 启动python应用
  • python 应用程序创建了一个我想在 celery 中安排的新任务

最佳答案

我完成的方法是基于与 click package has with custom subcommands 相同的想法的“插件”概念。有。

应用程序结构(基于python 3):

.
├── dynamic_tasks.py
├── run.py
└── tasks
    └── get_rate.py

celery 任务 dynamic_tasks.py 定义如下:

import os
import celery

app = celery.Celery('dynamic_tasks', broker='amqp://guest@192.168.169.1/', backend='rpc://')

PLUGIN_FOLDER = os.path.join(os.path.dirname(__file__), 'tasks')
def _absolutepath(filename):
    """ Return the absolute path to the filename"""
    return os.path.join(PLUGIN_FOLDER, filename)

@app.task
def tasks(funcname, *args, **kwargs):
    try:
        funcname = funcname.replace('-', '_')
        funcname += '.py'
        func = _absolutepath(funcname)
        ns = {}
        with open(func) as f:
            code = compile(f.read(), func, 'exec')
            eval(code, ns, ns)
        return ns['task'](*args, **kwargs)
    except IOError as e:
       # Manage IOError
       raise e

可插入任务示例tasks/get_rate.py:

""" This task get the currency rate between a pair of currencies """    
import urllib.request

URL = 'http://finance.yahoo.com/d/quotes.csv?s={}=X&f=p'

def task(pair='EURSEK', url_tmplt=URL):
    with urllib.request.urlopen(url_tmplt.format(pair)) as res:
        body = res.read()
    return (pair, float(body.strip()))

并且,简单地说,从 run.py 运行示例:

from dynamic_tasks import tasks

print(tasks.delay('get_rate', 'EURSEK').get())

已编辑 由于 celery 在不同的机器上运行,因此不可能依赖本地文件系统。我的新方法是发送要作为字符串执行的函数:

@app.task
def dynamic_tasks(funcname, funccode, *args, **kwargs):
    try:
        ns = {}
        code = compile(funccode, funcname, 'exec')
        eval(code, ns, ns)
        logger.info('execute %r with args %r, %r', funcname, args, kwargs)
        return ns['task'](*args, **kwargs)
    except IOError:
        logger.error("Error loading the dynamic function from text %s", funcname)

关于python - celery任务的动态注册,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/37724522/

相关文章:

python - 将环境变量传递给 GAE 实例

jquery - 如何将 "submit"表单发送到 ajax 脚本而不是返回到 Flask?

rabbitmq - Celery 任务计划(Celery、Django 和 RabbitMQ)

python-3.x - 测试 Celery 工作的 Redis 命令

python - 如何阻止 QSvgWidget 生成它自己的窗口

python - 遍历图像目录并将它们全部旋转 x 度并保存到目录

python - tf.nn.conv2d() 对输入张量形状有什么影响?

python - 从文件中删除特定的 JSON 对象,然后存储该文件

python - Celery 为什么任务停留在队列中

python - django celery 和弦任务错误