forked from Ghadjeres/DeepBach
-
Notifications
You must be signed in to change notification settings - Fork 0
/
data_utils.py
1562 lines (1333 loc) · 56 KB
/
data_utils.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
939
940
941
942
943
944
945
946
947
948
949
950
951
952
953
954
955
956
957
958
959
960
961
962
963
964
965
966
967
968
969
970
971
972
973
974
975
976
977
978
979
980
981
982
983
984
985
986
987
988
989
990
991
992
993
994
995
996
997
998
999
1000
#!/usr/bin/env python
# -*- coding: utf-8 -*-
"""
Created on 7 mars 2016
@author: Gaetan Hadjeres
"""
import pickle
from tqdm import tqdm
import numpy as np
from music21 import corpus, converter, stream, note, duration
NUM_VOICES = 4
SUBDIVISION = 4 # quarter note subdivision
BITS_FERMATA = 2 # number of bits needed to encode fermata
RANGE_FERMATA = 3 # 3 beats before fermatas
SPACING_FERMATAS = 12 # in beats
FERMATAS_LENGTH = 1 # in beats
P_INDEX = 0 # pitch index in representation
A_INDEX = 1 # articulation index in representation
F_INDEX = 2 # fermata index in representation
BEAT_SIZE = 4
OCTAVE = 12
RAW_DATASET = 'datasets/raw_dataset/bach_chorales.pickle'
RAW_DATASET_NO_TRANSPOSITION = 'datasets/' \
'raw_dataset/' \
'bach_chorales_no_transposition.pickle'
voice_ids = list(range(NUM_VOICES)) # soprano, alto, tenor, bass
def filter_file_list(file_list):
"""
Only retain 4 voices chorales
"""
l = []
for file_name in file_list:
c = converter.parse(file_name)
if len(c.parts) == NUM_VOICES and c.parts[0].id == 'Soprano':
# Retain only chorals with fermatas
if len(list(filter(lambda n: len(n.expressions) == 1, c.parts[0].flat.notes))) > 0:
l.append(file_name)
return l
def compute_min_max_pitches(file_list, voices=[0]):
"""
Removes wrong chorales
:param file_list:
:type voices: list containing voices ids
:returns: two lists min_p, max_p containing min and max pitches for each voice
"""
min_p, max_p = [128] * len(voices), [0] * len(voices)
to_remove = []
for file_name in file_list:
choral = converter.parse(file_name)
for k, voice_id in enumerate(voices):
try:
c = choral.parts[voice_id] # Retain only voice_id voice
l = list(map(lambda n: n.pitch.midi, c.flat.notes))
min_p[k] = min(min_p[k], min(l))
max_p[k] = max(max_p[k], max(l))
except AttributeError:
to_remove.append(file_name)
for file_name in set(to_remove):
file_list.remove(file_name)
return np.array(min_p), np.array(max_p)
def part_to_list(part):
"""
:rtype: np.ndarray
Returns (part_length, 2) matrix
t[0] = (pitch, articulation)
"""
length = int(part.duration.quarterLength * SUBDIVISION) # in 16th notes
list_notes = part.flat.notes
num_notes = len(list_notes)
j = 0
i = 0
t = np.zeros((length, 2))
is_articulated = True
while i < length:
if j < num_notes - 1:
if list_notes[j + 1].offset > i / SUBDIVISION:
t[i, :] = [list_notes[j].pitch.midi, is_articulated]
i += 1
is_articulated = False
else:
j += 1
is_articulated = True
else:
t[i, :] = [list_notes[j].pitch.midi, is_articulated]
i += 1
is_articulated = False
return t
def part_to_list_with_fermata(part):
"""
:rtype: np.ndarray
Returns (part_length, 3) matrix
t[0] = (pitch, articulation, fermata)
"""
length = int(part.duration.quarterLength * SUBDIVISION) # in 16th notes
list_notes = part.flat.notes
num_notes = len(list_notes)
j = 0
i = 0
t = np.zeros((length, 3))
is_articulated = True
fermata = False
while i < length:
if j < num_notes - 1:
if list_notes[j + 1].offset > i / SUBDIVISION:
if len(list_notes[j].expressions) == 1:
fermata = True
else:
fermata = False
# fermata = fermata and is_articulated
t[i, :] = [list_notes[j].pitch.midi, is_articulated, fermata]
i += 1
is_articulated = False
else:
j += 1
is_articulated = True
else:
if len(list_notes[j].expressions) == 1:
fermata = True
else:
fermata = False
# fermata = fermata and is_articulated
t[i, :] = [list_notes[j].pitch.midi, is_articulated, fermata]
i += 1
is_articulated = False
return t
def chorale_to_input(chorale_file, voice_id=0):
"""
Returns a list of [pitch, articulation]
"""
part = converter.parse(chorale_file).parts[voice_id]
# assert part.id == 'Soprano'
return part_to_list(part)
def chorale_to_inputs(chorale_file, num_voices=None):
"""
Returns a numpy array [voices, time, (pitch, articulation)]
:param chorale_file:
:param num_voices:
:return:
"""
mat = []
for voice_id in range(num_voices):
mat.append(chorale_to_input(chorale_file, voice_id=voice_id))
return np.array(mat)
def chorale_to_inputs_with_fermata(chorale_file, num_voices=None):
"""
Returns a numpy array [voices, time, (pitch, articulation, fermata)]
:param chorale_file:
:param num_voices:
:return:
"""
mat = []
for voice_id in range(num_voices):
mat.append(chorale_to_input_with_fermata(chorale_file, voice_id=voice_id))
return np.array(mat)
def chorale_to_input_with_fermata(chorale_file, voice_id=0):
"""
Returns a list of [pitch, articulation, fermata]
"""
choral = converter.parse(chorale_file)
part = choral.parts[voice_id]
sop = choral.parts[0]
assert sop.id == 'Soprano'
sop = part_to_list_with_fermata(sop)
part = part_to_list_with_fermata(part)
# copy fermatas
for k, e in enumerate(sop):
part[k] = [part[k][0], part[k][1], e[2]]
return part
def pitch_column_to_one_hot(col, MIN_PITCH, MAX_PITCH):
"""
:param col:
:param MIN_PITCH: scalar !
:param MAX_PITCH: scalar !
:return:
"""
return np.vectorize(lambda x: x in col)(np.arange(MIN_PITCH, MAX_PITCH + 1))
def to_beat(time, timesteps=None):
"""
time is given in the number of 16th notes
put timesteps=None to return only current beat
Returns metrical position one-hot encoded
IMPORTANT, right_beats is REVERSED
"""
beat = [0] * 4
beat[time % 4] = 1
if timesteps is None:
return beat
left_beats = np.array(list(map(lambda x: p_to_onehot(x, 0, 3),
np.arange(time - timesteps, time) % 4)))
right_beats = np.array(list(map(lambda x: p_to_onehot(x, 0, 3),
np.arange(time + timesteps, time, -1) % 4)))
return left_beats, np.array(beat), right_beats
def is_fermata(time):
"""
Returns a boolean
custom function
:param time:
:return:
"""
# evenly spaced fermatas
return (time // SUBDIVISION) % SPACING_FERMATAS < FERMATAS_LENGTH
# for god save the queen
# must add timesteps when
# fermatas_god = np.concatenate((np.arange(5 * 12, 6 * 12) + 16,
# np.arange(13 * 12, 14 * 12) + 16))
# return time in fermatas_god
# no fermata
# return 0
def fermata_melody_to_fermata(time, timesteps=None, fermatas_melody=None):
"""
time is given in 16th notes
put timesteps=None only returns the current fermata
one hot encoded
:param time:
:param timesteps:
:return:
"""
# custom formula for fermatas
if fermatas_melody is None:
print('Error in fermata_melody_to_fermata, fermatas_melody is None')
central_fermata = p_to_onehot(fermatas_melody[time], min_pitch=0, max_pitch=1)
if timesteps is None:
return central_fermata
fermatas_left = np.array(list(map(lambda f: p_to_onehot(f,
min_pitch=0,
max_pitch=1),
fermatas_melody[time - timesteps: time])))
fermatas_right = np.array(list(map(lambda f: p_to_onehot(f,
min_pitch=0,
max_pitch=1),
fermatas_melody[time + timesteps: time: -1])))
return fermatas_left, central_fermata, fermatas_right
def to_fermata(time, timesteps=None):
"""
time is given in 16th notes
put timesteps=None only returns the current fermata
one hot encoded
:param time:
:param timesteps:
:return:
"""
# custom formula for fermatas
central_fermata = p_to_onehot(is_fermata(time), min_pitch=0, max_pitch=1)
if timesteps is None:
return central_fermata
fermatas_left = np.array(list(map(lambda f: p_to_onehot(is_fermata(f),
min_pitch=0,
max_pitch=1),
np.arange(time - timesteps, time))))
fermatas_right = np.array(list(map(lambda f: p_to_onehot(is_fermata(f),
min_pitch=0,
max_pitch=1),
np.arange(time + timesteps, time, -1))))
return fermatas_left, central_fermata, fermatas_right
def inputs_to_feature(inputs, voice_id, initial_beat=0):
"""
Arguments: inputs list of input
Returns: features for voice voice_id
features : previous_pitch * simultaneous_above_pitch * articulation * beat
:param voice_id: so that a voice depends only on the preceding voices
"""
beat_length = len(to_beat(0))
feature = np.zeros((inputs[voice_id].shape[0], inputs[voice_id].shape[1] + beat_length))
for k, pitch_and_articulation in enumerate(inputs[voice_id]):
feature[k, :] = np.concatenate((pitch_and_articulation, to_beat(k + initial_beat)))
return feature
def inputs_to_feature_with_fermata(inputs, voice_index, initial_beat=0):
"""
Arguments: inputs list of input containing fermatas
Returns: features for voice voice_index in inputs
features : previous_pitch * articulation * beat
"""
beat_length = len(to_beat(0))
feature = np.zeros((inputs[voice_index].shape[0],
inputs[voice_index].shape[1] - 1 + BITS_FERMATA + beat_length))
for k, pitch_and_articulation_and_fermata in enumerate(inputs[voice_index]):
feature[k, :] = np.concatenate((pitch_and_articulation_and_fermata[:2],
next_fermata_within(inputs, voice_index, k),
to_beat(k + initial_beat)))
return feature
def next_fermata_within(inputs, voice_id, index, range_fermata=RANGE_FERMATA):
"""
:param range_fermata:
:param inputs:
:param voice_id:
:param index:
:return:
"""
# if fermata
num = 0
if inputs[voice_id][index][2]:
num = 0
else:
for k in range(index, len(inputs[voice_id])):
if inputs[voice_id][k][2]:
num = ((k - k % SUBDIVISION) - (index - index % SUBDIVISION)) // SUBDIVISION
break
if num <= range_fermata:
return np.array([0, 1], dtype=np.int32)
else:
return np.array([1, 0], dtype=np.int32)
def next_fermata_in(inputs, voice_id, index):
# if fermata
num = 0
if inputs[voice_id][index][2]:
num = 0
else:
for k in range(index, len(inputs[voice_id])):
if inputs[voice_id][k][2]:
num = ((k - k % SUBDIVISION) - (index - index % SUBDIVISION)) / SUBDIVISION
break
return np.array(list(map(lambda x: x == num, range(BITS_FERMATA))), dtype=np.int32)
def input_to_feature_with_fermata(input, initial_beat=0):
return inputs_to_feature_with_fermata([input], voice_index=0, initial_beat=initial_beat)
def input_to_feature(input, initial_beat=0):
return inputs_to_feature([input], voice_id=0, initial_beat=initial_beat)
def feature_to_onehot_feature(feature, NUM_PITCHES, MIN_PITCH, MAX_PITCH):
"""
must only apply onehot encoding to first column
"""
onehot = np.zeros((feature.shape[0], NUM_PITCHES + feature.shape[1] - 1))
onehot[:, 0:NUM_PITCHES] = np.array(list(map(lambda col:
pitch_column_to_one_hot(col, MIN_PITCH, MAX_PITCH),
feature[:, 0][:, None])), dtype=np.int32)
onehot[:, NUM_PITCHES:] = feature[:, 1:]
return onehot
def fusion_features(Xs, voice_index, file_index=None):
"""
Balanced fusion. Covers all cases
if file_index is None: an element of Xs is given by
Xs[voice_index] [t, features]
else by
Xs[voice_index][file_index] [t, features]
Returns shorter sequences
"""
if file_index is not None:
total_features = 0
# X[voice_index][file_index] [t, features]
for X in Xs:
# WARNING 4 STANDS FOR THE NUMBER OF BITS USED BY THE BEAT REPRESENTATION
total_features += X[file_index].shape[1] - 4
total_features += 4 # Because we keep one beat
fusion = np.zeros((Xs[voice_index][file_index].shape[0] - 1, total_features))
for k, vect in enumerate(Xs[voice_index][file_index][:-1, :]):
i = 0
for var_voice_index, X in enumerate(Xs):
feature = X[file_index]
if var_voice_index < voice_index:
fusion[k, i:i + feature.shape[1] - 4] = feature[k + 1, :-4]
i += feature.shape[1] - 4
elif var_voice_index > voice_index:
fusion[k, i:i + feature.shape[1] - 4] = feature[k, :-4]
i += feature.shape[1] - 4
# original features at the end
fusion[k, i:] = vect
assert i + len(vect) == total_features
# print(fusion.shape, voice_index)
return fusion
else:
total_features = 0
# X[voice_index][file_index] [t, features]
for X in Xs:
### WARNING 4 STANDS FOR THE NUMBER OF BITS USED BY THE BEAT REPRESENTATION
total_features += X.shape[1] - 4
# print(X[file_index].shape[1])
total_features += 4 # Because we keep one beat
fusion = np.zeros((Xs[voice_index].shape[0] - 1, total_features))
for k, vect in enumerate(Xs[voice_index][:-1, :]):
i = 0
for var_voice_index, X in enumerate(Xs):
feature = X
if var_voice_index < voice_index:
fusion[k, i:i + feature.shape[1] - 4] = feature[k + 1, :-4]
i += feature.shape[1] - 4
elif var_voice_index > voice_index:
fusion[k, i:i + feature.shape[1] - 4] = feature[k, :-4]
i += feature.shape[1] - 4
# original features at the end
fusion[k, i:] = vect
assert i + len(vect) == total_features
# print(fusion.shape, voice_index)
return fusion
def fusion_features_with_fermata(Xs, voice_index, file_index=None):
"""
"""
num_last_bits_removed = BEAT_SIZE + BITS_FERMATA
if file_index is not None:
total_features = 0
# X[voice_index][file_index] [t, features]
for X in Xs:
total_features += X[file_index].shape[1] - num_last_bits_removed
total_features += num_last_bits_removed # Because we keep one beat
fusion = np.zeros((Xs[voice_index][file_index].shape[0] - 1, total_features))
for k, vect in enumerate(Xs[voice_index][file_index][:-1, :]):
i = 0
for var_voice_index, X in enumerate(Xs):
feature = X[file_index]
if var_voice_index < voice_index:
fusion[k, i:i + feature.shape[1] - num_last_bits_removed] = feature[k + 1, :-num_last_bits_removed]
i += feature.shape[1] - num_last_bits_removed
elif var_voice_index > voice_index:
fusion[k, i:i + feature.shape[1] - num_last_bits_removed] = feature[k, :-num_last_bits_removed]
i += feature.shape[1] - num_last_bits_removed
# original features at the end
fusion[k, i:] = vect
assert i + len(vect) == total_features
# print(fusion.shape, voice_index)
return fusion
else:
total_features = 0
# X[voice_index][file_index] [t, features]
for X in Xs:
total_features += X.shape[1] - num_last_bits_removed
# print(X[file_index].shape[1])
total_features += num_last_bits_removed # Because we keep one beat
fusion = np.zeros((Xs[voice_index].shape[0] - 1, total_features))
for k, vect in enumerate(Xs[voice_index][:-1, :]):
i = 0
for var_voice_index, X in enumerate(Xs):
feature = X
if var_voice_index < voice_index:
fusion[k, i:i + feature.shape[1] - num_last_bits_removed] = feature[k + 1, :-num_last_bits_removed]
i += feature.shape[1] - num_last_bits_removed
elif var_voice_index > voice_index:
fusion[k, i:i + feature.shape[1] - num_last_bits_removed] = feature[k, :-num_last_bits_removed]
i += feature.shape[1] - num_last_bits_removed
# original features at the end
fusion[k, i:] = vect
assert i + len(vect) == total_features
# print(fusion.shape, voice_index)
return fusion
def list_to_array(X):
return np.concatenate(X).reshape((len(X),) + X[0].shape)
def make_raw_dataset(file_list, dataset_name, num_voices=4, transpose=False, min_pitches=None, max_pitches=None,
voice_ids=None):
"""
:param file_list:
:param dataset_name:
:param num_voices:
:param transpose:
:return: tuple (X, min_pitches, max_pitches, num_voices) where X is list of (num_voices, time, 3)
"""
if min_pitches is None or max_pitches is None:
raise Exception('min_pitches and max_pitches must be provided as arguments. See compute_min_max_pitches')
# Choose whether to extract fermatas or not
to_inputs = chorale_to_inputs_with_fermata
X = []
for chorale_file in tqdm(file_list):
try:
inputs = to_inputs(chorale_file, num_voices=num_voices)
if not transpose:
X.append(inputs)
else:
# same transposition for every voice
max_transposition = 128
min_transposition = -128
for k, _ in enumerate(voice_ids):
max_transposition = min(max_transposition,
int(max_pitches[k] - max(inputs[k, :, P_INDEX])))
min_transposition = max(min_transposition,
min_pitches[k] - int(min(inputs[k, :, P_INDEX])))
for num_semitones in range(min_transposition, max_transposition + 1):
transposed_input = inputs.copy()
transposed_input[:, :, P_INDEX] += num_semitones
X.append(transposed_input)
except AttributeError:
pass
dataset = (X, min_pitches, max_pitches, num_voices)
pickle.dump(dataset, open(dataset_name, 'wb'), pickle.HIGHEST_PROTOCOL)
def make_datasets(file_list, dataset_names, voice_ids, MIN_PITCH, MAX_PITCH,
transpose=False, export_matlab=False):
"""
MIN_PITCH and MAX_PITCH are arrays !
pickles a dataset
simple version, only returns list of chorals
"""
assert len(dataset_names) == len(voice_ids)
print("Making datasets " + str(dataset_names))
inputs = []
X = []
y = []
for _ in range(len(dataset_names)):
inputs.append([])
X.append([])
y.append([])
for voice_index, voice_id in enumerate(voice_ids):
for file_index, file_name in enumerate(file_list):
inputs[voice_index].append(chorale_to_input(file_name, voice_id=voice_id))
if export_matlab:
import scipy.io as sio
sio.savemat('corpus4voices.mat', {'corpus': np.array(inputs)})
for voice_index, voice_id in enumerate(voice_ids):
for file_index, _ in enumerate(file_list):
try:
if not transpose:
y[voice_index].append(inputs[voice_index][file_index][1:, :])
X[voice_index].append(feature_to_onehot_feature(
input_to_feature(inputs[voice_index][file_index])[:-1, :],
MAX_PITCH[voice_index] - MIN_PITCH[voice_index] + 1,
MIN_PITCH[voice_index], MAX_PITCH[voice_index]))
else:
# same transposition for every voice
max_transposition = 128
min_transposition = -128
for k, _ in enumerate(voice_ids):
max_transposition = min(max_transposition,
int(MAX_PITCH[k] - max(inputs[k][file_index][:, 0])))
min_transposition = max(min_transposition,
MIN_PITCH[k] - int(min(inputs[k][file_index][:, 0])))
for num_semitones in range(min_transposition, max_transposition + 1):
transposed_input = translate_y([inputs[voice_index][file_index]], num_semitones)[0]
y[voice_index].append(transposed_input[1:, :])
X[voice_index].append(feature_to_onehot_feature(
inputs_to_feature([transposed_input], 0)[:-1, :],
MAX_PITCH[voice_index] - MIN_PITCH[voice_index] + 1,
MIN_PITCH[voice_index], MAX_PITCH[voice_index]))
except AttributeError:
print('print')
# Fusion features
num_chorals = len(X[voice_index])
XX = []
yy = []
for voice_index in range(len(voice_ids)):
XX.append([])
yy.append([])
for _ in range(num_chorals):
XX[voice_index].append([])
yy[voice_index].append([])
for voice_index, _ in enumerate(voice_ids):
for choral_index in range(num_chorals):
XX[voice_index][choral_index] = fusion_features(X, voice_index, choral_index)
yy[voice_index][choral_index] = y[voice_index][choral_index][:-1]
for voice_index, dataset_name in enumerate(dataset_names):
dataset = {'X': XX[voice_index], 'y': yy[voice_index],
'min_pitch': MIN_PITCH[voice_index], 'max_pitch': MAX_PITCH[voice_index]
}
pickle.dump(dataset, open(dataset_name, 'wb'), pickle.HIGHEST_PROTOCOL)
def make_datasets_with_fermata(file_list, dataset_names, voice_ids, MIN_PITCH, MAX_PITCH,
transpose=False, export_matlab=False):
"""
MIN_PITCH and MAX_PITCH are arrays !
pickles a dataset
simple version, only returns list of chorals
:rtype: object
"""
assert len(dataset_names) == len(voice_ids)
print("Making datasets " + str(dataset_names))
inputs = []
X = []
y = []
for _ in range(len(dataset_names)):
inputs.append([])
X.append([])
y.append([])
for voice_index, voice_id in enumerate(voice_ids):
for file_index, file_name in enumerate(file_list):
inputs[voice_index].append(chorale_to_input_with_fermata(file_name, voice_id=voice_id))
if export_matlab:
import scipy.io as sio
sio.savemat('corpus4voices.mat', {'corpus': np.array(inputs)})
for voice_index, voice_id in enumerate(voice_ids):
for file_index, _ in enumerate(file_list):
try:
if not transpose:
# Discard fermatas for labels ( [: 2])
y[voice_index].append(inputs[voice_index][file_index][1:, :2])
X[voice_index].append(feature_to_onehot_feature(
input_to_feature_with_fermata(inputs[voice_index][file_index], voice_index)[:-1, :],
MAX_PITCH[voice_index] - MIN_PITCH[voice_index] + 1,
MIN_PITCH[voice_index], MAX_PITCH[voice_index]))
else:
# same transposition for every voice
max_transposition = 128
min_transposition = -128
for k, _ in enumerate(voice_ids):
max_transposition = min(max_transposition,
int(MAX_PITCH[k] - max(inputs[k][file_index][:, 0])))
min_transposition = max(min_transposition,
MIN_PITCH[k] - int(min(inputs[k][file_index][:, 0])))
for num_semitones in range(min_transposition, max_transposition + 1):
transposed_input = translate_y([inputs[voice_index][file_index]], num_semitones)[0]
# Discard fermatas for labels ( [: 2])
y[voice_index].append(transposed_input[1:, :2])
X[voice_index].append(feature_to_onehot_feature(
inputs_to_feature_with_fermata([transposed_input], 0)[:-1, :],
MAX_PITCH[voice_index] - MIN_PITCH[voice_index] + 1,
MIN_PITCH[voice_index], MAX_PITCH[voice_index]))
except AttributeError:
print('print')
# Fusion features
num_chorals = len(X[voice_index])
XX = []
yy = []
for voice_index in range(len(voice_ids)):
XX.append([])
yy.append([])
for _ in range(num_chorals):
XX[voice_index].append([])
yy[voice_index].append([])
for voice_index, _ in enumerate(voice_ids):
for choral_index in range(num_chorals):
XX[voice_index][choral_index] = fusion_features_with_fermata(X, voice_index, choral_index)
yy[voice_index][choral_index] = y[voice_index][choral_index][:-1]
for voice_index, dataset_name in enumerate(dataset_names):
dataset = {'X': XX[voice_index], 'y': yy[voice_index],
'min_pitch': MIN_PITCH[voice_index], 'max_pitch': MAX_PITCH[voice_index]
}
pickle.dump(dataset, open(dataset_name, 'wb'), pickle.HIGHEST_PROTOCOL)
def zero_padding(mat, size=16):
m = np.array(mat)
zeros_shape = (size,) + m.shape[1:]
return np.concatenate((np.zeros(zeros_shape),
m,
np.zeros(zeros_shape)))
def unpickle(pickle_file):
with open(pickle_file, 'rb') as f:
save = pickle.load(f)
X = save['X']
y = save['y']
min_pitch = save['min_pitch']
max_pitch = save['max_pitch']
del save # hint to help gc free up memory
print('Number of chorales: ', len(X))
print('X, y:', X[0].shape, y[0].shape)
return X, y, min_pitch, max_pitch
def convert_list_to_X(pitch_and_articulation_list, NUM_PITCHES, MIN_PITCH, MAX_PITCH, offset=0):
"""
list of pitches, starting on offset
returns matrix length * num_features
"""
return feature_to_onehot_feature(
input_to_feature(
pitch_and_articulation_list, initial_beat=offset), NUM_PITCHES, MIN_PITCH, MAX_PITCH)
# does not take articulation into account for the moment !
def create_consecutive_batch_generator(X, y, BATCH_SIZE, TIMESTEPS, NUM_PITCHES, MIN_PITCH, MAX_PITCH):
"""
X is a list of chorals
Returns input (BATCH_SIZE, TIMESTEPS, NUM_FEATURES),
labels (BATCH_SIZE, TIMESTEPS, NUM_PITCHES) (one hot encoded)
"""
input = []
labels = []
batch = 0
time_index = 0
choral_index = np.random.randint(len(X))
while True:
if time_index + TIMESTEPS < y[choral_index].shape[0]:
# if there is a full batch
input.append(X[choral_index][time_index:time_index + TIMESTEPS, :])
labels.append(feature_to_onehot_feature(y[choral_index][time_index:time_index + TIMESTEPS, 0][:, None],
NUM_PITCHES, MIN_PITCH, MAX_PITCH)) # Retain pitch
time_index += 1
batch += 1
else:
choral_index = np.random.randint(len(X))
time_index = 0
if batch == BATCH_SIZE:
yield np.array(input, dtype=np.float32), np.array(labels, dtype=np.int32)
batch = 0
input = []
labels = []
def pa_to_onehot(pa, min_pitch, max_pitch):
"""
pitch and articulation tuple to onehot
returns a vector with two ones (pitch and articulation)
"""
# continuation
if pa[A_INDEX] == 0:
return np.concatenate((np.zeros((max_pitch + 1 - min_pitch,)),
np.array([1]))
)
else:
return np.concatenate((
np.array(pa[P_INDEX] == np.arange(min_pitch, max_pitch + 1),
dtype=np.float32),
np.array([0]))
)
def paf_to_onehot(paf, min_pitch, max_pitch):
"""
pitch and articulation and fermata tuple to onehot
returns a vector with possibly three ones (pitch and articulation)
"""
# continuation
if paf[A_INDEX] == 0:
return np.concatenate((np.zeros((max_pitch + 1 - min_pitch,)),
np.array([1]), np.array(paf[F_INDEX]))
)
else:
return np.concatenate((
np.array(paf[P_INDEX] == np.arange(min_pitch, max_pitch + 1),
dtype=np.float32),
np.array([0]), np.array(paf[F_INDEX]))
)
def p_to_onehot(p, min_pitch, max_pitch):
"""
pitch to one hot
:param p:
:param min_pitch:
:param max_pitch: included !
:return: np.array of shape (max_pitch - min_pitch + 1)
"""
return np.array(p == np.arange(min_pitch, max_pitch + 1),
dtype=np.float32)
def ps_to_onehot(ps, min_pitches, max_pitches):
"""
list of pitches to one hot representation
:param ps: list of pitches
:param min_pitch:
:param max_pitch:
:return: np.array of shape (sum_len(ps), max_pitches - min_pitches + 1)
"""
vects = []
for k, p in enumerate(ps):
vects.append(p_to_onehot(p, min_pitch=min_pitches[k], max_pitch=max_pitches[k]))
return np.concatenate(vects)
def pas_to_onehot(pas, min_pitches, max_pitches):
"""
list of (pitch, articulation) tuple (one for each voice) to ONE onehot-encoded vector
:param pas:
:param min_pitches:
:param max_pitches:
:return:
"""
vects = []
for k, pa in enumerate(pas):
vects.append(pa_to_onehot(pa, min_pitch=min_pitches[k], max_pitch=max_pitches[k]))
return np.concatenate(vects)
def pafs_to_onehot(pafs, min_pitches, max_pitches):
"""
list of (pitch, articulation, fermata) tuple (one for each voice) to ONE onehot-encoded vector
:param pafs:
:param min_pitches:
:param max_pitches:
:return:
"""
vects = []
for k, paf in enumerate(pafs):
vects.append(paf_to_onehot(paf, min_pitch=min_pitches[k], max_pitch=max_pitches[k]))
return np.concatenate(vects)
def as_pas_to_as_ps(chorale_as_pas, min_pitches, max_pitches):
"""
convert chorale (num_voices, time, 2) to chorale (num_voices, time) by adding a slur symbol
:return:
"""
chorale_as_ps = np.zeros(chorale_as_pas.shape[:2])
for voice_index, voice in enumerate(chorale_as_pas):
for time_index, pa in enumerate(voice):
# continuation
if pa[A_INDEX] == 0:
chorale_as_ps[voice_index, time_index] = max_pitches[voice_index] + 1
else:
chorale_as_ps[voice_index, time_index] = pa[P_INDEX]
return chorale_as_ps
def as_ps_to_as_pas(chorale_as_ps, min_pitches, max_pitches):
"""
convert chorale (num_voices, time) to chorale (num_voices, time, 2) by removing slur symbol
max_pitches DO NOT INCLUDE slur_symbol_pitch
:return:
"""
chorale_as_pas = np.zeros(chorale_as_ps.shape + (2,))
previous_pitch = 0
for voice_index, voice in enumerate(chorale_as_ps):
SLUR_SYMBOL_PITCH = max_pitches[voice_index] + 1
for time_index, p in enumerate(voice):
if not p == SLUR_SYMBOL_PITCH:
previous_pitch = p
# continuation
if p == SLUR_SYMBOL_PITCH:
chorale_as_pas[voice_index, time_index, :] = [previous_pitch, 0]
else:
chorale_as_pas[voice_index, time_index, :] = [previous_pitch, 1]
return chorale_as_pas
def chorale_to_onehot(chorale, min_pitches, max_pitches, chorale_as_pas=True, fermatas=True):
"""
chorale must be of shape (time, num_voices, 3) if chorale_as_pas
else (time, num_voices)
:param min_pitches: number of different pitches in chorale for each voice
"""
mat = []
if chorale_as_pas:
if fermatas:
print('must never go there')
for pafs in chorale:
mat.append(pafs_to_onehot(pafs, min_pitches, max_pitches))
else:
for pas in chorale:
mat.append(pas_to_onehot(pas, min_pitches, max_pitches))
return np.array(mat)
# if chorale is given with only pitches + slur_symbol
else:
for ps in chorale:
# Add slur_symbol
mat.append(ps_to_onehot(ps, min_pitches, max_pitches))
return np.array(mat)
def all_features(chorale, voice_index, time_index, timesteps, min_pitches, max_pitches, num_voices,
chorale_as_pas=True):
"""
Returns all features for voice voice_index at time time_index from chorale
shapes:
(time, voice, 3) if chorale_as pas
(time, voice) if not chorale_as_pas
"""
if chorale_as_pas:
return all_features_from_pa_chorale_with_fermatas(chorale, voice_index, time_index, timesteps, min_pitches,
max_pitches, num_voices)
else:
# when chorale already contains slur_symbols
return all_features_from_slur_chorale(chorale, voice_index, time_index, timesteps,
min_pitches, max_pitches,
num_voices)
def all_features_from_slur_chorale(chorale, voice_index, time_index, timesteps,
min_pitches, max_pitches, num_voices):
"""
:param max_pitches: TRUE max_pitches from chorale_datasets
:param min_pitches: TRUE min_pitches from chorale_datasets
:param chorale: (time, num_voices) numpy array
:return:
(left_feature,
central_feature,
right_feature,
beat,
label)
"""
mask = np.array(voice_index == np.arange(num_voices), dtype=bool) == False
left_feature = chorale_to_onehot(chorale[time_index - timesteps:time_index, :], min_pitches=min_pitches,
max_pitches=max_pitches + 1, chorale_as_pas=False)
right_feature = chorale_to_onehot(chorale[time_index + timesteps: time_index: -1, :], min_pitches=min_pitches,
max_pitches=max_pitches + 1, chorale_as_pas=False)
central_feature = ps_to_onehot(chorale[time_index, mask],
min_pitches=min_pitches[mask],
max_pitches=max_pitches[mask] + 1)
# put timesteps=None to only have the current beat
beat = to_beat(time_index, timesteps=timesteps)
# if slur_symbol:
label = p_to_onehot(chorale[time_index, voice_index], min_pitch=min_pitches[voice_index],
max_pitch=max_pitches[voice_index] + 1)
return (np.array(left_feature),
np.array(central_feature),
np.array(right_feature),
beat,
np.array(label)
)
def all_features_from_pa_chorale(chorale, voice_index, time_index, timesteps,
min_pitches, max_pitches, num_voices):
"""
:param num_voices:
:param max_pitches: