From ac92f23610305d6eb6a1395ab74f3d530170fc51 Mon Sep 17 00:00:00 2001 From: Michael Harbarth Date: Thu, 1 Aug 2024 11:25:36 +0200 Subject: [PATCH] feat: Add functionality to extract comments on file level using regex --- capella_ros_tools/__main__.py | 10 +++++- capella_ros_tools/data_model.py | 33 ++++++++++++++----- capella_ros_tools/importer.py | 16 +++++++-- .../package/msg/SampleClassEnum.msg | 32 ++++++++++++++++++ tests/test_import_msgs.py | 22 +++++++++++++ 5 files changed, 101 insertions(+), 12 deletions(-) create mode 100644 tests/data/data_model/description_regex_msgs/package/msg/SampleClassEnum.msg diff --git a/capella_ros_tools/__main__.py b/capella_ros_tools/__main__.py index 2e11894..7f008fd 100644 --- a/capella_ros_tools/__main__.py +++ b/capella_ros_tools/__main__.py @@ -74,6 +74,11 @@ def cli(): type=click.Path(path_type=pathlib.Path, dir_okay=False), help="Ignore the license header from the given file when importing msgs.", ) +@click.option( + "--description-regex", + type=str, + help="Regular expression to extract description from the file .", +) def import_msgs( input: str, model: capellambse.MelodyModel, @@ -83,6 +88,7 @@ def import_msgs( no_deps: bool, output: pathlib.Path, license_header: pathlib.Path | None, + description_regex: str | None, ) -> None: """Import ROS messages into a Capella data package.""" if root: @@ -97,7 +103,9 @@ def import_msgs( else: params = {"types_parent_uuid": model.sa.data_package.uuid} - parsed = importer.Importer(input, no_deps, license_header) + parsed = importer.Importer( + input, no_deps, license_header, description_regex + ) logger.info("Loaded %d packages", len(parsed.messages.packages)) yml = parsed.to_yaml(root_uuid, **params) diff --git a/capella_ros_tools/data_model.py b/capella_ros_tools/data_model.py index d3e360b..989a61e 100644 --- a/capella_ros_tools/data_model.py +++ b/capella_ros_tools/data_model.py @@ -159,11 +159,11 @@ def __eq__(self, other: object) -> bool: def _process_block_comment(line: str) -> str: if comment := _clean_comment(line): return f"{comment} " - return "
" + return "" def _extract_file_level_comments( - msg_string: str, + msg_string: str, regex: re.Pattern | None = None ) -> t.Tuple[str, list[str]]: """Extract comments at the beginning of the message.""" lines = msg_string.lstrip("\n").splitlines() @@ -177,8 +177,14 @@ def _extract_file_level_comments( return "", lines else: break - file_level_comments += _process_block_comment(line) + file_level_comments += _process_block_comment(line) or "\n" + if regex is not None: + if matches := regex.search(file_level_comments): + file_level_comments = "\n".join(matches.groups()) + else: + file_level_comments = "" + file_level_comments = file_level_comments.replace("\n", "
") file_content = lines[i:] return file_level_comments, file_content @@ -220,18 +226,26 @@ def from_file( cls, file: abc.AbstractFilePath | pathlib.Path, license_header: str | None = None, + msg_description_regex: re.Pattern[str] | None = None, ) -> MessageDef: """Create message definition from a .msg file.""" msg_name = file.stem msg_string = file.read_text() license_header = license_header or LICENSE_HEADER msg_string = msg_string.removeprefix(license_header) - return cls.from_string(msg_name, msg_string) + return cls.from_string(msg_name, msg_string, msg_description_regex) @classmethod - def from_string(cls, msg_name: str, msg_string: str) -> MessageDef: + def from_string( + cls, + msg_name: str, + msg_string: str, + msg_description_regex: re.Pattern[str] | None = None, + ) -> MessageDef: """Create message definition from a string.""" - msg_comments, lines = _extract_file_level_comments(msg_string) + msg_comments, lines = _extract_file_level_comments( + msg_string, msg_description_regex + ) msg = cls(msg_name, [], [], msg_comments) last_element: t.Any = None block_comments = "" @@ -256,7 +270,7 @@ def from_string(cls, msg_name: str, msg_string: str) -> MessageDef: if last_index > 0: # block comments were used block_comments = "" - block_comments += _process_block_comment(line) + block_comments += _process_block_comment(line) or "
" continue else: # inline comment @@ -398,6 +412,7 @@ def from_msg_folder( pkg_name: str, msg_path: abc.AbstractFilePath | pathlib.Path, license_header: str | None = None, + msg_description_regex: re.Pattern[str] | None = None, ) -> MessagePkgDef: """Create a message package definition from a folder.""" out = cls(pkg_name, [], []) @@ -406,6 +421,8 @@ def from_msg_folder( msg_path.rglob("*.msg"), ) for msg_file in sorted(files, key=os.fspath): - msg_def = MessageDef.from_file(msg_file, license_header) + msg_def = MessageDef.from_file( + msg_file, license_header, msg_description_regex + ) out.messages.append(msg_def) return out diff --git a/capella_ros_tools/importer.py b/capella_ros_tools/importer.py index e2397be..0a74b94 100644 --- a/capella_ros_tools/importer.py +++ b/capella_ros_tools/importer.py @@ -4,6 +4,7 @@ import os import pathlib +import re import typing as t from capellambse import decl, filehandler, helpers @@ -29,6 +30,7 @@ def __init__( msg_path: str, no_deps: bool, license_header_path: pathlib.Path | None = None, + msg_description_regex: str | None = None, ): self.messages = data_model.MessagePkgDef("root", [], []) self._promise_ids: dict[str, None] = {} @@ -37,19 +39,27 @@ def __init__( if license_header_path is not None: self._license_header = license_header_path.read_text("utf-8") - self._add_packages("ros_msgs", msg_path) + self._add_packages("ros_msgs", msg_path, msg_description_regex) if no_deps: return for interface_name, interface_url in ROS2_INTERFACES.items(): self._add_packages(interface_name, interface_url) - def _add_packages(self, name: str, path: str) -> None: + def _add_packages( + self, name: str, path: str, msg_description_regex: str | None = None + ) -> None: root = filehandler.get_filehandler(path).rootdir + msg_description_pattern = None + if msg_description_regex is not None: + msg_description_pattern = re.compile( + msg_description_regex, re.MULTILINE + ) + for dir in sorted(root.rglob("msg"), key=os.fspath): pkg_name = dir.parent.name or name pkg_def = data_model.MessagePkgDef.from_msg_folder( - pkg_name, dir, self._license_header + pkg_name, dir, self._license_header, msg_description_pattern ) self.messages.packages.append(pkg_def) logger.info("Loaded package %s from %s", pkg_name, dir) diff --git a/tests/data/data_model/description_regex_msgs/package/msg/SampleClassEnum.msg b/tests/data/data_model/description_regex_msgs/package/msg/SampleClassEnum.msg new file mode 100644 index 0000000..2b5a780 --- /dev/null +++ b/tests/data/data_model/description_regex_msgs/package/msg/SampleClassEnum.msg @@ -0,0 +1,32 @@ +# SPDX-FileCopyrightText: Copyright DB InfraGO AG +# SPDX-License-Identifier: Apache-2.0 + +# Decision +# +# Created: 2023-08-29 +# Modified: 2023-08-29 +# Version: 0.1 +# Status: Draft +# +# Description: Message type for providing the made decision. +# An additional description line. +# +# Expect linebreak + +# This block comment is added to the +# enum description of SampleClassEnumStatus. +uint8 OK = 0 +uint8 WARN = 1 +uint8 ERROR = 2 +uint8 STALE = 3 + +# This block comment is added to the +# enum description of Color. +uint8 COLOR_RED = 0 +uint8 COLOR_BLUE = 1 +uint8 COLOR_YELLOW = 2 + +uint8 status # The property status is of type + # SampleClassEnumStatus. +uint8 color # The property color is of type Color. +uint8 field diff --git a/tests/test_import_msgs.py b/tests/test_import_msgs.py index 6fab51d..d8e807c 100644 --- a/tests/test_import_msgs.py +++ b/tests/test_import_msgs.py @@ -2,6 +2,7 @@ # SPDX-License-Identifier: Apache-2.0 import pathlib +import re import pytest from capellambse import decl, helpers @@ -24,6 +25,9 @@ CUSTOM_LICENSE_PACKAGE_PATH = PATH.joinpath( "data/data_model/custom_license_msgs" ) +DESCRIPTION_REGEX_PACKAGE_PATH = PATH.joinpath( + "data/data_model/description_regex_msgs" +) SAMPLE_PACKAGE_YAML = PATH.joinpath("data/data_model/example_msgs.yaml") DUMMY_PATH = PATH.joinpath("data/empty_project_60") CUSTOM_LICENSE_PATH = PATH.joinpath( @@ -34,6 +38,11 @@ "Properties in SampleClassEnum can reference enums in the same file. " ) +EXPECTED_DESCRIPTION_REGEX = ( + "Message type for providing the made decision. " + "An additional description line.
Expect linebreak " +) + ROOT = helpers.UUIDString("00000000-0000-0000-0000-000000000000") SA_ROOT = helpers.UUIDString("00000000-0000-0000-0000-000000000001") @@ -241,3 +250,16 @@ def test_custom_license_header(): importer.messages.packages[0].messages[0].description == EXPECTED_DESCRIPTION_SAMPLE_CLASS_ENUM ) + + +def test_description_regex(): + importer = Importer( + DESCRIPTION_REGEX_PACKAGE_PATH.as_posix(), + True, + msg_description_regex=r"^Description:\s*([\s\S]*)", + ) + + assert ( + importer.messages.packages[0].messages[0].description + == EXPECTED_DESCRIPTION_REGEX + )