"""Notifies users when laundry is done.""" # pylint: disable=import-error import configparser import logging import math import smtplib import time from datetime import datetime from email.mime.text import MIMEText import requests from microstacknode.hardware.accelerometer.mma8452q import MMA8452Q RECIPIENT_EMAILS = ['ianonavy@gmail.com'] ALERT_EMAIL_TEXT = """Hi, Your laundry is done. Or maybe your roomate's. I don't know. Regretfully, Ianonavy Bot """ G_RANGE = 2 INTERVAL = 0.005 # seconds WINDOW_SIZE = 1000 # intervals THRESHOLD = 0.025 # G's MIN_NOTIFICATION_PERIOD = 60 # seconds CONFIG_FILE_PATH = '/home/pi/laundry-notifier/laundry_notifier/laundry_notifier.conf' LOG_FORMAT = '%(asctime)-15s %(message)s' def average(a_list): """Computes the average of `a_list`. :param a_list: The list of numbers :return: The average of the list """ return sum(a_list) * 1.0 / len(a_list) def stdev(a_list): """Computes the standard deviation for `a_list`. :param a_list: The list of numbers :return: The standard deviation of the list """ if not a_list: return 0 avg = average(list(a_list)) variance = [(item - avg) ** 2 for item in a_list] return math.sqrt(average(list(variance))) def enqueue(sliding_window, item): """Enqueues an item onto the sliding window. Drops items if adding the item would exceed `WINDOW_SIZE` items. :param sliding_window: List object for the window :param item: Item to add """ if len(sliding_window) == WINDOW_SIZE: sliding_window.pop(0) sliding_window.append(item) def amplitude_stdev(sliding_window): """Gets the standard deviations for each axis of the sliding window. :param sliding_window: List of dicts with 'x', 'y', and 'z' keys :return: List of dicts with the standard deviations of the 'x', 'y', and 'z' """ standard_deviations = {} for key in ('x', 'y', 'z'): values = [measurement[key] for measurement in sliding_window] standard_deviations[key] = stdev(values) return standard_deviations class Notifier(object): """Notifies users.""" def __init__(self, last_sent_at=datetime(1970, 1, 1, 0, 0, 0)): self.last_sent_at = last_sent_at config = configparser.ConfigParser() config.read(CONFIG_FILE_PATH) notifications_section = config['notifications'] self._email_username = notifications_section['email_username'] self._email_password = notifications_section['email_password'] self._iftttkey = notifications_section['ifttt_key'] def _notify_email(self, recipient_email_address): """Notifies a user via email. :param recipient_email_address: The recipient email address """ logging.info("Alerting " + recipient_email_address) msg = MIMEText(ALERT_EMAIL_TEXT) msg['Subject'] = 'Laundry Finished' msg['From'] = self._email_username msg['To'] = recipient_email_address # Send the message via our own SMTP server. server = smtplib.SMTP('smtp.gmail.com:587') server.starttls() server.login(self._email_username, self._email_password) server.send_message(msg) server.quit() def _notify_ifttt(self): """Notifies If This, Then That.""" url_format = 'https://maker.ifttt.com/trigger/laundry_done/with/key/{}' requests.get(url_format.format(self._iftttkey)) def send_notifications(self): """Sends notifications to users. Will do nothing if called more frequently than `MIN_NOTIFICATION_PERIOD` seconds. """ time_since_last_sent = datetime.now() - self._last_sent_at seconds_since_last_sent = time_since_last_sent.seconds logging.info("Sending notification after %ds", seconds_since_last_sent) # limit frequency of notifications if seconds_since_last_sent > MIN_NOTIFICATION_PERIOD: for email in RECIPIENT_EMAILS: self._notify_email(email) # Notify if this, then that self._notify_ifttt() self._last_sent_at = datetime.now() class LaundryStateDetector(object): """Detects the current state of the laundry machine.""" def __init__(self, accelerometer): self.accelerometer = accelerometer self._sliding_window = [] self._dryer_state = 'off' self._notifier = Notifier() def step(self): """Computes the next step of the sliding window.""" g_values = self.accelerometer.get_xyz() enqueue(self._sliding_window, g_values) sliding_stdev = amplitude_stdev(self._sliding_window) if g_values: if sliding_stdev['x'] < THRESHOLD: # Notify recipients on state transitions from 'on' to 'off' if self._dryer_state == 'on': logging.info('Dryer turned off; sliding stdev is %f', sliding_stdev['x']) self._notifier.send_notifications() self._dryer_state = 'off' else: # Log state transitions from 'off' to 'on' if self._dryer_state == 'off': logging.info('Dryer turned on; sliding stdev is %f', sliding_stdev['x']) self._dryer_state = 'on' def run(self): """Main loop for the laundry state detector.""" logging.info('Started laundry notifier') while True: self.step() time.sleep(INTERVAL) def main(): """Main function to run as a script.""" logging.basicConfig(format=LOG_FORMAT) logging.getLogger().setLevel(logging.INFO) with MMA8452Q() as accelerometer: # Configure accelerometer accelerometer.standby() accelerometer.set_g_range(G_RANGE) accelerometer.activate() # Settle time.sleep(INTERVAL) # Run the laundry state detector. detector = LaundryStateDetector(accelerometer) detector.run() if __name__ == '__main__': main()