Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Change from cycle based delays to time delay #2

Merged
merged 8 commits into from
Mar 1, 2024
89 changes: 48 additions & 41 deletions PVCharge.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,9 @@
import os
import math
import time
import logging
import tomllib
from time import sleep
from routines import PowerUsage, TeslaCommands, MqttCallbacks
import routines

# Load config file
with open("config.toml", mode="rb") as fp:
Expand All @@ -24,19 +24,18 @@


# Initialize classes
Energy = PowerUsage()
Car = TeslaCommands()
Messages = MqttCallbacks()
Energy = routines.PowerUsage()
Car = routines.TeslaCommands()
Messages = routines.MqttCallbacks()

# Control loop variables
car_is_charging = False
report_due_fast = 0
report_due_slow = 0
report_delay_fast = round((60 / (config["FAST_POLLING"] + 1.5))) # Print a report roughly every minute
report_delay_slow = round((60 / config["SLOW_POLLING"])) # Print a report roughly every minute
start_charging_count = 0
stop_charging_count = 0
stop_charging_time = 0
start_charging_time = 0
report_time = 0
while True:
# Record loop start time
loop_time = time.time()
# Check if we are allowed to charge
charge_tesla = Messages.calculate_charge_tesla()
logging.debug(f"Current calculated charge enable: {charge_tesla}")
Expand All @@ -46,7 +45,7 @@
if not charge_tesla:
for poll in range(0, config["SLOW_POLLING"]), config["SLOW_POLLING_CHK"]: # While waiting ensure that the car isn't charging
if prevent_non_solar_charge:
logging.info("Slow poll wait, ensure car isn't charging")
logging.debug("Slow poll wait, ensure car isn't charging")
Energy.sample_sensor()
if round(Energy.charge_rate_sensor) >= config["MIN_CHARGE"]:
if Car.stop_charging(): # Stop if it is charging
Expand All @@ -55,25 +54,26 @@
else:
logging.warning("Slow poll, Car discovered charging and was NOT stopped successfully")
else:
logging.info("Slow poll wait, ignore charging")
sleep(config["SLOW_POLLING_CHK"])
if report_due_slow >= report_delay_slow:
status = Energy.status_report(charge_tesla, car_is_charging, new_sample=True)
logging.info(f"Slow poll {status}")
Messages.client.publish(topic=config["TOPIC_STATUS"], payload=status, qos=1)
report_due_slow = 0
report_due_slow += 1
logging.debug("Slow poll wait, ignore charging")
# Wait configured time before reporting status
report_is_due, report_time = routines.check_elapsed_time(loop_time, report_time, config["REPORT_DELAY"])
if report_is_due:
status = Energy.status_report(charge_tesla, car_is_charging, new_sample=True)
logging.info(f"{status}")
Messages.client.publish(topic=config["TOPIC_STATUS"], payload=status, qos=1)
report_time = 0
time.sleep(config["SLOW_POLLING_CHK"])

if charge_tesla: # If we are allowed to charge
if car_is_charging: # Is the car currently charging?
if Energy.sufficient_generation(config["MIN_CHARGE"]):
# Reset stop counter
stop_charging_count = 0
# Reset stop time
stop_charging_time = 0
# Calculate new charge rate
# Use math.floor() on calculate_charge_rate to ensure we are always just "under" the available PV generation capacity
# Use round() on charge_rate_sensor to prevent constant requests when on the edge of a value
new_charge_rate = math.floor(Energy.calculate_charge_rate(new_sample=False))
logging.info(f"Car charging, new rate calculated: {new_charge_rate}, current rate: {round(Energy.charge_rate_sensor)}")
logging.debug(f"Car charging, new rate calculated: {new_charge_rate}, current rate: {round(Energy.charge_rate_sensor)}")
if new_charge_rate != round(Energy.charge_rate_sensor):
# Set new charge rate
if Car.set_charge_rate(new_charge_rate):
Expand All @@ -91,41 +91,47 @@
Messages.client.publish(topic=config["TOPIC_CHARGE_RATE"], payload=config["MIN_CHARGE"], qos=1)
else:
logging.warning(f"Car charging, Available Energy Reduced, new rate was NOT successfully set")
else: # We are already at min charge, begin stopping sequence
stop_charging_count += 1
logging.info(f"Car charging, Available Energy Reduced, charging at min rate, stopping count: {stop_charging_count}")
if stop_charging_count >= 30:

else: # We are already at min charge
# Wait configured time before stopping
waited_long_enough, stop_charging_time = routines.check_elapsed_time(loop_time, stop_charging_time, config["DELAYED_STOP_TIME"])
if waited_long_enough:
if Car.stop_charging():
logging.info(f"Car charging, Available Energy Reduced, charging was successfully stopped")
car_is_charging = False
stop_charging_count = 0
stop_charging_time = 0
else:
logging.warning(f"Car charging, Available Energy Reduced, charging was NOT successfully stopped")
else:
logging.info(f"Car charging, Available Energy Reduced, charging at min rate, stopping in: {round(config['DELAYED_STOP_TIME'] - (loop_time - stop_charging_time))} seconds")

else: # Car isn't charging, should it be?
if Energy.sufficient_generation(config["MIN_CHARGE"]): # If we have enough sun to charge
if round(Energy.charge_rate_sensor) < config["MIN_CHARGE"]: # Make sure car isn’t already charging
start_charging_count += 1
logging.info(f"Car is NOT charging, Energy is Available, starting count: {start_charging_count}")
if start_charging_count >= 5:
# Wait configured time before starting
waited_long_enough, start_charging_time = routines.check_elapsed_time(loop_time, start_charging_time, config["DELAYED_START_TIME"])
if waited_long_enough:
if Car.wake():
logging.info(f"Car is NOT charging, Energy is Available, car woken successfully")
sleep(5) # Wait until car is awake
time.sleep(5) # Wait until car is awake
if Car.start_charging():
logging.info(f"Car Started Charging Successfully")
sleep(10) # Wait until charging is fully started
time.sleep(10) # Wait until charging is fully started
if Energy.verify_new_charge_rate(config["MIN_CHARGE"]):
logging.info(f"Charge Rate is greater than min charge")
car_is_charging = True
start_charging_count = 0
start_charging_time = 0
# Optionally we could set a new charge rate here
else:
logging.warning(f"Car Charging NOT Started Successfully")
else:
logging.warning(f"Car was NOT woken successfully")
else:
logging.info(f"Car is NOT charging, Energy is Available, starting in: {round(config['DELAYED_START_TIME'] - (loop_time - start_charging_time))} seconds")

else: # Car is already charging, set the flag
car_is_charging = True
start_charging_count = 0
start_charging_time = 0

else: # Sun isn't generating enough power to charge
if prevent_non_solar_charge: # If true, prevent after-hours charging
Expand All @@ -146,12 +152,13 @@
logging.info(f"Charge Stopping, did NOT stop successfully")
car_is_charging = False # Clear the flag even if it fails

if report_due_fast >= report_delay_fast:
# Wait configured time before reporting status
report_is_due, report_time = routines.check_elapsed_time(loop_time, report_time, config["REPORT_DELAY"])
if report_is_due:
status = Energy.status_report(charge_tesla, car_is_charging, new_sample=True)
logging.info(f"Fast poll {status}")
logging.info(f"{status}")
Messages.client.publish(topic=config["TOPIC_STATUS"], payload=status, qos=1)
report_due_fast = 0
report_due_fast += 1
report_time = 0

# Main loop delay
sleep(config["FAST_POLLING"])
# Control loop delay
time.sleep(config["FAST_POLLING"])
14 changes: 9 additions & 5 deletions example_config.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
# Configuration file
LOG_FILE = 'PVCharge.log'
LOG_LEVEL = "INFO" # Default INFO, change to DEBUG to diagnose issues
LOG_FILE = 'PVCharge.log' # Log file name to use
LOG_LEVEL = "INFO" # Default INFO, change to DEBUG to diagnose issues
PREVENT_NON_SOLAR_CHARGE = "False" # Default for after-hours charging, unless changed via MQTT

# MQTT Control topics
TOPIC_PREVENT_NON_SOLAR_CHARGE = "topic_base/prevent_non_solar_charge"
Expand All @@ -12,8 +13,11 @@ TOPIC_STATUS = "topic_base/status"
TOPIC_CHARGE_RATE = "topic_base/new_charge_rate"

# Control loop parameters
SLOW_POLLING = 30 # Charging disabled, control topic check interval (seconds)
SLOW_POLLING_CHK = 5 # Charging disabled, prevent non-solar charge check interval (seconds)
FAST_POLLING = 1 # Charging enabled, update rate (seconds)
MIN_CHARGE = 7 # Slowest allowed charge rate (Amps)
MAX_CHARGE_LIMIT = 80 # Max charge limit (%)
SLOW_POLLING = 30 # Charging disabled, control topic check interval (seconds)
SLOW_POLLING_CHK = 5 # Charging disabled, prevent non-solar charge check interval (seconds)
FAST_POLLING = 1 # Charging enabled, loop delay (seconds)
DELAYED_START_TIME = 10 # When Energy is Available how long do we wait before starting charge (seconds)
DELAYED_STOP_TIME = 90 # When Available Energy is Reduced how long do we wait before stopping charge (seconds)
REPORT_DELAY = 60 # Send status string to MQTT every x (seconds), delayed by as much as SLOW_POLLING_CHK when charging disabled
23 changes: 19 additions & 4 deletions routines.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,13 +2,14 @@
import sys
import subprocess
import math
import time
import logging
from time import sleep
import tomllib
from dotenv import load_dotenv
from egauge import webapi
import paho.mqtt.client as mqtt
from egauge.webapi.device import Register, Local
import paho.mqtt.client as mqtt


# Load parameters from .env
load_dotenv()
Expand Down Expand Up @@ -83,7 +84,7 @@ def verify_new_charge_rate(self, new_charge_rate):
if round(self.charge_rate_sensor) >= new_charge_rate:
logging.debug(f"New charge rate verified")
return True
sleep(0.5)
time.sleep(0.5)
logging.debug(f"New charge rate NOT verified")
return False

Expand Down Expand Up @@ -160,6 +161,17 @@ def call_sub_error_handler(cmd):
return False
return True

def check_elapsed_time(loop_time, compare_time, wait_time):
if compare_time == 0:
compare_time = time.time() # Set counter to current time
return False, compare_time
elif (loop_time - compare_time) >= wait_time:
# Compare current loop time to first time
return True, compare_time
else:
# We haven't waited long enough, keep waiting
return False, compare_time


class MqttCallbacks:
"""Class to handle MQTT"""
Expand All @@ -173,7 +185,10 @@ def __init__(self):
self.topic_teslamate_plugged_in = config["TOPIC_TESLAMATE_PLUGGED_IN"]
self.topic_teslamate_battery_level = config["TOPIC_TESLAMATE_BATTERY_LEVEL"]
self.max_charge_limit = config["MAX_CHARGE_LIMIT"]
self.var_topic_prevent_non_solar_charge = False
if config["PREVENT_NON_SOLAR_CHARGE"] == "True":
self.var_topic_prevent_non_solar_charge = True
else:
self.var_topic_prevent_non_solar_charge = False
self.var_topic_teslamate_geofence = False
self.var_topic_teslamate_plugged_in = False
self.var_topic_teslamate_battery_level = 0
Expand Down