-
Notifications
You must be signed in to change notification settings - Fork 1
/
bitblaster.py
131 lines (119 loc) · 4.32 KB
/
bitblaster.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
"""
bitblaster python interface
http://bitbucket.com/martijnj/bitblaster
Copyright 2015 by Martijn Jasperse
"""
import serial
import time
CLOCK = 40e6
BAUDRATE = 115200
CRLF = '\r\n'
class BitBlaster:
def __init__(self,port,timeout=1,startup=5):
"""Establish connection with a bitblaster unit at the specified serial port.
The serial interface timeout is `timeout` and the maximum startup delay is `startup`."""
# connect to device
self._ser = serial.Serial(port,BAUDRATE,timeout=timeout)
self.wait(startup,'ready')
# make sure connection works
self.check('hello','hello')
def __enter__(self):
return self
def __exit__(self, type, value, traceback):
self._ser.close()
def write(self,cmd):
"""Write the specified command to the unit."""
if not cmd.endswith(CRLF): # must CRLF terminate
cmd = cmd + CRLF
self._ser.write(cmd)
def read(self):
"""Read back from unit, and trim whitespace."""
return self._ser.readline().strip()
def check(self,cmd,expect='ok'):
"""Send the command, get a reply, make sure it matches expectation."""
self.write(cmd)
resp = self.read()
if not resp == expect:
raise serial.SerialException('Unexpected response: "%s"'%resp)
def bitstream(self,vals,dt=None,timesteps=False,adapt=True):
"""Program the unit to output the values in `vals` every `dt` seconds.
If `timesteps` is True, `dt` is specified in clock cycles not seconds.
If `adapt` is True, the timesteps are adjusted to minimise cumulative timing error."""
# parse parameters
if timesteps: dt = dt / CLOCK
# program the sequence
return self.sequence([(v,dt) for v in vals],adapt=adapt)
def sequence(self,vals,n0=0,adapt=True):
"""Program the unit with the given sequence.
The sequence is an iterable of tuples (v,dt) where `v` is the value to output and `dt` the time to maintain that output in seconds.
If `adapt` is True, the timesteps are adjusted to minimise cumulative timing error.
Note that `dt`==0 causes the device to wait for a hardware trigger."""
n = 0
err = 0
for v, dt in vals:
# adapt the timestep to the cumulative error?
if adapt: dt -= err
# guess closest number of steps
di = int(round(dt*CLOCK))
assert di >= 6, 'Timestep too small'
# accumulate error
err += dt - di/CLOCK
# program sequence, allow for long timesteps
for i in range(di//65535):
# potential error if di%65535 < 6
self.check('set %i %x %u'%(n,v,65535))
n += 1
self.check('set %i %x %u'%(n,v,di%65535))
n += 1
# make sure to terminate the sequence
self.check('set %i 0 0'%n)
# check it worked
self.check('len',str(n))
return n
def start(self,hwtrig=False,wait=None):
"""Send the unit the "start" command.
If `hwtrig` is true, the unit will wait for a hardware trigger.
If `wait` is not None, the function waits until the sequence completes using wait()."""
if hwtrig:
self.check('hwstart') # start on ext trigger
else:
self.check('start') # software trigger now
if wait is None:
return # do not wait
if isinstance(wait,bool):
if wait:
return self.wait() # wait forever
else:
return # do not wait
else:
return self.wait(wait) # wait for specified timeout
def repeat(self,hwtrig=True,block=True):
"""Instruct the unit to continually output the sequence.
Note that the unit will become unresponsive until reset (e.g. with further serial communication).
If `block` is True then the function will block, checking that the unit is executing correctly."""
if hwtrig:
self.check('hwrepeat')
else:
self.check('repeat')
if block:
i = 0
while True:
# note that this is an infinite loop by design
status = self.read()
if status != str(i):
raise RuntimeError('Unexpected response at iteration %i: "%s"'%(i,status))
i = i+1
def wait(self,timeout=None,expect='done'):
"""Waits for the unit to return "done" at the end of a sequence.
A RuntimeError is raised if the sequence does not terminate within `timeout` seconds.
If `timeout` is None, the function waits forever."""
bailout = int(time.time()+timeout*1000) if timeout is not None else None
while True:
resp = self.read()
if resp == expect:
return
elif resp != '':
raise serial.SerialException('Unexpected response: "%s"'%resp)
if bailout is not None and time.time() > bailout:
raise RuntimeError('Timeout')
time.sleep(5e-3)