python - celery |发现 Flask 错误 : expected a bytes-like object, AsyncResult

标签 python flask celery typeerror celery-task

我目前正在开发一个 Flask 应用程序(Python 3.6),并且想要集成 celery,因为我有多个长时间运行的后台任务。

编辑: celery 4.1

集成没有问题, celery 任务执行正确,但我无法访问正在运行的任务的当前状态。

celery , flask 设置:

def make_celery(app):
    celery = Celery(app.import_name,
                    backend=app.config["result_backend"],
                    broker=app.config["broker_url"])

    celery.conf.update(app.config)

    TaskBase = celery.Task

    class ContextTask(TaskBase):
        abstract = True

        def __call__(self, *args, **kwargs):
            with app.app_context():
                return TaskBase.__call__(self, *args, **kwargs)

    celery.Task = ContextTask
    return celery


app = Flask(__name__)
app.config["broker_url"] = "redis://localhost:6379"
app.config["result_backend"] = "redis://localhost:6379"
app.config["DB"] = "../pyhodl.sqlite"

celery_app = make_celery(app)

celery 任务:

@celery_app.task(bind=True, name="server.tasks.update_trading_pair")
def update_trading_pair(self, exchange, currency_a, currency_b):
    print(exchange, currency_a, currency_b)
    time.sleep(50)

调用任务并将值存储在字典中:

task_id = update_trading_pair.delay(exchange, currency_a, currency_b)
print("NEW TASK")
print(task_id)
id = exchange_mnemonic + "_" + currency_a + "_" + currency_b
TASK_STATES[id] = task_id

获取任务的状态:

result = update_trading_pair.AsyncResult(TASK_STATES[market.__id__()])
print(result.state)
print(result) # works but only prints the task_id

这是引发错误的地方。当我只打印结果对象时,它只打印task_id。如果我尝试检索当前状态,则会引发以下异常:

TypeError: sequence item 1: expected a bytes-like object, AsyncResult found

最佳答案

说明:

当您调用任务时:

task_id = update_trading_pair.delay(exchange, currency_a, currency_b)

你的变量task_idAsyncResult的一个实例,它不是一个字符串。

因此,您的变量 TASK_STATES[market.__id__()] 也是 AsyncResult 的实例,而它应该是一个字符串。

然后您尝试用它实例化一个 AsyncResult 对象

result = update_trading_pair.AsyncResult(TASK_STATES[market.__id__()])

因此,您正在使用另一个 AsyncResult 对象实例化一个 AsyncResult 对象,而它应该使用字符串实例化。

也许您的困惑来自于您的 print(task_id) 它向您显示了一个字符串,但是当您这样做时,在底层 __str__ 方法中 AsyncResult 对象被调用,如果你在源代码中查看它 here ,

def __str__(self):
    """`str(self) -> self.id`."""
    return str(self.id)

它只是打印 task_id 对象的 id 属性。

解决方案:

您可以通过执行 TASK_STATES[id] = task_id.id 或执行

来修复此问题
result = update_trading_pair.AsyncResult(str(TASK_STATES[market.__id__()]))

关于python - celery |发现 Flask 错误 : expected a bytes-like object, AsyncResult,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/49127024/

相关文章:

python ast vs json for str to dict translation

python 3 |多线程在 thread.start() 声明之前开始

python - 将日期操作应用于整个数据框

python - 在 pandas 中将数据从行旋转到具有特定结构的列

python - 布局中的 Flask css 未加载

python - 什么时候应该使用 Flask.g?

python - Flask instance init中的第一个参数怎么理解?

python - 如何按任务名称检查和取消 Celery 任务

postgresql - 使用sqlalchemy + postgresql时如何在 celery 中回滚异常

python - Celery:每个工作人员的 task_acks_late 的不同设置/向 celery 添加自定义选项