Files
@ 7b33cc2da5f0
Branch filter:
Location: HydroBot/hydrobot-software/alerts.py
7b33cc2da5f0
6.3 KiB
text/x-python
Added new gas sensing and other stuff from a long time ago
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 | import smtplib
from email.mime.text import MIMEText
from abc import ABCMeta, abstractmethod
import configparser
import logging
class AlertManager:
def __init__(self, database, config):
self.logger = logging.getLogger('hydrobot')
self.database = database
self.from_addr = config.get("alerts", "from_address")
self.to_addr = config.get("alerts", "to_address")
self.mail_server = config.get("alerts", "mail_server")
self.username = config.get("alerts", "username")
self.password = config.get("alerts", "password")
self.alerts = []
self.load_alerts(config)
def load_alerts(self, config):
for section in config.sections():
if "alert" in section and section != "alerts":
type = config.get(section, "type")
if type == "output_feedback":
name = config.get(section, "name")
output_module = config.get(section, "output_module")
output_type = config.get(section, "output_type")
output_number = config.get(section, "output_number")
input_module = config.get(section, "input_module")
input_type = config.get(section, "input_type")
input_number = config.get(section, "input_number")
on_threshold = config.getfloat(section, "on_threshold")
off_threshold = config.getfloat(section, "off_threshold")
hysteresis = config.getfloat(section, "hysteresis")
self.alerts.append(OutputFeedbackAlert(self, name, output_module, output_type, output_number, input_module, input_type, input_number, on_threshold, off_threshold, hysteresis))
elif type == "measurement":
name = config.get(section, "name")
input_module = config.get(section, "input_module")
input_type = config.get(section, "input_type")
input_number = config.get(section, "input_number")
high_threshold = config.getfloat(section, "high_threshold")
low_threshold = config.getfloat(section, "low_threshold")
hysteresis = config.getfloat(section, "hysteresis")
self.alerts.append(MeasurementAlert(self, name, input_module, input_type, input_number, high_threshold, low_threshold, hysteresis))
def send_alert(self, message):
msg = MIMEText(message)
msg['Subject'] = "HydroBot Alert"
msg['From'] = self.from_addr
msg['To'] = self.to_addr
try:
server = smtplib.SMTP(self.mail_server)
server.ehlo()
server.starttls()
server.login(self.username, self.password)
server.send_message(msg)
server.quit()
self.logger.info("Sent email alert: " + message)
except:
self.logger.error("Sending email alert failed!")
def evaluate_alerts(self):
for alert in self.alerts:
alert.evalutate()
class Alert(metaclass=ABCMeta):
def __init__(self, manager, name):
self.manager = manager
self.name = name
@abstractmethod
def evalutate(self):
pass
class OutputFeedbackAlert(Alert):
def __init__(self, manager, name, output_module, output_type, output_number, input_module, input_type, input_number, on_threshold, off_threshold, hysteresis):
super(OutputFeedbackAlert, self).__init__(manager, name)
self.output_module = output_module
self.output_type = output_type
self.output_number = output_number
self.input_module = input_module
self.input_type = input_type
self.input_number = input_number
self.on_threshold = on_threshold
self.off_threshold = off_threshold
self.hysteresis = hysteresis
def evalutate(self):
#get output status
output_status = self.manager.database.read_value(self.output_module, self.output_type, int(self.output_number))
print("Output status: " + str(output_status))
#get input status
input_status = self.manager.database.read_value(self.input_module, self.input_type, int(self.input_number))
print("Input status: " + str(input_status))
if input_status != None:
if output_status == 1:
if input_status < self.on_threshold:
#alert!
#print("Alert! " + self.name + " did not turn on!")
self.manager.send_alert("Alert! " + self.name + " did not turn on!")
if output_status == 0:
if input_status > self.off_threshold:
#alert!
#print("Alert! " + self.name + " did not turn off!")
self.manager.send_alert("Alert! " + self.name + " did not turn off!")
class MeasurementAlert(Alert):
def __init__(self, manager, name, input_module, input_type, input_number, high_threshold, low_threshold, hysteresis):
super(MeasurementAlert, self).__init__(manager, name)
self.input_module = input_module
self.input_type = input_type
self.input_number = input_number
self.high_threshold = high_threshold
self.low_threshold = low_threshold
self.hysteresis = hysteresis
def evalutate(self):
#get measurement
measurement = self.manager.database.read_value(self.input_module, self.input_type, int(self.input_number))
print("Measurement: " + str(measurement))
if measurement != None:
if measurement > self.high_threshold:
#alert!
#print("Alert! " + self.name + " is too high!")
self.manager.send_alert("Alert! " + self.name + " is too high!")
if measurement < self.low_threshold:
#alert!
#print("Alert! " + self.name + " is too low!")
self.manager.send_alert("Alert! " + self.name + " is too low!")
|