From 95e129e331b3f66b9d2b639301f3f93766b42381 Mon Sep 17 00:00:00 2001 From: Daniel Thompson Date: Thu, 25 Jun 2020 21:59:32 +0100 Subject: [PATCH] ppg: Pull the PPG signal processing into a seperate library Signed-off-by: Daniel Thompson --- wasp/apps/heart.py | 138 ++----------------------- wasp/boards/pinetime/manifest.py | 1 + wasp/ppg.py | 170 +++++++++++++++++++++++++++++++ 3 files changed, 181 insertions(+), 128 deletions(-) create mode 100644 wasp/ppg.py diff --git a/wasp/apps/heart.py b/wasp/apps/heart.py index fdf8336..367d17a 100644 --- a/wasp/apps/heart.py +++ b/wasp/apps/heart.py @@ -2,112 +2,8 @@ # Copyright (C) 2020 Daniel Thompson import wasp -import array import machine - -class Biquad(): - """Direct Form II Biquad Filter""" - - def __init__(self, b0, b1, b2, a1, a2): - self._coeff = (b0, b1, b2, a1, a2) - self._v1 = 0 - self._v2 = 0 - - def step(self, x): - c = self._coeff - v1 = self._v1 - v2 = self._v2 - - v = x - (c[3] * v1) - (c[4] * v2) - y = (c[0] * v) + (c[1] * v1) + (c[2] * v2) - - self._v2 = v1 - self._v1 = v - return y - -class PTAGC(): - """Peak Tracking Automatic Gain Control - - In order for the correlation checks to work correctly we must - aggressively reject spikes caused by fast DC steps. Setting a - threshold based on the median is very effective at killing - spikes but needs an extra 1k for sample storage which isn't - really plausible for a microcontroller. - """ - def __init__(self, start, decay, threshold): - self._peak = start - self._decay = decay - self._boost = 1 / decay - self._threshold = threshold - - def step(self, spl): - # peak tracking - peak = self._peak - if abs(spl) > peak: - peak *= self._boost - else: - peak *= self._decay - self._peak = peak - - # rejection filter (clipper) - threshold = self._threshold - if spl > (peak * threshold) or spl < (peak * -threshold): - return 0 - - # booster - spl = 100 * spl / (2 * peak) - - return spl - -def _compare(d1, d2, count, shift): - e = 0 - for i in range(count): - d = d1[i] - d2[i] - e += d*d - return e - -def compare(d, shift): - return _compare(d[shift:], d[:-shift], len(d)-shift, shift) - -def trough(d, mn, mx): - z2 = compare(d, mn-2) - z1 = compare(d, mn-1) - for i in range(mn, mx+1): - z = compare(d, i) - if z2 > z1 and z1 < z: - return i - z2 = z1 - z1 = z - - return -1 - -def get_hrs(d): - din = memoryview(d) - - # Search initially from ~210 to 30 bpm - t0 = trough(din, 7, 48) - if t0 < 0: - return None - - # Check the second cycle ... - t1 = t0 * 2 - t1 = trough(din, t1 - 5, t1 + 5) - if t1 < 0: - return None - - # ... and the third - t2 = (t1 * 3) // 2 - t2 = trough(din, t2 - 5, t2 + 4) - if t2 < 0: - return None - - # If we can find a fourth cycle then use that for the extra - # precision otherwise report whatever we've found - t3 = (t2 * 4) // 3 - t3 = trough(din, t3 - 4, t3 + 4) - if t3 < 0: - return (60 * 24 * 3) // t2 - return (60 * 24 * 4) // t3 +import ppg class HeartApp(): """Heart Rate Sensing application. @@ -125,40 +21,26 @@ class HeartApp(): draw.fill() draw.string('PPG graph', 0, 6, width=240) - self._hpf = Biquad(0.87033078, -1.74066156, 0.87033078, -1.72377617, 0.75754694) - self._agc = PTAGC(20, 0.971, 2) - self._lpf = Biquad(0.11595249, 0.23190498, 0.11595249, -0.72168143, 0.18549138) - - self._x = 0 - self._offset = wasp.watch.hrs.read_hrs() - - self._hrdata = array.array('b') - wasp.system.request_tick(1000 // 8) + self._hrdata = ppg.PPG(wasp.watch.hrs.read_hrs()) + self._x = 0 + def background(self): wasp.watch.hrs.disable() - del self._hpf - del self._agc - del self._lpf + del self._hrdata def _subtick(self, ticks): """Notify the application that its periodic tick is due.""" draw = wasp.watch.drawable - spl = wasp.watch.hrs.read_hrs() - spl -= self._offset - spl = self._hpf.step(spl) - spl = self._agc.step(spl) - spl = self._lpf.step(spl) - spl = int(spl) + spl = self._hrdata.preprocess(wasp.watch.hrs.read_hrs()) - self._hrdata.append(spl) - if len(self._hrdata) >= 240: - draw.string('{} bpm'.format(get_hrs(self._hrdata)), 0, 6, width=240) - del self._hrdata - self._hrdata = array.array('b') + if len(self._hrdata.data) >= 240: + draw.string('{} bpm'.format(self._hrdata.get_heart_rate()), + 0, 6, width=240) + # Graph is orange by default... color = 0xffc0 # If the maths goes wrong lets show it in the chart! diff --git a/wasp/boards/pinetime/manifest.py b/wasp/boards/pinetime/manifest.py index 02b5649..0193fed 100644 --- a/wasp/boards/pinetime/manifest.py +++ b/wasp/boards/pinetime/manifest.py @@ -29,6 +29,7 @@ freeze('../..', 'fonts/sans28.py', 'fonts/sans36.py', 'icons.py', + 'ppg.py', 'shell.py', 'wasp.py', 'widgets.py', diff --git a/wasp/ppg.py b/wasp/ppg.py new file mode 100644 index 0000000..324392d --- /dev/null +++ b/wasp/ppg.py @@ -0,0 +1,170 @@ +# SPDX-License-Identifier: LGPL-3.0-or-later +# Copyright (C) 2020 Daniel Thompson + +"""Photoplethysmogram (PPG) Signal Processing +~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ + +Algorithms and signal processing primatives that can be used to convert +raw PPG signals into something useful. +""" + +import array +import micropython + +@micropython.viper +def _compare(d1, d2, count: int, shift: int) -> int: + """Compare two sequences of (signed) bytes and quantify how dissimilar + they are. + """ + p1 = ptr8(d1) + p2 = ptr8(d2) + + e = 0 + for i in range(count): + s1 = int(p1[i]) + if s1 > 127: + s1 -= 256 + + s2 = int(p2[i]) + if s2 > 127: + s2 -= 256 + + d = s1 - s2 + e += d*d + return e + +class Biquad(): + """Direct Form II Biquad Filter""" + + def __init__(self, b0, b1, b2, a1, a2): + self._coeff = (b0, b1, b2, a1, a2) + self._v1 = 0 + self._v2 = 0 + + def step(self, x): + c = self._coeff + v1 = self._v1 + v2 = self._v2 + + v = x - (c[3] * v1) - (c[4] * v2) + y = (c[0] * v) + (c[1] * v1) + (c[2] * v2) + + self._v2 = v1 + self._v1 = v + return y + +class PTAGC(): + """Peak Tracking Automatic Gain Control + + In order for the correlation checks to work correctly we must + aggressively reject spikes caused by fast DC steps. Setting a + threshold based on the median is very effective at killing + spikes but needs an extra 1k for sample storage which isn't + really plausible for a microcontroller. + """ + def __init__(self, start, decay, threshold): + self._peak = start + self._decay = decay + self._boost = 1 / decay + self._threshold = threshold + + def step(self, spl): + # peak tracking + peak = self._peak + if abs(spl) > peak: + peak *= self._boost + else: + peak *= self._decay + self._peak = peak + + # rejection filter (clipper) + threshold = self._threshold + if spl > (peak * threshold) or spl < (peak * -threshold): + return 0 + + # booster + spl = 100 * spl / (2 * peak) + + return spl + +class PPG(): + """ + """ + + def __init__(self, spl): + self._offset = spl + self.data = array.array('b') + + self._hpf = Biquad(0.87033078, -1.74066156, 0.87033078, + -1.72377617, 0.75754694) + self._agc = PTAGC(20, 0.971, 2) + self._lpf = Biquad(0.11595249, 0.23190498, 0.11595249, + -0.72168143, 0.18549138) + + def preprocess(self, spl): + """Preprocess a PPG sample. + + Must be called at 24Hz for accurate heart rate calculations. + """ + spl -= self._offset + spl = self._hpf.step(spl) + spl = self._agc.step(spl) + spl = self._lpf.step(spl) + spl = int(spl) + + self.data.append(spl) + return spl + + def _get_heart_rate(self): + def compare(d, shift): + return _compare(d[shift:], d[:-shift], len(d)-shift, shift) + + def trough(d, mn, mx): + z2 = compare(d, mn-2) + z1 = compare(d, mn-1) + for i in range(mn, mx+1): + z = compare(d, i) + if z2 > z1 and z1 < z: + return i + z2 = z1 + z1 = z + + return -1 + + data = memoryview(self.data) + + # Search initially from ~210 to 30 bpm + t0 = trough(data, 7, 48) + if t0 < 0: + return None + + # Check the second cycle ... + t1 = t0 * 2 + t1 = trough(data, t1 - 5, t1 + 5) + if t1 < 0: + return None + + # ... and the third + t2 = (t1 * 3) // 2 + t2 = trough(data, t2 - 5, t2 + 4) + if t2 < 0: + return None + + # If we can find a fourth cycle then use that for the extra + # precision otherwise report whatever we've found + t3 = (t2 * 4) // 3 + t3 = trough(data, t3 - 4, t3 + 4) + if t3 < 0: + return (60 * 24 * 3) // t2 + return (60 * 24 * 4) // t3 + + def get_heart_rate(self): + if len(self.data) < 200: + return None + + hr = self._get_heart_rate() + + # Clear out the accumulated data + self.data = array.array('b') + + return hr