python - 高级 Python 调度程序和 SQLAlchemyJobStore

标签 python

我在 APS 中使用 sqlalchemy 作业存储.使用函数 add_cron_job 也可以将作业添加到 cron。但是数据库表中没有条目。请帮助我:以下是代码

import time
import logging
from threading import Timer
from threading import Lock
from gadgetplatform.classes.utils.utilities import import_module
from apscheduler.scheduler import Scheduler

from apscheduler.jobstores.sqlalchemy_store import SQLAlchemyJobStore

class SchedulerManager(object):
    _instance = None
    _initialised = False
    lock = Lock()
    log = logging.getLogger(__name__)
    jobDict=[]

    def __new__(cls):

        if not cls._instance or not cls._initialised:
            cls.lock.acquire()
            if not cls._instance:
                cls.log.info("creating instance")
                cls._instance = super(SchedulerManager, cls).__new__(cls)
                cls._initialised = True
                cls._instance.init()
                cls.log.info("instance created")
            cls.lock.release()
            cls.log.info("lock released")
            cls.log.info("returning instance")
        return cls._instance    

    def init(self):
        self.sched=Scheduler()
        self.sched.add_jobstore(SQLAlchemyJobStore('mysql://root@localhost/mygola?charset=utf8&use_unicode=0'), 'apschedulerJobs')        

    def addToCron(self,source,time):
        self.log.info("called to add schedular")

        time = time.split(' ')

        m=str(time[0])        
        h=str(time[1])        
        d=str(time[2])        
        mnth=str(time[3])                
        yr=str(time[4])

        func=self.convertStringToFunction(source)
        self.sched.add_cron_job(func, year=yr, month=mnth, day=d, hour=h, minute=m)        
        self.jobDict.append(source)

        self.log.info("added with the time")            

    def removeFromCron(self,source):        
        func=self.convertStringToFunction(source)
        self.sched.unschedule_func(func)

    def start(self):
        self.sched.start()
        self.log.info("Schedular Started")

    def stop(self):
        self.sched.shutdown()

    def getRunningJobs(self):
        return self.jobDict

    def convertStringToFunction(self,source):
        strArr = source.rsplit('.',1)
        mod = import_module(strArr[0])
        func = getattr(mod, strArr[1])
        return func

最佳答案

我从未使用过 APScheduler,但从文档来看,您似乎必须指定要将作业添加到的作业存储。

您可以通过给它指定特殊名称 default 来确保 SQL Alchemy 作业存储是默认的:

self.sched.add_jobstore(SQLAlchemyJobStore('...'), 'default')

或者您可以在添加作业时指定作业商店名称:

self.sched.add_cron_job(func, 
                        jobstore="apschedulerJobs", 
                        year=yr, month=mnth, day=d, hour=h, minute=m)

关于python - 高级 Python 调度程序和 SQLAlchemyJobStore,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/10104682/

相关文章:

python - lxml 中的错误处理编码

python - 最大递归深度误差,与列表理解符号有某种关系

python - Apache Airflow 如何将 xcom_pull() 值转换为 DAG?

python - 确保千克转换为克 pandas

python - 接近 MySQL 中的串行文本文件读取性能

python - Zamzar API 下载失败

python - Pandas df.to_csv() 将字典值保存为字符串。调用 pd.read_csv() 时如何取回字典?

python - Elasticsearch 聚合到 pandas Dataframe

python - 使用类在 python 中使用计数器函数创建顺序搜索

python - Tornado:在运行所有测试之前重置数据库