python - 如何将实例方法用作 APscheduler 的作业并结合持久数据存储[未绑定(bind)方法错误]

标签 python mongodb methods apscheduler unbound

[注:使用Python 2.7和jupyter notebook环境]

我希望能够使用 APscheduler 将实例方法安排为作业,并将这些作业存储在持久性数据库中 [在本例中为 mongodb]。

然而,当尝试这样做时,我遇到了以下错误:unbound method use_variable() must be called with Job instance as first argument (got NoneType instance instead)

在此之前我已经成功: (a) 将实例方法调度为作业 (b) 在 mongodb 中存储作业

但是,我无法让这两者一起工作。

什么有效:

(a) 将实例方法调度为作业的基本示例...

from apscheduler.schedulers.background import BackgroundScheduler

class Job:
    def __init__(self, config):
        self.variable = config['variable']

    def use_variable(self):
        print(self.variable)

job=Job({'variable': 'test'})
scheduler = BackgroundScheduler()
scheduler.add_job(job.use_variable, trigger='interval', seconds=5)
scheduler.start()

按预期运行,每 5 秒打印一次“test”。

什么中断:

但是,一旦我添加了一个工作商店...

from apscheduler.schedulers.background import BackgroundScheduler
from apscheduler.jobstores.mongodb import MongoDBJobStore

class Job:
    def __init__(self, config):
        self.variable = config['variable']

    def use_variable(self):
        print(self.variable)

job=Job({'variable': 'test'})

jobstores = {'default': MongoDBJobStore(
    database='apscheduler', collection='jobs')}
scheduler = BackgroundScheduler(jobstores=jobstores)
scheduler.add_job(job.use_variable, trigger='interval', seconds=5)
scheduler.start()

作业第一次成功运行,但由于“未绑定(bind)方法”导致所有后续调用错误。我猜测作业实例没有从作业存储中检索,因此没有“self”传递给 use_variable 方法...

来自文档...

来自文档:在绑定(bind)方法的情况下,将未绑定(bind)版本 (YourClass.method_name) 作为目标函数传递给 add_job() 并将类实例作为第一个参数(因此它作为 self 传递参数)

因此,我试过: scheduler.add_job(Job.use_variable, args=[job] trigger='interval', seconds=5)

也不走运。

当前的解决方法

我目前正在使用以下解决方法,但是它很老套,我想找到一个更优雅的解决方案!

def proxy(job):
   job.use_variable()

scheduler.add_job(proxy, args=[job], trigger='interval', seconds=5)

按照 Alex 的建议使用阻塞调度程序进行更新

最简单的示例代码:

from apscheduler.schedulers.blocking import BlockingScheduler
from apscheduler.jobstores.mongodb import MongoDBJobStore

class Job:
    def __init__(self, config):
        self.variable = config['variable']

    def use_variable(job):
        print(job.variable)


job = Job({'variable': 'test'})

jobstores = {'default': MongoDBJobStore(
    database='apscheduler', collection='jobs')}
scheduler = BlockingScheduler(jobstores=jobstores)

def my_listener(event):
    if (event.exception):
        print('Exception: {}'.format(event.exception))

scheduler.add_listener(my_listener)
scheduler.add_job(job.use_variable, trigger='interval',seconds=5)
// Also tried:
// scheduler.add_job(Job.use_variable, args=[job] trigger='interval',seconds=5)

scheduler.start()

重现错误。第一次调用成功打印“测试”,但后续调用遇到未绑定(bind)错误 [至少在 jupyter notebook 环境中]...

最佳答案

我在 APScheduler 3.6.0 中发现了一个错误,它只在 Python 2.7 上出现。它试图在方法调度方面做正确的事情,但在 py2.7 上它做错了。我将在下一个版本中修复此问题。与此同时,您可以继续使用您的解决方法。

关于python - 如何将实例方法用作 APscheduler 的作业并结合持久数据存储[未绑定(bind)方法错误],我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/56927791/

相关文章:

Python3 DictWriter 在 Linux 上使用 CRLF 而不是 LF

python - 如何从 Matplotlib 格式化等高线

python - 寻找更简单的解决方案来分组和选择 Pandas 中的行

javascript - Mongoose 中的自定义排序功能

mongodb - Mongo-Express 未显示所有具有 root 管理员权限的数据库

methods - 在函数内部定义结构的方法

python - 测试 python 函数的非确定性行为

node.js - 使用 Mongoose 删除子文档返回错误?

javascript - 无法调用未定义的方法但警报(方法)仍在工作

java - 如何以编程方式分析java类中使用的其他类的方法调用和对象