python - 每 X 秒作为用户输入运行 python 脚本

标签 python timer scheduled-tasks scheduling apscheduler

我有以下 python 代码:

#!/usr/bin/python
# -*- coding: utf-8 -*-
import json
import requests
import sys
import smtplib
from email.mime.multipart import MIMEMultipart
from email.mime.text import MIMEText

#Function to get the health of the cluster
def getClusterHealth():
    try:
        response = requests.get('http://127.0.0.1:2379/health')
        data = response.json()
        if data['health']=="true":
            print("Cluster is healthy")
            getClusterMetrics()
        elif data['health']!="true":
            print ("Cluster is not healthy")
            sendEmail()
    except requests.exceptions.ConnectionError as e:
        print e
        print("Cluster is down")
        sendEmail()

#Function to get the netrics of the cluster
def getClusterMetrics():
    try:
        response = requests.get('http://127.0.0.1:2379/metrics')
        with open('clusterMetrics.txt','w') as f:
            f.write(response.text)
            f.close()
            print("Cluster Metrics saved in file: clusterMetrics.txt")
    except requests.exceptions.ConnectionError as e:
        print e
        sendEmail()

#Function to send emails in case of failures
def sendEmail():
    msg = MIMEText("etcd Cluster Down Sample Mail")
    sender = "etcd Cluster - 10.35.14.141"
    recipients = ["sample@email.com"]
    msg["Subject"] = "etcd Cluster Monitoring Test Multiple ID"
    msg['From'] = sender
    msg['To'] = ", ".join(recipients)
    s = smtplib.SMTP('localhost')
    s.sendmail(sender,recipients,msg.as_string())
    s.quit()

if __name__ == "__main__":

    if(len(sys.argv) < 2):
        print("Usage : python etcdMonitoring.py [health|metrics]")
    elif(sys.argv[1] == "health"):
        getClusterHealth()
    elif(sys.argv[1] == "metrics"):
        getClusterMetrics()

我想在用户输入时运行整个脚本 X 秒。但是,使用此输入,我想在我的函数中执行一些基于计时器的功能。它应该按照我的意愿显示输出/做某些事情(不能在这里显示代码),但是内部函数将针对用户给定的输入倍数运行。例如,如果输入为 30,则脚本应每 30 秒运行一次,但我应该能够每两分钟检查一次集群是否健康。

最佳答案

我前段时间遇到过这样的问题,把你想每X秒执行一次的代码放在一个函数中然后使用threading python模块将它作为一个线程运行,但是不要忘记关闭线程在关闭程序之前使用 thread.cancel()thread.exit()(使用 atexit 或您自己的实现),因为函数将继续如果你不这样做,就执行。

import atexit
import threading
X = input("Repeat code every ? sec")
def code(X):
  thread = threading.Timer(float(X), code)
  thread.start()
  '''
  your
  code
  '''
def exit():
  thread.cancel() #or thread.exit()
atexit.register(exit)

更多信息:
Run certain code every n seconds
How to close a thread from within?

关于python - 每 X 秒作为用户输入运行 python 脚本,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/51102631/

相关文章:

python - 防止 numpy 将数值向上转换为字符串

python - 使用 pyplot.scatter() 方法绘制元组列表(x 坐标、y 坐标、颜色)

python - 使用补丁添加 OverflowError 副作用 - Python

python - 每秒运行 Python 函数一分钟

spring - 管理插入更新计划任务

amazon-web-services - 安排 AWS EC2 Spot 实例创建/终止

python - 使用 python 正则表达式在 HTML 中查找随机句子

java - 在主类中使用定时器

当设置为长时间后时,Python 计时器脚本不会运行

python - python中的非阻塞事件调度