This repository has been archived by the owner on Jan 22, 2024. It is now read-only.
forked from lukasjapan/bt-speaker
-
Notifications
You must be signed in to change notification settings - Fork 0
/
codecs.py
204 lines (173 loc) · 7.44 KB
/
codecs.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
from __future__ import unicode_literals
from collections import namedtuple
from bt_manager import ffi
import os
A2DP_CODECS = {'SBC': 0x00,
'MPEG12': 0x01,
'MPEG24': 0x02,
'ATRAC': 0x03,
}
"""
Enumeration of codec types supported by A2DP profile
"""
SBCCodecConfig = namedtuple('SBCCodecConfig',
'channel_mode frequency allocation_method '
'subbands block_length min_bitpool '
'max_bitpool')
"""
Named tuple collection of SBC A2DP audio profile properties
"""
class SBCSamplingFrequency:
"""Indicates with which sampling frequency the SBC
frame has been encoded."""
FREQ_16KHZ = (1 << 3)
FREQ_32KHZ = (1 << 2)
FREQ_44_1KHZ = (1 << 1)
FREQ_48KHZ = 1
ALL = 0xF
class SBCBlocks:
"""The block size with which the stream has been encoded"""
BLOCKS_4 = (1 << 3)
BLOCKS_8 = (1 << 2)
BLOCKS_12 = (1 << 1)
BLOCKS_16 = 1
ALL = 0xF
class SBCChannelMode:
"""Indicate with which channel mode the frame has been
encoded. The number of channels depends on this information."""
CHANNEL_MODE_MONO = (1 << 3)
CHANNEL_MODE_DUAL = (1 << 2)
CHANNEL_MODE_STEREO = (1 << 1)
CHANNEL_MODE_JOINT_STEREO = 1
ALL = 0xF
class SBCAllocationMethod:
"""Indicates how the bit pool is allocated to different
subbands. Either it is based on the loudness of the sub
band signal or on the signal to noise ratio."""
SNR = (1 << 1)
LOUDNESS = 1
ALL = 0x3
class SBCSubbands:
"""indicates the number of subbands with which the frame
has been encoded"""
SUBBANDS_4 = (1 << 1)
SUBBANDS_8 = 1
ALL = 0x3
class SBCCodec:
"""
Python cass wrapper around CFFI calls into the SBC codec
implemented in C. The main API is defined by sbc.h with
additional functions added to encapsulate an SBC payload
as part of an RTP packet. The API extensions for RTP
are designed to work directly with bluetooth media
transport file descriptors for the A2DP profile. So,
the basic idea is that this class is instantiated as
part of a media endpoint implementation in order to
encode or decode data carried on the media transport.
.. note:: You need two separate instantiations of this
class if you wish to encode and decode at the same
time. Although the class implementation is the same,
the underlying C implementation requires separate
`sbc_t` instances.
:param namedtuple config: Media endpoint negotiated
configuration parameters. These are not used
directly by the codec here but translated to
parameters usable by the codec. See
:py:class:`.SBCCodecConfig`
"""
def __init__(self, config):
import sys
try:
self.codec = ffi.dlopen('./librtpsbc.so')
except:
print 'Exception:', sys.exc_info()[0]
self.config = ffi.new('sbc_t *')
self.ts = ffi.new('unsigned int *', 0)
self.seq_num = ffi.new('unsigned int *', 0)
self._init_sbc_config(config)
self.codec.sbc_init(self.config, 0)
def _init_sbc_config(self, config):
"""
Translator from namedtuple config representation to
the sbc_t type.
:param namedtuple config: See :py:class:`.SBCCodecConfig`
:returns:
"""
if (config.channel_mode == SBCChannelMode.CHANNEL_MODE_MONO):
self.config.mode = self.codec.SBC_MODE_MONO
elif (config.channel_mode == SBCChannelMode.CHANNEL_MODE_STEREO):
self.config.mode = self.codec.SBC_MODE_STEREO
elif (config.channel_mode == SBCChannelMode.CHANNEL_MODE_DUAL):
self.config.mode = self.codec.SBC_MODE_DUAL_CHANNEL
elif (config.channel_mode == SBCChannelMode.CHANNEL_MODE_JOINT_STEREO):
self.config.mode = self.codec.SBC_MODE_JOINT_STEREO
if (config.frequency == SBCSamplingFrequency.FREQ_16KHZ):
self.config.frequency = self.codec.SBC_FREQ_16000
elif (config.frequency == SBCSamplingFrequency.FREQ_32KHZ):
self.config.frequency = self.codec.SBC_FREQ_32000
elif (config.frequency == SBCSamplingFrequency.FREQ_44_1KHZ):
self.config.frequency = self.codec.SBC_FREQ_44100
elif (config.frequency == SBCSamplingFrequency.FREQ_48KHZ):
self.config.frequency = self.codec.SBC_FREQ_48000
if (config.allocation_method == SBCAllocationMethod.LOUDNESS):
self.config.allocation = self.codec.SBC_AM_LOUDNESS
elif (config.allocation_method == SBCAllocationMethod.SNR):
self.config.allocation = self.codec.SBC_AM_SNR
if (config.subbands == SBCSubbands.SUBBANDS_4):
self.config.subbands = self.codec.SBC_SB_4
elif (config.subbands == SBCSubbands.SUBBANDS_8):
self.config.subbands = self.codec.SBC_SB_8
if (config.block_length == SBCBlocks.BLOCKS_4):
self.config.blocks = self.codec.SBC_BLK_4
elif (config.block_length == SBCBlocks.BLOCKS_8):
self.config.blocks = self.codec.SBC_BLK_8
elif (config.block_length == SBCBlocks.BLOCKS_12):
self.config.blocks = self.codec.SBC_BLK_12
elif (config.block_length == SBCBlocks.BLOCKS_16):
self.config.blocks = self.codec.SBC_BLK_16
self.config.bitpool = config.max_bitpool
self.config.endian = self.codec.SBC_LE
def encode(self, fd, mtu, data):
"""
Encode the supplied data (byte array) and write to
the media transport file descriptor encapsulated
as RTP packets. The encoder will calculate the
required number of SBC frames and encapsulate as
RTP to fit the MTU size.
:param int fd: Media transport file descriptor
:param int mtu: Media transport MTU size as returned
when the media transport was acquired.
:param array{byte} data: Data to encode and send
over the media transport.
:return:
"""
self.codec.rtp_sbc_encode_to_fd(self.config,
ffi.new('char[]',
data),
len(data),
mtu,
self.ts,
self.seq_num,
fd)
def decode(self, fd, mtu, max_len=2560):
"""
Read the media transport descriptor, depay
the RTP payload and decode the SBC frames into
a byte array. The maximum number of bytes to
be returned may be passed as an argument and all
available bytes are returned to the caller.
:param int fd: Media transport file descriptor
:param int mtu: Media transport MTU size as returned
when the media transport was acquired.
:param int max_len: Optional. Set maximum number of
bytes to read.
:return data: Decoded data bytes as an array.
:rtype: array{byte}
"""
output_buffer = ffi.new('char[]', max_len)
sz = self.codec.rtp_sbc_decode_from_fd(self.config,
output_buffer,
max_len,
mtu,
fd)
return ffi.buffer(output_buffer[0:sz])