-
Notifications
You must be signed in to change notification settings - Fork 1
/
dsp.py
155 lines (126 loc) · 4.76 KB
/
dsp.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
"""Digital Signal Processing capabilities for amodem."""
import functools
import numpy as np
import common
import exceptions
from kivy.logger import Logger as log
class FIR:
def __init__(self, h):
self.h = np.array(h)
self.x_state = [0] * len(self.h)
def __call__(self, x):
x_ = self.x_state
h = self.h
for v in x:
x_ = [v] + x_[:-1]
yield np.dot(x_, h)
self.x_state = x_
class Demux:
def __init__(self, sampler, omegas, Nsym):
self.Nsym = Nsym
self.filters = [exp_iwt(-w, Nsym) / (0.5*self.Nsym) for w in omegas]
self.filters = np.array(self.filters)
self.sampler = sampler
def __iter__(self):
return self
def next(self):
frame = self.sampler.take(size=self.Nsym)
if len(frame) == self.Nsym:
symbol = np.dot(self.filters, frame)
#log.info(symbol)
magnitude = np.abs(symbol) #calculate distance between the symbol and the unit circle
return symbol
raise StopIteration
__next__ = next
@functools.lru_cache(maxsize=64, typed=False) #improve the efficiency of this function using caching
def exp_iwt(omega, n):
return np.exp(1j * omega * np.arange(n))
def norm(x):
return np.sqrt(np.dot(x.conj(), x).real)
def rms(x):
return np.mean(np.abs(x) ** 2, axis=0) ** 0.5
def coherence(x, omega):
n = len(x)
Hc = exp_iwt(-omega, n) / np.sqrt(0.5*n)
norm_x = norm(x)
if not norm_x:
return 0.0
return np.dot(Hc, x) / norm_x
def linear_regression(x, y):
""" Find (a,b) such that y = a*x + b. """
x = np.array(x)
y = np.array(y)
mean_x = np.mean(x)
mean_y = np.mean(y)
x_ = x - mean_x
y_ = y - mean_y
a = np.dot(y_, x_) / np.dot(x_, x_)
b = mean_y - a * mean_x
return a, b
class MODEM:
def __init__(self, symbols):
self.encode_map = {}
symbols = np.array(list(symbols))
bits_per_symbol = np.log2(len(symbols))
bits_per_symbol = np.round(bits_per_symbol)
N = (2 ** bits_per_symbol)
assert N == len(symbols)
bits_per_symbol = int(bits_per_symbol)
#generate the bits that each symbol will represent
for i, v in enumerate(symbols):
bits = [int(i & (1 << j) != 0) for j in range(bits_per_symbol)]
#print(bits)
self.encode_map[tuple(bits)] = v
self.symbols = symbols
self.bits_per_symbol = bits_per_symbol
#print('encode map')
#print(self.encode_map)
bits_map = dict(item[::-1] for item in self.encode_map.items())
#print(bits_map)
self.decode_list = [(s, bits_map[s]) for s in self.symbols]
#print(self.symbols)
#print(self.decode_list)
def encode(self, bits):
for bits_tuple in common.iterate(bits, self.bits_per_symbol, tuple):
yield self.encode_map[bits_tuple]
#take a stream of symbols and decode them into bits
def decode(self, symbols, error_handler=None):
""" Maximum-likelihood decoding, using naive nearest-neighbour. """
symbols_vec = self.symbols
_dec = self.decode_list
for received in symbols:
error = np.abs(symbols_vec - received)
index = np.argmin(error)
decoded, bits = _dec[index]
if error_handler:
error_handler(received=received, decoded=decoded)
yield bits
##@brief convert an array of bits to a differentially encoded BPSK signal
##@param bits an array of 0 and 1's
##@param L the upsampling factor to be applied to the signal
##@param prev_bit the last differentially encoded bit from the previous call if the method is being called multiple times
def bits2baseband(bits,L,prev_bit=0):
dif_encoded_bits = np.zeros(len(bits))
for i in range(0,len(bits)):
dif_encoded_bits[i] = (bits[i] + prev_bit)%2
prev_bit = dif_encoded_bits[i]
toReturn = np.zeros(len(dif_encoded_bits)*L)
for i in range(0,len(dif_encoded_bits)):
#upsample the signal and apply NRZ encoding
if (dif_encoded_bits[i] == 0):
toReturn[i*L : (i*L)+L] = -1
else:
toReturn[i*L : (i*L)+L] = 1
t = np.arange(start=0, stop=len(dif_encoded_bits)*L)
return (toReturn,t,prev_bit)
def prbs(reg, poly, bits):
""" Simple pseudo-random number generator. """
mask = (1 << bits) - 1
size = 0 # effective register size (in bits)
while (poly >> size) > 1:
size += 1
while True:
yield reg & mask
reg = reg << 1
if reg >> size:
reg = reg ^ poly