From 88a6235dddb0f2bec0ec8f575d087ac542328ad6 Mon Sep 17 00:00:00 2001 From: Ian Adam Naval Date: Sun, 20 Sep 2015 23:46:25 -0400 Subject: [PATCH] Refactor into nice(r) classes --- laundry_notifier/run.py | 204 +++++++++++++++++++++++++--------------- 1 file changed, 130 insertions(+), 74 deletions(-) diff --git a/laundry_notifier/run.py b/laundry_notifier/run.py index 1ce7899..107e083 100644 --- a/laundry_notifier/run.py +++ b/laundry_notifier/run.py @@ -1,3 +1,7 @@ +"""Notifies users when laundry is done.""" + +# pylint: disable=import-error + import configparser import logging import math @@ -22,49 +26,53 @@ G_RANGE = 2 INTERVAL = 0.005 # seconds WINDOW_SIZE = 1000 # intervals THRESHOLD = 0.025 # G's -MAX_EMAIL_FREQUENCY = 60 # seconds -MAX_NOTIFICATION_FREQUENCY = 60 # seconds +MIN_NOTIFICATION_PERIOD = 60 # seconds CONFIG_FILE_PATH = '/home/pi/laundry-notifier/laundry_notifier/laundry_notifier.conf' +LOG_FORMAT = '%(asctime)-15s %(message)s' -def average(s): - return sum(s) * 1.0 / len(s) +def average(a_list): + """Computes the average of `a_list`. + + :param a_list: The list of numbers + :return: The average of the list + """ + return sum(a_list) * 1.0 / len(a_list) -def stdev(s): - if not s: +def stdev(a_list): + """Computes the standard deviation for `a_list`. + + :param a_list: The list of numbers + :return: The standard deviation of the list + """ + if not a_list: return 0 - avg = average(list(s)) - variance = map(lambda x: (x - avg) ** 2, s) + avg = average(list(a_list)) + variance = [(item - avg) ** 2 for item in a_list] return math.sqrt(average(list(variance))) -def notify_user(username, password, recipient_email_address): - logging.info("Alerting " + recipient_email_address) - - msg = MIMEText(ALERT_EMAIL_TEXT) - - # me == the sender's email address - # you == the recipient's email address - msg['Subject'] = 'Laundry Finished' - msg['From'] = 'pi@localhost' - msg['To'] = recipient_email_address - - # Send the message via our own SMTP server. - server = smtplib.SMTP('smtp.gmail.com:587') - server.starttls() - server.login(username, password) - server.send_message(msg) - server.quit() - - def enqueue(sliding_window, item): + """Enqueues an item onto the sliding window. + + Drops items if adding the item would exceed `WINDOW_SIZE` items. + + :param sliding_window: List object for the window + :param item: Item to add + """ if len(sliding_window) == WINDOW_SIZE: sliding_window.pop(0) sliding_window.append(item) def amplitude_stdev(sliding_window): + """Gets the standard deviations for each axis of the sliding window. + + :param sliding_window: List of dicts with 'x', 'y', and 'z' keys + :return: List of dicts with the standard deviations of the 'x', + 'y', and 'z' + """ standard_deviations = {} for key in ('x', 'y', 'z'): values = [measurement[key] for measurement in sliding_window] @@ -72,32 +80,103 @@ def amplitude_stdev(sliding_window): return standard_deviations -def send_notifications(last_notification_sent_at, iftttkey): - seconds_since_last_notification = \ - (datetime.now() - last_notification_sent_at).seconds - logging.info("Sending notification after %ds" % seconds_since_last_notification) - # limit frequency of notifications - if seconds_since_last_notification > MAX_NOTIFICATION_FREQUENCY: - [notify_user(email) for email in RECIPIENT_EMAILS] - # Notify if this, then that - requests.get( - 'https://maker.ifttt.com/trigger/laundry_done/with/key/%s' - % iftttkey) - return datetime.now() - return last_notification_sent_at +class Notifier(object): + """Notifies users.""" + + def __init__(self, last_sent_at=datetime(1970, 1, 1, 0, 0, 0)): + self._last_sent_at = last_sent_at + + config = configparser.ConfigParser() + config.read(CONFIG_FILE_PATH) + notifications_section = config['notifications'] + self._email_username = notifications_section['email_username'] + self._email_password = notifications_section['email_password'] + self._iftttkey = notifications_section['ifttt_key'] + + + def _notify_email(self, recipient_email_address): + """Notifies a user via email. + + :param recipient_email_address: The recipient email address + """ + logging.info("Alerting " + recipient_email_address) + + msg = MIMEText(ALERT_EMAIL_TEXT) + msg['Subject'] = 'Laundry Finished' + msg['From'] = self._email_username + msg['To'] = recipient_email_address + + # Send the message via our own SMTP server. + server = smtplib.SMTP('smtp.gmail.com:587') + server.starttls() + server.login(self._email_username, self._email_password) + server.send_message(msg) + server.quit() + + def _notify_ifttt(self): + """Notifies If This, Then That.""" + url_format = 'https://maker.ifttt.com/trigger/laundry_done/with/key/{}' + requests.get(url_format.format(self._iftttkey)) + + def send_notifications(self): + """Sends notifications to users. + + Will do nothing if called more frequently than + `MIN_NOTIFICATION_PERIOD` seconds. + """ + time_since_last_sent = datetime.now() - self._last_sent_at + seconds_since_last_sent = time_since_last_sent.seconds + logging.info("Sending notification after %ds", seconds_since_last_sent) + # limit frequency of notifications + if seconds_since_last_sent > MIN_NOTIFICATION_PERIOD: + for email in RECIPIENT_EMAILS: + self._notify_email(email) + # Notify if this, then that + self._notify_ifttt() + self._last_sent_at = datetime.now() + + +class LaundryStateDetector(object): + """Detects the current state of the laundry machine.""" + + def __init__(self, accelerometer): + self.accelerometer = accelerometer + self._sliding_window = [] + self._dryer_state = 'off' + self._notifier = Notifier() + + def step(self): + """Computes the next step of the sliding window.""" + g_values = self.accelerometer.get_xyz() + enqueue(self._sliding_window, g_values) + sliding_stdev = amplitude_stdev(self._sliding_window) + if g_values: + if sliding_stdev['x'] < THRESHOLD: + # Notify recipients on state transitions from 'on' to 'off' + if self._dryer_state == 'on': + logging.info('Dryer turned off; sliding stdev is %f', + sliding_stdev['x']) + self._notifier.send_notifications() + self._dryer_state = 'off' + else: + # Log state transitions from 'off' to 'on' + if self._dryer_state == 'off': + logging.info('Dryer turned on; sliding stdev is %f', + sliding_stdev['x']) + self._dryer_state = 'on' + + def run(self): + """Main loop for the laundry state detector.""" + logging.info('Started laundry notifier') + while True: + self.step() + time.sleep(INTERVAL) def main(): - LOG_FORMAT = '%(asctime)-15s %(message)s' + """Main function to run as a script.""" logging.basicConfig(format=LOG_FORMAT) logging.getLogger().setLevel(logging.INFO) - logging.info('Started laundry notifier') - config = configparser.ConfigParser() - config.read(CONFIG_FILE_PATH) - notifications_section = config['notifications'] - iftttkey = notifications_section['ifttt_key'] - email_username = notifications_section['email_username'] - email_password = notifications_section['email_password'] with MMA8452Q() as accelerometer: # Configure accelerometer accelerometer.standby() @@ -107,32 +186,9 @@ def main(): # Settle time.sleep(INTERVAL) - sliding_window = [] - dryer_state = 'off' - last_notification_sent_at = datetime(1970, 1, 1, 0, 0, 0) - - while True: - g_values = accelerometer.get_xyz() - enqueue(sliding_window, g_values) - sliding_stdev = amplitude_stdev(sliding_window) - if g_values: - if sliding_stdev['x'] < THRESHOLD: - # Notify recipients on state transitions from 'on' to 'off' - if dryer_state == 'on': - logging.info('Dryer turned off; sliding stdev is %f' % sliding_stdev['x']) - last_notification_sent_at = send_notifications( - last_notification_sent_at, - email_username, - email_password, - iftttkey) - dryer_state = 'off' - else: - # Log state transitions from 'off' to 'on' - if dryer_state == 'off': - logging.info('Dryer turned on; sliding stdev is %f' % sliding_stdev['x']) - dryer_state = 'on' - - time.sleep(INTERVAL) + # Run the laundry state detector. + detector = LaundryStateDetector(accelerometer) + detector.run() if __name__ == '__main__':