848 lines
35 KiB
Python
848 lines
35 KiB
Python
# SPDX-License-Identifier: LGPL-3.0-or-later
|
|
# Copyright (C) 2021 github.com/thiswillbeyourgithub/
|
|
|
|
"""Sleep tracker
|
|
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
|
|
|
|
SleepTk is an alarm clock app designed to track body movement throughout
|
|
the night to optimize sleep. Currently a WIP, more information at
|
|
https://github.com/thiswillbeyourgithub/sleep_tracker_pinetime_wasp-os
|
|
"""
|
|
|
|
import wasp
|
|
import widgets
|
|
import shell
|
|
import fonts
|
|
import math
|
|
import ppg
|
|
from array import array
|
|
from micropython import const
|
|
|
|
# 2-bit RLE, 60x60, 225 bytes, kindly designed by Emanuel Löffler (https://github.com/plan5)
|
|
icon = (
|
|
b'\x02'
|
|
b'<<'
|
|
b'?\x11\xd2*\xd2*\xd2*\xd8$\xd82\xca2\xca6'
|
|
b'\xc4\x02\xc24\xc4\x02\xc24\xc4\x04\xc40\xc4\x04\xc40'
|
|
b'\xc4\x04\xc40\xc4\x04\xc42\xc2\x04\xc8.\xc2\x04\xc8.'
|
|
b'\xc2\x04\xc8.\xc2\x04\xc8 \xc4\n\xc2\x06\xc6 \xc4\n'
|
|
b'\xc2\x06\xc6 \xc4\x08\xc6\x04\xc6 \xc4\x08\xc6\x04\xc6\x1e'
|
|
b'\xc8\x06\xc6\x04\xc6\x1e\xc8\x06\xc6\x04\xc6\x1e\xc8\x06\xc6\x04'
|
|
b'\xc6\x1e\xc8\x06\xc6\x04\xc6\x1c\xca\x06\xc6\x04\xc6\x1c\xd6\x04'
|
|
b'\xc6\x1c\xd6\x04\xc6\x1c\xd6\x04\xc6\x07\xed\x08\xc4\x03\xed\x08'
|
|
b'\xc4\x03\xed\x08\xc4\x03\xed\x08\xc4"\xc6\x04\xc4\x06\xc2&'
|
|
b'\xc6\x04\xc4\x06\xc2&\xc6\x04\xc4\x06\xc2&\xc6\x04\xc4\x06'
|
|
b'\xc2&\xc6\x04\xc6\x04\xc2&\xc6\x04\xc6\x04\xc2(\xc2\x04'
|
|
b'\xc8\x02\xc2*\xc2\x04\xc8\x02\xc2.\xce.\xce.\xce.'
|
|
b'\xce*\xd1+\xd1+\xd1+\xd1%\xd7%\xd4\x16\xe6\x16'
|
|
b'\xe6\x16\xe0\x1c\xe0\x1c\xde\x1e\xde"\xd4(\xd4\x1a'
|
|
)
|
|
|
|
# HARDCODED VARIABLES:
|
|
_ON = const(1)
|
|
_OFF = const(0)
|
|
_TRACKING = const(0)
|
|
_RINGING = const(1)
|
|
_SETTINGS1 = const(2)
|
|
_SETTINGS2 = const(3)
|
|
_FONT = fonts.sans18
|
|
_FONT_COLOR = const(0xf800) # red font to reduce eye strain at night
|
|
_TIMESTAMP = const(946684800) # unix time and time used by wasp os don't have the same reference date
|
|
|
|
## USER SETTINGS #################################
|
|
_KILL_BT = const(1)
|
|
# set to 0 to disable turning off bluetooth while tracking to save battery
|
|
# (you have to reboot the watch to reactivate BT, default: 1)
|
|
_STOP_LIMIT = const(10)
|
|
# number of times to swipe or press the button to turn off ringing (default: 10)
|
|
_SNOOZE_TIME = const(300)
|
|
# number of seconds to snooze for (default: 5 minutes)
|
|
_FREQ = const(5)
|
|
# get accelerometer data every X seconds, but process and store them only
|
|
# every _STORE_FREQ seconds (default: 5)
|
|
_HR_FREQ = const(300)
|
|
# how many seconds between heart rate data (default: 300, minimum 120)
|
|
_STORE_FREQ = const(300)
|
|
# process data and store to file every X seconds (default: 300)
|
|
_BATTERY_THRESHOLD = const(15)
|
|
# under X% of battery, stop tracking and only keep the alarm, set at -200
|
|
# or lower to disable (default: 15)
|
|
_ANTICIPATE_ALLOWED = const(2400)
|
|
# number of seconds SleepTk can wake you up before the alarm clock you set
|
|
# only relevant if smart_alarm is enabled. (default: 2400)
|
|
_GRADUAL_WAKE = array("H", [1, 2, 3, 4, 5, 6, 7, 8, 9, 10])
|
|
# nb of minutes before alarm to send a tiny vibration, designed to wake
|
|
# you more gently. (default: array("H", [1, 2, 3, 4, 5, 6, 7, 8, 9, 10]) )
|
|
_MINUTES_TO_FALL_ASLEEP = const(14)
|
|
# time you take to fall asleep (default: 14, according to https://sleepyti.me/)
|
|
_CYCLE_LENGTH = const(90)
|
|
# sleep cycle length in minutes. Currently used only to display best wake up
|
|
# time but not to compute smart alarm! (default: 90 or 100, according to
|
|
# https://sleepyti.me/)
|
|
_SLEEP_GOAL_CYCLE = const(5)
|
|
# number of sleep cycle you wish to sleep. With _CYCLE_LENGTH this is used
|
|
# to suggest best wake up time to user when setting the alarm. (default: 5)
|
|
##################################################
|
|
|
|
|
|
class SleepTkApp():
|
|
NAME = 'SleepTk'
|
|
ICON = icon
|
|
|
|
def __init__(self):
|
|
wasp.gc.collect()
|
|
|
|
# default button state
|
|
self._state_alarm = _ON
|
|
self._state_body_tracking = _OFF
|
|
self._state_HR_tracking = _OFF
|
|
self._state_gradual_wake = _ON
|
|
self._state_smart_alarm = _OFF
|
|
self._state_spinval_H = _OFF
|
|
self._state_spinval_M = _OFF
|
|
|
|
self._hrdata = None
|
|
self._last_HR = _OFF # if _OFF, no HR to write, if "?": error during last HR, else: heart rate
|
|
self._last_HR_printed = "?"
|
|
self._last_HR_date = _OFF
|
|
self._track_HR_once = _OFF
|
|
|
|
self._page = _SETTINGS1
|
|
self._currently_tracking = _OFF
|
|
self._conf_view = _OFF # confirmation view
|
|
self._smart_offset = _OFF # number of seconds between the alarm you set manually and the smart alarm time
|
|
self._buff = array("f", [_OFF, _OFF, _OFF]) # contains accelerometer values
|
|
self._last_touch = int(wasp.watch.rtc.time())
|
|
|
|
try:
|
|
shell.mkdir("logs/")
|
|
except: # folder already exists
|
|
pass
|
|
try:
|
|
shell.mkdir("logs/sleep")
|
|
except: # folder already exists
|
|
pass
|
|
|
|
def foreground(self):
|
|
self.stat_bar = widgets.StatusBar()
|
|
self.stat_bar.clock = True
|
|
self._conf_view = _OFF
|
|
wasp.gc.collect()
|
|
self._draw()
|
|
wasp.system.request_event(wasp.EventMask.TOUCH |
|
|
wasp.EventMask.SWIPE_LEFTRIGHT |
|
|
wasp.EventMask.SWIPE_UPDOWN |
|
|
wasp.EventMask.BUTTON)
|
|
if self._page == _TRACKING and self._track_HR_once:
|
|
wasp.system.request_tick(1000 // 8)
|
|
|
|
def sleep(self):
|
|
self._stop_trial = 0
|
|
wasp.gc.collect()
|
|
return True
|
|
|
|
def background(self):
|
|
wasp.watch.hrs.disable()
|
|
self._hrdata = None
|
|
wasp.gc.collect()
|
|
|
|
def _try_stop_alarm(self):
|
|
"""If button or swipe more than _STOP_LIMIT, then stop ringing"""
|
|
if self._stop_trial + 1 >= _STOP_LIMIT:
|
|
self._n_vibration = 0
|
|
del self._n_vibration
|
|
wasp.system.cancel_alarm(self._WU_t, self._activate_ticks_to_ring)
|
|
self._disable_tracking()
|
|
self.__init__()
|
|
self.foreground()
|
|
else:
|
|
self._stop_trial += 1
|
|
draw = wasp.watch.drawable
|
|
draw.set_color(_FONT_COLOR)
|
|
draw.string("{} to stop".format(_STOP_LIMIT - self._stop_trial), 0, 70)
|
|
|
|
def press(self, button, state):
|
|
"stop ringing alarm if pressed physical button"
|
|
if not state:
|
|
return
|
|
self._last_touch = int(wasp.watch.rtc.time())
|
|
mute = wasp.watch.display.mute
|
|
mute(False)
|
|
if self._page == _RINGING:
|
|
self._try_stop_alarm()
|
|
elif self._page == _TRACKING:
|
|
self._was_touched = 1
|
|
# disable pressing to exit, use swipe up instead
|
|
self._draw()
|
|
else:
|
|
wasp.system.navigate(wasp.EventType.HOME)
|
|
|
|
def swipe(self, event):
|
|
"navigate between settings page"
|
|
mute = wasp.watch.display.mute
|
|
mute(False)
|
|
self._last_touch = int(wasp.watch.rtc.time())
|
|
if self._page == _SETTINGS1:
|
|
if event[0] == wasp.EventType.LEFT:
|
|
self._page = _SETTINGS2
|
|
self._draw()
|
|
else:
|
|
return True
|
|
elif self._page == _SETTINGS2:
|
|
if event[0] == wasp.EventType.RIGHT:
|
|
self._page = _SETTINGS1
|
|
self._draw()
|
|
else:
|
|
return True
|
|
elif self._page == _RINGING:
|
|
self._try_stop_alarm()
|
|
else:
|
|
return True
|
|
|
|
def touch(self, event):
|
|
"""either start trackign or disable it, draw the screen in all cases"""
|
|
wasp.gc.collect()
|
|
draw = wasp.watch.drawable
|
|
mute = wasp.watch.display.mute
|
|
mute(False)
|
|
self._last_touch = int(wasp.watch.rtc.time())
|
|
if self._page == _TRACKING:
|
|
self._was_touched = 1
|
|
if self._conf_view is _OFF:
|
|
if self.btn_off.touch(event):
|
|
self._conf_view = widgets.ConfirmationView()
|
|
self._conf_view.draw("Stop tracking?")
|
|
draw.reset()
|
|
return
|
|
else:
|
|
if self._conf_view.touch(event):
|
|
if self._conf_view.value:
|
|
self._disable_tracking()
|
|
self._page = _SETTINGS1
|
|
self._conf_view = _OFF
|
|
draw.reset()
|
|
elif self._page == _RINGING:
|
|
if self.btn_snooz.touch(event):
|
|
if self._track_HR_once: # if currently tracking HR, stop
|
|
self._track_HR_once = _OFF
|
|
self._hrdata = None
|
|
wasp.watch.hrs.disable()
|
|
self._page = _TRACKING
|
|
self._WU_t = int(wasp.watch.rtc.time()) + _SNOOZE_TIME
|
|
wasp.system.set_alarm(self._WU_t, self._activate_ticks_to_ring)
|
|
wasp.system.sleep()
|
|
elif self._page == _SETTINGS1:
|
|
if self._state_alarm and (self._spin_H.touch(event) or self._spin_M.touch(event)):
|
|
if self._state_spinval_M == 0 and self._spin_M.value == 55:
|
|
self._spin_H.value -= 1
|
|
elif self._state_spinval_M == 55 and self._spin_M.value == 0:
|
|
self._spin_H.value += 1
|
|
if self._spin_H.value >= 24:
|
|
self._spin_H.value = 0
|
|
elif self._spin_H.value <= -1:
|
|
self._spin_H.value = 23
|
|
self._state_spinval_M = self._spin_M.value
|
|
self._spin_M.update()
|
|
self._state_spinval_H = self._spin_H.value
|
|
self._spin_H.update()
|
|
if self._state_alarm:
|
|
self._draw_duration(draw)
|
|
return
|
|
elif self.check_al.touch(event):
|
|
self._state_alarm = self.check_al.state
|
|
self.check_al.update()
|
|
return
|
|
elif self._page == _SETTINGS2:
|
|
if self._state_body_tracking:
|
|
if self.btn_HR.touch(event):
|
|
self.btn_HR.draw()
|
|
self._state_HR_tracking = self.btn_HR.state
|
|
return
|
|
if self._state_alarm:
|
|
if self.check_grad.touch(event):
|
|
self._state_gradual_wake = self.check_grad.state
|
|
self.check_grad.draw()
|
|
return
|
|
if self._state_body_tracking:
|
|
if self.check_smart.touch(event):
|
|
self._state_smart_alarm = self.check_smart.state
|
|
self.check_smart.draw()
|
|
return
|
|
if self.btn_sta.touch(event):
|
|
draw.fill()
|
|
draw.string("Loading", 0, 100)
|
|
self._start_tracking()
|
|
elif self.check_body_tracking.touch(event):
|
|
self._state_body_tracking = self.check_body_tracking.state
|
|
self.check_body_tracking.draw()
|
|
if not self._state_body_tracking:
|
|
self._state_smart_alarm = _OFF
|
|
self._state_HR_tracking = _OFF
|
|
self._draw()
|
|
|
|
def _draw_duration(self, draw):
|
|
draw.set_font(_FONT)
|
|
if self._page == _SETTINGS1:
|
|
duration = (self._read_time(self._state_spinval_H, self._state_spinval_M) - wasp.watch.rtc.time()) / 60 - _MINUTES_TO_FALL_ASLEEP
|
|
assert duration >= _MINUTES_TO_FALL_ASLEEP
|
|
y = 180
|
|
elif self._page == _TRACKING:
|
|
draw.set_color(_FONT_COLOR)
|
|
duration = (wasp.watch.rtc.time() - self._track_start_time) / 60 - _MINUTES_TO_FALL_ASLEEP # time slept
|
|
if duration <= 0: # don't print when not yet asleep
|
|
return
|
|
y = 130
|
|
|
|
draw.string("Total sleep {:02d}h{:02d}m".format(
|
|
int(duration // 60),
|
|
int(duration % 60)), 0, y + 20)
|
|
cycl = duration / _CYCLE_LENGTH
|
|
cycl_modulo = cycl % 1
|
|
draw.string("{} cycles ".format(str(cycl)[0:4]), 0, y)
|
|
if duration > 30 and not self._track_HR_once:
|
|
if cycl_modulo > 0.10 and cycl_modulo < 0.90:
|
|
draw.string("Not rested!", 0, y + 40)
|
|
else:
|
|
draw.string("Well rested", 0, y + 40)
|
|
|
|
def _draw(self):
|
|
"""GUI"""
|
|
mute = wasp.watch.display.mute
|
|
mute(False)
|
|
draw = wasp.watch.drawable
|
|
draw.fill(0)
|
|
self.stat_bar.draw()
|
|
draw.set_font(_FONT)
|
|
draw.set_color(_FONT_COLOR)
|
|
if self._page == _RINGING:
|
|
if self._smart_offset != 0:
|
|
msg = "WAKE UP ({}m early)".format(str(self._smart_offset//60)[0:2])
|
|
else:
|
|
msg = "WAKE UP"
|
|
draw.string(msg, 0, 50)
|
|
self.btn_snooz = widgets.Button(x=0, y=90, w=240, h=120, label="SNOOZE")
|
|
self.btn_snooz.draw()
|
|
draw.reset()
|
|
elif self._page == _TRACKING:
|
|
ti = wasp.watch.time.localtime(self._track_start_time)
|
|
draw.string('Began at {:02d}:{:02d}'.format(ti[3], ti[4]), 0, 50)
|
|
if self._state_alarm:
|
|
word = "Alarm at "
|
|
if self._state_smart_alarm:
|
|
word = "Alarm BEFORE "
|
|
ti = wasp.watch.time.localtime(self._WU_t)
|
|
draw.string("{}{:02d}:{:02d}".format(word, ti[3], ti[4]), 0, 70)
|
|
draw.string("Gradual wake: {}".format(True if self._state_gradual_wake else False), 0, 90)
|
|
else:
|
|
draw.string("No alarm set", 0, 70)
|
|
draw.string("data points: {} / {}".format(str(self._data_point_nb), str(self._data_point_nb * _FREQ // _STORE_FREQ)), 0, 110)
|
|
if self._track_HR_once:
|
|
draw.string("(ongoing)", 0, 170)
|
|
if self._state_HR_tracking:
|
|
draw.string("HR:{}".format(self._last_HR_printed), 160, 170)
|
|
self.btn_off = widgets.Button(x=0, y=200, w=240, h=40, label="Stop")
|
|
self.btn_off.update(txt=_FONT_COLOR, frame=0, bg=0)
|
|
self._draw_duration(draw)
|
|
elif self._page == _SETTINGS1:
|
|
# reset spinval values between runs
|
|
self._state_spinval_H = _OFF
|
|
self._state_spinval_M = _OFF
|
|
self.check_al = widgets.Checkbox(x=0, y=40, label="Wake me up")
|
|
self.check_al.state = self._state_alarm
|
|
self.check_al.draw()
|
|
if self._state_alarm:
|
|
if (self._state_spinval_H, self._state_spinval_M) == (_OFF, _OFF):
|
|
# suggest wake up time, on the basis of desired sleep goal + time to fall asleep
|
|
(H, M) = wasp.watch.rtc.get_localtime()[3:5]
|
|
goal_h = _SLEEP_GOAL_CYCLE * _CYCLE_LENGTH // 60
|
|
goal_m = _SLEEP_GOAL_CYCLE * _CYCLE_LENGTH % 60
|
|
M += goal_m + _MINUTES_TO_FALL_ASLEEP
|
|
while M % 5 != 0:
|
|
M += 1
|
|
self._state_spinval_H = ((H + goal_h) % 24 + (M // 60)) % 24
|
|
self._state_spinval_M = M % 60
|
|
self._spin_H = widgets.Spinner(30, 70, 0, 23, 2)
|
|
self._spin_H.value = self._state_spinval_H
|
|
self._spin_H.draw()
|
|
self._spin_M = widgets.Spinner(150, 70, 0, 59, 2, 5)
|
|
self._spin_M.value = self._state_spinval_M
|
|
self._spin_M.draw()
|
|
if self._state_alarm:
|
|
self._draw_duration(draw)
|
|
elif self._page == _SETTINGS2:
|
|
self.check_body_tracking = widgets.Checkbox(x=0, y=40, label="Movement tracking")
|
|
self.check_body_tracking.state = self._state_body_tracking
|
|
self.check_body_tracking.draw()
|
|
if self._state_body_tracking:
|
|
self.btn_HR = widgets.Checkbox(x=0, y=80, label="Heart rate tracking")
|
|
self.btn_HR.state = self._state_HR_tracking
|
|
self.btn_HR.draw()
|
|
if self._state_alarm:
|
|
self.check_grad = widgets.Checkbox(0, 120, "Gradual wake")
|
|
self.check_grad.state = self._state_gradual_wake
|
|
self.check_grad.draw()
|
|
if self._state_body_tracking:
|
|
self.check_smart = widgets.Checkbox(x=0, y=160, label="Smart alarm")
|
|
self.check_smart.state = self._state_smart_alarm
|
|
self.check_smart.draw()
|
|
self.btn_sta = widgets.Button(x=0, y=200, w=240, h=40, label="Start")
|
|
self.btn_sta.draw()
|
|
draw.reset()
|
|
|
|
def _start_tracking(self):
|
|
# save some memory
|
|
self.check_al = None
|
|
self.check_smart = None
|
|
self.check_body_tracking = None
|
|
self.check_grad = None
|
|
self.btn_sta = None
|
|
self.btn_snooz = None
|
|
self.btn_off = None
|
|
self.btn_HR = None
|
|
self._spin_H = None
|
|
self._spin_M = None
|
|
del self.check_al, self.check_smart, self.check_body_tracking, self.check_grad, self.btn_sta, self.btn_snooz, self.btn_off, self.btn_HR, self._spin_H, self._spin_M
|
|
|
|
self._currently_tracking = True
|
|
|
|
# accel data not yet written to disk:
|
|
self._data_point_nb = 0 # total number of data points so far
|
|
self._last_checkpoint = 0 # to know when to save to file
|
|
self._track_start_time = int(wasp.watch.rtc.time()) # makes output more compact
|
|
self._last_HR_printed = "?"
|
|
self._was_touched = 0
|
|
wasp.watch.accel.reset()
|
|
|
|
# create one file per recording session:
|
|
self.filep = "logs/sleep/{}.csv".format(str(self._track_start_time + _TIMESTAMP))
|
|
f = open(self.filep, "wb")
|
|
f.write(b"")
|
|
f.close()
|
|
|
|
# if enabled, add alarm to log accel data in _FREQ seconds
|
|
if self._state_body_tracking:
|
|
self.next_al = wasp.watch.rtc.time() + _FREQ
|
|
wasp.system.set_alarm(self.next_al, self._trackOnce)
|
|
else:
|
|
self.next_al = None
|
|
|
|
if self._state_gradual_wake and not self._state_alarm:
|
|
# fix incompatible settings
|
|
self._state_gradual_wake = _OFF
|
|
|
|
# setting up alarm
|
|
if self._state_alarm:
|
|
self._old_notification_level = wasp.system.notify_level
|
|
self._WU_t = self._read_time(self._state_spinval_H, self._state_spinval_M)
|
|
wasp.system.set_alarm(self._WU_t, self._activate_ticks_to_ring)
|
|
|
|
# also set alarm to vibrate a tiny bit before wake up time
|
|
# to wake up gradually
|
|
if self._state_gradual_wake:
|
|
for t in _GRADUAL_WAKE:
|
|
wasp.system.set_alarm(self._WU_t - int(t*60), self._tiny_vibration)
|
|
|
|
# wake up SleepTk 2min before earliest possible wake up
|
|
if self._state_smart_alarm:
|
|
if not SmartAlarm in locals():
|
|
raise Exception("SmartAlarm was removed in previous run to save memory. Restart the watch to use it.")
|
|
self._WU_a = self._WU_t - _ANTICIPATE_ALLOWED - 120
|
|
wasp.system.set_alarm(self._WU_a, self._smart_alarm_start)
|
|
else:
|
|
SmartAlarm = None
|
|
del SmartAlarm
|
|
|
|
# don't track heart rate right away, wait a few seconds
|
|
if self._state_HR_tracking:
|
|
self._last_HR_date = int(wasp.watch.rtc.time()) + 10
|
|
wasp.system.notify_level = 1 # silent notifications
|
|
|
|
# kill bluetooth
|
|
if _KILL_BT:
|
|
import ble
|
|
if ble.enabled():
|
|
ble.disable()
|
|
|
|
self._page = _TRACKING
|
|
self._stop_trial = 0
|
|
|
|
def _read_time(self, HH, MM):
|
|
"convert time from spinners to seconds"
|
|
(Y, Mo, d, h, m) = wasp.watch.rtc.get_localtime()[0:5]
|
|
HH = self._state_spinval_H
|
|
MM = self._state_spinval_M
|
|
if HH < h or (HH == h and MM <= m):
|
|
d += 1
|
|
return wasp.watch.time.mktime((Y, Mo, d, HH, MM, 0, 0, 0, 0))
|
|
|
|
def _disable_tracking(self, keep_main_alarm=False):
|
|
"""called by touching "STOP TRACKING" or when computing best alarm time
|
|
to wake up you disables tracking features and alarms"""
|
|
self._currently_tracking = False
|
|
if self.next_al:
|
|
wasp.system.cancel_alarm(self.next_al, self._trackOnce)
|
|
if self._state_alarm:
|
|
if keep_main_alarm is False: # to keep the alarm when stopping because of low battery
|
|
wasp.system.cancel_alarm(self._WU_t, self._activate_ticks_to_ring)
|
|
for t in _GRADUAL_WAKE:
|
|
wasp.system.cancel_alarm(self._WU_t - int(t*60), self._tiny_vibration)
|
|
if self._state_smart_alarm:
|
|
wasp.system.cancel_alarm(self._WU_a, self._smart_alarm_start)
|
|
self._state_smart_alarm = _OFF
|
|
wasp.watch.hrs.disable()
|
|
self._periodicSave()
|
|
self._state_HR_tracking = _OFF
|
|
wasp.gc.collect()
|
|
|
|
def _trackOnce(self):
|
|
"""get one data point of accelerometer every _FREQ seconds, keep
|
|
the average of each axis then store in a file every
|
|
_STORE_FREQ seconds"""
|
|
if self._currently_tracking:
|
|
buff = self._buff
|
|
xyz = wasp.watch.accel.accel_xyz()
|
|
if xyz == (0, 0, 0):
|
|
wasp.watch.accel.reset()
|
|
xyz = wasp.watch.accel.accel_xyz()
|
|
buff[0] += xyz[1]
|
|
buff[1] += xyz[0]
|
|
buff[2] -= xyz[2]
|
|
self._data_point_nb += 1
|
|
|
|
# add alarm to log accel data in _FREQ seconds
|
|
self.next_al = wasp.watch.rtc.time() + _FREQ
|
|
wasp.system.set_alarm(self.next_al, self._trackOnce)
|
|
|
|
self._periodicSave()
|
|
if wasp.watch.battery.level() <= _BATTERY_THRESHOLD:
|
|
# strop tracking if battery low
|
|
self._disable_tracking(keep_main_alarm=True)
|
|
h, m = wasp.watch.time.localtime(wasp.watch.rtc.time())[3:5]
|
|
wasp.system.notify(wasp.watch.rtc.get_uptime_ms(), {"src": "SleepTk",
|
|
"title": "Bat low",
|
|
"body": "Stopped \
|
|
tracking sleep at {}h{}m because your battery went below {}%. Alarm kept \
|
|
on.".format(h, m, _BATTERY_THRESHOLD)})
|
|
elif self._state_HR_tracking and \
|
|
wasp.watch.rtc.time() - self._last_HR_date > _HR_FREQ and \
|
|
not self._track_HR_once:
|
|
self._track_HR_once = _ON
|
|
wasp.system.wake()
|
|
if int(wasp.watch.rtc.time()) - self._last_touch > 5:
|
|
mute = wasp.watch.display.mute
|
|
mute(True)
|
|
wasp.system.switch(self)
|
|
wasp.system.request_tick(1000 // 8)
|
|
|
|
wasp.gc.collect()
|
|
|
|
def _periodicSave(self):
|
|
"""save data to csv with row order:
|
|
1. average arm angle
|
|
2. elapsed times
|
|
3/4. heart rate if present, and/or -1 if screen was woken up
|
|
by user during that time
|
|
arm angle formula from https://www.nature.com/articles/s41598-018-31266-z
|
|
note: math.atan() is faster than using a taylor serie
|
|
"""
|
|
buff = self._buff
|
|
n = self._data_point_nb - self._last_checkpoint
|
|
if n >= _STORE_FREQ // _FREQ and not self._track_HR_once:
|
|
buff[0] /= n
|
|
buff[1] /= n
|
|
buff[2] /= n
|
|
if self._last_HR != _OFF:
|
|
bpm = ",{}".format(str(self._last_HR))
|
|
self._last_HR = _OFF
|
|
else:
|
|
bpm = ""
|
|
f = open(self.filep, "ab")
|
|
f.write("{:7f},{}{}\n".format(
|
|
math.atan(buff[2] / (buff[0]**2 + buff[1]**2))*180/3.1415926535, # estimated arm angle
|
|
int(wasp.watch.rtc.time() - self._track_start_time),
|
|
bpm,
|
|
",-1" if self._was_touched else ""
|
|
).encode())
|
|
f.close()
|
|
del f
|
|
buff[0] = 0 # resets x/y/z to 0
|
|
buff[1] = 0
|
|
buff[2] = 0
|
|
self._last_checkpoint = self._data_point_nb
|
|
self._was_touched = 0
|
|
wasp.gc.collect()
|
|
|
|
def _activate_ticks_to_ring(self):
|
|
"""listen to ticks every second, telling the watch to vibrate and
|
|
completely wake the user up"""
|
|
wasp.gc.collect()
|
|
wasp.system.notify_level = self._old_notification_level # restore notification level
|
|
self._page = _RINGING
|
|
self._n_vibration = 0
|
|
if int(wasp.watch.rtc.time()) - self._last_touch > 5:
|
|
mute = wasp.watch.display.mute
|
|
mute(True)
|
|
wasp.system.wake()
|
|
wasp.system.switch(self)
|
|
self._draw()
|
|
wasp.system.request_tick(period_ms=1000)
|
|
|
|
def tick(self, ticks):
|
|
"""vibrate to wake you up OR track heart rate using code from heart.py"""
|
|
wasp.gc.collect()
|
|
wasp.system.switch(self)
|
|
if self._page == _RINGING:
|
|
wasp.system.keep_awake()
|
|
# in 60 vibrations, ramp up from subtle to strong:
|
|
wasp.watch.vibrator.pulse(duty=max(80 - 1 * self._n_vibration, 20),
|
|
ms=min(100 + 6 * self._n_vibration, 500))
|
|
self._n_vibration += 1
|
|
elif self._track_HR_once:
|
|
wasp.watch.hrs.enable()
|
|
if self._hrdata is None:
|
|
self._hrdata = ppg.PPG(wasp.watch.hrs.read_hrs())
|
|
t = wasp.machine.Timer(id=1, period=8000000)
|
|
t.start()
|
|
wasp.system.keep_awake()
|
|
if int(wasp.watch.rtc.time()) - self._last_touch > 5:
|
|
mute = wasp.watch.display.mute
|
|
mute(True)
|
|
self._subtick(1)
|
|
while t.time() < 41666:
|
|
pass
|
|
wasp.system.keep_awake()
|
|
self._subtick(1)
|
|
while t.time() < 83332:
|
|
pass
|
|
wasp.system.keep_awake()
|
|
self._subtick(1)
|
|
t.stop()
|
|
del t
|
|
|
|
wasp.system.keep_awake()
|
|
if len(self._hrdata.data) >= 240: # 10 seconds passed
|
|
bpm = self._hrdata.get_heart_rate()
|
|
bpm = int(bpm) if bpm is not None else None
|
|
if bpm is None:
|
|
# in case of invalid data, write it in the file but
|
|
# keep trying to read HR
|
|
self._last_HR = "?"
|
|
self._hrdata = None
|
|
self._last_HR_printed = self._last_HR
|
|
elif bpm < 100 and bpm > 40:
|
|
# if HR was already computed since last periodicSave,
|
|
# then average the two values
|
|
if self._last_HR != _OFF and self._last_HR != "?" and isinstance(int, self._last_HR):
|
|
self._last_HR = (self._last_HR + bpm) // 2
|
|
else:
|
|
self._last_HR = bpm
|
|
self._last_HR_printed = self._last_HR
|
|
self._last_HR_date = int(wasp.watch.rtc.time())
|
|
self._track_HR_once = _OFF
|
|
self._hrdata = None
|
|
wasp.watch.hrs.disable()
|
|
if int(wasp.watch.rtc.time()) - self._last_touch > 5:
|
|
wasp.system.sleep()
|
|
|
|
def _subtick(self, ticks):
|
|
"""track heart rate at 24Hz"""
|
|
self._hrdata.preprocess(wasp.watch.hrs.read_hrs())
|
|
|
|
def _tiny_vibration(self):
|
|
"""vibrate just a tiny bit before waking up, to gradually return
|
|
to consciousness"""
|
|
wasp.gc.collect()
|
|
if int(wasp.watch.rtc.time()) - self._last_touch > 5:
|
|
mute = wasp.watch.display.mute
|
|
mute(True)
|
|
wasp.system.wake()
|
|
wasp.system.switch(self)
|
|
wasp.watch.vibrator.pulse(duty=60, ms=100)
|
|
if not self._track_HR_once:
|
|
wasp.system.sleep()
|
|
|
|
def _smart_alarm_start(self):
|
|
SmartAlarm(self)
|
|
|
|
class SmartAlarm():
|
|
def __init__(self, sleeptk):
|
|
self.sleeptk = sleeptk
|
|
self._smart_alarm_compute()
|
|
|
|
def _smart_alarm_compute(self):
|
|
"""computes best wake up time from sleep data"""
|
|
wasp.gc.collect()
|
|
if not self.sleeptk._smart_alarm_state:
|
|
t = wasp.watch.time.localtime(wasp.watch.rtc.time())
|
|
wasp.system.notify(wasp.watch.rtc.get_uptime_ms(),
|
|
{"src": "SleepTk",
|
|
"title": "Smart alarm computation",
|
|
"body": "Started computation for the smart alarm \
|
|
BY MISTAKE at {:02d}h{:02d}m".format(t[3], t[4])})
|
|
return
|
|
mute = wasp.watch.display.mute
|
|
mute(True)
|
|
wasp.system.wake()
|
|
wasp.system.switch(self.sleeptk)
|
|
t = wasp.watch.time.localtime(wasp.watch.rtc.time())
|
|
wasp.system.notify(wasp.watch.rtc.get_uptime_ms(),
|
|
{"src": "SleepTk",
|
|
"title": "Starting smart alarm computation",
|
|
"body": "Starting computation for the smart alarm at {:02d}h{:02d}m".format(t[3], t[4])}
|
|
)
|
|
try:
|
|
start_time = wasp.watch.rtc.time()
|
|
# stop tracking to save memory, keep the alarm just in case
|
|
#self.sleeptk._disable_tracking(keep_main_alarm=True)
|
|
|
|
# read file one character at a time, to get only the 1st
|
|
# value of each row, which is the arm angle
|
|
data = array("f")
|
|
buff = b""
|
|
f = open(self.sleeptk.filep, "rb")
|
|
skip = False
|
|
while True:
|
|
char = f.read(1)
|
|
if char == b",": # start ignoring after the first col
|
|
skip = True
|
|
continue
|
|
if char == b"\n":
|
|
skip = False # stop skipping because reading a new line
|
|
data.append(float(buff))
|
|
buff = b""
|
|
continue
|
|
if char == b"": # end of file
|
|
data.append(float(buff))
|
|
break
|
|
if not skip: # digit of arm angle value
|
|
buff += char
|
|
|
|
f.close()
|
|
del f, char, buff
|
|
wasp.gc.collect()
|
|
wasp.system.keep_awake()
|
|
|
|
earlier, cycle = self.sleeptk._signal_processing(data)
|
|
WU_t = self.sleeptk._WU_t
|
|
wasp.gc.collect()
|
|
|
|
# add new alarm
|
|
wasp.system.set_alarm(max(WU_t - earlier, int(wasp.watch.rtc.time()) + 3), # not before right now, to make sure it rings
|
|
self.sleeptk._activate_ticks_to_ring)
|
|
|
|
# replace old gentle alarm by another one
|
|
if self.sleeptk._grad_alarm_state:
|
|
for t in _GRADUAL_WAKE:
|
|
wasp.system.cancel_alarm(WU_t - int(t*60), self.sleeptk._tiny_vibration)
|
|
if earlier + int(t*60) < _ANTICIPATE_ALLOWED:
|
|
wasp.system.set_alarm(WU_t - earlier - int(t*60), self.sleeptk._tiny_vibration)
|
|
|
|
self.sleeptk._earlier = earlier
|
|
self.sleeptk._page = _TRACKING
|
|
wasp.system.notify(wasp.watch.rtc.get_uptime_ms(), {"src": "SleepTk",
|
|
"title": "Finished smart alarm computation",
|
|
"body": "Finished computing best wake up time in {:2f}s. Sleep cycle: {:.2f}h".format(wasp.watch.rtc.time() - start_time, cycle)
|
|
})
|
|
if not self.sleeptk._track_HR_once:
|
|
wasp.system.sleep()
|
|
except Exception as e:
|
|
wasp.gc.collect()
|
|
h, m = wasp.watch.time.localtime(wasp.watch.rtc.time())[3:5]
|
|
msg = "Exception occured at {:02d}h{:02d}m: '{}'%".format(h, m, str(e))
|
|
f = open("smart_alarm_error_{}.txt".format(int(wasp.watch.rtc.time())), "wb")
|
|
f.write(msg.encode())
|
|
f.close()
|
|
wasp.system.notify(wasp.watch.rtc.get_uptime_ms(), {"src": "SleepTk",
|
|
"title": "Smart alarm error",
|
|
"body": msg})
|
|
wasp.gc.collect()
|
|
|
|
def _signal_processing(self, data):
|
|
"""signal processing over the data read from the local file"""
|
|
|
|
# take absolute rate of change of data
|
|
for i in range(len(data)-1):
|
|
mem = data[i+1]
|
|
data[i] = abs(mem-data[i])
|
|
del i
|
|
|
|
# remove outliers:
|
|
for x in range(len(data)):
|
|
data[x] = min(0.005, data[x])
|
|
del x
|
|
wasp.gc.collect()
|
|
wasp.system.keep_awake()
|
|
|
|
# smoothen several times
|
|
for j in range(5):
|
|
for i in range(1, len(data)-1):
|
|
data[i] += data[i-1]
|
|
data[i] /= 2
|
|
del i, j
|
|
wasp.gc.collect()
|
|
wasp.system.keep_awake()
|
|
|
|
# center data
|
|
mean = sum(data) / len(data)
|
|
for i in range(len(data)):
|
|
data[i] = data[i] - mean
|
|
del mean, i
|
|
wasp.gc.collect()
|
|
wasp.system.keep_awake()
|
|
|
|
# find local maximas
|
|
x_maximas = array("H", [0])
|
|
y_maximas = array("f", [0])
|
|
window = int(60*60/_STORE_FREQ)
|
|
skip = 1800 // _STORE_FREQ # skip first 60 minutes of data
|
|
for start_w in range(skip, len(data) - window + 1):
|
|
m = max(data[start_w:start_w + window])
|
|
for i in range(start_w, start_w + window):
|
|
if data[i] == m and m > 0:
|
|
if i not in x_maximas:
|
|
if i - x_maximas[-1] <= 2:
|
|
# too close to last maximum, keep highest
|
|
if y_maximas[-1] < data[i]:
|
|
x_maximas[-1] = i
|
|
y_maximas[-1] = data[i]
|
|
else:
|
|
x_maximas.append(i)
|
|
y_maximas.append(m)
|
|
del window, skip, start_w, i, m, x_maximas[0], y_maximas[0], data
|
|
wasp.gc.collect()
|
|
wasp.system.keep_awake()
|
|
|
|
# merge the closest peaks while there are more than N peaks
|
|
N = 3
|
|
while len(x_maximas) > N:
|
|
diffs = array("f", [x_maximas[int(x)+1] - x_maximas[int(x)] for x in range(len(x_maximas)-1)])
|
|
ex = False
|
|
for d_min_idx, d in enumerate(diffs):
|
|
if ex:
|
|
break
|
|
if d == min(diffs):
|
|
y_maximas.remove(y_maximas[d_min_idx+1])
|
|
x_maximas.remove(x_maximas[d_min_idx+1])
|
|
ex = True
|
|
del diffs, ex, d_min_idx
|
|
wasp.gc.collect()
|
|
wasp.system.keep_awake()
|
|
|
|
# sleep cycle duration is the average time distance between those N peaks
|
|
cycle = sum([x_maximas[i+1] - x_maximas[i] for i in range(len(x_maximas) -1)]) / N * _STORE_FREQ
|
|
|
|
last_peak = self.sleeptk._track_start_time + x_maximas[-1] * _STORE_FREQ
|
|
WU_t = self.sleeptk._WU_t
|
|
|
|
# check if too late, already woken up:
|
|
if last_peak + cycle > WU_t:
|
|
raise Exception("Took too long to compute!")
|
|
|
|
# if smart alarm wants to wake you up too early, limit how early
|
|
if last_peak + cycle < WU_t - _ANTICIPATE_ALLOWED:
|
|
earlier = _ANTICIPATE_ALLOWED
|
|
else: # will wake you up at computed time
|
|
earlier = last_peak - self.sleeptk._track_start_time + cycle
|
|
wasp.system.keep_awake()
|
|
return (earlier, cycle)
|