-
Notifications
You must be signed in to change notification settings - Fork 0
/
multiple.py
160 lines (140 loc) · 5.4 KB
/
multiple.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
import platform
import sys
import subprocess
import multiprocessing
import re
import time
import numpy as np
if sys.version_info[0] < 3:
from bitalino import BITalino
else:
from bitalino3X import BITalino
import dataCollection as dc
if platform.system() != 'Linux':
print("Sorry, this script currently can only run linux systems.")
print("Exiting...")
# OS Specific Initializations
#clearCmd = "cls||clear"
#if platform.system() == 'Windows':
# clearCmd = "cls"
# print("Using Windows default console size 80x24")
# columns = 80
# rows = 24
#else:
# clearCmd = "clear"
# rows, columns = os.popen('stty size', 'r').read().split()
defaultMacAddresses = ["20:16:12:21:98:56", "20:16:12:22:01:29"]
# Parsing Arguments
if len(sys.argv) == 1:
print("Run with flag '--help' or '-h' for instructions.")
print("Now using default settings.\n")
macAddresses = defaultMacAddresses
acqChannels = [0, 1, 2, 3, 4, 5]
samplingRate = 100
print("Please type the path to the video here:")
vidPath = sys.stdin.readline().rstrip()
elif any(s in ["--help", "-h"] for s in sys.argv):
print("\nThis script connects to a BITalino device via Bluetooth, get readings for 1 min for baseline, then plays a video and records the readings during the video.")
print("\nRun without flags to use default settings.\n")
print("Flags:\n")
print("\t--mac-addresses [MAC Addresses of the devices, in a comma seperated list]")
print("\t\t If '--mac-addresses' is not set, default MAC Addresses " + str(defaultMacAddresses) + " will be used.\n")
print("\t--channels [Comma seperated list of integers from 0 - 5, representing channels]")
print("\t\t If '--channels' is not set, all channels [0,1,2,3,4,5] will be monitored")
print("\t\t Example: 0,2,3 to monitor channels A1, A3, A4\n")
print("\t--sampling-rate [Sampling Rate in Hz]")
print("\t\tIf '--sampling-rate' is not set, default sampling rate of 100 Hz will be used.")
print("\t\tSampling Rate can be 1, 10, 100 or 100 Hz.\n")
print("\t--video [Path to video file]")
print("\t\tIf '--video' flag is not set, the script will ask you for video during execution\n")
print("\t--help (-h)")
print("\t\tShow this screen.\n")
exit()
else:
# --mac-address flag
try:
i = sys.argv.index("--mac-addresses")
macAddresses = sys.argv[i+1].split(',')
except:
macAddresses = defaultMacAddresses
print("No MAC Address set, using default MAC Address: " + str(macAddresses))
# --channels flag
try:
i = sys.argv.index("--channels")
channels = sys.argv[i+1]
acqChannels = list(map(int, channels.split(',')))
except:
acqChannels = [0, 1, 2, 3, 4, 5]
print("No channels set, monitoring all channels.")
# --sampling-rate flag
try:
i = sys.argv.index("--sampling-rate")
samplingRate = int(sys.argv[i+1])
except:
samplingRate = 100
print("No sampling rate set, using default sampling rate of 100 Hz.")
# --video flag
try:
i = sys.argv.index("--video")
vidPath = sys.argv[i+1]
except:
print("Please type the path to the video here:")
vidPath = sys.stdin.readline().rstrip()
# Setting other attributes
batteryThreshold = 30
# Connecting to the BITalino devices
devices = []
for addr in macAddresses :
print("Connecting to " + addr)
devices.append(BITalino(addr))
print(addr + " connected.")
# Initializing Devices
for i in range(len(devices)) :
devices[i].battery(batteryThreshold)
print(macAddresses[i] + " version: " + str(devices[i].version()))
devices[i].start(samplingRate, acqChannels)
# Sampling for baseline
print("The data collected will be stored in PyBitSignals_<MAC Address>_<date>_<time>.txt")
outputFiles = []
processes = []
# Initialize output files, setup processes
for i in range(len(devices)) :
filename = "PyBitSignals_" + re.sub(':', '', macAddresses[i]) + "_" + time.strftime("%Y-%m-%d_%H-%M-%S") + ".txt"
outputFile = dc.initOutput(filename, macAddresses[i], acqChannels, samplingRate)
outputFiles.append(outputFile)
processes.append(multiprocessing.Process(target=dc.writeOutTimed, args=(outputFile, devices[i], acqChannels, samplingRate, 60)))
# Start baseline sampling for each device
for p in processes:
p.start()
# Exit the completed processes
for p in processes:
p.join()
# Open Video and record data
print("Play video and record data...")
seconds = dc.videoLength(vidPath)
processes = []
for i in range(len(devices)):
processes.append(multiprocessing.Process(target=dc.writeOutTimed, args=(outputFiles[i], devices[i], acqChannels, samplingRate, seconds)))
try:
vidProc = subprocess.Popen(["mplayer","-fs", vidPath])
vidStartTime = time.time()
for p in processes:
p.start()
for p in processes:
p.join()
except OSError:
print("mplayer not found")
print("Would you like to install mplayer? [Y/N]")
option = sys.stdin.readline().rstrip()
if(option == "Y"):
# Install mplayer using apt-get
print("Installing mplayer using apt-get. Will require password for sudo")
subprocess.call(["sudo", "apt-get", "install", "mplayer"])
print("Exiting...")
exit()
print("Video finished.\n")
print("Video started at " + str(vidStartTime) + " or in human language: " + str(time.ctime(vidStartTime)))
print("Done")
for d in devices:
d.stop()
d.close()