考虑一下我需要经常向我的客户发送自动报告电子邮件。由于我可能有多个客户,并且一个客户可能需要每日和/或每周报告,因此我想将这些作为 report_config.json
保存在 json 文件中,并使用 python schedule 自动执行
模块。
到目前为止我已经尝试过:
report_config.json
看起来像这样:
{
"cusomer1": [
{
"report_type": "twitter",
"report_frequency": "daily",
"receivers": [
"<a href="https://stackoverflow.com/cdn-cgi/l/email-protection" class="__cf_email__" data-cfemail="acc9d4cdc1dcc0c9ecc9d4cdc1dcc0c982cfc3c1" rel="noreferrer noopener nofollow">[email protected]</a>"
]
},
{
"report_type": "twitter",
"report_frequency": "weekly",
"receivers": [
"<a href="https://stackoverflow.com/cdn-cgi/l/email-protection" class="__cf_email__" data-cfemail="cca9b4ada1bca0a98ca9b4ada1bca0a9e2afa3a1" rel="noreferrer noopener nofollow">[email protected]</a>"
]
}
],
"customer2": {
"report_type": "twitter",
"report_frequency": "daily",
"receivers": [
"<a href="https://stackoverflow.com/cdn-cgi/l/email-protection" class="__cf_email__" data-cfemail="513429303c213d34113429303c213d347f323e3c" rel="noreferrer noopener nofollow">[email protected]</a>"
]
}
}
这就像我想将客户的电子邮件需求保留在列表中作为其在配置文件中的值。然后循环并安排它们。即:
if __name__ == "__main__":
for customer, reports in report_config.items():
for report in reports:
# below is an example of scheduling and not working as I want to achieve right now
schedule.every().days.at('03:00').do(preprare_report,
report['report_type'], report['report_frequency'])
schedule.every().monday.at('03:00').do(preprare_report,
report['report_type'], report['report_frequency'])
while True:
schedule.run_pending()
time.sleep(1)
但正如您所看到的,客户可能只需要每天发送电子邮件。另一个人可能想要每个星期一和每天。但我无法自动化代码的 schedule.every().monday/days
部分,因为它们是方法而不是参数。
我知道我可以用几个 if-else 语句来做我想做的事情。但随着客户及其需求的增长,硬编码将变得难以维护。你知道,这会很痛苦:
if report['day'] == 'monday':
schedule.every().monday.at('03:00')...
elif report['day'] == 'tue':
...
而且日期甚至不是这里唯一的一个参数。那么,关于如何实现全自动系统有什么建议吗?我可以提供您需要的任何其他信息。谢谢。
最佳答案
使用 getattr
函数,您可以自动化获取作业实例。
为了方便起见,我插入了一些类型提示。
from typing import Literal, Optional
import schedule
UnitType = Literal["weeks", "days", "hours", "minutes", "seconds",
"week", "day", "hour", "minute", "second",
"monday", "tuesday", "wednesday", "thursday",
"friday", "saturday", "sunday"]
def get_schedule_job(
unit: UnitType,
interval: int = 1,
time: Optional[str] = None
) -> schedule.Job:
schedule_job: schedule.Job = getattr(schedule.every(interval), unit)
if time is not None:
return schedule_job.at(time)
return schedule_job
report_schedule_config_example = {
"interval": 1,
"unit": "monday",
"time": "03:00"
}
get_schedule_job(**report_schedule_config_example).do(prepare_report)
关于python - 如何通过附加配置自动调度,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/61042391/