python - 将客户端功能传递给 celery 任务

标签 python celery

我想将客户端创建的函数传递给 celery 任务并执行它。例如下面,我正在尝试编写一个 map 函数,它接受一个函数 f 和一个列表 l 并执行 map (f, l) 在 celery 任务中。

大概是函数没有正确序列化(可以理解,这很难)。但是,有什么办法可以做到这一点?最佳做法是什么?我想我可以传递一个字符串,然后 exec 它,但我宁愿不只是我的应用程序预期的工作方式。

编辑:我找到了一种方法 serialize a function ......我想我可以把它包起来做我需要做的事情。有更好的想法吗?


from celery import Celery

app = Celery('tasks', broker='redis://localhost', backend='redis://localhost')

@app.task
def cp_map(f, l):
    return map(f, l)

然后,我尝试将此任务与此结合使用:

In [20]: from tasks import cp_map

In [21]: def f(x): return x + 1

In [22]: cp_map.delay(f, [1,2,3])
Out[22]: <AsyncResult: 27baf8bf-8ef3-496d-a445-ebd7ee94e206>

In [23]: _.status
Out[23]: 'PENDING'

在 worker 身上,我得到这个:

[2014-02-09 22:27:00,828: CRITICAL/MainProcess] Can't decode message body: DecodeError(AttributeError("'module' object has no attribute 'f'",),) (type:u'application/x-python-serialize' encoding:u'binary' raw:"'\\x80\\x02}q\\x01(U\\x07expiresq\\x02NU\\x03utcq\\x03\\x88U\\x04argsq\\x04c__main__\\nf\\nq\\x05]q\\x06(K\\x01K\\x02K\\x03e\\x86q\\x07U\\x05chordq\\x08NU\\tcallbacksq\\tNU\\x08errbacksq\\nNU\\x07tasksetq\\x0bNU\\x02idq\\x0cU$27baf8bf-8ef3-496d-a445-ebd7ee94e206q\\rU\\x07retriesq\\x0eK\\x00U\\x04taskq\\x0fU\\x0ctasks.cp_mapq\\x10U\\ttimelimitq\\x11NN\\x86U\\x03etaq\\x12NU\\x06kwargsq\\x13}q\\x14u.' (233b)"')
Traceback (most recent call last):
  File "/usr/local/lib/python2.7/dist-packages/kombu/messaging.py", line 585, in _receive_callback
    decoded = None if on_m else message.decode()
  File "/usr/local/lib/python2.7/dist-packages/kombu/message.py", line 142, in decode
    self.content_encoding, accept=self.accept)
  File "/usr/local/lib/python2.7/dist-packages/kombu/serialization.py", line 184, in loads
    return decode(data)
  File "/usr/lib/python2.7/contextlib.py", line 35, in __exit__
    self.gen.throw(type, value, traceback)
  File "/usr/local/lib/python2.7/dist-packages/kombu/serialization.py", line 59, in _reraise_errors
    reraise(wrapper, wrapper(exc), sys.exc_info()[2])
  File "/usr/local/lib/python2.7/dist-packages/kombu/serialization.py", line 55, in _reraise_errors
    yield
  File "/usr/local/lib/python2.7/dist-packages/kombu/serialization.py", line 184, in loads
    return decode(data)
  File "/usr/local/lib/python2.7/dist-packages/kombu/serialization.py", line 64, in pickle_loads
    return load(BytesIO(s))
DecodeError: 'module' object has no attribute 'f'

最佳答案

您可以使用 marshal 将函数序列化为字符串,然后在任务中将其反序列化。我不知道这是否是最好的方法,但它会起作用。您可能还想看看 dill .

这是从 another stackoverflow answer 复制的一些示例代码:

import marshal
def foo(x): return x*x
code_string = marshal.dumps(foo.func_code)

然后在任务中:

import marshal, types

code = marshal.loads(code_string)
func = types.FunctionType(code, globals(), "some_func_name")

func(10)  # gives 100

关于python - 将客户端功能传递给 celery 任务,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/21668657/

相关文章:

python - 如果输入在索引之外,如何打印列表中的特定项目

python - 拆分列表并存储在python中的字典中

python - celery 结果,不起作用

python - RabbitMQ IOError : Socket closed

django - 停止 celeryd 的问题

python - 按名称删除 celery 任务(使用通配符?)

python - 读取数据时删除列包含某些字符串: python

python - 这个 psycopg2 代码可以安全地注入(inject)吗?

Python 3.6 Tkinter 和多重处理

python - 同步 celery 队列