import configparser import math import smtplib import time from datetime import datetime from email.mime.text import MIMEText import requests from microstacknode.hardware.accelerometer.mma8452q import MMA8452Q EMAIL_USERNAME = 'noreply@ianonavy.com' EMAIL_PASSWORD = 'S2PENjQbO6=cHgchw@CXs.bJ' RECIPIENT_EMAILS = ['ianonavy@gmail.com'] ALERT_EMAIL_TEXT = """Hi, Your laundry is done. Or maybe your roomate's. I don't know. Regretfully, Ianonavy Bot """ G_RANGE = 2 INTERVAL = 0.005 # seconds WINDOW_SIZE = 400 # intervals THRESHOLD = 0.025 # G's MAX_NOTIFICATION_FREQUENCY = 60 # seconds CONFIG_FILE_PATH = '/home/pi/laundry-notifier/laundry_notifier/laundry_notifier.conf' def average(s): return sum(s) * 1.0 / len(s) def stdev(s): if not s: return 0 avg = average(list(s)) variance = map(lambda x: (x - avg) ** 2, s) return math.sqrt(average(list(variance))) def notify_user(recipient_email_address): log("Alerting " + recipient_email_address) msg = MIMEText(ALERT_EMAIL_TEXT) # me == the sender's email address # you == the recipient's email address msg['Subject'] = 'Laundry Finished' msg['From'] = 'pi@localhost' msg['To'] = recipient_email_address # Send the message via our own SMTP server. username = EMAIL_USERNAME password = EMAIL_PASSWORD server = smtplib.SMTP('smtp.gmail.com:587') server.starttls() server.login(username, password) server.send_message(msg) server.quit() def enqueue(sliding_window, item): if len(sliding_window) == WINDOW_SIZE: sliding_window.pop(0) sliding_window.append(item) def amplitude_stdev(sliding_window): standard_deviations = {} for key in ('x', 'y', 'z'): values = [measurement[key] for measurement in sliding_window] standard_deviations[key] = stdev(values) return standard_deviations def send_notifications(last_notification_sent_at, iftttkey): seconds_since_last_notification = \ (datetime.now() - last_notification_sent_at).seconds # Log the time notifications were sent log("Sending notification after %ds" % seconds_since_last_notification) # limit frequency of notifications if seconds_since_last_notification > MAX_NOTIFICATION_FREQUENCY: [notify_user(email) for email in RECIPIENT_EMAILS] # Notify if this, then that requests.get( 'https://maker.ifttt.com/trigger/laundry_done/with/key/%s' % iftttkey) return datetime.now() return last_notification_sent_at def main(): log('started laundry notifier') config = configparser.ConfigParser() config.read(CONFIG_FILE_PATH) iftttkey = config['notifications']['ifttt_key'] with MMA8452Q() as accelerometer: # Configure accelerometer accelerometer.standby() accelerometer.set_g_range(G_RANGE) accelerometer.activate() # Settle time.sleep(INTERVAL) sliding_window = [] dryer_state = 'off' last_notification_sent_at = datetime(1970, 1, 1, 0, 0, 0) while True: g_values = accelerometer.get_xyz() enqueue(sliding_window, g_values) sliding_stdev = amplitude_stdev(sliding_window) if g_values: if sliding_stdev['x'] < THRESHOLD: # Notify recipients on state transitions from 'on' to 'off' if dryer_state == 'on': log('Dryer turned off; sliding stdev is %f' % sliding_stdev['x']) last_notification_sent_at = send_notifications( last_notification_sent_at, iftttkey) dryer_state = 'off' else: # Log state transitions from 'off' to 'on' if dryer_state == 'off': log('Dryer turned on; sliding stdev is %f' % sliding_stdev['x']) dryer_state = 'on' time.sleep(INTERVAL) def log(message): print('%s\t%s' % (datetime.isoformat(datetime.now()), message)) if __name__ == '__main__': main()