diff --git a/hydrobot.py b/hydrobot.py --- a/hydrobot.py +++ b/hydrobot.py @@ -1,39 +1,30 @@ #!/usr/bin/env python import sys -import os import time -import _thread -import ast +import signal import configparser -import datetime -from canard import can, messaging -from canard.hw import socketcan -from canard.file import jsondb -from canard.utils import queue -from influxdb import InfluxDBClient -from influxdb import SeriesHelper -from apscheduler.schedulers.background import BackgroundScheduler -import PID -from pytz import timezone import logging -import signal import network import network_interface from message import HydroBotMessage import module +import protocol +from database import Database +from scheduler import Scheduler + #TODO -#fix temperature offsets #add periodic output refresh #add output cleanup on shutdown +#validate config settings on load -# load config file +#load config file config = configparser.ConfigParser(allow_no_value = True) config.read("hydrobot.conf") -# set up logger +#set up logger logger = logging.getLogger('hydrobot') log_level = config.get("system", "log_level") levels = {"CRITICAL" : 50, "ERROR" : 40, "WARNING" : 30, "INFO" : 20, "DEBUG" : 10, "NOTSET" : 0} @@ -41,268 +32,42 @@ logger.setLevel(levels[log_level]) fh = logging.FileHandler('hydrobot.log') fh.setLevel(logging.INFO) -formatter = logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s') +formatter = logging.Formatter('[%(asctime)s][%(levelname)s][%(module)s][%(funcName)s] %(message)s') #('%(asctime)s - %(name)s - %(levelname)s - %(message)s') fh.setFormatter(formatter) logger.addHandler(fh) - - -logger.info("Starting HydroBot!") - -class MySeriesHelper(SeriesHelper): - - # Meta class stores time series helper configuration. - class Meta: - # The client should be an instance of InfluxDBClient. - #client = myclient - # The series name must be a string. Add dependent fields/tags in curly brackets. - series_name = '{measurement}' - # Defines all the fields in this time series. - fields = ['value'] - # Defines all the tags for the series. - tags = ['measurement'] - # Defines the number of data points to store prior to writing on the wire. - bulk_size = 5 - # autocommit must be set to True when using bulk_size - autocommit = True - - -class Database: - - def __init__(self): - host = config.get("database", "host") - port = config.get("database", "port") - username = config.get("database", "username") - password = config.get("database", "password") - database = config.get("database", "database") - self.name = config.get("system", "name") - try: - self.client = InfluxDBClient(host, port, username, password, database) - MySeriesHelper.Meta.client = self.client - MySeriesHelper.Meta.series_name = self.name + '.{measurement}' - except: - logger.error("Could not connect to database") - - # To manually submit data points which are not yet written, call commit: - #MySeriesHelper.commit() - - def log_data(self, msgdb, message): - try: - if message == msgdb.AirSense: - MySeriesHelper(measurement='air_temp', value=(float)(message.Temperature.value)) - MySeriesHelper(measurement='air_humidity', value=(float)(message.Humidity.value)) - MySeriesHelper(measurement='air_pressure', value=(float)(message.Pressure.value)) - if message == msgdb.RelayDriveIn: - MySeriesHelper(measurement='water_flow_rate', value=(float)(message.FlowRate.value)) - MySeriesHelper(measurement='input_1', value=(float)(message.Input1.value)) - MySeriesHelper(measurement='input_2', value=(float)(message.Input2.value)) - MySeriesHelper(measurement='input_3', value=(float)(message.Input3.value)) - MySeriesHelper(measurement='input_4', value=(float)(message.Input4.value)) - if message == msgdb.WaterSense: - MySeriesHelper(measurement='water_level', value=(float)(message.PercentFull.value)) - MySeriesHelper(measurement='water_temp', value=(float)(message.Temperature.value)) - except: - logger.error("Could not connect to database") - -class CanBus: - - def __init__(self, database, interface): - - # Bring up CAN interface (maybe do this in a systemd service file) - # Passing random arguments to sudo is super dangerous - os.system("sudo ip link set " + interface + " up type can bitrate 500000") - - self.database = database - self.dev = socketcan.SocketCanDev(interface) - self.queue = queue.CanQueue(self.dev) - - parser = jsondb.JsonDbParser() - self.msgdb = parser.parse('hydrobot_can.json') - - self.temp_msg = self.msgdb.AirSense - self.relay_msg = self.msgdb.RelayDriveIn - self.relay_send_msg = self.msgdb.RelayDriveOut - - def start(self): - self.queue.start() - _thread.start_new_thread(self.process_can, ()) +ch = logging.StreamHandler(sys.stdout) +ch.setLevel(logging.DEBUG) +formatter = logging.Formatter('[%(asctime)s][%(levelname)s][%(module)s][%(funcName)s] %(message)s') #('%(asctime)s - %(name)s - %(levelname)s - %(message)s') +ch.setFormatter(formatter) +logger.addHandler(ch) - def process_can(self): - while True: - frame = self.queue.recv() - if frame != None: - message = self.msgdb.decode(frame) - if message: - logger.debug("Received CAN message! ID: " + hex(message.id)) - self.database.log_data(self.msgdb, message) - - def send_can(self): - self.relay_send_msg.Nothing.value = 0 - if self.relay_send_msg.Output1.value == 0: - self.relay_send_msg.Output1.value = 1 - else: - self.relay_send_msg.Output1.value = 0 - self.relay_send_msg.Output2.value = 1 - self.relay_send_msg.Output3.value = 1 - self.relay_send_msg.Output4.value = 1 - logger.debug("Send CAN message! ID: " + hex(self.relay_send_msg.id) + " Data: " + str(self.relay_send_msg.data)) - self.queue.send(self.relay_send_msg.encode()) - - def set_output(self, module, output, state): - logger.info("Output! " + module + " " + output + " " + str(state)) - msg = self.msgdb.lookup_message(module) - msg.lookup_signal(output).value = state - self.queue.send(msg.encode()) - - if msg.lookup_signal(output) == msg.Output1: - try: - MySeriesHelper(measurement='output_1', value=state) - except: - logger.error("Could not connect to database") - if msg.lookup_signal(output) == msg.Output2: - try: - MySeriesHelper(measurement='output_2', value=state) - except: - logger.error("Could not connect to database") - if msg.lookup_signal(output) == msg.Output3: - try: - MySeriesHelper(measurement='output_3', value=state) - except: - logger.error("Could not connect to database") - if msg.lookup_signal(output) == msg.Output4: - try: - MySeriesHelper(measurement='output_4', value=state) - except: - logger.error("Could not connect to database") - - -class Scheduler: - - def __init__(self, canbus): - self.canbus = canbus - self.apscheduler = BackgroundScheduler() - self.apscheduler.configure(timezone=timezone('US/Eastern')) - - def start(self): - self.apscheduler.start() - - for section in config.sections(): - if "timer" in section: - items = config.items(section) - for item in items: - if item[0] == 'trigger': - trigger = item[1] - if item[0] == 'module': - module = item[1] - if item[0] == 'output': - output = item[1] - if item[0] == 'on_time': - on_time = ast.literal_eval(item[1]) - if item[0] == 'off_time': - off_time = ast.literal_eval(item[1]) - if item[0] == 'on_duration': - on_duration = ast.literal_eval(item[1]) - if item[0] == 'off_duration': - off_duration = ast.literal_eval(item[1]) - if trigger == 'cron': - on_job = self.apscheduler.add_job(self.canbus.set_output, trigger, [module, output, 1], day = on_time[0], day_of_week = on_time[1], hour = on_time[2], minute = on_time[3], second = on_time[4]) - off_job = self.apscheduler.add_job(self.canbus.set_output, trigger, [module, output, 0], day = off_time[0], day_of_week = off_time[1], hour = off_time[2], minute = off_time[3], second = off_time[4]) - - #evalute the current state of the cron trigger and set output on if needed - if on_job.next_run_time > off_job.next_run_time: - self.canbus.set_output(module, output, 1) - else: - self.canbus.set_output(module, output, 0) - - if trigger == 'interval': - self.apscheduler.add_job(self.interval_off, trigger, [module, output, section + "_off", section + "_on", on_duration], id = section + "_off", weeks = off_duration[0], days = off_duration[1], hours = off_duration[2], minutes = off_duration[3], seconds = off_duration[4]) - timer = self.apscheduler.add_job(self.interval_on, trigger, [module, output, section + "_off", section + "_on", off_duration], id = section + "_on", weeks = on_duration[0], days = on_duration[1], hours = on_duration[2], minutes = on_duration[3], seconds = on_duration[4]) - timer.pause() - self.interval_off(module, output, section + "_off", section + "_on", on_duration) - - if "pid" in section: - items = config.items(section) - for item in items: - if item[0] == 'module_out': - module_out = item[1] - if item[0] == 'output': - signal_out = item[1] - if item[0] == 'module_in': - module_in = item[1] - if item[0] == 'input': - signal_in = item[1] - if item[0] == 'kp': - kp = item[1] - if item[0] == 'ki': - ki = item[1] - if item[0] == 'kd': - kd = item[1] - if item[0] == 'rate': - rate = item[1] - if item[0] == 'setpoint': - setpoint = item[1] - - pid = PID.PID(float(setpoint)) - pid.SetKp(kp) - pid.SetKi(ki) - pid.SetKd(kd) - pid.Reset() - self.apscheduler.add_job(self.process_PID, "interval", [pid, module_out, signal_out, module_in, signal_in], seconds = int(rate)) - - - def interval_off(self, module, output, timer_off, timer_on, on_duration): - self.canbus.set_output(module, output, 1) - self.apscheduler.get_job(timer_on).reschedule("interval", weeks = on_duration[0], days = on_duration[1], hours = on_duration[2], minutes = on_duration[3], seconds = on_duration[4]) - self.apscheduler.get_job(timer_on).resume() - self.apscheduler.get_job(timer_off).pause() - logger.info("Turning " + timer_on + "!") - - def interval_on(self, module, output, timer_off, timer_on, off_duration): - self.canbus.set_output(module, output, 0) - self.apscheduler.get_job(timer_off).reschedule("interval", weeks = off_duration[0], days = off_duration[1], hours = off_duration[2], minutes = off_duration[3], seconds = off_duration[4]) - self.apscheduler.get_job(timer_off).resume() - self.apscheduler.get_job(timer_on).pause() - logger.info("Turning " + timer_off + "!") - - def process_PID(self, pid, module_out, signal_out, module_in, signal_in): - msg = self.canbus.msgdb.lookup_message(module_in) - input_reading = msg.lookup_signal(signal_in).value - input_reading = 19 - pid_out = pid.Update(input_reading) - if pid_out > 0: - self.canbus.set_output(module_out, signal_out, 1) - run_time = datetime.datetime.now() + datetime.timedelta(milliseconds=pid_out) - self.apscheduler.add_job(self.canbus.set_output, "date", run_date=run_time, args=[module_out, signal_out, 0]) - -def func(): - print(1) - +logger.info('Logger initialized') def main(argv): + + logger.info("Starting HydroBot!") if len(argv) < 1: - print("Error: please specify a can interface") + logger.error("Please specify a can interface") return 1 - database = Database() - + database = Database(config) - module_network = network.Network(logger) - module_network.add_interface(network_interface.CanBusNetworkInterface(module_network, argv[0], logger)) + module_network = network.Network(database, config) module_network.start_all_interfaces() - message = HydroBotMessage(0x0001, 0x81, 0x0007, 0x0001, 0x0111) - module_network.send_message(message) - + scheduler = Scheduler(module_network, config) + scheduler.start() - #canbus = CanBus(database, argv[0]) - #canbus.start() - - #scheduler = Scheduler(canbus) - #scheduler.start() + #module = module_network.module_list.lookup_module_by_name("RelayDrive1") + #message = HydroBotMessage(module.uuid, (0x80 | protocol.lookup_command_key_by_name("config")), protocol.lookup_data_key_by_name("can_id"), 0, 0x300) + #module.send_message(message) + #message = HydroBotMessage(module.uuid, (0x80 | protocol.lookup_command_key_by_name("config")), protocol.lookup_data_key_by_name("data_rate"), 0, 10) + #module.send_message(message) while True: - time.sleep(0.001) + time.sleep(0.1) def close_program(signum, frame): logger.info("Closing now!") @@ -311,4 +76,4 @@ def close_program(signum, frame): signal.signal(signal.SIGINT, close_program) if __name__ == "__main__": - sys.exit(main(sys.argv[1:])) + main(sys.argv[1:])