打造基于Python 自动检测windows服务运行状况,包括检测服务状态和端口探测,以及异常情况通过email报警的脚本
1 入口程序
checkServices.py
# -*- encoding: utf-8 -*-
# blog.cuixh.com
import json
import os
import time
import win32service
import win32serviceutil
import win32api
import socket
from Logger import Logger
import threading
from SendMail import SendMail
log = Logger('.\\log\\' + time.strftime("%Y%m%d", time.localtime()) + '-exception.txt', level='debug')
#服务配置文件
with open('config.json', 'r', encoding='utf-8') as f:
ServiceName = json.load(f)
#端口配置文件
with open('port.json', 'r', encoding='utf-8') as f1:
PortName = json.load(f1)
#mail配置参数和脚本运行配置
with open('mail.json', 'r', encoding='utf-8') as f2:
Mail = json.load(f2)
if not os.path.exists(".\\log\\"):
os.mkdir(".\\log\\")
def port_status(ip, port):
server = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
try:
server.connect((ip, port))
log.logger.info(str(port) + ' 端口正常')
server.close()
except Exception as err:
log.logger.error(str(port) + " 端口关闭 原因:%s" % err)
s = SendMail(Mail['smtpServer'], Mail['senDer'], Mail['password'])
s.Send_email_text(str(port) +" 端口关闭 原因:%s" % err, Mail['Title'], Mail['mailList'])
def GetSvcStatus(svcname):
svcstatusdic = {
# The service continue is pending.
win32service.SERVICE_CONTINUE_PENDING: "continue pending",
# The service pause is pending.
win32service.SERVICE_PAUSE_PENDING: "pause pending",
# The service is paused.
win32service.SERVICE_PAUSED: "paused",
# The service is running.
win32service.SERVICE_RUNNING: "running",
# The service is starting.
win32service.SERVICE_START_PENDING: "start pending",
# The service is stopping.
win32service.SERVICE_STOP_PENDING: "stop pending",
# The service is not running.
win32service.SERVICE_STOPPED: "stopped"
}
status = win32serviceutil.QueryServiceStatus(svcname)
if status:
return svcstatusdic.get(status[1], "unknown")
else:
return "error"
def step_port():
for i in PortName:
port_status("127.0.0.1", int(PortName[i]))
def main():
svcname = ServiceName
for service in svcname:
status = GetSvcStatus(svcname[service])
if status == "running":
log.logger.info("{} current status : {}".format(svcname[service], status))
elif status == "stopped" or status == "paused":
log.logger.info("{} current status : {}".format(svcname[service], status))
log.logger.info(svcname[service] + " 服务没有启动,开始启动服务中...")
s = SendMail(Mail['smtpServer'], Mail['senDer'], Mail['password'])
s.Send_email_text(svcname[service] + " 服务没有启动,开始启动服务中...", Mail['Title'], Mail['mailList'])
# 打印服务状态
win32serviceutil.StartService(svcname[service])
win32api.Sleep(1000)
status = GetSvcStatus(svcname[service])
log.logger.info("After Control,{} current status : {}".format(svcname[service], status))
# 关闭文件
log.logger.info('重启 ' + svcname[service] + ' 成功...')
s1 = SendMail(Mail['smtpServer'], Mail['senDer'], Mail['password'])
s1.Send_email_text('重启 ' + svcname[service] + ' 成功...', Mail['Title'], Mail['mailList'])
else:
log.logger.error(svcname[service] +" 程序异常无法重启,请查看相关服务和日志!")
s2 = SendMail(Mail['smtpServer'], Mail['senDer'], Mail['password'])
s2.Send_email_text(svcname[service] + " 程序异常无法重启,请查看相关服务和日志!", Mail['Title'], Mail['mailList'])
def thread(name):
th = threading.Thread(target=name)
th.start()
if __name__ == "__main__":
while True:
thread(main)
thread(step_port)
time.sleep(Mail['interval']) # 从mail.json 读取interval的值Logger.py #日志模块
# -*- encoding: utf-8 -*-
# blog.cuixh.com
import logging
from logging import handlers
class Logger(object):
level_relations = {
'debug': logging.DEBUG,
'info': logging.INFO,
'warning': logging.WARNING,
'error': logging.ERROR,
'crit': logging.CRITICAL
} # 日志级别关系映射
def __init__(self, filename, level='info', when='D', backCount=3,
fmt='%(asctime)s - %(pathname)s[line:%(lineno)d] - %(levelname)s: %(message)s'):
self.logger = logging.getLogger(filename)
format_str = logging.Formatter(fmt) # 设置日志格式
self.logger.setLevel(self.level_relations.get(level)) # 设置日志级别
sh = logging.StreamHandler() # 往屏幕上输出
sh.setFormatter(format_str) # 设置屏幕上显示的格式
th = handlers.TimedRotatingFileHandler(filename=filename, when=when, backupCount=backCount,
encoding='utf-8') # 往文件里写入#指定间隔时间自动生成文件的处理器
# 实例化TimedRotatingFileHandler
# interval是时间间隔,backupCount是备份文件的个数,如果超过这个个数,就会自动删除,when是间隔的时间单位,单位有以下几种:
# S 秒
# M 分
# H 小时、
# D 天、
# W 每星期(interval==0时代表星期一)
# midnight 每天凌晨
th.setFormatter(format_str) # 设置文件里写入的格式
self.logger.addHandler(sh) # 把对象加到logger里
self.logger.addHandler(th)SendMail.py #邮件发送模块
# -*- encoding: utf-8 -*-
# blog.cuixh.com
import smtplib # 发邮件
from email.mime.text import MIMEText # 邮件文本
import time
from Logger import Logger
log = Logger('.\\log\\' + time.strftime("%Y%m%d", time.localtime()) + '-mail.txt', level='debug')
class SendMail:
def __init__(self, host, Sender, password):
self.host = host # 服务器
self.Sender = Sender # 邮件发送方
self.password = password # 密码
self.mailsever = smtplib.SMTP_SSL(self.host, 465) # 邮件服务器端口25
self.mailsever.login(self.Sender, self.password)
def Send_email_text(self, Message, title, maillist):
msg = MIMEText(Message) # 转化为邮件文本格式
msg["Subject"] = title # 邮件标题
msg["From"] = self.Sender # 邮件发送者
msg["To"] = "maillist" # 邮件接收方
try:
self.mailsever.sendmail(self.Sender,
maillist,
msg.as_string())
log.logger.info("邮件发送成功")
except smtplib.SMTPException as e:
log.logger.error('error:%s' % e)
def exit(self):
self.mailsever.quit()2 配置文件
config.json #服务配置文件
{
"mysqlService": "mysql"
}port.json # 端口配置文件
{
"mysql":"3306"
}mail.json #邮件配置和循环运行间隔时间
{
"smtpServer": "smtp.163.com", "senDer": "xxx@163.com", "password": "xxxx", "Title": "提示", "mailList": ["xxxxx@qq.com"], "interval" : 60
}3 用pyinstaller 打包成exe文件
pyinstaller -F checkServices.py
4 放到windows服务器目录中,包括mail.json config.json port.json #注意配置相应的服务和端口以及正确的邮件地址和密码,运行即可










