[注:使用Python 2.7和jupyter notebook环境]
我希望能够使用 APscheduler 将实例方法安排为作业,并将这些作业存储在持久性数据库中 [在本例中为 mongodb]。
然而,当尝试这样做时,我遇到了以下错误:unbound method use_variable() must be called with Job instance as first argument (got NoneType instance instead)
在此之前我已经成功: (a) 将实例方法调度为作业 (b) 在 mongodb 中存储作业
但是,我无法让这两者一起工作。
什么有效:
(a) 将实例方法调度为作业的基本示例...
from apscheduler.schedulers.background import BackgroundScheduler
class Job:
def __init__(self, config):
self.variable = config['variable']
def use_variable(self):
print(self.variable)
job=Job({'variable': 'test'})
scheduler = BackgroundScheduler()
scheduler.add_job(job.use_variable, trigger='interval', seconds=5)
scheduler.start()
按预期运行,每 5 秒打印一次“test”。
什么中断:
但是,一旦我添加了一个工作商店...
from apscheduler.schedulers.background import BackgroundScheduler
from apscheduler.jobstores.mongodb import MongoDBJobStore
class Job:
def __init__(self, config):
self.variable = config['variable']
def use_variable(self):
print(self.variable)
job=Job({'variable': 'test'})
jobstores = {'default': MongoDBJobStore(
database='apscheduler', collection='jobs')}
scheduler = BackgroundScheduler(jobstores=jobstores)
scheduler.add_job(job.use_variable, trigger='interval', seconds=5)
scheduler.start()
作业第一次成功运行,但由于“未绑定(bind)方法”导致所有后续调用错误。我猜测作业实例没有从作业存储中检索,因此没有“self”传递给 use_variable 方法...
来自文档...
来自文档:在绑定(bind)方法的情况下,将未绑定(bind)版本 (YourClass.method_name) 作为目标函数传递给 add_job() 并将类实例作为第一个参数(因此它作为 self 传递参数)
因此,我试过:
scheduler.add_job(Job.use_variable, args=[job] trigger='interval', seconds=5)
也不走运。
当前的解决方法
我目前正在使用以下解决方法,但是它很老套,我想找到一个更优雅的解决方案!
def proxy(job):
job.use_variable()
scheduler.add_job(proxy, args=[job], trigger='interval', seconds=5)
按照 Alex 的建议使用阻塞调度程序进行更新
最简单的示例代码:
from apscheduler.schedulers.blocking import BlockingScheduler
from apscheduler.jobstores.mongodb import MongoDBJobStore
class Job:
def __init__(self, config):
self.variable = config['variable']
def use_variable(job):
print(job.variable)
job = Job({'variable': 'test'})
jobstores = {'default': MongoDBJobStore(
database='apscheduler', collection='jobs')}
scheduler = BlockingScheduler(jobstores=jobstores)
def my_listener(event):
if (event.exception):
print('Exception: {}'.format(event.exception))
scheduler.add_listener(my_listener)
scheduler.add_job(job.use_variable, trigger='interval',seconds=5)
// Also tried:
// scheduler.add_job(Job.use_variable, args=[job] trigger='interval',seconds=5)
scheduler.start()
重现错误。第一次调用成功打印“测试”,但后续调用遇到未绑定(bind)错误 [至少在 jupyter notebook 环境中]...
最佳答案
我在 APScheduler 3.6.0 中发现了一个错误,它只在 Python 2.7 上出现。它试图在方法调度方面做正确的事情,但在 py2.7 上它做错了。我将在下一个版本中修复此问题。与此同时,您可以继续使用您的解决方法。
关于python - 如何将实例方法用作 APscheduler 的作业并结合持久数据存储[未绑定(bind)方法错误],我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/56927791/