wasp-os/wasp/ppg.py

189 lines
4.8 KiB
Python

# SPDX-License-Identifier: LGPL-3.0-or-later
# Copyright (C) 2020 Daniel Thompson
"""Photoplethysmogram (PPG) Signal Processing
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
Algorithms and signal processing primatives that can be used to convert
raw PPG signals into something useful.
"""
import array
import micropython
import watch
@micropython.viper
def _compare(d1, d2, count: int, shift: int) -> int:
"""Compare two sequences of (signed) bytes and quantify how dissimilar
they are.
"""
p1 = ptr8(d1)
p2 = ptr8(d2)
e = 0
for i in range(count):
s1 = int(p1[i])
if s1 > 127:
s1 -= 256
s2 = int(p2[i])
if s2 > 127:
s2 -= 256
d = s1 - s2
e += d*d
return e
class Biquad():
"""Direct Form II Biquad Filter"""
def __init__(self, b0, b1, b2, a1, a2):
self._coeff = (b0, b1, b2, a1, a2)
self._v1 = 0
self._v2 = 0
def step(self, x):
c = self._coeff
v1 = self._v1
v2 = self._v2
v = x - (c[3] * v1) - (c[4] * v2)
y = (c[0] * v) + (c[1] * v1) + (c[2] * v2)
self._v2 = v1
self._v1 = v
return y
class PTAGC():
"""Peak Tracking Automatic Gain Control
In order for the correlation checks to work correctly we must
aggressively reject spikes caused by fast DC steps. Setting a
threshold based on the median is very effective at killing
spikes but needs an extra 1k for sample storage which isn't
really plausible for a microcontroller.
"""
def __init__(self, start, decay, threshold):
self._peak = start
self._decay = decay
self._boost = 1 / decay
self._threshold = threshold
def step(self, spl):
# peak tracking
peak = self._peak
if abs(spl) > peak:
peak *= self._boost
else:
peak *= self._decay
self._peak = peak
# rejection filter (clipper)
threshold = self._threshold
if spl > (peak * threshold) or spl < (peak * -threshold):
return 0
# booster
spl = 100 * spl / (2 * peak)
return spl
class PPG():
"""
"""
def __init__(self, spl):
self._offset = spl
self.data = array.array('b')
self.debug = None
self._hpf = Biquad(0.87033078, -1.74066156, 0.87033078,
-1.72377617, 0.75754694)
self._agc = PTAGC(20, 0.971, 2)
self._lpf = Biquad(0.11595249, 0.23190498, 0.11595249,
-0.72168143, 0.18549138)
def preprocess(self, spl):
"""Preprocess a PPG sample.
Must be called at 24Hz for accurate heart rate calculations.
"""
if self.debug != None:
self.debug.append(spl)
spl -= self._offset
spl = self._hpf.step(spl)
spl = self._agc.step(spl)
spl = self._lpf.step(spl)
spl = int(spl)
self.data.append(spl)
return spl
def _get_heart_rate(self):
def compare(d, shift):
return _compare(d[shift:], d[:-shift], len(d)-shift, shift)
def trough(d, mn, mx):
z2 = compare(d, mn-2)
z1 = compare(d, mn-1)
for i in range(mn, mx+1):
z = compare(d, i)
if z2 > z1 and z1 < z:
return i
z2 = z1
z1 = z
return -1
data = memoryview(self.data)
# Search initially from ~210 to 30 bpm
t0 = trough(data, 7, 48)
if t0 < 0:
return None
# Check the second cycle ...
t1 = t0 * 2
t1 = trough(data, t1 - 5, t1 + 5)
if t1 < 0:
return None
# ... and the third
t2 = (t1 * 3) // 2
t2 = trough(data, t2 - 5, t2 + 4)
if t2 < 0:
return None
# If we can find a fourth cycle then use that for the extra
# precision otherwise report whatever we've found
t3 = (t2 * 4) // 3
t3 = trough(data, t3 - 4, t3 + 4)
if t3 < 0:
return (60 * 24 * 3) // t2
return (60 * 24 * 4) // t3
def get_heart_rate(self):
if len(self.data) < 200:
return None
hr = self._get_heart_rate()
# Clear out the accumulated data
self.data = array.array('b')
# Dump the debug data
if self.debug:
with open('hrs.data', 'ab') as f:
# Re-sync marker
f.write(b'\xff\xff')
now = watch.rtc.get_localtime()
f.write(array.array('H', now[:6]))
f.write(self.debug)
self.debug = array.array('H')
return hr
def enable_debug(self):
if self.debug == None:
self.debug = array.array('H')