python-3.x - 测试 Celery 工作的 Redis 命令

标签 python-3.x redis celery

我正在试验使用 Redis 作为代理的 Celery worker。

这是我对 Celery worker 的测试代码:

from celery import Celery
app = Celery('tasks', broker='redis://xxxxx.net:6379/0')

@app.task
def nextexec(payload):
    print(payload)

使用 redis-cli,我运行以下命令将一个值插入 celery 队列(由 Celery 自动创建)

RPUSH celery somekey 'somevalue'

但是当执行查询时,我的 worker 系统性地崩溃了,我得到一个 Unrecoverable error: JSONDecodeError。它似乎接收到 None 而不是应该解码的 JSON 字符串。

知道我需要执行什么 Redis 查询或者我需要对这个(但简单的)工作脚本进行哪些更改吗?

最佳答案

celery 使用 serializers在客户和 worker 之间传输数据。每条消息都需要序列化,并且有一个 content_type header ,描述用于对其进行编码的序列化方法。

这是使用 json 序列化的示例消息。

{'body': 'W1sxXSwge30sIHsiY2FsbGJhY2tzIjogbnVsbCwgImVycmJhY2tzIjogbnVsbCwgImNoYWluIjogbnVsbCwgImNob3JkIjogbnVsbH1d',
 'content-encoding': 'utf-8',
 'content-type': 'application/json',
 'headers': {'argsrepr': '(1,)',
  'eta': None,
  'expires': None,
  'group': None,
  'id': '5ce9a8d8-41d7-47a4-9074-beedabd88dcc',
  'kwargsrepr': '{}',
  'lang': 'py',
  'origin': 'gen5339@pavilion',
  'parent_id': None,
  'retries': 0,
  'root_id': '5ce9a8d8-41d7-47a4-9074-beedabd88dcc',
  'task': 't.wait',
  'timelimit': [None, None]},
 'properties': {'body_encoding': 'base64',
  'correlation_id': '5ce9a8d8-41d7-47a4-9074-beedabd88dcc',
  'delivery_info': {'exchange': '', 'routing_key': 'celery'},
  'delivery_mode': 2,
  'delivery_tag': '0177eb65-344e-4b1c-ab5f-8e2f5d75b8d3',
  'priority': 0,
  'reply_to': 'a5c611b8-18b3-3bbb-b598-c3757f06c4fd'}}

Celery worker 需要接收指定格式的消息。您不能将某些值推送给 broker(redis) 并期望 celery 执行它。

使用 python 对您的任务进行排队。

from mymodule import nextexec
payload ='some payload'
nextexec.delay(payload)

关于python-3.x - 测试 Celery 工作的 Redis 命令,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/43056659/

相关文章:

python - 回调 celery apply_async

python - 比较条件条件中的对象,Python 3

python - 如何在Python中优化打印帕斯卡三角形?

python - OptionMenu 命令函数需要参数

Docker Stack/Compose Redis 实例不持久化数据

performance - 可能在 redis 中使用 lua 将存储在集合中的所有 key 作为哈希列表返回?

python - Pandas groupby object.aggregate 具有自定义列表操作功能

amazon-web-services - AWS ElastiCache Redis - 一个可用区中的所有节点以避免数据传输成本

python - 用 celery 运行 "unique"任务

django - 使用来自 django 应用程序的 kafka 消息