打造基于Python 自动检测windows服务运行状况,包括检测服务状态和端口探测,以及异常情况通过email报警的脚本
1 入口程序
checkServices.py
# -*- encoding: utf-8 -*- # blog.cuixh.com import json import os import time import win32service import win32serviceutil import win32api import socket from Logger import Logger import threading from SendMail import SendMail log = Logger('.\\log\\' + time.strftime("%Y%m%d", time.localtime()) + '-exception.txt', level='debug') #服务配置文件 with open('config.json', 'r', encoding='utf-8') as f: ServiceName = json.load(f) #端口配置文件 with open('port.json', 'r', encoding='utf-8') as f1: PortName = json.load(f1) #mail配置参数和脚本运行配置 with open('mail.json', 'r', encoding='utf-8') as f2: Mail = json.load(f2) if not os.path.exists(".\\log\\"): os.mkdir(".\\log\\") def port_status(ip, port): server = socket.socket(socket.AF_INET, socket.SOCK_STREAM) try: server.connect((ip, port)) log.logger.info(str(port) + ' 端口正常') server.close() except Exception as err: log.logger.error(str(port) + " 端口关闭 原因:%s" % err) s = SendMail(Mail['smtpServer'], Mail['senDer'], Mail['password']) s.Send_email_text(str(port) +" 端口关闭 原因:%s" % err, Mail['Title'], Mail['mailList']) def GetSvcStatus(svcname): svcstatusdic = { # The service continue is pending. win32service.SERVICE_CONTINUE_PENDING: "continue pending", # The service pause is pending. win32service.SERVICE_PAUSE_PENDING: "pause pending", # The service is paused. win32service.SERVICE_PAUSED: "paused", # The service is running. win32service.SERVICE_RUNNING: "running", # The service is starting. win32service.SERVICE_START_PENDING: "start pending", # The service is stopping. win32service.SERVICE_STOP_PENDING: "stop pending", # The service is not running. win32service.SERVICE_STOPPED: "stopped" } status = win32serviceutil.QueryServiceStatus(svcname) if status: return svcstatusdic.get(status[1], "unknown") else: return "error" def step_port(): for i in PortName: port_status("127.0.0.1", int(PortName[i])) def main(): svcname = ServiceName for service in svcname: status = GetSvcStatus(svcname[service]) if status == "running": log.logger.info("{} current status : {}".format(svcname[service], status)) elif status == "stopped" or status == "paused": log.logger.info("{} current status : {}".format(svcname[service], status)) log.logger.info(svcname[service] + " 服务没有启动,开始启动服务中...") s = SendMail(Mail['smtpServer'], Mail['senDer'], Mail['password']) s.Send_email_text(svcname[service] + " 服务没有启动,开始启动服务中...", Mail['Title'], Mail['mailList']) # 打印服务状态 win32serviceutil.StartService(svcname[service]) win32api.Sleep(1000) status = GetSvcStatus(svcname[service]) log.logger.info("After Control,{} current status : {}".format(svcname[service], status)) # 关闭文件 log.logger.info('重启 ' + svcname[service] + ' 成功...') s1 = SendMail(Mail['smtpServer'], Mail['senDer'], Mail['password']) s1.Send_email_text('重启 ' + svcname[service] + ' 成功...', Mail['Title'], Mail['mailList']) else: log.logger.error(svcname[service] +" 程序异常无法重启,请查看相关服务和日志!") s2 = SendMail(Mail['smtpServer'], Mail['senDer'], Mail['password']) s2.Send_email_text(svcname[service] + " 程序异常无法重启,请查看相关服务和日志!", Mail['Title'], Mail['mailList']) def thread(name): th = threading.Thread(target=name) th.start() if __name__ == "__main__": while True: thread(main) thread(step_port) time.sleep(Mail['interval']) # 从mail.json 读取interval的值
Logger.py #日志模块
# -*- encoding: utf-8 -*- # blog.cuixh.com import logging from logging import handlers class Logger(object): level_relations = { 'debug': logging.DEBUG, 'info': logging.INFO, 'warning': logging.WARNING, 'error': logging.ERROR, 'crit': logging.CRITICAL } # 日志级别关系映射 def __init__(self, filename, level='info', when='D', backCount=3, fmt='%(asctime)s - %(pathname)s[line:%(lineno)d] - %(levelname)s: %(message)s'): self.logger = logging.getLogger(filename) format_str = logging.Formatter(fmt) # 设置日志格式 self.logger.setLevel(self.level_relations.get(level)) # 设置日志级别 sh = logging.StreamHandler() # 往屏幕上输出 sh.setFormatter(format_str) # 设置屏幕上显示的格式 th = handlers.TimedRotatingFileHandler(filename=filename, when=when, backupCount=backCount, encoding='utf-8') # 往文件里写入#指定间隔时间自动生成文件的处理器 # 实例化TimedRotatingFileHandler # interval是时间间隔,backupCount是备份文件的个数,如果超过这个个数,就会自动删除,when是间隔的时间单位,单位有以下几种: # S 秒 # M 分 # H 小时、 # D 天、 # W 每星期(interval==0时代表星期一) # midnight 每天凌晨 th.setFormatter(format_str) # 设置文件里写入的格式 self.logger.addHandler(sh) # 把对象加到logger里 self.logger.addHandler(th)
SendMail.py #邮件发送模块
# -*- encoding: utf-8 -*- # blog.cuixh.com import smtplib # 发邮件 from email.mime.text import MIMEText # 邮件文本 import time from Logger import Logger log = Logger('.\\log\\' + time.strftime("%Y%m%d", time.localtime()) + '-mail.txt', level='debug') class SendMail: def __init__(self, host, Sender, password): self.host = host # 服务器 self.Sender = Sender # 邮件发送方 self.password = password # 密码 self.mailsever = smtplib.SMTP_SSL(self.host, 465) # 邮件服务器端口25 self.mailsever.login(self.Sender, self.password) def Send_email_text(self, Message, title, maillist): msg = MIMEText(Message) # 转化为邮件文本格式 msg["Subject"] = title # 邮件标题 msg["From"] = self.Sender # 邮件发送者 msg["To"] = "maillist" # 邮件接收方 try: self.mailsever.sendmail(self.Sender, maillist, msg.as_string()) log.logger.info("邮件发送成功") except smtplib.SMTPException as e: log.logger.error('error:%s' % e) def exit(self): self.mailsever.quit()
2 配置文件
config.json #服务配置文件
{ "mysqlService": "mysql" }
port.json # 端口配置文件
{ "mysql":"3306" }
mail.json #邮件配置和循环运行间隔时间
{ "smtpServer": "smtp.163.com", "senDer": "xxx@163.com", "password": "xxxx", "Title": "提示", "mailList": ["xxxxx@qq.com"], "interval" : 60 }
3 用pyinstaller 打包成exe文件
pyinstaller -F checkServices.py
4 放到windows服务器目录中,包括mail.json config.json port.json #注意配置相应的服务和端口以及正确的邮件地址和密码,运行即可