# SPDX-License-Identifier: LGPL-3.0-or-later # Copyright (C) 2021 github.com/thiswillbeyourgithub/ """Sleep tracker ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ SleepTk is an alarm clock app designed to track body movement throughout the night to optimize sleep. Currently a WIP, more information at https://github.com/thiswillbeyourgithub/sleep_tracker_pinetime_wasp-os """ import wasp import widgets import shell import fonts import math import ppg from array import array from micropython import const # 2-bit RLE, 60x60, 225 bytes, kindly designed by Emanuel Löffler (https://github.com/plan5) icon = ( b'\x02' b'<<' b'?\x11\xd2*\xd2*\xd2*\xd8$\xd82\xca2\xca6' b'\xc4\x02\xc24\xc4\x02\xc24\xc4\x04\xc40\xc4\x04\xc40' b'\xc4\x04\xc40\xc4\x04\xc42\xc2\x04\xc8.\xc2\x04\xc8.' b'\xc2\x04\xc8.\xc2\x04\xc8 \xc4\n\xc2\x06\xc6 \xc4\n' b'\xc2\x06\xc6 \xc4\x08\xc6\x04\xc6 \xc4\x08\xc6\x04\xc6\x1e' b'\xc8\x06\xc6\x04\xc6\x1e\xc8\x06\xc6\x04\xc6\x1e\xc8\x06\xc6\x04' b'\xc6\x1e\xc8\x06\xc6\x04\xc6\x1c\xca\x06\xc6\x04\xc6\x1c\xd6\x04' b'\xc6\x1c\xd6\x04\xc6\x1c\xd6\x04\xc6\x07\xed\x08\xc4\x03\xed\x08' b'\xc4\x03\xed\x08\xc4\x03\xed\x08\xc4"\xc6\x04\xc4\x06\xc2&' b'\xc6\x04\xc4\x06\xc2&\xc6\x04\xc4\x06\xc2&\xc6\x04\xc4\x06' b'\xc2&\xc6\x04\xc6\x04\xc2&\xc6\x04\xc6\x04\xc2(\xc2\x04' b'\xc8\x02\xc2*\xc2\x04\xc8\x02\xc2.\xce.\xce.\xce.' b'\xce*\xd1+\xd1+\xd1+\xd1%\xd7%\xd4\x16\xe6\x16' b'\xe6\x16\xe0\x1c\xe0\x1c\xde\x1e\xde"\xd4(\xd4\x1a' ) # HARDCODED VARIABLES: _ON = const(1) _OFF = const(0) _TRACKING = const(0) _RINGING = const(1) _SETTINGS1 = const(2) _SETTINGS2 = const(3) _FONT = fonts.sans18 _FONT_COLOR = const(0xf800) # red font to reduce eye strain at night _TIMESTAMP = const(946684800) # unix time and time used by wasp os don't have the same reference date ## USER SETTINGS ################################# _KILL_BT = const(1) # set to 0 to disable turning off bluetooth while tracking to save battery # (you have to reboot the watch to reactivate BT, default: 1) _STOP_LIMIT = const(10) # number of times to swipe or press the button to turn off ringing (default: 10) _SNOOZE_TIME = const(300) # number of seconds to snooze for (default: 5 minutes) _FREQ = const(5) # get accelerometer data every X seconds, but process and store them only # every _STORE_FREQ seconds (default: 5) _HR_FREQ = const(300) # how many seconds between heart rate data (default: 300, minimum 120) _STORE_FREQ = const(300) # process data and store to file every X seconds (default: 300) _BATTERY_THRESHOLD = const(15) # under X% of battery, stop tracking and only keep the alarm, set at -200 # or lower to disable (default: 15) _ANTICIPATE_ALLOWED = const(2400) # number of seconds SleepTk can wake you up before the alarm clock you set # only relevant if smart_alarm is enabled. (default: 2400) _GRADUAL_WAKE = array("H", [1, 2, 3, 4, 5, 6, 7, 8, 9, 10]) # nb of minutes before alarm to send a tiny vibration, designed to wake # you more gently. (default: array("H", [1, 2, 3, 4, 5, 6, 7, 8, 9, 10]) ) _MINUTES_TO_FALL_ASLEEP = const(14) # time you take to fall asleep (default: 14, according to https://sleepyti.me/) _CYCLE_LENGTH = const(90) # sleep cycle length in minutes. Currently used only to display best wake up # time but not to compute smart alarm! (default: 90 or 100, according to # https://sleepyti.me/) _SLEEP_GOAL_CYCLE = const(5) # number of sleep cycle you wish to sleep. With _CYCLE_LENGTH this is used # to suggest best wake up time to user when setting the alarm. (default: 5) ################################################## class SleepTkApp(): NAME = 'SleepTk' ICON = icon def __init__(self): wasp.gc.collect() # default button state self._state_alarm = _ON self._state_body_tracking = _OFF self._state_HR_tracking = _OFF self._state_gradual_wake = _ON self._state_smart_alarm = _OFF self._state_spinval_H = _OFF self._state_spinval_M = _OFF self._hrdata = None self._last_HR = _OFF # if _OFF, no HR to write, if "?": error during last HR, else: heart rate self._last_HR_printed = "?" self._last_HR_date = _OFF self._track_HR_once = _OFF self._page = _SETTINGS1 self._currently_tracking = _OFF self._conf_view = _OFF # confirmation view self._smart_offset = _OFF # number of seconds between the alarm you set manually and the smart alarm time self._buff = array("f", [_OFF, _OFF, _OFF]) # contains accelerometer values self._last_touch = int(wasp.watch.rtc.time()) try: shell.mkdir("logs/") except: # folder already exists pass try: shell.mkdir("logs/sleep") except: # folder already exists pass def foreground(self): self.stat_bar = widgets.StatusBar() self.stat_bar.clock = True self._conf_view = _OFF wasp.gc.collect() self._draw() wasp.system.request_event(wasp.EventMask.TOUCH | wasp.EventMask.SWIPE_LEFTRIGHT | wasp.EventMask.SWIPE_UPDOWN | wasp.EventMask.BUTTON) if self._page == _TRACKING and self._track_HR_once: wasp.system.request_tick(1000 // 8) def sleep(self): self._stop_trial = 0 wasp.gc.collect() return True def background(self): wasp.watch.hrs.disable() self._hrdata = None wasp.gc.collect() def _try_stop_alarm(self): """If button or swipe more than _STOP_LIMIT, then stop ringing""" if self._stop_trial + 1 >= _STOP_LIMIT: self._n_vibration = 0 del self._n_vibration wasp.system.cancel_alarm(self._WU_t, self._activate_ticks_to_ring) self._disable_tracking() self.__init__() self.foreground() else: self._stop_trial += 1 draw = wasp.watch.drawable draw.set_color(_FONT_COLOR) draw.string("{} to stop".format(_STOP_LIMIT - self._stop_trial), 0, 70) def press(self, button, state): "stop ringing alarm if pressed physical button" if not state: return self._last_touch = int(wasp.watch.rtc.time()) mute = wasp.watch.display.mute mute(False) if self._page == _RINGING: self._try_stop_alarm() elif self._page == _TRACKING: self._was_touched = 1 # disable pressing to exit, use swipe up instead self._draw() else: wasp.system.navigate(wasp.EventType.HOME) def swipe(self, event): "navigate between settings page" mute = wasp.watch.display.mute mute(False) self._last_touch = int(wasp.watch.rtc.time()) if self._page == _SETTINGS1: if event[0] == wasp.EventType.LEFT: self._page = _SETTINGS2 self._draw() else: return True elif self._page == _SETTINGS2: if event[0] == wasp.EventType.RIGHT: self._page = _SETTINGS1 self._draw() else: return True elif self._page == _RINGING: self._try_stop_alarm() else: return True def touch(self, event): """either start trackign or disable it, draw the screen in all cases""" wasp.gc.collect() draw = wasp.watch.drawable mute = wasp.watch.display.mute mute(False) self._last_touch = int(wasp.watch.rtc.time()) if self._page == _TRACKING: self._was_touched = 1 if self._conf_view is _OFF: if self.btn_off.touch(event): self._conf_view = widgets.ConfirmationView() self._conf_view.draw("Stop tracking?") draw.reset() return else: if self._conf_view.touch(event): if self._conf_view.value: self._disable_tracking() self._page = _SETTINGS1 self._conf_view = _OFF draw.reset() elif self._page == _RINGING: if self.btn_snooz.touch(event): if self._track_HR_once: # if currently tracking HR, stop self._track_HR_once = _OFF self._hrdata = None wasp.watch.hrs.disable() self._page = _TRACKING self._WU_t = int(wasp.watch.rtc.time()) + _SNOOZE_TIME wasp.system.set_alarm(self._WU_t, self._activate_ticks_to_ring) wasp.system.sleep() elif self._page == _SETTINGS1: if self._state_alarm and (self._spin_H.touch(event) or self._spin_M.touch(event)): if self._state_spinval_M == 0 and self._spin_M.value == 55: self._spin_H.value -= 1 elif self._state_spinval_M == 55 and self._spin_M.value == 0: self._spin_H.value += 1 if self._spin_H.value >= 24: self._spin_H.value = 0 elif self._spin_H.value <= -1: self._spin_H.value = 23 self._state_spinval_M = self._spin_M.value self._spin_M.update() self._state_spinval_H = self._spin_H.value self._spin_H.update() if self._state_alarm: self._draw_duration(draw) return elif self.check_al.touch(event): self._state_alarm = self.check_al.state self.check_al.update() return elif self._page == _SETTINGS2: if self._state_body_tracking: if self.btn_HR.touch(event): self.btn_HR.draw() self._state_HR_tracking = self.btn_HR.state return if self._state_alarm: if self.check_grad.touch(event): self._state_gradual_wake = self.check_grad.state self.check_grad.draw() return if self._state_body_tracking: if self.check_smart.touch(event): self._state_smart_alarm = self.check_smart.state self.check_smart.draw() return if self.btn_sta.touch(event): draw.fill() draw.string("Loading", 0, 100) self._start_tracking() elif self.check_body_tracking.touch(event): self._state_body_tracking = self.check_body_tracking.state self.check_body_tracking.draw() if not self._state_body_tracking: self._state_smart_alarm = _OFF self._state_HR_tracking = _OFF self._draw() def _draw_duration(self, draw): draw.set_font(_FONT) if self._page == _SETTINGS1: duration = (self._read_time(self._state_spinval_H, self._state_spinval_M) - wasp.watch.rtc.time()) / 60 - _MINUTES_TO_FALL_ASLEEP assert duration >= _MINUTES_TO_FALL_ASLEEP y = 180 elif self._page == _TRACKING: draw.set_color(_FONT_COLOR) duration = (wasp.watch.rtc.time() - self._track_start_time) / 60 - _MINUTES_TO_FALL_ASLEEP # time slept if duration <= 0: # don't print when not yet asleep return y = 130 draw.string("Total sleep {:02d}h{:02d}m".format( int(duration // 60), int(duration % 60)), 0, y + 20) cycl = duration / _CYCLE_LENGTH cycl_modulo = cycl % 1 draw.string("{} cycles ".format(str(cycl)[0:4]), 0, y) if duration > 30 and not self._track_HR_once: if cycl_modulo > 0.10 and cycl_modulo < 0.90: draw.string("Not rested!", 0, y + 40) else: draw.string("Well rested", 0, y + 40) def _draw(self): """GUI""" mute = wasp.watch.display.mute mute(False) draw = wasp.watch.drawable draw.fill(0) self.stat_bar.draw() draw.set_font(_FONT) draw.set_color(_FONT_COLOR) if self._page == _RINGING: if self._smart_offset != 0: msg = "WAKE UP ({}m early)".format(str(self._smart_offset//60)[0:2]) else: msg = "WAKE UP" draw.string(msg, 0, 50) self.btn_snooz = widgets.Button(x=0, y=90, w=240, h=120, label="SNOOZE") self.btn_snooz.draw() draw.reset() elif self._page == _TRACKING: ti = wasp.watch.time.localtime(self._track_start_time) draw.string('Began at {:02d}:{:02d}'.format(ti[3], ti[4]), 0, 50) if self._state_alarm: word = "Alarm at " if self._state_smart_alarm: word = "Alarm BEFORE " ti = wasp.watch.time.localtime(self._WU_t) draw.string("{}{:02d}:{:02d}".format(word, ti[3], ti[4]), 0, 70) draw.string("Gradual wake: {}".format(True if self._state_gradual_wake else False), 0, 90) else: draw.string("No alarm set", 0, 70) draw.string("data points: {} / {}".format(str(self._data_point_nb), str(self._data_point_nb * _FREQ // _STORE_FREQ)), 0, 110) if self._track_HR_once: draw.string("(ongoing)", 0, 170) if self._state_HR_tracking: draw.string("HR:{}".format(self._last_HR_printed), 160, 170) self.btn_off = widgets.Button(x=0, y=200, w=240, h=40, label="Stop") self.btn_off.update(txt=_FONT_COLOR, frame=0, bg=0) self._draw_duration(draw) elif self._page == _SETTINGS1: # reset spinval values between runs self._state_spinval_H = _OFF self._state_spinval_M = _OFF self.check_al = widgets.Checkbox(x=0, y=40, label="Wake me up") self.check_al.state = self._state_alarm self.check_al.draw() if self._state_alarm: if (self._state_spinval_H, self._state_spinval_M) == (_OFF, _OFF): # suggest wake up time, on the basis of desired sleep goal + time to fall asleep (H, M) = wasp.watch.rtc.get_localtime()[3:5] goal_h = _SLEEP_GOAL_CYCLE * _CYCLE_LENGTH // 60 goal_m = _SLEEP_GOAL_CYCLE * _CYCLE_LENGTH % 60 M += goal_m + _MINUTES_TO_FALL_ASLEEP while M % 5 != 0: M += 1 self._state_spinval_H = ((H + goal_h) % 24 + (M // 60)) % 24 self._state_spinval_M = M % 60 self._spin_H = widgets.Spinner(30, 70, 0, 23, 2) self._spin_H.value = self._state_spinval_H self._spin_H.draw() self._spin_M = widgets.Spinner(150, 70, 0, 59, 2, 5) self._spin_M.value = self._state_spinval_M self._spin_M.draw() if self._state_alarm: self._draw_duration(draw) elif self._page == _SETTINGS2: self.check_body_tracking = widgets.Checkbox(x=0, y=40, label="Movement tracking") self.check_body_tracking.state = self._state_body_tracking self.check_body_tracking.draw() if self._state_body_tracking: self.btn_HR = widgets.Checkbox(x=0, y=80, label="Heart rate tracking") self.btn_HR.state = self._state_HR_tracking self.btn_HR.draw() if self._state_alarm: self.check_grad = widgets.Checkbox(0, 120, "Gradual wake") self.check_grad.state = self._state_gradual_wake self.check_grad.draw() if self._state_body_tracking: self.check_smart = widgets.Checkbox(x=0, y=160, label="Smart alarm") self.check_smart.state = self._state_smart_alarm self.check_smart.draw() self.btn_sta = widgets.Button(x=0, y=200, w=240, h=40, label="Start") self.btn_sta.draw() draw.reset() def _start_tracking(self): # save some memory self.check_al = None self.check_smart = None self.check_body_tracking = None self.check_grad = None self.btn_sta = None self.btn_snooz = None self.btn_off = None self.btn_HR = None self._spin_H = None self._spin_M = None del self.check_al, self.check_smart, self.check_body_tracking, self.check_grad, self.btn_sta, self.btn_snooz, self.btn_off, self.btn_HR, self._spin_H, self._spin_M self._currently_tracking = True # accel data not yet written to disk: self._data_point_nb = 0 # total number of data points so far self._last_checkpoint = 0 # to know when to save to file self._track_start_time = int(wasp.watch.rtc.time()) # makes output more compact self._last_HR_printed = "?" self._was_touched = 0 wasp.watch.accel.reset() # create one file per recording session: self.filep = "logs/sleep/{}.csv".format(str(self._track_start_time + _TIMESTAMP)) f = open(self.filep, "wb") f.write(b"") f.close() # if enabled, add alarm to log accel data in _FREQ seconds if self._state_body_tracking: self.next_al = wasp.watch.rtc.time() + _FREQ wasp.system.set_alarm(self.next_al, self._trackOnce) else: self.next_al = None if self._state_gradual_wake and not self._state_alarm: # fix incompatible settings self._state_gradual_wake = _OFF # setting up alarm if self._state_alarm: self._old_notification_level = wasp.system.notify_level self._WU_t = self._read_time(self._state_spinval_H, self._state_spinval_M) wasp.system.set_alarm(self._WU_t, self._activate_ticks_to_ring) # also set alarm to vibrate a tiny bit before wake up time # to wake up gradually if self._state_gradual_wake: for t in _GRADUAL_WAKE: wasp.system.set_alarm(self._WU_t - int(t*60), self._tiny_vibration) # wake up SleepTk 2min before earliest possible wake up if self._state_smart_alarm: if not SmartAlarm in locals(): raise Exception("SmartAlarm was removed in previous run to save memory. Restart the watch to use it.") self._WU_a = self._WU_t - _ANTICIPATE_ALLOWED - 120 wasp.system.set_alarm(self._WU_a, self._smart_alarm_start) else: SmartAlarm = None del SmartAlarm # don't track heart rate right away, wait a few seconds if self._state_HR_tracking: self._last_HR_date = int(wasp.watch.rtc.time()) + 10 wasp.system.notify_level = 1 # silent notifications # kill bluetooth if _KILL_BT: import ble if ble.enabled(): ble.disable() self._page = _TRACKING self._stop_trial = 0 def _read_time(self, HH, MM): "convert time from spinners to seconds" (Y, Mo, d, h, m) = wasp.watch.rtc.get_localtime()[0:5] HH = self._state_spinval_H MM = self._state_spinval_M if HH < h or (HH == h and MM <= m): d += 1 return wasp.watch.time.mktime((Y, Mo, d, HH, MM, 0, 0, 0, 0)) def _disable_tracking(self, keep_main_alarm=False): """called by touching "STOP TRACKING" or when computing best alarm time to wake up you disables tracking features and alarms""" self._currently_tracking = False if self.next_al: wasp.system.cancel_alarm(self.next_al, self._trackOnce) if self._state_alarm: if keep_main_alarm is False: # to keep the alarm when stopping because of low battery wasp.system.cancel_alarm(self._WU_t, self._activate_ticks_to_ring) for t in _GRADUAL_WAKE: wasp.system.cancel_alarm(self._WU_t - int(t*60), self._tiny_vibration) if self._state_smart_alarm: wasp.system.cancel_alarm(self._WU_a, self._smart_alarm_start) self._state_smart_alarm = _OFF wasp.watch.hrs.disable() self._periodicSave() self._state_HR_tracking = _OFF wasp.gc.collect() def _trackOnce(self): """get one data point of accelerometer every _FREQ seconds, keep the average of each axis then store in a file every _STORE_FREQ seconds""" if self._currently_tracking: buff = self._buff xyz = wasp.watch.accel.accel_xyz() if xyz == (0, 0, 0): wasp.watch.accel.reset() xyz = wasp.watch.accel.accel_xyz() buff[0] += xyz[1] buff[1] += xyz[0] buff[2] -= xyz[2] self._data_point_nb += 1 # add alarm to log accel data in _FREQ seconds self.next_al = wasp.watch.rtc.time() + _FREQ wasp.system.set_alarm(self.next_al, self._trackOnce) self._periodicSave() if wasp.watch.battery.level() <= _BATTERY_THRESHOLD: # strop tracking if battery low self._disable_tracking(keep_main_alarm=True) h, m = wasp.watch.time.localtime(wasp.watch.rtc.time())[3:5] wasp.system.notify(wasp.watch.rtc.get_uptime_ms(), {"src": "SleepTk", "title": "Bat low", "body": "Stopped \ tracking sleep at {}h{}m because your battery went below {}%. Alarm kept \ on.".format(h, m, _BATTERY_THRESHOLD)}) elif self._state_HR_tracking and \ wasp.watch.rtc.time() - self._last_HR_date > _HR_FREQ and \ not self._track_HR_once: self._track_HR_once = _ON wasp.system.wake() if int(wasp.watch.rtc.time()) - self._last_touch > 5: mute = wasp.watch.display.mute mute(True) wasp.system.switch(self) wasp.system.request_tick(1000 // 8) wasp.gc.collect() def _periodicSave(self): """save data to csv with row order: 1. average arm angle 2. elapsed times 3/4. heart rate if present, and/or -1 if screen was woken up by user during that time arm angle formula from https://www.nature.com/articles/s41598-018-31266-z note: math.atan() is faster than using a taylor serie """ buff = self._buff n = self._data_point_nb - self._last_checkpoint if n >= _STORE_FREQ // _FREQ and not self._track_HR_once: buff[0] /= n buff[1] /= n buff[2] /= n if self._last_HR != _OFF: bpm = ",{}".format(str(self._last_HR)) self._last_HR = _OFF else: bpm = "" f = open(self.filep, "ab") f.write("{:7f},{}{}\n".format( math.atan(buff[2] / (buff[0]**2 + buff[1]**2))*180/3.1415926535, # estimated arm angle int(wasp.watch.rtc.time() - self._track_start_time), bpm, ",-1" if self._was_touched else "" ).encode()) f.close() del f buff[0] = 0 # resets x/y/z to 0 buff[1] = 0 buff[2] = 0 self._last_checkpoint = self._data_point_nb self._was_touched = 0 wasp.gc.collect() def _activate_ticks_to_ring(self): """listen to ticks every second, telling the watch to vibrate and completely wake the user up""" wasp.gc.collect() wasp.system.notify_level = self._old_notification_level # restore notification level self._page = _RINGING self._n_vibration = 0 if int(wasp.watch.rtc.time()) - self._last_touch > 5: mute = wasp.watch.display.mute mute(True) wasp.system.wake() wasp.system.switch(self) self._draw() wasp.system.request_tick(period_ms=1000) def tick(self, ticks): """vibrate to wake you up OR track heart rate using code from heart.py""" wasp.gc.collect() wasp.system.switch(self) if self._page == _RINGING: wasp.system.keep_awake() # in 60 vibrations, ramp up from subtle to strong: wasp.watch.vibrator.pulse(duty=max(80 - 1 * self._n_vibration, 20), ms=min(100 + 6 * self._n_vibration, 500)) self._n_vibration += 1 elif self._track_HR_once: wasp.watch.hrs.enable() if self._hrdata is None: self._hrdata = ppg.PPG(wasp.watch.hrs.read_hrs()) t = wasp.machine.Timer(id=1, period=8000000) t.start() wasp.system.keep_awake() if int(wasp.watch.rtc.time()) - self._last_touch > 5: mute = wasp.watch.display.mute mute(True) self._subtick(1) while t.time() < 41666: pass wasp.system.keep_awake() self._subtick(1) while t.time() < 83332: pass wasp.system.keep_awake() self._subtick(1) t.stop() del t wasp.system.keep_awake() if len(self._hrdata.data) >= 240: # 10 seconds passed bpm = self._hrdata.get_heart_rate() bpm = int(bpm) if bpm is not None else None if bpm is None: # in case of invalid data, write it in the file but # keep trying to read HR self._last_HR = "?" self._hrdata = None self._last_HR_printed = self._last_HR elif bpm < 100 and bpm > 40: # if HR was already computed since last periodicSave, # then average the two values if self._last_HR != _OFF and self._last_HR != "?" and isinstance(int, self._last_HR): self._last_HR = (self._last_HR + bpm) // 2 else: self._last_HR = bpm self._last_HR_printed = self._last_HR self._last_HR_date = int(wasp.watch.rtc.time()) self._track_HR_once = _OFF self._hrdata = None wasp.watch.hrs.disable() if int(wasp.watch.rtc.time()) - self._last_touch > 5: wasp.system.sleep() def _subtick(self, ticks): """track heart rate at 24Hz""" self._hrdata.preprocess(wasp.watch.hrs.read_hrs()) def _tiny_vibration(self): """vibrate just a tiny bit before waking up, to gradually return to consciousness""" wasp.gc.collect() if int(wasp.watch.rtc.time()) - self._last_touch > 5: mute = wasp.watch.display.mute mute(True) wasp.system.wake() wasp.system.switch(self) wasp.watch.vibrator.pulse(duty=60, ms=100) if not self._track_HR_once: wasp.system.sleep() def _smart_alarm_start(self): SmartAlarm(self) class SmartAlarm(): def __init__(self, sleeptk): self.sleeptk = sleeptk self._smart_alarm_compute() def _smart_alarm_compute(self): """computes best wake up time from sleep data""" wasp.gc.collect() if not self.sleeptk._smart_alarm_state: t = wasp.watch.time.localtime(wasp.watch.rtc.time()) wasp.system.notify(wasp.watch.rtc.get_uptime_ms(), {"src": "SleepTk", "title": "Smart alarm computation", "body": "Started computation for the smart alarm \ BY MISTAKE at {:02d}h{:02d}m".format(t[3], t[4])}) return mute = wasp.watch.display.mute mute(True) wasp.system.wake() wasp.system.switch(self.sleeptk) t = wasp.watch.time.localtime(wasp.watch.rtc.time()) wasp.system.notify(wasp.watch.rtc.get_uptime_ms(), {"src": "SleepTk", "title": "Starting smart alarm computation", "body": "Starting computation for the smart alarm at {:02d}h{:02d}m".format(t[3], t[4])} ) try: start_time = wasp.watch.rtc.time() # stop tracking to save memory, keep the alarm just in case #self.sleeptk._disable_tracking(keep_main_alarm=True) # read file one character at a time, to get only the 1st # value of each row, which is the arm angle data = array("f") buff = b"" f = open(self.sleeptk.filep, "rb") skip = False while True: char = f.read(1) if char == b",": # start ignoring after the first col skip = True continue if char == b"\n": skip = False # stop skipping because reading a new line data.append(float(buff)) buff = b"" continue if char == b"": # end of file data.append(float(buff)) break if not skip: # digit of arm angle value buff += char f.close() del f, char, buff wasp.gc.collect() wasp.system.keep_awake() earlier, cycle = self.sleeptk._signal_processing(data) WU_t = self.sleeptk._WU_t wasp.gc.collect() # add new alarm wasp.system.set_alarm(max(WU_t - earlier, int(wasp.watch.rtc.time()) + 3), # not before right now, to make sure it rings self.sleeptk._activate_ticks_to_ring) # replace old gentle alarm by another one if self.sleeptk._grad_alarm_state: for t in _GRADUAL_WAKE: wasp.system.cancel_alarm(WU_t - int(t*60), self.sleeptk._tiny_vibration) if earlier + int(t*60) < _ANTICIPATE_ALLOWED: wasp.system.set_alarm(WU_t - earlier - int(t*60), self.sleeptk._tiny_vibration) self.sleeptk._earlier = earlier self.sleeptk._page = _TRACKING wasp.system.notify(wasp.watch.rtc.get_uptime_ms(), {"src": "SleepTk", "title": "Finished smart alarm computation", "body": "Finished computing best wake up time in {:2f}s. Sleep cycle: {:.2f}h".format(wasp.watch.rtc.time() - start_time, cycle) }) if not self.sleeptk._track_HR_once: wasp.system.sleep() except Exception as e: wasp.gc.collect() h, m = wasp.watch.time.localtime(wasp.watch.rtc.time())[3:5] msg = "Exception occured at {:02d}h{:02d}m: '{}'%".format(h, m, str(e)) f = open("smart_alarm_error_{}.txt".format(int(wasp.watch.rtc.time())), "wb") f.write(msg.encode()) f.close() wasp.system.notify(wasp.watch.rtc.get_uptime_ms(), {"src": "SleepTk", "title": "Smart alarm error", "body": msg}) wasp.gc.collect() def _signal_processing(self, data): """signal processing over the data read from the local file""" # take absolute rate of change of data for i in range(len(data)-1): mem = data[i+1] data[i] = abs(mem-data[i]) del i # remove outliers: for x in range(len(data)): data[x] = min(0.005, data[x]) del x wasp.gc.collect() wasp.system.keep_awake() # smoothen several times for j in range(5): for i in range(1, len(data)-1): data[i] += data[i-1] data[i] /= 2 del i, j wasp.gc.collect() wasp.system.keep_awake() # center data mean = sum(data) / len(data) for i in range(len(data)): data[i] = data[i] - mean del mean, i wasp.gc.collect() wasp.system.keep_awake() # find local maximas x_maximas = array("H", [0]) y_maximas = array("f", [0]) window = int(60*60/_STORE_FREQ) skip = 1800 // _STORE_FREQ # skip first 60 minutes of data for start_w in range(skip, len(data) - window + 1): m = max(data[start_w:start_w + window]) for i in range(start_w, start_w + window): if data[i] == m and m > 0: if i not in x_maximas: if i - x_maximas[-1] <= 2: # too close to last maximum, keep highest if y_maximas[-1] < data[i]: x_maximas[-1] = i y_maximas[-1] = data[i] else: x_maximas.append(i) y_maximas.append(m) del window, skip, start_w, i, m, x_maximas[0], y_maximas[0], data wasp.gc.collect() wasp.system.keep_awake() # merge the closest peaks while there are more than N peaks N = 3 while len(x_maximas) > N: diffs = array("f", [x_maximas[int(x)+1] - x_maximas[int(x)] for x in range(len(x_maximas)-1)]) ex = False for d_min_idx, d in enumerate(diffs): if ex: break if d == min(diffs): y_maximas.remove(y_maximas[d_min_idx+1]) x_maximas.remove(x_maximas[d_min_idx+1]) ex = True del diffs, ex, d_min_idx wasp.gc.collect() wasp.system.keep_awake() # sleep cycle duration is the average time distance between those N peaks cycle = sum([x_maximas[i+1] - x_maximas[i] for i in range(len(x_maximas) -1)]) / N * _STORE_FREQ last_peak = self.sleeptk._track_start_time + x_maximas[-1] * _STORE_FREQ WU_t = self.sleeptk._WU_t # check if too late, already woken up: if last_peak + cycle > WU_t: raise Exception("Took too long to compute!") # if smart alarm wants to wake you up too early, limit how early if last_peak + cycle < WU_t - _ANTICIPATE_ALLOWED: earlier = _ANTICIPATE_ALLOWED else: # will wake you up at computed time earlier = last_peak - self.sleeptk._track_start_time + cycle wasp.system.keep_awake() return (earlier, cycle)