diff --git a/pyocd/subcommands/rtt_cmd.py b/pyocd/subcommands/rtt_cmd.py index 6c663d211..4561ddea6 100644 --- a/pyocd/subcommands/rtt_cmd.py +++ b/pyocd/subcommands/rtt_cmd.py @@ -4,6 +4,7 @@ # Copyright (C) 2021 Simon D. Levy # Copyright (C) 2022 Johan Carlsson # Copyright (C) 2022 Samuel Dewan +# Copyright (C) 2022 Zhengji Li # SPDX-License-Identifier: Apache-2.0 # # Licensed under the Apache License, Version 2.0 (the "License"); @@ -22,6 +23,7 @@ import logging import sys from time import sleep +import time from typing import List from pyocd.core.helpers import ConnectHelper @@ -39,7 +41,7 @@ class RTTSubcommand(SubcommandBase): """@brief `pyocd rtt` subcommand.""" NAMES = ["rtt"] - HELP = "SEGGER RTT Viewer." + HELP = "SEGGER RTT Viewer/Logger." @classmethod def get_args(cls) -> List[argparse.ArgumentParser]: @@ -52,6 +54,12 @@ def get_args(cls) -> List[argparse.ArgumentParser]: help="Start address of RTT control block search range.") rtt_options.add_argument("-s", "--size", type=int_base_0, default=None, help="Size of RTT control block search range.") + rtt_options.add_argument("--up-channel-id", type=int, default=0, + help="Up channel ID.") + rtt_options.add_argument("--down-channel-id", type=int, default=0, + help="Down channel ID.") + rtt_options.add_argument("-d", "--log-file", type=str, default=None, + help="Log file name. When specified, logging mode is enabled.") return [cls.CommonOptions.COMMON, cls.CommonOptions.CONNECT, rtt_parser] @@ -91,20 +99,12 @@ def invoke(self) -> int: LOG.error("No up channels.") return 1 - if len(control_block.down_channels) < 1: - LOG.error("No down channels.") - return 1 - LOG.info(f"{len(control_block.up_channels)} up channels and " f"{len(control_block.down_channels)} down channels found") - up_chan: RTTUpChannel = control_block.up_channels[0] - down_chan: RTTDownChannel = control_block.down_channels[0] - + up_chan: RTTUpChannel = control_block.up_channels[self._args.up_channel_id] up_name = up_chan.name if up_chan.name is not None else "" - down_name = down_chan.name if down_chan.name is not None else "" - LOG.info(f"Reading from up channel 0 (\"{up_name}\"), writing to " - f"down channel 0 (\"{down_name}\")") + LOG.info(f"Reading from up channel {self._args.up_channel_id} (\"{up_name}\")") # some targets might need this here #target.reset_and_halt() @@ -114,38 +114,17 @@ def invoke(self) -> int: # set up terminal input kb = KBHit() - # byte array to send via RTT - cmd = bytes() - - while True: - # poll at most 1000 times per second to limit CPU use - sleep(0.001) - - # read data from up buffer 0 (target -> host) and write to - # stdout - up_data: bytes = up_chan.read() - sys.stdout.buffer.write(up_data) - sys.stdout.buffer.flush() + if self._args.log_file is None: + if len(control_block.down_channels) < 1: + LOG.error("No down channels.") + return 1 + down_chan: RTTDownChannel = control_block.down_channels[self._args.down_channel_id] + down_name = down_chan.name if down_chan.name is not None else "" + LOG.info(f"Writing to down channel {self._args.down_channel_id} (\"{down_name}\")") - # try to fetch character - if kb.kbhit(): - c: str = kb.getch() - - if ord(c) == 27: # process ESC - break - elif c.isprintable() or c == '\n': - print(c, end="", flush=True) - - # add char to buffer - cmd += c.encode("utf-8") - - # write buffer to target - if not cmd: - continue - - # write cmd buffer to down buffer 0 (host -> target) - bytes_out = down_chan.write(cmd) - cmd = cmd[bytes_out:] + self.viewer_loop(up_chan, down_chan, kb) + else: + self.logger_loop(up_chan, kb) except KeyboardInterrupt: pass @@ -157,3 +136,67 @@ def invoke(self) -> int: kb.set_normal_term() return 0 + + def logger_loop(self, up_chan, kb): + + LOG.info("start logging ... Press any key to stop") + total_size = 0 + block_size = 0 + last_time = time.time() + + with open(self._args.log_file, 'wb') as log_file: + + while True: + # poll at most 1000 times per second to limit CPU use + sleep(0.001) + + # read data from up buffer + data = up_chan.read() + log_file.write(data) + + s = len(data) + block_size += s + total_size += s + diff = time.time() - last_time + if diff > 1.0: + print(f"Transfer rate: {block_size / 1000:.1f} KByte/s; Bytes written: {total_size / 1000:.0f} KByte", end="\r") + block_size = 0 + last_time = time.time() + + # try to fetch character + if kb.kbhit(): + break + + def viewer_loop(self, up_chan, down_chan, kb): + # byte array to send via RTT + cmd = bytes() + + while True: + # poll at most 1000 times per second to limit CPU use + sleep(0.001) + + # read data from up buffer 0 (target -> host) and write to + # stdout + up_data: bytes = up_chan.read() + sys.stdout.buffer.write(up_data) + sys.stdout.buffer.flush() + + # try to fetch character + if kb.kbhit(): + c: str = kb.getch() + + if ord(c) == 27: # process ESC + break + elif c.isprintable() or c == '\n': + print(c, end="", flush=True) + + # add char to buffer + cmd += c.encode("utf-8") + + # write buffer to target + if not cmd: + continue + + # write cmd buffer to down buffer 0 (host -> target) + bytes_out = down_chan.write(cmd) + cmd = cmd[bytes_out:]