python - 向 Django 中长时间运行的方法发送信号

标签 python django signals celery

我想向 Celery 中长时间运行的任务发送“暂停”信号,我正在尝试找出最好的方法来做到这一点。在 View 中,我可以从数据库中提取对象的实例并告诉其保存,但它与 Celery 中的对象实例不同。该对象不会回来检查它是否已暂停。

从长时间运行的类和任务中轮询数据库感觉很奇怪且不切实际,因此我正在考虑向我的实例发送一条消息。我考虑过使用 pubsub 但我更喜欢使用 Django 信号,因为它已经是一个 Django 项目。我可能会以错误的方式处理这个问题。

这是一个不起作用的示例:

模型.py

class LongRunningClass(models.Model):
    is_paused = models.BooleanField(default=False)
    processed_files = models.IntegerField(default=0)
    total_files = models.IntegerField(default=100)

    def long_task(self):
        remaining_files = self.total_files - self.processed_files
        for i in xrange(remaining_files):
            if not self.is_paused:
                self.processed_files += 1
                time.sleep(1)

        # Task complete, let's save.
        self.save()

View .py

def pause_task(self, pk):
     lrc = LongRunningClass.objects.get(pk=pk)
     lrc.is_paused = True
     lrc.save()
     return HttpResponse(json.dumps({'is_paused': lrc.is_paused}))


def resume_task(self, pk):
    lrc = LongRunningClass.objects.get(pk=pk)
    lrc.is_paused = False
    lrc.save()

    # Pretend this is a Celery task
    lrc.long_task()

因此,如果我修改 models.py 以使用信号,我可以添加这些行,但它仍然不太有效:

pause_signal = django.dispatch.Signal(providing_args=['is_paused'])

@django.dispatch.receiver(pause_signal)
def pause_callback(sender, **kwargs):
    if 'is_paused' in kwargs:
        sender.is_paused = kwargs['is_paused']
        sender.save()

这也不会影响已经运行的实例化类。如何告诉任务中运行的模型实例暂停?

最佳答案

Celery 任务是一个单独的进程。 Django 信号是标准的“观察者”模式,它在一个线程内工作,因此无法使用信号组织线程之间的通信。您需要从数据库加载对象以了解其属性是否已更改。

class LongRunningClass(models.Model):
    is_paused = models.BooleanField(default=False)
    processed_files = models.IntegerField(default=0)
    total_files = models.IntegerField(default=100)

    def get_is_paused(self):
        db_obj = LongRunningClass.objects.get(pk=self.pk)
        return db_obj.is_paused

    def long_task(self):
        remaining_files = self.total_files - self.processed_files
        for i in xrange(remaining_files):
            if not self.get_is_paused:
                self.processed_files += 1
                time.sleep(1)

    # Task complete, let's save.
    self.save() 

设计上不太好 - 你最好将 long_task 移动到其他地方,并使用新加载的 LongRunningClass 实例进行操作,但它会完成这项工作。如果您不想经常打扰数据库,您可以在此处添加一些内存缓存。

顺便说一句:我不是 100% 确定,但您可能在这里遇到另一个设计问题。当您以这种周期运行很长时间的任务时,这种情况相当罕见。考虑从程序中删除循环(您有队列!)。看看:

@celery.task(run_every=2minutes)  # adding XX files for processing every XX minutes
def scheduled_task(lr_pk):
    lr = LongRunningClass.objects.get(pk=lr_pk)
    if not lr.is paused:
        remaining_files = self.total_files - self.processed_files
        for i in xrange(lr.files_per_iteration):
            process_file.delay(lr.pk,i)

@celery.task(rate=1/m,queue='process_file')  # processing each file
def process_file(lr_pk,i):
    #  do somthing with i
    lr = LongRunningClass.objects.get(pk=lr_pk)
    lr.processed_files += 1
    lr.save() 

您必须设置 celerybeat,并为此类任务创建单独的队列,才能实现此解决方案。但结果是,您将对程序有很多控制权 - 速度、并行执行,并且您的代码不会因 sleep(1) 而挂起。如果为每个文件创建另一个模型,您可以控制处理哪些文件、不处理哪些文件、处理错误等。

关于python - 向 Django 中长时间运行的方法发送信号,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/15982080/

相关文章:

python - 如何将请求对象传递给 Django 类 View 中的 method_decorator?

python - 如何在线程 python 程序中捕获 SIGINT?

python - 如何在 PyQT5 中暂停/播放线程?

Python 抓取谷歌金融

python - 具有三个列表的逻辑运算符

python - 本地pypi包依赖于python pypi服务器

python - Reportlab 不在 Google App Engine 上的 Django 应用程序中生成图表

django - Windows 上的最小生产 Django 服务器

c - 如何使用 C 中的信号将多个值从子进程传递到父进程?

c - Windows 控制台应用程序 - 关闭事件的信号