Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

add logging function to rtt sub-command. #1527

Merged
merged 2 commits into from
Apr 16, 2023
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
127 changes: 85 additions & 42 deletions pyocd/subcommands/rtt_cmd.py
zjli-2019 marked this conversation as resolved.
Show resolved Hide resolved
zjli-2019 marked this conversation as resolved.
Show resolved Hide resolved
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
# Copyright (C) 2021 Simon D. Levy <simon.d.levy@gmail.com>
# Copyright (C) 2022 Johan Carlsson <johan.carlsson@teenage.engineering>
# Copyright (C) 2022 Samuel Dewan
# Copyright (C) 2022 Zhengji Li
# SPDX-License-Identifier: Apache-2.0
#
# Licensed under the Apache License, Version 2.0 (the "License");
Expand All @@ -22,6 +23,7 @@
import logging
import sys
from time import sleep
import time
from typing import List

from pyocd.core.helpers import ConnectHelper
Expand All @@ -39,7 +41,7 @@ class RTTSubcommand(SubcommandBase):
"""@brief `pyocd rtt` subcommand."""

NAMES = ["rtt"]
HELP = "SEGGER RTT Viewer."
HELP = "SEGGER RTT Viewer/Logger."

@classmethod
def get_args(cls) -> List[argparse.ArgumentParser]:
Expand All @@ -52,6 +54,12 @@ def get_args(cls) -> List[argparse.ArgumentParser]:
help="Start address of RTT control block search range.")
rtt_options.add_argument("-s", "--size", type=int_base_0, default=None,
help="Size of RTT control block search range.")
rtt_options.add_argument("--up-channel-id", type=int, default=0,
help="Up channel ID.")
rtt_options.add_argument("--down-channel-id", type=int, default=0,
help="Down channel ID.")
rtt_options.add_argument("-d", "--log-file", type=str, default=None,
help="Log file name. When specified, logging mode is enabled.")

return [cls.CommonOptions.COMMON, cls.CommonOptions.CONNECT, rtt_parser]

Expand Down Expand Up @@ -91,20 +99,12 @@ def invoke(self) -> int:
LOG.error("No up channels.")
return 1

if len(control_block.down_channels) < 1:
LOG.error("No down channels.")
return 1

LOG.info(f"{len(control_block.up_channels)} up channels and "
f"{len(control_block.down_channels)} down channels found")

up_chan: RTTUpChannel = control_block.up_channels[0]
down_chan: RTTDownChannel = control_block.down_channels[0]

up_chan: RTTUpChannel = control_block.up_channels[self._args.up_channel_id]
up_name = up_chan.name if up_chan.name is not None else ""
down_name = down_chan.name if down_chan.name is not None else ""
LOG.info(f"Reading from up channel 0 (\"{up_name}\"), writing to "
f"down channel 0 (\"{down_name}\")")
LOG.info(f"Reading from up channel {self._args.up_channel_id} (\"{up_name}\")")

# some targets might need this here
#target.reset_and_halt()
Expand All @@ -114,38 +114,17 @@ def invoke(self) -> int:
# set up terminal input
kb = KBHit()

# byte array to send via RTT
cmd = bytes()

while True:
# poll at most 1000 times per second to limit CPU use
sleep(0.001)

# read data from up buffer 0 (target -> host) and write to
# stdout
up_data: bytes = up_chan.read()
sys.stdout.buffer.write(up_data)
sys.stdout.buffer.flush()
if self._args.log_file is None:
if len(control_block.down_channels) < 1:
LOG.error("No down channels.")
return 1
down_chan: RTTDownChannel = control_block.down_channels[self._args.down_channel_id]
down_name = down_chan.name if down_chan.name is not None else ""
LOG.info(f"Writing to down channel {self._args.down_channel_id} (\"{down_name}\")")

# try to fetch character
if kb.kbhit():
c: str = kb.getch()

if ord(c) == 27: # process ESC
break
elif c.isprintable() or c == '\n':
print(c, end="", flush=True)

# add char to buffer
cmd += c.encode("utf-8")

# write buffer to target
if not cmd:
continue

# write cmd buffer to down buffer 0 (host -> target)
bytes_out = down_chan.write(cmd)
cmd = cmd[bytes_out:]
self.viewer_loop(up_chan, down_chan, kb)
else:
self.logger_loop(up_chan, kb)

except KeyboardInterrupt:
pass
Expand All @@ -157,3 +136,67 @@ def invoke(self) -> int:
kb.set_normal_term()

return 0

def logger_loop(self, up_chan, kb):

LOG.info("start logging ... Press any key to stop")
total_size = 0
block_size = 0
last_time = time.time()

with open(self._args.log_file, 'wb') as log_file:

while True:
# poll at most 1000 times per second to limit CPU use
sleep(0.001)

# read data from up buffer
data = up_chan.read()
log_file.write(data)

s = len(data)
block_size += s
total_size += s
diff = time.time() - last_time
if diff > 1.0:
print(f"Transfer rate: {block_size / 1000:.1f} KByte/s; Bytes written: {total_size / 1000:.0f} KByte", end="\r")
block_size = 0
last_time = time.time()

# try to fetch character
if kb.kbhit():
break

def viewer_loop(self, up_chan, down_chan, kb):
# byte array to send via RTT
cmd = bytes()

while True:
# poll at most 1000 times per second to limit CPU use
sleep(0.001)

# read data from up buffer 0 (target -> host) and write to
# stdout
up_data: bytes = up_chan.read()
sys.stdout.buffer.write(up_data)
sys.stdout.buffer.flush()

# try to fetch character
if kb.kbhit():
c: str = kb.getch()

if ord(c) == 27: # process ESC
break
elif c.isprintable() or c == '\n':
print(c, end="", flush=True)

# add char to buffer
cmd += c.encode("utf-8")

# write buffer to target
if not cmd:
continue

# write cmd buffer to down buffer 0 (host -> target)
bytes_out = down_chan.write(cmd)
cmd = cmd[bytes_out:]