-
Notifications
You must be signed in to change notification settings - Fork 0
/
bandplay.py
304 lines (257 loc) · 9.06 KB
/
bandplay.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
import signal
import sys
import time
from os.path import exists
import keyboard
import sounddevice as sd
import soundfile as sf
import threading
import os
import getopt
import config
import prebuild
import numpy as np
from scipy.signal import resample
# Initial play states
class PlayState:
running = False
paused = False
def resample_files(entry_path):
for (paths, dirs, files) in os.walk(entry_path, topdown=True):
for file in files:
if file.endswith(".wav"):
sound_file = os.path.join(paths, file)
resample_file = sound_file + ".resample"
# Check samplerate
sr = sf.SoundFile(sound_file).samplerate
if sr != 44100:
print("Resample file: '" + sound_file + "' (samplerate=" + str(sr), end=") ", flush=True)
with sf.SoundFile(sound_file) as f:
track_array = f.read(dtype='float32')
secs = len(track_array) / f.samplerate
samples = int(secs * 44100) + 1
rwv = resample(track_array, samples)
# Write a new resampled file with 44100 Hz
sf.write(resample_file, rwv, format='WAV', samplerate=44100)
# Rename resampled file to original file
os.rename(resample_file, sound_file)
print("done!")
def get_path():
# Get the passed args
try:
opts, args = getopt.getopt(sys.argv[1:3], "d:")
except getopt.GetoptError:
print("Usage: bandplay.py -d <path-to-config>")
sys.exit(2)
# Check if args are valid
if not opts or opts[0][0] != "-d":
print("Usage: bandplay.py -d <path-to-config>")
sys.exit(2)
track_dir = opts[0][1]
# Check if the file exists
if not exists(track_dir):
print("Could not find dir: " + track_dir)
sys.exit(2)
# Return the config path
return track_dir
def get_playlist_file():
playlist_file = ""
try:
# Get the passed playlist
opts, args = getopt.getopt(sys.argv[3:], "p:")
playlist_file = opts[0][1]
except:
pass
# Return the playlist
return playlist_file
def choose_track_dir(path):
counter = 0
dir_index = []
# List dirs for user
for dirs in os.listdir(path):
if os.path.isdir(path+dirs):
print(str(counter + 1) + ")", dirs)
# Save dirs in array
dir_index.append(dirs)
counter += 1
# Get the users input
try:
user_input = int(input("Choose track dir: ")) - 1
except ValueError:
print("Didnt chose a track? Try again...")
track_dir = choose_track_dir(path)
return track_dir
track_dir = os.path.join(path, dir_index[user_input])
# check if tracks exist
if not os.listdir(track_dir):
print("No files in directory. Exiting.")
sys.exit(1)
# return the joined filepath
return track_dir
def open_stream(device, track_dir):
if device.song_ch and device.click_ch:
print("Opened Stream on device: " + device.name + " (Multichannel)")
else:
print("Opened Stream on device: ", device.name)
# Open and start the output stream on device channels
output = sd.OutputStream(device=device.name, channels=device.channels)
output.start()
# Return stream object
return output
def play_track(device, track_dir, state):
# Set block size
block_size = 2048
# Create the track array with X channels
# track_array = np.empty((2, device.channels), dtype='float32')
#
# # Playback on MULTIPLE DEVICES (stereo)
# # If device is for playing a Song
# if not device.click_ch:
# path = os.path.join(track_dir, "song.wav")
# if os.path.exists(path):
# with sf.SoundFile(path) as f:
# # Load track into numpy array
# track_array = f.read(dtype='float32')
#
# # If device is for playing a Click
# if not device.song_ch:
# path = os.path.join(track_dir, "click.wav")
# if os.path.exists(path):
# with sf.SoundFile(path) as f:
# # Load track into numpy array
# track_array = f.read(dtype='float32')
#
# # Playback on MULTI CHANNEL device
device_track_path = os.path.join(track_dir, "prebuild", str(device.name + ".wav"))
# # Check if there is a prebuild file
# if os.path.exists(device_track_path):
# print("Playing prebuild track...")
# # Load prebuild device track
# with sf.SoundFile(device_track_path) as f:
# print("loadin track")
# track_array = f.read(dtype='float32')
#
# print("done")
# # Get track- and block size
# play_size = track_array[:, 0].size
# block_count = play_size / block_size
#
# start_block = 0
# end_block = block_size
#
#
# # Go through track in steps of <block_size>
# for i in range(int(block_count)):
# # Save the block of data in block var (block of 2048 lists of X channels)
# block = track_array[start_block:end_block, :]
#
# # Set the start/end of the next block
# start_block += block_size
# end_block += block_size
for blocks in sf.blocks(device_track_path, blocksize=block_size,dtype='float32'):
# Handle Resume/Play
while state.paused:
time.sleep(1)
if state.running:
# Write current block to output stream
device.stream.write(blocks)
if not state.running:
break
# Close Output Stream
device.stream.close()
state.running = False
print("stream finished")
def play(track_dir, device_list):
# Create a state object
state = PlayState()
threads = []
for device in device_list:
# Open device stream
device.stream = open_stream(device, track_dir)
# Open Thread for every device
threads.append(threading.Thread(target=play_track, args=[device, track_dir, state]))
try:
print("Press Ctrl+C to stop")
print("Starting...")
# Start all threads
for thread in threads:
print("thread started")
thread.start()
# Set a running state
state.running = True
# Handle Play and Resume
while thread_is_alive(threads):
if keyboard.is_pressed("space") and state.paused:
print("Resume")
state.paused = False
time.sleep(0.5)
if keyboard.is_pressed("space"):
print("Pause")
state.paused = True
time.sleep(0.5)
time.sleep(0.2)
except KeyboardInterrupt:
print("Ctrl+C pressed. Stopping...")
state.running = False
state.paused = False
time.sleep(1)
# Close Output Streams
for device in device_list:
device.stream.close()
time.sleep(1)
# Hard kill threads if still alive
if thread_is_alive(threads):
print("hard kill threads")
os.kill(os.getpid(), signal.SIGINT)
def thread_is_alive(threads):
# Check if any thread is still alive
for thread in threads:
if thread.is_alive():
return True
return False
def main():
# Create a default device file
if not config.check_default():
print("No config file found")
config.write_default()
# Read device list from file
device_list = config.read_default()
# Get the entry path
entry_path = get_path()
# entry_path = r"C:\Users\rhe\Downloads\tracks"
# Resample files (if necessary)
resample_files(entry_path)
# Write/Prebuild track arrays if MULTICHANNEL device
for track in os.listdir(entry_path):
path = os.path.join(entry_path, track)
if os.path.isdir(path):
for device in device_list:
# if device is MULTICHANNEL and no prebuild exists
if device.song_ch and device.click_ch:
if not os.path.exists(os.path.join(path, "prebuild", str(device.name + ".wav"))):
print("Pre-build files for track '" + track + "' and device '" + device.name + "'...", end="")
prebuild.build_soundfiles(path, device)
print("done!")
# Check for given Playlist
if get_playlist_file():
# Read playlist file
playlist = config.read_playlist(entry_path, get_playlist_file())
# Go through every track in playlist
for track in playlist:
input("Press ENTER to play next track")
print("playing: " + track)
track_dir = os.path.join(entry_path, track)
play(track_dir, device_list)
else:
try:
while True:
print("Press CTRL+C to exit program")
# Let user choose a track dir
track_dir = choose_track_dir(entry_path)
# Play choosen track
play(track_dir, device_list)
except KeyboardInterrupt:
print("")
return
if __name__ == "__main__":
main()