-
Notifications
You must be signed in to change notification settings - Fork 0
/
test_read_opus.py
142 lines (121 loc) · 4.44 KB
/
test_read_opus.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
# Purpose of script:
# Encodes a WAV file to OPUS, apply some channel-related disturbances
# on the bytes via a channel model, then decodes it back to WAV.
#
# (c) Paul Didier, Mohammad Bokaei, José Miguel Cadavid Tobón (SOUNDS ETN 2023)
import os
import sys
import pydub # requires `ffmpeg`
import numpy as np
from pathlib import Path
from bitstring import BitArray
import commpy_trial_analog as cta
from sys import platform as PLATFORM
# Global variables (change as you want)
SEED = 0 # seed for reproducibility
SNR = 10 # AWGN channel SNR [dB]
SIZE_OF_QAM_CONSTELLATION = 64 # 4, 16, 64, 256, 1024, 4096, 16384 (cf. `cta.analog_modeller`)
FRAME_SIZE = 2.5 # OPUS encoding frame size [ms]
BITRATE = 32 # OPUS encoding bitrate [kbits/s]
INPUT_WAV_FILENAME = '.\\input\\test_short.wav' # input WAV file
OUTPUT_FOLDER = '.\\output' # folder where output files will be saved (e.g., OPUS & WAV files)
OPUS_DIRECTORY = '.\\opus' # directory where opus encoder/decoder is located
# Probably don't change these
AUDIO_CODEC = 'opus' # 'opus' or something else (e.g., 'mp3')
def main(inWavFilename=INPUT_WAV_FILENAME):
"""Main function (called by default when running script)."""
# Set seed for reproducibility
np.random.seed(SEED)
# Encode WAV file to OPUS
print(f'Calling {AUDIO_CODEC.capitalize()} encoder...')
encOutFilename = f'{OUTPUT_FOLDER}\\{Path(inWavFilename).stem}_to_{AUDIO_CODEC}.{AUDIO_CODEC}'
if (Path(encOutFilename).exists()):
os.remove(encOutFilename)
os.system(f'ffmpeg -i {inWavFilename} -frame_duration {FRAME_SIZE} -b:a {BITRATE}K -vbr off {encOutFilename}')
print(f'Exported OPUS file ("{encOutFilename}").')
# Read OPUS file
print(f'Reading {AUDIO_CODEC.capitalize()} file...')
audioEncoded: pydub.AudioSegment = pydub.AudioSegment.from_file(
encOutFilename,
codec = 'opus'
)
data = audioEncoded.raw_data
# Convert to `BitArray` object
print('Converting to BitArray object...')
bitArray = BitArray(bytes=data)
# Extract actual bits as `np.ndarray`
bitsList = np.array([int(b) for b in bitArray.bin])
# Modify bits as we want
print('----- Modifying bits (channel effect) -----')
decodedBits = cta.analog_modeller(
bitsList,
snr=SNR,
plot=False,
sizeOfQamConstellation=SIZE_OF_QAM_CONSTELLATION
)
backToBytes = BitArray(decodedBits).bytes
print('----- Done modifying bits -----')
# Write modified bits to WAV file
backToWavFilename = f'{OUTPUT_FOLDER}\\{Path(inWavFilename).stem}_{AUDIO_CODEC}_codec_{SNR}dB_noise_QAM{SIZE_OF_QAM_CONSTELLATION}.wav'
print('Writing WAV file...')
audioForOutput = pydub.AudioSegment(
data=backToBytes,
sample_width=audioEncoded.sample_width,
frame_rate=audioEncoded.frame_rate,
channels=audioEncoded.channels
)
audioForOutput.export(
backToWavFilename,
format="wav",
)
print(f'Exported WAV file ("{backToWavFilename}").')
print('Done.')
def read_opus(fileName):
"""Read an opus file and return the data."""
f = open(fileName, 'rb')
data = f.read()
f.close()
return data
def write_opus(fileName, data):
"""Writes an opus file."""
f = open(fileName, 'wb')
f.write(data)
f.close()
def call_opus_encoder(
wav_filename,
opus_out_name,
encoder_name='opusenc',
bitrate=160
):
"""Call the opus encoder with the given parameters."""
fullPathOpus = build_opus_path(encoder_name)
encoding_check = os.system(
f'{fullPathOpus} --bitrate {bitrate} {wav_filename} {opus_out_name}'
)
if encoding_check == 1:
raise Exception("Encoding failed !!")
def call_opus_decoder(
opus_filename,
wav_out_name,
decoder_name='opusdec'
):
"""Call the opus decoder with the given parameters."""
fullPathOpus = build_opus_path(decoder_name)
decoding_check = os.system(
f'{fullPathOpus} {opus_filename} {wav_out_name}'
)
if decoding_check == 1:
raise Exception("Decoding failed !!")
def build_opus_path(encoder_name, opus_dir=OPUS_DIRECTORY):
"""Build the path to the opus encoder/decoder,
depending on the platform."""
fullPathOpus = opus_dir
if not 'darwin' in PLATFORM:
sep = '\\'
fullPathOpus += f'{sep}win'
else:
sep = '/'
fullPathOpus += f'{sep}{encoder_name}'
return fullPathOpus
if __name__ == '__main__':
sys.exit(main())