-
Notifications
You must be signed in to change notification settings - Fork 0
/
devices.py
260 lines (222 loc) · 7.34 KB
/
devices.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
from __future__ import print_function
from time import sleep
import numpy as np
import scipy as sp
from scipy import signal
import RPi.GPIO as GPIO
import smbus
import logging
import spidev
import hrcalc
#import board
#import adafruit_mcp3xxx.mcp3008 as MCP
#from adafruit_mcp3xxx.analog_in import AnalogIn
#import digitalio
#import busio
# class for the EMG sensor
BASELINE = None
class EMG:
# construct the class with the connected pin
def __init__(self, channel=0):
logging.basicConfig(level=logging.DEBUG, format='[%(asctime)s] [%(name)s] [%(levelname)s]: %(message)s')
logger = logging.getLogger('EMG')
self.logger = logger
# create filter params (high-pass and low-pass)
high = 20 / (1000 / 2)
low = 450 / (1000 / 2)
# create butterworth filter: returns filter coefficients
self.b, self.a = sp.signal.butter(4, [high, low], btype='bandpass')
# create spi bus
self.spi = spidev.SpiDev()
self.spi.open(0, 0)
assert channel <= 7 and channel >=0, 'channel not within bounds'
self.spi.max_speed_hz = 1350000
self.channel = channel
self.logger.info(f'initialized on channel {channel} at {self.spi.max_speed_hz}hz')
# calibrate the EMG
# this will only be necessary if different users get wildly different values
def calibrate(self):
pass
def read_analog(self):
read = self.spi.xfer2([1, (8 + self.channel) << 4, 0])
data = ((read[1] & 3) << 8) + read[2]
return data
def read_percent(self):
"""
returns a emg as a percentage of the maximum acceptable range
"""
MAX = 600
return (self.read_analog() / 600) * 100
"""
# get the current value from the EMG
def get_raw(self):
return self.pin.value
# read the emg to a buffer and return it to be normalized
def read_sequential(self, amount=100):
emg_buf = []
for i in range(amount):
emg_buf.append(self.get_raw())
return np.array(emg_buf)
def get_normalized(self):
buf = self.read_sequential()
# filter data (linear filter)
emg_filtered = sp.signal.filtfilt(self.b, self.a, buf)
# rectiy the filtered data
emg = abs(emg_filtered) / BASELINE
return emg
"""
### this section is courtesy of https://github.com/doug-burrell/max30102 ###
# register addresses
REG_INTR_STATUS_1 = 0x00
REG_INTR_STATUS_2 = 0x01
REG_INTR_ENABLE_1 = 0x02
REG_INTR_ENABLE_2 = 0x03
REG_FIFO_WR_PTR = 0x04
REG_OVF_COUNTER = 0x05
REG_FIFO_RD_PTR = 0x06
REG_FIFO_DATA = 0x07
REG_FIFO_CONFIG = 0x08
REG_MODE_CONFIG = 0x09
REG_SPO2_CONFIG = 0x0A
REG_LED1_PA = 0x0C
REG_LED2_PA = 0x0D
REG_PILOT_PA = 0x10
REG_MULTI_LED_CTRL1 = 0x11
REG_MULTI_LED_CTRL2 = 0x12
REG_TEMP_INTR = 0x1F
REG_TEMP_FRAC = 0x20
REG_TEMP_CONFIG = 0x21
REG_PROX_INT_THRESH = 0x30
REG_REV_ID = 0xFE
REG_PART_ID = 0xFF
class MAX30102():
# by default, this assumes that the device is at 0x57 on channel 1
def __init__(self, channel=1, address=0x57):
logging.basicConfig(level=logging.DEBUG, format='[%(asctime)s] [%(name)s] [%(levelname)s]: %(message)s')
logger = logging.getLogger('MAX30102')
self.logger = logger
self.logger.info('Channel: {0}, address: {1}'.format(channel, address))
self.address = address
self.channel = channel
self.bus = smbus.SMBus(self.channel)
self.reset()
sleep(1) # wait 1 sec
# read & clear interrupt register (read 1 byte)
reg_data = self.bus.read_i2c_block_data(self.address, REG_INTR_STATUS_1, 1)
# print("[SETUP] reset complete with interrupt register0: {0}".format(reg_data))
self.setup()
# print("[SETUP] setup complete")
self.logger.info('initialized successfully')
def shutdown(self):
"""
Shutdown the device.
"""
self.logger.info('shutting down')
try:
self.bus.write_i2c_block_data(self.address, REG_MODE_CONFIG, [0x80])
except:
self.logger.critical('failed to shutdown')
def reset(self):
"""
Reset the device, this will clear all settings,
so after running this, run setup() again.
"""
self.logger.info('reseting')
self.bus.write_i2c_block_data(self.address, REG_MODE_CONFIG, [0x40])
def setup(self, led_mode=0x03):
"""
This will setup the device with the values written in sample Arduino code.
"""
self.logger.info('perfoming setup routine')
# INTR setting
# 0xc0 : A_FULL_EN and PPG_RDY_EN = Interrupt will be triggered when
# fifo almost full & new fifo data ready
self.bus.write_i2c_block_data(self.address, REG_INTR_ENABLE_1, [0xc0])
self.bus.write_i2c_block_data(self.address, REG_INTR_ENABLE_2, [0x00])
# FIFO_WR_PTR[4:0]
self.bus.write_i2c_block_data(self.address, REG_FIFO_WR_PTR, [0x00])
# OVF_COUNTER[4:0]
self.bus.write_i2c_block_data(self.address, REG_OVF_COUNTER, [0x00])
# FIFO_RD_PTR[4:0]
self.bus.write_i2c_block_data(self.address, REG_FIFO_RD_PTR, [0x00])
# 0b 0100 1111
# sample avg = 4, fifo rollover = false, fifo almost full = 17
self.bus.write_i2c_block_data(self.address, REG_FIFO_CONFIG, [0x4f])
# 0x02 for read-only, 0x03 for SpO2 mode, 0x07 multimode LED
self.bus.write_i2c_block_data(self.address, REG_MODE_CONFIG, [led_mode])
# 0b 0010 0111
# SPO2_ADC range = 4096nA, SPO2 sample rate = 100Hz, LED pulse-width = 411uS
self.bus.write_i2c_block_data(self.address, REG_SPO2_CONFIG, [0x27])
# choose value for ~7mA for LED1
self.bus.write_i2c_block_data(self.address, REG_LED1_PA, [0x24])
# choose value for ~7mA for LED2
self.bus.write_i2c_block_data(self.address, REG_LED2_PA, [0x24])
# choose value fro ~25mA for Pilot LED
self.bus.write_i2c_block_data(self.address, REG_PILOT_PA, [0x7f])
# this won't validate the arguments!
# use when changing the values from default
def set_config(self, reg, value):
self.bus.write_i2c_block_data(self.address, reg, value)
def get_data_present(self):
try:
read_ptr = self.bus.read_byte_data(self.address, REG_FIFO_RD_PTR)
write_ptr = self.bus.read_byte_data(self.address, REG_FIFO_WR_PTR)
except:
self.logger.critical('failed to retrieve data to determine if real data is present', exc_info=True)
if read_ptr == write_ptr:
return 0
else:
num_samples = write_ptr - read_ptr
# account for pointer wrap around
if num_samples < 0:
num_samples += 32
return num_samples
def read_fifo(self):
"""
This function will read the data register.
"""
red_led = None
ir_led = None
# read 1 byte from registers (values are discarded)
reg_INTR1 = self.bus.read_i2c_block_data(self.address, REG_INTR_STATUS_1, 1)
reg_INTR2 = self.bus.read_i2c_block_data(self.address, REG_INTR_STATUS_2, 1)
# read 6-byte data from the device
d = self.bus.read_i2c_block_data(self.address, REG_FIFO_DATA, 6)
# mask MSB [23:18]
red_led = (d[0] << 16 | d[1] << 8 | d[2]) & 0x03FFFF
ir_led = (d[3] << 16 | d[4] << 8 | d[5]) & 0x03FFFF
return red_led, ir_led
def read_sequential(self, amount=100):
"""
This function will read the red-led and ir-led `amount` times.
This works as blocking function.
"""
red_buf = []
ir_buf = []
bpms = []
while True:
num_bytes = self.get_data_present()
if num_bytes > 0:
while num_bytes > 0:
red, ir = self.read_fifo()
num_bytes -= 1
red_buf.append(red)
ir_buf.append(ir)
while len(ir_buf) > amount:
ir_buf.pop(0)
red_buf.pop(0)
if len(ir_buf) == amount:
return red_buf, ir_buf
"""
count = amount
while count > 0:
num_bytes = self.get_data_present()
if num_bytes > 0:
while num_bytes > 0:
red, ir = self.read_fifo()
red_buf.append(red)
ir_buf.append(ir)
num_bytes -= 1
count -= 1
return red_buf, ir_buf
"""