python - 如何实现每个工作线程同步以避免与 Celery 上的信号连接的方法出现竞争条件?

标签 python synchronization locking celery race-condition

我正在使用task_preruntask_postrun发出信号来跟踪特定工作人员当时实际执行的任务数量。

每次任务进入时,我都会将文件中的整数加一。当任务离开时,我将其减少一个单位。

我将这些值写入文件。这意味着当两个任务在同一个工作线程下同时启动并且 task_prerrun 信号同时触发并访问同一个文件时,我必须考虑竞争条件。

我将如何处理这个问题? 我可以在全局范围内拥有一个threading.Lock对象吗?这个锁必须在每个工作人员的基础上工作,所以我想声明它是可以的尽管不是很好的做法,但在全局范围内都是如此。

不想想要获取正在处理的任务总数,我想获取该工作人员正在处理的任务数量强>.

The reason why is to protect the instances from being removed when the autoscaling group minimum size changes in an AWS stack... I don't want AWS to kill machines that are still processing tasks.

考虑以下示例:

import os
import time

from celery import Celery
from celery.signals import task_prerun, task_postrun

app = Celery('tasks', broker='pyamqp://guest@localhost/')

# Text file that keeps track of how many tasks are still computing.
counter_file = os.path.join(os.path.dirname(__file__), 'counter.txt')
if not os.path.exists(counter_file):
    with open(counter_file, 'w') as f:
        f.write('0')

@task_prerun.connect
def before(*args, **kwargs):
    """ Open the counter file and increment the value in it. """
    with open(counter_file, 'r+') as f:
        count = int(f.read())
        f.seek(0)
        f.write(str(count + 1))

@task_postrun.connect
def after(*args, **kwargs):
    """ Open the counter file and decrement the value in it. """
    with open(counter_file, 'r+') as f:
        count = int(f.read())
        f.seek(0)
        f.write(str(count - 1))

@app.task
def add(x, y):
    time.sleep(5)
    return x + y

解决方案

我考虑了 @DejanLekic 提出的使用 Inspect 类的解决方案,结果成功了。这是最终的脚本,我使用两台机器将其加载到 celery 中:

# tasks.py

import os
import random
import socket
import threading
import time

from celery import Celery
from celery.signals import task_prerun, task_postrun

app = Celery('tasks', broker=os.getenv('BROKER_ADDR', 'pyamqp://guest@localhost//'))

def get_number_of_tasks_being_executed_by_this_worker(wait_before=0.01):
    time.sleep(wait_before)
    # Do not rely on the worker name, because we are just sure of the hostname, so we
    # cannot use the detination= keyword of the inspect call.
    active_tasks_by_all_workers = app.control.inspect().active()

    # Filter the tasks of the workers on this machine.
    active_tasks_by_this_worker = [
        val for key, val in active_tasks_by_all_workers.items()
        if socket.gethostname() in key
    ]

    # Get the list of tasks of the first (and only, ideally) match.
    active_tasks_by_this_worker = active_tasks_by_this_worker[0] if active_tasks_by_this_worker else []

    return active_tasks_by_this_worker

def check_if_should_protect_from_autoscaling():
    tasks = get_number_of_tasks_being_executed_by_this_worker()

    if tasks:
        print("%d tasks are still running in this worker. Ensure protection is set." % len(tasks))
        # if is_not_protected_against_auto_scaling_group:
        #     set_aws_autoscaling_protection()

    else:
        print("This worker is not executing any tasks. Unsetting protection.")
        # unset_aws_autoscaling_protection()

@task_postrun.connect
def after(*args, **kwargs):
    # Get the number of tasks with a little delay (0.01 seconds suffice), otherwise at
    # this point the current task that executed this method is shown as active.
    threading.Thread(target=check_if_should_protect_from_autoscaling).start()

@app.task
def add(x, y):
    time.sleep(3 * random.random())
    return x + y

我正在从此脚本发送许多任务:

# dispatcher.py

import asyncio

from tasks import add

async def task():
    add.delay(3, 4)


async def main():
    await asyncio.gather(*[task() for i in range(200)])


if __name__ == '__main__':
    asyncio.run(main())

输出日志似乎证实了预期的行为:

[2019-09-23 07:50:28,507: WARNING/ForkPoolWorker-3] 10 tasks are still running in this worker. Ensure protection is set.
[2019-09-23 07:50:28,625: WARNING/ForkPoolWorker-1] 8 tasks are still running in this worker. Ensure protection is set.
[2019-09-23 07:50:28,627: WARNING/ForkPoolWorker-7] 8 tasks are still running in this worker. Ensure protection is set.
[2019-09-23 07:50:28,993: WARNING/ForkPoolWorker-4] 7 tasks are still running in this worker. Ensure protection is set.
[2019-09-23 07:50:29,027: INFO/ForkPoolWorker-2] Task tasks.add[c3af9378-5666-42c3-9a37-5d0720b2065a] succeeded in 1.6377690890221857s: 7
[2019-09-23 07:50:29,204: INFO/ForkPoolWorker-9] Task tasks.add[9ca176ce-1590-4670-9947-4656166d224d] succeeded in 2.7913955969852395s: 7
[2019-09-23 07:50:29,224: INFO/ForkPoolWorker-5] Task tasks.add[38d005bc-ff13-4514-aba0-8601e79e67c8] succeeded in 2.0496858750120737s: 7
[2019-09-23 07:50:29,311: WARNING/ForkPoolWorker-8] 5 tasks are still running in this worker. Ensure protection is set.
[2019-09-23 07:50:29,316: WARNING/ForkPoolWorker-6] 5 tasks are still running in this worker. Ensure protection is set.
[2019-09-23 07:50:29,510: WARNING/ForkPoolWorker-10] 4 tasks are still running in this worker. Ensure protection is set.
[2019-09-23 07:50:30,059: WARNING/ForkPoolWorker-2] 3 tasks are still running in this worker. Ensure protection is set.
[2019-09-23 07:50:30,199: INFO/ForkPoolWorker-3] Task tasks.add[991d984a-4434-47a0-8c98-9508ca980f0b] succeeded in 2.7176807850482874s: 7
[2019-09-23 07:50:30,239: WARNING/ForkPoolWorker-9] 1 tasks are still running in this worker. Ensure protection is set.
[2019-09-23 07:50:30,250: WARNING/ForkPoolWorker-5] 1 tasks are still running in this worker. Ensure protection is set.
[2019-09-23 07:50:31,226: WARNING/ForkPoolWorker-3] This worker is not executing any tasks. Unsetting protection.

一切都好! :D

最佳答案

我们通过使用一些现成的 Celery 功能实现了 Celery 自动缩放(在 AWS 上)。对于您的要求,我们使用 Celery 的控制 API ( https://docs.celeryproject.org/en/latest/reference/celery.app.control.html )。关键是其中的检查部分。 Inspect 类可以采用 destination 参数,这是您要检查的 Celery 节点。我们不使用它,我们想要检查集群中的所有节点,但也许您可能需要以不同的方式进行操作。您应该熟悉此类及其 .active() 方法,该方法将为您提供一组工作人员或整个集群中的事件任务列表(如果未提供目标)。

关于python - 如何实现每个工作线程同步以避免与 Celery 上的信号连接的方法出现竞争条件?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/57998807/

相关文章:

python - 如何在两个不同的字典中组合值,这些字典在 python 中具有相同的键

python - 我尝试求解非线性方程时出错

python - 提高 DNN 模型的准确性

Python 三重数据的数据结构是什么

java - 代码块上的同步

opencv - FFMPEG 将 image2pipe 中的视频与 RTMP 中的音频同步

iOS:触摸 JSON 并检索数据大小

linux - 用于 Linux 的混合互斥库

mysql - 在生产数据库上创建索引 MySQL 5.6.13

sql-server - Sql Server 忽略行锁提示