python - 如何通过附加配置自动调度

标签 python schedule

考虑一下我需要经常向我的客户发送自动报告电子邮件。由于我可能有多个客户,并且一个客户可能需要每日和/或每周报告,因此我想将这些作为 report_config.json 保存在 json 文件中,并使用 python schedule 自动执行 模块。

到目前为止我已经尝试过:

report_config.json 看起来像这样:

{
    "cusomer1": [
        {
            "report_type": "twitter",
            "report_frequency": "daily",
            "receivers": [
                "<a href="https://stackoverflow.com/cdn-cgi/l/email-protection" class="__cf_email__" data-cfemail="acc9d4cdc1dcc0c9ecc9d4cdc1dcc0c982cfc3c1" rel="noreferrer noopener nofollow">[email protected]</a>"
            ]
        },
        {
            "report_type": "twitter",
            "report_frequency": "weekly",
            "receivers": [
                "<a href="https://stackoverflow.com/cdn-cgi/l/email-protection" class="__cf_email__" data-cfemail="cca9b4ada1bca0a98ca9b4ada1bca0a9e2afa3a1" rel="noreferrer noopener nofollow">[email protected]</a>"
            ]
        }
    ],
    "customer2": {
        "report_type": "twitter",
        "report_frequency": "daily",
        "receivers": [
                "<a href="https://stackoverflow.com/cdn-cgi/l/email-protection" class="__cf_email__" data-cfemail="513429303c213d34113429303c213d347f323e3c" rel="noreferrer noopener nofollow">[email protected]</a>"
            ]
    }
}

这就像我想将客户的电子邮件需求保留在列表中作为其在配置文件中的值。然后循环并安排它们。即:

if __name__ == "__main__":
    for customer, reports in report_config.items():
        for report in reports:
            # below is an example of scheduling and not working as I want to achieve right now
            schedule.every().days.at('03:00').do(preprare_report,
                                                 report['report_type'], report['report_frequency'])
            schedule.every().monday.at('03:00').do(preprare_report,
                                                   report['report_type'], report['report_frequency'])

    while True:
        schedule.run_pending()
        time.sleep(1)

但正如您所看到的,客户可能只需要每天发送电子邮件。另一个人可能想要每个星期一和每天。但我无法自动化代码的 schedule.every().monday/days 部分,因为它们是方法而不是参数。

我知道我可以用几个 if-else 语句来做我想做的事情。但随着客户及其需求的增长,硬编码将变得难以维护。你知道,这会很痛苦:

if report['day'] == 'monday':
    schedule.every().monday.at('03:00')...
elif report['day'] == 'tue':
    ...

而且日期甚至不是这里唯一的一个参数。那么,关于如何实现全自动系统有什么建议吗?我可以提供您需要的任何其他信息。谢谢。

最佳答案

使用 getattr 函数,您可以自动化获取作业实例。

为了方便起见,我插入了一些类型提示。

from typing import Literal, Optional

import schedule

UnitType = Literal["weeks", "days", "hours", "minutes", "seconds",
                   "week", "day", "hour", "minute", "second",
                   "monday", "tuesday", "wednesday", "thursday", 
                   "friday", "saturday", "sunday"]


def get_schedule_job(
        unit: UnitType,
        interval: int = 1,
        time: Optional[str] = None
) -> schedule.Job:
    schedule_job: schedule.Job = getattr(schedule.every(interval), unit)

    if time is not None:
        return schedule_job.at(time)
    return schedule_job


report_schedule_config_example = {
    "interval": 1,
    "unit": "monday",
    "time": "03:00"
}

get_schedule_job(**report_schedule_config_example).do(prepare_report)

关于python - 如何通过附加配置自动调度,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/61042391/

相关文章:

python - Elasticsearch 和日期范围过滤器

python - 如何使用 python 获取 Youtube 视频的持续时间?

python - 包装一个函数,该函数返回一个指向带有 ctypes 的 python 对象的指针

java - ScheduledExecutorService 任务运行晚于预期

java - 用于 Java 的通用分布式调度库

Scala - 在一天中的给定时间安排任务 : run a task everyday at 6pm

python - 用于测试的 MQTT 代理

python - Django 功能性 LiveServerTestCase - 使用 selenium 提交表单后,对象保存到非测试数据库

java - 让主线程等待,直到新的单线程在java中完全执行

iOS iPad 自定义日历控件