-
Notifications
You must be signed in to change notification settings - Fork 0
/
swo_parser.py
220 lines (177 loc) · 7.21 KB
/
swo_parser.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
#
# This program is free software. It comes without any
# warranty, to the extent permitted by applicable law.
#
# Adapted from
import socket
import time
import sys
class Stream:
"""
Stream of messages for one of the 32 ITM channels.
ITM Trace messages can be output on one of 32 channels. The stream class
contains a byte buffer for one of these channels. Once a newline character
is received, the buffer is dumped to stdout. Also, there is an optional
string that is prepended to the start of each line. This is useful for
using different channels for different logging levels.
For example, one might use channel 0 for info messages, channel 1 for
warnings, and channel 2 for errors. One might like to use "INFO: ",
"WARNING: ", and "ERROR: " as the headers for these channels. The debugger
can enable and disable these channels on startup if you want to only see
error messages. This would actually prevent the info and warning messages
from being generated by the processor, which will save time in the code
because ITM routines are blocking.
Each stream also has the option to echo to the GDB console. Simply pass
the socket connected to the Tcl server to the constructor
"""
# Max number of characters for a stream before a newline needs to occur
MAX_LINE_LENGTH = 1024
def __init__(self, id, header="", out=sys.stdout, tcl_socket=None):
self.id = id
self._buffer = []
self._header = header
self.out = out
self.tcl_socket = tcl_socket
def add_char(self, c):
if len(self._buffer) >= self.MAX_LINE_LENGTH:
self._output(
"SWO_PARSER.PY WARNING: stream "
+ str(self.id)
+ " received "
+ str(self.MAX_LINE_LENGTH)
+ " bytes without receiving a newline. Did you forget one?"
)
self._output(self._header + "".join(self._buffer) + c)
self._buffer = []
return
if c == "\n":
self._output(self._header + "".join(self._buffer))
self._buffer = []
return
self._buffer.append(c)
def add_chars(self, s):
for c in s:
self.add_char(c)
def _output(self, s):
print("[Stream ID %s] %s" % (self.id, s), file=self.out, flush=True)
return
# do not print to TCL socket, double output.
if self.tcl_socket is not None:
self.tcl_socket.sendall(b'puts "' + s.encode("utf-8") + b'"\r\n\x1a')
class StreamManager:
"""
Manages up to 32 byte streams.
This class contains a dictionary of streams indexed by their stream id. It
is responsible for parsing the incoming data, and forwarding the bytes to
the correct stream.
"""
def __init__(self):
self.streams = dict()
self._itmbuffer = b""
def add_stream(self, stream):
self.streams[stream.id] = stream
def parse_tcl(self, line):
r"""
When OpenOCD is configured to output the trace data over the Tcl
server, it periodically outputs a string (terminated with \r\n) that
looks something like this:
type target_trace data 01480165016c016c016f0120015401720161016301650121010a
The parse_tcl method turns this into the raw ITM bytes and sends it to
parse_itm_bytes.
"""
if line.startswith(b"type target_trace data ") and line.endswith(b"\r\n"):
itm_bytes = int(line[23:-2], 16).to_bytes(
len(line[23:-2]) // 2, byteorder="big"
)
self.parse_itm_bytes(itm_bytes)
def parse_itm_bytes(self, bstring):
"""
Parses ITM packets based on the format discription from ARM
http://infocenter.arm.com/help/index.jsp?topic=/com.arm.doc.ddi0314h/Chdbicbg.html
"""
bstring = self._itmbuffer + bstring
self._itmbuffer = b""
while len(bstring) > 0:
header = bstring[0]
# The third bit of a header must be zero, and the last two bits
# can't be zero.
if header & 0x04 != 0 or header & 0x03 == 0:
bstring = bstring[1:]
continue
payload_size = 2 ** (header & 0x03 - 1)
stream_id = header >> 3
if payload_size >= len(bstring):
self._itmbuffer = bstring
return
if stream_id in self.streams:
s = bstring[1 : payload_size + 1].decode("ascii")
self.streams[stream_id].add_chars(s)
bstring = bstring[payload_size + 1 :]
#### Main program ####
def swo_parser_main(out=sys.stdout):
# Set up the socket to the OpenOCD Tcl server
HOST = "localhost"
PORT = 6666
with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as tcl_socket:
retries = 0
while retries < 100:
try:
tcl_socket.connect((HOST, PORT))
except socket.error:
retries += 1
time.sleep(0.1)
tcl_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
continue
else:
break
print(
"SWO client successfully connected to OpenOCD on %s:%d" % (HOST, PORT),
file=out,
flush=True,
)
# Create a stream manager and add three streams
streams = StreamManager()
streams.add_stream(Stream(0, "", out, tcl_socket))
streams.add_stream(Stream(1, "WARNING: ", out))
streams.add_stream(Stream(2, "ERROR: ", out, tcl_socket))
# Enable the tcl_trace output
tcl_socket.sendall(b"tcl_trace on\n\x1a")
if len(sys.argv) > 1:
if sys.argv[1] == "--dont-run":
pass
else:
tcl_socket.sendall(b"reset run\n\x1a")
else:
tcl_socket.sendall(b"reset run\n\x1a")
try:
tcl_buf = b""
while True:
# Wait for new data from the socket
data = b""
while len(data) == 0:
try:
data = tcl_socket.recv(1024)
except BlockingIOError:
time.sleep(0.1)
tcl_buf = tcl_buf + data
# Tcl messages are terminated with a 0x1A byte
temp = tcl_buf.split(b"\x1a", 1)
while len(temp) == 2:
# Parse the Tcl message
streams.parse_tcl(temp[0])
# Remove that message from tcl_buf and grab another message from
# the buffer if the is one
tcl_buf = temp[1]
temp = tcl_buf.split(b"\x1a", 1)
except KeyboardInterrupt:
time.sleep(0.1)
print(
"==== Terminating SWO / TCL client program ====", file=out, flush=True
)
time.sleep(0.2)
# Turn off the trace data before closing the port
# XXX: There currently isn't a way for the code to actually reach this line
# tcl_socket.sendall(b'tcl_trace off\n\x1a')
# tcl_socket.sendall(b'shutdown\n\x1a')
if __name__ == "__main__":
swo_parser_main()