python - Django - 每 x 秒运行一个函数

标签 python django

我正在开发一个 Django 应用程序。我有一个 API 端点,如果需要,它必须执行一个必须重复几次的功能(直到某个条件为真)。我现在的处理方式是——

def shut_down(request):
  # Do some stuff
  while True:
    result = some_fn()
    if result:
      break
    time.sleep(2)

  return True

虽然我知道这是一种糟糕的方法,而且我不应该阻塞 2 秒,但我不知道如何绕过它。
在等待 4 秒后,此方法有效。但我想要一些让循环在后台运行的东西,并在 some_fn 返回 True 时停止。 (另外,肯定 some_fn 会返回 True)

编辑-
阅读 Oz123 的回复给了我一个似乎可行的想法。这就是我所做的 -

def shut_down(params):
    # Do some stuff
    # Offload the blocking job to a new thread

    t = threading.Thread(target=some_fn, args=(id, ), kwargs={})
    t.setDaemon(True)
    t.start()

    return True

def some_fn(id):
    while True:
        # Do the job, get result in res
        # If the job is done, return. Or sleep the thread for 2 seconds before trying again.

        if res:
            return
        else:
            time.sleep(2)

这对我有用。很简单,就是不知道多线程配合Django的效率如何。
如果有人能指出其中的缺陷,欢迎批评。

最佳答案

对于许多小型项目celery是矫枉过正。对于这些项目,您可以使用 schedule ,非常好用。

使用这个库,您可以让任何函数定期执行任务:

import schedule
import time

def job():
    print("I'm working...")

schedule.every(10).minutes.do(job)
schedule.every().hour.do(job)
schedule.every().day.at("10:30").do(job)
schedule.every().monday.do(job)
schedule.every().wednesday.at("13:15").do(job)

while True:
    schedule.run_pending()
    time.sleep(1) 

示例以阻塞方式运行,但是如果你查看FAQ,你会发现你也可以在并行线程中运行任务,这样你就不会阻塞,一旦不再需要就删除任务:

import threading    
import time 

from schedule import Scheduler

def run_continuously(self, interval=1):
    """Continuously run, while executing pending jobs at each elapsed
    time interval.
    @return cease_continuous_run: threading.Event which can be set to
    cease continuous run.
    Please note that it is *intended behavior that run_continuously()
    does not run missed jobs*. For example, if you've registered a job
    that should run every minute and you set a continuous run interval
    of one hour then your job won't be run 60 times at each interval but
    only once.
    """

    cease_continuous_run = threading.Event()

    class ScheduleThread(threading.Thread):

        @classmethod
        def run(cls):
            while not cease_continuous_run.is_set():
                self.run_pending()
                time.sleep(interval)

    continuous_thread = ScheduleThread()
    continuous_thread.setDaemon(True)
    continuous_thread.start()
    return cease_continuous_run


Scheduler.run_continuously = run_continuously

这是一个在类方法中使用的例子:

    def foo(self):
        ...
        if some_condition():
           return schedule.CancelJob  # a job can dequeue it

    # can be put in __enter__ or __init__
    self._job_stop = self.scheduler.run_continuously()

    logger.debug("doing foo"...)
    self.foo() # call foo
    self.scheduler.every(5).seconds.do(
        self.foo) # schedule foo for running every 5 seconds
    
    ...
    # later on foo is not needed any more:
    self._job_stop.set()
    
    ...
    
    def __exit__(self, exec_type, exc_value, traceback):
        # if the jobs are not stop, you can stop them
        self._job_stop.set()
    

关于python - Django - 每 x 秒运行一个函数,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/44896618/

相关文章:

python - 查找元素的上一次出现

python - 我在这个脚本中正确地计算了数组比较吗?

python - 在不知道嵌套级别数的情况下递归迭代到所有嵌套数组的最佳方法?

python - 如何扩展 Django 组并仍然在 Django 用户中使用

python - 如何使用 Django REST 序列化程序对保留键进行验证?

python - Django REST 框架 : get field of related model in serializer

javascript - 将元素 id 从 div 发送到 javascript 文件

python - 如何在 for 循环中使用 django 模板点表示法

python - 使用 selenium 自动爬取

python线程混淆代码 'int'对象不可调用