python - Celery/Redis 同一任务并行执行多次

标签 python django redis celery

我有 2 个自定义任务(TaskATaskB),它们都继承自 celery.Task。调度程序时不时地启动 TaskATaskA 每次都启动 NTaskB,参数不同。但出于某种原因,有时具有相同参数的相同 TaskB 同时执行两次,这会导致数据库出现不同的问题。

class TaskA(celery.Task):

    def run(self, *args, **kwargs):
        objects = MyModel.objects.filter(processed=False)\
                                 .values_list('id', flat=True)
        task_b = TaskB()
        for o in objects:
            o.apply_async(args=[o, ])

class TaskB(celery.Task):

    def run(self, obj_id, *args, **kwargs):
        obj = MyModel.objects.get(id=obj_id)
        # do some stuff with obj

我尝试过的事情

我尝试使用 celery.group 希望它能解决这些问题,但我得到的只是错误,说 run 有 2 个参数但没有提供。

这就是我尝试使用 celery.group 启动 TaskB 的方式:

# somewhere in TaskA
task_b = TaskB()
g = celery.group([task_b.s(id) for id in objects])
g.apply_async()

我也这样试过:

# somewhere in TaskA
task_b = TaskB()
g = celery.group([task_b.run(id) for id in objects])
g.apply_async()

g.apply_async() 之前执行任务。

问题

问题是出在我启动任务的方式上还是其他原因?这是正常行为吗?

附加信息

在我的本地机器上,我使用 RabbitMQ 3.3.4 运行 celery 3.1.13,并在服务器上运行 celery 3.1.13 Redis 2.8.9。 在本地机器上我看不到这样的行为,每个任务都执行一次。在服务器上,我看到有 1 到 10 个这样的任务连续执行两次。

这是我在本地机器和服务器上运行 celery 的方式:

celery_beat: celery -A proj beat -l info

celery1: celery -A proj worker -Q default -l info --purge -n default_worker -P eventlet -c 50

celery2: celery -A proj worker -Q long -l info --purge -n long_worker -P eventlet -c 200

可行的解决方法

我根据收到的参数在 TaskB 上引入了一个锁。经过大约 10 个小时的测试,我看到究竟是什么被执行了两次,但是锁防止了数据库冲突。 这确实解决了我的问题,但我仍然想了解为什么会这样。

最佳答案

您是否按照 Using Redis 中的描述设置了 fanout_prefixfanout_patterns celery 的文档?我将 Celery 与 Redis 结合使用,但没有遇到此问题。

关于python - Celery/Redis 同一任务并行执行多次,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/24912447/

相关文章:

python - django 测试文件下载 - "ValueError: I/O operation on closed file"

python - 如何检查哪个 Python 解释器 Spyder 在其控制台上运行?

java - RMI - 线程池子线程套接字权限问题

python - 将字符串转换为具有规范的列表

python - createsuperuser 没有要求用户名

python - Django get_queryset 过滤 pk 上的对象

python - 运行 Django 测试时出现类型错误

Redis Cluster vs Twemproxy - 移动响应

redis - 使用 Redis 跟踪在线用户

python - Dataflow Streaming 使用 Python SDK : Transform for PubSub Messages to BigQuery Output