diff --git a/odxtools/admindata.py b/odxtools/admindata.py index b538298f..eadf9621 100644 --- a/odxtools/admindata.py +++ b/odxtools/admindata.py @@ -5,7 +5,9 @@ from .companydata import CompanyData, TeamMember from .odxlink import OdxLinkId, OdxLinkRef, OdxLinkDatabase, OdxDocFragment from .utils import read_description_from_odx +from .specialdata import SpecialDataGroup, read_sdgs_from_odx +from xml.etree import ElementTree from dataclasses import dataclass, field from typing import Optional, Any, Dict, List @@ -14,14 +16,30 @@ class CompanyDocInfo: company_data_ref: OdxLinkRef company_data: Optional[CompanyData] = None team_member_ref: Optional[OdxLinkRef] = None - team_member: Optional[TeamMember] = None doc_label: Optional[str] = None + sdgs: List[SpecialDataGroup] = field(default_factory=list) + + _team_member: Optional[TeamMember] = None + @property + def team_member(self) -> Optional[TeamMember]: + return self._team_member + + def _build_odxlinks(self) -> Dict[OdxLinkId, Any]: + result = { } + + for sdg in self.sdgs: + result.update(sdg._build_odxlinks()) + + return result def _resolve_references(self, odxlinks: OdxLinkDatabase): self.company_data = odxlinks.resolve(self.company_data_ref) if self.team_member_ref is not None: - self.team_member = odxlinks.resolve(self.team_member_ref) + self._team_member = odxlinks.resolve(self.team_member_ref) + + for sdg in self.sdgs: + sdg._resolve_references(odxlinks) @dataclass() class Modification: @@ -48,12 +66,15 @@ def _resolve_references(self, odxlinks: OdxLinkDatabase): @dataclass() class AdminData: language: Optional[str] = None - company_doc_infos: Optional[List[CompanyDocInfo]] = None + company_doc_infos: List[CompanyDocInfo] = field(default_factory=list) doc_revisions: Optional[List[DocRevision]] = None def _build_odxlinks(self) -> Dict[OdxLinkId, Any]: result: Dict[OdxLinkId, Any] = {} + for cdi in self.company_doc_infos: + result.update(cdi._build_odxlinks()) + return result def _resolve_references(self, odxlinks: OdxLinkDatabase): @@ -65,38 +86,41 @@ def _resolve_references(self, odxlinks: OdxLinkDatabase): for dr in self.doc_revisions: dr._resolve_references(odxlinks) -def read_admin_data_from_odx(et_element, doc_frags: List[OdxDocFragment]): + for cdi in self.company_doc_infos: + cdi._resolve_references(odxlinks) + +def read_admin_data_from_odx(et_element: Optional[ElementTree.Element], + doc_frags: List[OdxDocFragment]) \ + -> Optional[AdminData]: + if et_element is None: return None language = et_element.findtext("LANGUAGE") - company_doc_infos = et_element.find("COMPANY-DOC-INFOS") - if company_doc_infos is not None: - cdilist = list() - for cdi in company_doc_infos.iterfind("COMPANY-DOC-INFO"): + company_doc_infos = list() + if cdis_elem := et_element.find("COMPANY-DOC-INFOS"): + for cdi in cdis_elem.iterfind("COMPANY-DOC-INFO"): # the company data reference is mandatory company_data_ref = OdxLinkRef.from_et(cdi.find("COMPANY-DATA-REF"), doc_frags) assert company_data_ref is not None team_member_ref = OdxLinkRef.from_et(cdi.find("TEAM-MEMBER-REF"), doc_frags) - assert team_member_ref is not None - doc_label = cdi.findtext("DOC-LABEL") + sdgs = read_sdgs_from_odx(cdi.find("SDGS"), doc_frags) - cdilist.append(CompanyDocInfo(company_data_ref=company_data_ref, - team_member_ref=team_member_ref, - doc_label=doc_label)) + company_doc_infos.append(CompanyDocInfo(company_data_ref=company_data_ref, + team_member_ref=team_member_ref, + doc_label=doc_label, + sdgs=sdgs)) - company_doc_infos = cdilist - - doc_revisions = et_element.find("DOC-REVISIONS") - if doc_revisions is not None: - drlist = list() - for dr in doc_revisions.iterfind("DOC-REVISION"): + doc_revisions = [] + if drs_elem := et_element.find("DOC-REVISIONS"): + for dr in drs_elem.iterfind("DOC-REVISION"): team_member_ref = OdxLinkRef.from_et(dr.find("TEAM-MEMBER-REF"), doc_frags) revision_label = dr.findtext("REVISION-LABEL") state = dr.findtext("STATE") date = dr.findtext("DATE") + assert date is not None tool = dr.findtext("TOOL") modlist = None @@ -110,19 +134,13 @@ def read_admin_data_from_odx(et_element, doc_frags: List[OdxDocFragment]): modlist.append(Modification(change=m_change, reason=m_reason)) - drlist.append(DocRevision(team_member_ref=team_member_ref, + doc_revisions.append(DocRevision(team_member_ref=team_member_ref, revision_label=revision_label, state=state, date=date, tool=tool, modifications=modlist)) - - doc_revisions = drlist - - else: - doc_revisions = [] - return AdminData(language=language, company_doc_infos=company_doc_infos, doc_revisions=doc_revisions) diff --git a/odxtools/audience.py b/odxtools/audience.py index 88a1905c..fd6bb083 100644 --- a/odxtools/audience.py +++ b/odxtools/audience.py @@ -2,7 +2,7 @@ # Copyright (c) 2022 MBition GmbH from dataclasses import dataclass, field -from typing import Optional, List +from typing import Optional, List, Dict, Any from odxtools.utils import read_description_from_odx from .odxlink import OdxLinkRef, OdxLinkId, OdxDocFragment, OdxLinkDatabase @@ -44,6 +44,12 @@ class AdditionalAudience: long_name: Optional[str] = None description: Optional[str] = None + def _build_odxlinks(self) -> Dict[OdxLinkId, Any]: + return { self.odx_id: self } + + def _resolve_references(self, odxlinks: OdxLinkDatabase) -> None: + pass + def read_audience_from_odx(et_element, doc_frags: List[OdxDocFragment]): enabled_audience_refs = [OdxLinkRef.from_et(ref, doc_frags) diff --git a/odxtools/companydata.py b/odxtools/companydata.py index 15d4a076..d8e71091 100644 --- a/odxtools/companydata.py +++ b/odxtools/companydata.py @@ -5,6 +5,7 @@ from .utils import read_description_from_odx from .odxlink import OdxLinkId, OdxLinkDatabase, OdxDocFragment from .utils import short_name_as_id +from .specialdata import SpecialDataGroup, read_sdgs_from_odx from dataclasses import dataclass, field from typing import Optional, Any, Dict, List @@ -29,6 +30,19 @@ class RelatedDoc: @dataclass() class CompanySpecificInfo: related_docs: Optional[List[RelatedDoc]] + sdgs: List[SpecialDataGroup] = field(default_factory=list) + + def _build_odxlinks(self) -> Dict[OdxLinkId, Any]: + result = { } + + for sdg in self.sdgs: + result.update(sdg._build_odxlinks()) + + return result + + def _resolve_references(self, odxlinks: OdxLinkDatabase): + for sdg in self.sdgs: + sdg._resolve_references(odxlinks) @dataclass() class TeamMember: @@ -63,10 +77,14 @@ def _build_odxlinks(self) -> Dict[OdxLinkId, Any]: for tm in self.team_members: result[tm.odx_id] = tm + if self.company_specific_info: + result.update(self.company_specific_info._build_odxlinks()) + return result def _resolve_references(self, odxlinks: OdxLinkDatabase): - pass + if self.company_specific_info: + self.company_specific_info._resolve_references(odxlinks) def read_xdoc_from_odx(xdoc): short_name = xdoc.findtext("SHORT-NAME") @@ -170,7 +188,10 @@ def read_company_datas_from_odx(et_element, doc_frags: List[OdxDocFragment]) \ related_docs = rdlist - company_specific_info = CompanySpecificInfo(related_docs=related_docs) + sdgs = read_sdgs_from_odx(company_specific_info.find("SDGS"), doc_frags) + + company_specific_info = CompanySpecificInfo(related_docs=related_docs, + sdgs=sdgs) cdl.append(CompanyData(odx_id=odx_id, short_name=short_name, diff --git a/odxtools/comparam_subset.py b/odxtools/comparam_subset.py index 152949f5..9395d9f1 100644 --- a/odxtools/comparam_subset.py +++ b/odxtools/comparam_subset.py @@ -11,7 +11,7 @@ from .admindata import AdminData, read_admin_data_from_odx from .companydata import CompanyData, read_company_datas_from_odx from .utils import read_description_from_odx, short_name_as_id - +from .specialdata import SpecialDataGroup, read_sdgs_from_odx StandardizationLevel = Literal[ "STANDARD", @@ -102,6 +102,7 @@ class ComparamSubset: description: Optional[str] = None admin_data: Optional[AdminData] = None company_datas: Optional[NamedItemList[CompanyData]] = None + sdgs: List[SpecialDataGroup] = field(default_factory=list) def _build_odxlinks(self) -> Dict[OdxLinkId, Any]: odxlinks: Dict[OdxLinkId, Any] = {} @@ -117,6 +118,16 @@ def _build_odxlinks(self) -> Dict[OdxLinkId, Any]: if self.unit_spec: odxlinks.update(self.unit_spec._build_odxlinks()) + if self.admin_data is not None: + odxlinks.update(self.admin_data._build_odxlinks()) + + if self.company_datas is not None: + for cd in self.company_datas: + odxlinks.update(cd._build_odxlinks()) + + for sdg in self.sdgs: + odxlinks.update(sdg._build_odxlinks()) + return odxlinks def _resolve_references(self, odxlinks: OdxLinkDatabase): @@ -129,6 +140,15 @@ def _resolve_references(self, odxlinks: OdxLinkDatabase): if self.unit_spec: self.unit_spec._resolve_references(odxlinks) + if self.admin_data is not None: + self.admin_data._resolve_references(odxlinks) + + if self.company_datas is not None: + for cd in self.company_datas: + cd._resolve_references(odxlinks) + + for sdg in self.sdgs: + sdg._resolve_references(odxlinks) def read_comparam_from_odx(et_element, doc_frags: List[OdxDocFragment]) -> BaseComparam: odx_id = OdxLinkId.from_et(et_element, doc_frags) @@ -217,6 +237,8 @@ def read_comparam_subset_from_odx(et_element: Element) -> ComparamSubset: else: unit_spec = None + sdgs = read_sdgs_from_odx(et_element.find("SDGS"), doc_frags) + return ComparamSubset( odx_id=odx_id, category=category, @@ -228,4 +250,5 @@ def read_comparam_subset_from_odx(et_element: Element) -> ComparamSubset: data_object_props=NamedItemList(short_name_as_id, data_object_props), comparams=NamedItemList(short_name_as_id, comparams), unit_spec=unit_spec, + sdgs=sdgs, ) diff --git a/odxtools/database.py b/odxtools/database.py index 141ebbee..e1ee9218 100644 --- a/odxtools/database.py +++ b/odxtools/database.py @@ -95,24 +95,24 @@ def finalize_init(self) -> None: self._odxlinks = OdxLinkDatabase() for subset in self.comparam_subsets: - self.odxlinks.update(subset._build_odxlinks()) + self._odxlinks.update(subset._build_odxlinks()) for dlc in self.diag_layer_containers: - self.odxlinks.update(dlc._build_odxlinks()) + self._odxlinks.update(dlc._build_odxlinks()) for dl in self.diag_layers: - self.odxlinks.update(dl._build_odxlinks()) + self._odxlinks.update(dl._build_odxlinks()) # Resolve references for subset in self.comparam_subsets: - subset._resolve_references(self.odxlinks) + subset._resolve_references(self._odxlinks) for dlc in self.diag_layer_containers: - dlc._resolve_references(self.odxlinks) + dlc._resolve_references(self._odxlinks) for dl_type_name in ["ECU-SHARED-DATA", "PROTOCOL", "FUNCTIONAL-GROUP", "BASE-VARIANT", "ECU-VARIANT"]: for dl in self.diag_layers: if dl.variant_type == dl_type_name: - dl._resolve_references(self.odxlinks) + dl._resolve_references(self._odxlinks) @property def odxlinks(self) -> OdxLinkDatabase: diff --git a/odxtools/dataobjectproperty.py b/odxtools/dataobjectproperty.py index 90da3908..a045f5b8 100644 --- a/odxtools/dataobjectproperty.py +++ b/odxtools/dataobjectproperty.py @@ -3,7 +3,7 @@ import abc from typing import cast, List, Dict, Optional, Any, Union -from dataclasses import dataclass +from dataclasses import dataclass, field from .utils import read_description_from_odx from .physicaltype import PhysicalType, read_physical_type_from_odx @@ -14,10 +14,11 @@ from .decodestate import DecodeState from .encodestate import EncodeState from .exceptions import DecodeError, EncodeError +from .specialdata import SpecialDataGroup, read_sdgs_from_odx from .odxlink import OdxLinkRef, OdxLinkId, OdxDocFragment, OdxLinkDatabase class DopBase(abc.ABC): - """ Base class for all DOPs. + """Base class for all DOPs. Any class that a parameter can reference via a DOP-REF should inherit from this class. """ @@ -27,12 +28,26 @@ def __init__(self, short_name, long_name=None, description=None, - is_visible=True): + is_visible=True, + sdgs = []): self.odx_id = odx_id self.short_name = short_name self.long_name = long_name self.description = description self.is_visible = is_visible + self.sdgs = sdgs + + def _build_odxlinks(self) -> Dict[OdxLinkId, Any]: + result = {} + + for sdg in self.sdgs: + result.update(sdg._build_odxlinks()) + + return result + + def _resolve_references(self, odxlinks: OdxLinkDatabase) -> None: + for sdg in self.sdgs: + sdg._resolve_references(odxlinks) @abc.abstractmethod def convert_physical_to_bytes(self, physical_value, encode_state: EncodeState, bit_position: int) -> bytes: @@ -56,12 +71,14 @@ def __init__(self, compu_method: CompuMethod, unit_ref: Optional[OdxLinkRef] = None, long_name: Optional[str] = None, - description: Optional[str] = None + description: Optional[str] = None, + sdgs: List[SpecialDataGroup] = [], ): super().__init__(odx_id=odx_id, short_name=short_name, long_name=long_name, - description=description) + description=description, + sdgs=sdgs) self.diag_coded_type = diag_coded_type self.physical_type = physical_type self.compu_method = compu_method @@ -120,6 +137,8 @@ def get_valid_physical_values(self): def _resolve_references(self, odxlinks: OdxLinkDatabase): """Resolves the reference to the unit""" + super()._resolve_references(odxlinks) + if self.unit_ref: self._unit = odxlinks.resolve(self.unit_ref) @@ -155,7 +174,19 @@ class DiagnosticTroubleCode: display_trouble_code: Optional[str] = None level: Union[bytes, bytearray, None] = None is_temporary: bool = False + sdgs: List[SpecialDataGroup] = field(default_factory=list) + + def _build_odxlinks(self) -> Dict[OdxLinkId, Any]: + result = {} + + for sdg in self.sdgs: + result.update(sdg._build_odxlinks()) + return result + + def _resolve_references(self, odxlinks: OdxLinkDatabase) -> None: + for sdg in self.sdgs: + sdg._resolve_references(odxlinks) class DtcRef: """A proxy for DiagnosticTroubleCode. @@ -196,10 +227,11 @@ def is_temporary(self): return self.dtc.is_temporary def _resolve_references(self, odxlinks: OdxLinkDatabase): - self.dtc: Optional[DiagnosticTroubleCode] = odxlinks.resolve(self.dtc_ref) # type: ignore - - assert isinstance(self.dtc, DiagnosticTroubleCode),\ - f"DTC-REF {self.dtc_ref} does not reference a DTC but a {type(self.dtc)}." + dtc = odxlinks.resolve(self.dtc_ref) + assert isinstance(dtc, DiagnosticTroubleCode),\ + f"DTC-REF {self.dtc_ref} references an object of type {type(dtc)} " \ + f"instead of a DiagnosticTroubleCode." + self.dtc = dtc class DtcDop(DataObjectProperty): @@ -215,7 +247,8 @@ def __init__(self, is_visible: bool = False, linked_dtc_dops: bool = False, long_name: Optional[str] = None, - description: Optional[str] = None + description: Optional[str] = None, + sdgs: List[SpecialDataGroup] = [], ): super().__init__(odx_id=odx_id, short_name=short_name, @@ -223,7 +256,8 @@ def __init__(self, description=description, diag_coded_type=diag_coded_type, physical_type=physical_type, - compu_method=compu_method) + compu_method=compu_method, + sdgs=sdgs) self.dtcs = dtcs self.is_visible = is_visible self.linked_dtc_dops = linked_dtc_dops @@ -268,19 +302,21 @@ def convert_physical_to_bytes(self, physical_value, encode_state, bit_position): return super().convert_physical_to_bytes(trouble_code, encode_state, bit_position) def _build_odxlinks(self) -> Dict[OdxLinkId, Any]: - odxlinks: Dict[OdxLinkId, Any] = {} + odxlinks = super()._build_odxlinks() odxlinks[self.odx_id] = self for dtc in self.dtcs: if isinstance(dtc, DiagnosticTroubleCode): assert dtc.odx_id is not None odxlinks[dtc.odx_id] = dtc + odxlinks.update(dtc._build_odxlinks()) + return odxlinks def _resolve_references(self, odxlinks: OdxLinkDatabase): - for dtc in self.dtcs: - if isinstance(dtc, DtcRef): - dtc._resolve_references(odxlinks) + super()._resolve_references(odxlinks) + for dtc in self.dtcs: + dtc._resolve_references(odxlinks) def read_dtc_from_odx(et_element, doc_frags: List[OdxDocFragment]): if et_element.find("DISPLAY-TROUBLE-CODE") is not None: @@ -293,13 +329,16 @@ def read_dtc_from_odx(et_element, doc_frags: List[OdxDocFragment]): else: level = None + sdgs = read_sdgs_from_odx(et_element.find("SDGS"), doc_frags) + return DiagnosticTroubleCode(odx_id=OdxLinkId.from_et(et_element, doc_frags), short_name=et_element.findtext("SHORT-NAME"), trouble_code=int( et_element.findtext("TROUBLE-CODE")), text=et_element.findtext("TEXT"), display_trouble_code=display_trouble_code, - level=level) + level=level, + sdgs=sdgs) def read_data_object_property_from_odx(et_element, doc_frags: List[OdxDocFragment]) \ @@ -310,7 +349,7 @@ def read_data_object_property_from_odx(et_element, doc_frags: List[OdxDocFragmen short_name = et_element.findtext("SHORT-NAME") long_name = et_element.findtext("LONG-NAME") description = read_description_from_odx(et_element.find("DESC")) - logger.debug('Parsing DOP ' + short_name) + sdgs = read_sdgs_from_odx(et_element.find("SDGS"), doc_frags) diag_coded_type = read_diag_coded_type_from_odx( et_element.find("DIAG-CODED-TYPE"), doc_frags) @@ -329,7 +368,8 @@ def read_data_object_property_from_odx(et_element, doc_frags: List[OdxDocFragmen compu_method, unit_ref=unit_ref, long_name=long_name, - description=description) + description=description, + sdgs=sdgs) else: dtcs = [read_dtc_from_odx(el, doc_frags) for el in et_element.iterfind("DTCS/DTC")] @@ -345,5 +385,6 @@ def read_data_object_property_from_odx(et_element, doc_frags: List[OdxDocFragmen dtcs, is_visible=is_visible, long_name=long_name, - description=description) + description=description, + sdgs=sdgs) return dop diff --git a/odxtools/diagdatadictionaryspec.py b/odxtools/diagdatadictionaryspec.py index cd3df2a1..609f2878 100644 --- a/odxtools/diagdatadictionaryspec.py +++ b/odxtools/diagdatadictionaryspec.py @@ -21,6 +21,7 @@ from .table import read_table_from_odx, Table from .units import read_unit_spec_from_odx, UnitSpec from .odxlink import OdxLinkId, OdxLinkDatabase, OdxDocFragment +from .specialdata import SpecialDataGroup, read_sdgs_from_odx if TYPE_CHECKING: from .diaglayer import DiagLayer @@ -56,6 +57,7 @@ class DiagDataDictionarySpec: default_factory=lambda: _construct_named_item_list([]) ) unit_spec: Optional[UnitSpec] = None + sdgs: List[SpecialDataGroup] = field(default_factory=list) def __post_init__(self): self._all_data_object_properties = _construct_named_item_list( @@ -96,27 +98,26 @@ def __post_init__(self): def _build_odxlinks(self) -> Dict[OdxLinkId, Any]: odxlinks = {} - for obj in chain( - self.data_object_props, self.structures, self.end_of_pdu_fields, self.tables - ): + for obj in chain(self.data_object_props, + self.structures, + self.end_of_pdu_fields, + self.tables, + ): odxlinks[obj.odx_id] = obj - for table in self.tables: - odxlinks.update(table._build_odxlinks()) - - for env_data_desc in self.env_data_descs: - odxlinks.update(env_data_desc._build_odxlinks()) - - for env_data in self.env_datas: - odxlinks.update(env_data._build_odxlinks()) - - for mux in self.muxs: - odxlinks.update(mux._build_odxlinks()) - - for obj in self.dtc_dops: + for obj in chain(self.data_object_props, + self.dtc_dops, + self.env_data_descs, + self.env_datas, + self.muxs, + self.sdgs, + self.structures, + self.end_of_pdu_fields, + self.tables, + ): odxlinks.update(obj._build_odxlinks()) - if self.unit_spec: + if self.unit_spec is not None: odxlinks.update(self.unit_spec._build_odxlinks()) return odxlinks @@ -124,25 +125,25 @@ def _build_odxlinks(self) -> Dict[OdxLinkId, Any]: def _resolve_references(self, parent_dl: "DiagLayer", odxlinks: OdxLinkDatabase): - for dop in chain( - self.dtc_dops, - self.data_object_props, - self.tables, - ): - dop._resolve_references(odxlinks) - - for struct in chain(self.structures, self.end_of_pdu_fields): - struct._resolve_references(parent_dl, odxlinks) - - for env_data in chain( - self.env_datas, - ): - env_data._resolve_references(parent_dl, odxlinks) - - for mux in chain( - self.muxs, - ): - mux._resolve_references(odxlinks) + + for obj in self.data_object_props: + obj._resolve_references(odxlinks) + for obj in self.dtc_dops: + obj._resolve_references(odxlinks) + for obj in self.end_of_pdu_fields: + obj._resolve_references(parent_dl, odxlinks) + for obj in self.env_data_descs: + obj._resolve_references(odxlinks) + for obj in self.env_datas: + obj._resolve_references(parent_dl, odxlinks) + for obj in self.muxs: + obj._resolve_references(odxlinks) + for obj in self.sdgs: + obj._resolve_references(odxlinks) + for obj in self.structures: + obj._resolve_references(parent_dl, odxlinks) + for obj in self.tables: + obj._resolve_references(odxlinks) if self.unit_spec: self.unit_spec._resolve_references(odxlinks) @@ -219,6 +220,8 @@ def read_diag_data_dictionary_spec_from_odx(et_element, doc_frags: List[OdxDocFr if num > 0: logger.info(f"Not implemented: Did not parse {num} {name}.") + sdgs = read_sdgs_from_odx(et_element.find("SDGS"), doc_frags) + return DiagDataDictionarySpec( data_object_props=NamedItemList(short_name_as_id, data_object_props), structures=NamedItemList(short_name_as_id, structures), @@ -229,4 +232,5 @@ def read_diag_data_dictionary_spec_from_odx(et_element, doc_frags: List[OdxDocFr env_data_descs=NamedItemList(short_name_as_id, env_data_descs), env_datas=NamedItemList(short_name_as_id, env_datas), muxs=NamedItemList(short_name_as_id, muxs), + sdgs=sdgs, ) diff --git a/odxtools/diaglayer.py b/odxtools/diaglayer.py index 021a6e57..1900fba9 100644 --- a/odxtools/diaglayer.py +++ b/odxtools/diaglayer.py @@ -29,6 +29,7 @@ from .service import DiagService, read_diag_service_from_odx from .singleecujob import SingleEcuJob, read_single_ecu_job_from_odx from .structures import Request, Response, read_structure_from_odx +from .specialdata import SpecialDataGroup, read_sdgs_from_odx # Defines priority of overriding objects PRIORITY_OF_DIAG_LAYER_TYPE = { @@ -116,6 +117,7 @@ def __init__(self, states=[], state_transitions=[], import_refs=[], + sdgs=[], ): logger.info(f"Initializing variant type {variant_type}") self.variant_type = variant_type @@ -124,6 +126,7 @@ def __init__(self, self.short_name = short_name self.long_name = long_name self.description = description + self.sdgs = sdgs # Requests and Responses self.requests = requests @@ -203,7 +206,7 @@ def _build_odxlinks(self) -> Dict[OdxLinkId, Any]: """Construct a mapping from IDs to all objects that are contained in this diagnostic layer.""" logger.info(f"Adding {self.odx_id} to odxlinks.") - odxlinks = {} + odxlinks = { self.odx_id: self } for obj in chain(self._local_services, self._local_single_ecu_jobs, @@ -216,12 +219,21 @@ def _build_odxlinks(self) -> Dict[OdxLinkId, Any]: self.state_transitions): odxlinks[obj.odx_id] = obj + for obj in chain(self._local_services, + self._local_single_ecu_jobs, + self.requests, + self.positive_responses, + self.negative_responses, + self.additional_audiences, + self.functional_classes, + self.sdgs, + self.states, + self.state_transitions): + odxlinks.update(obj._build_odxlinks()) + if self.local_diag_data_dictionary_spec: - odxlinks.update( - self.local_diag_data_dictionary_spec._build_odxlinks() - ) + odxlinks.update(self.local_diag_data_dictionary_spec._build_odxlinks()) - odxlinks[self.odx_id] = self return odxlinks def _resolve_references(self, odxlinks: OdxLinkDatabase) -> None: @@ -230,6 +242,9 @@ def _resolve_references(self, odxlinks: OdxLinkDatabase) -> None: for pr in self.parent_refs: pr._resolve_references(odxlinks) + for sdg in self.sdgs: + sdg._resolve_references(odxlinks) + services = sorted(self._compute_available_services_by_name(odxlinks).values(), key=short_name_as_id) self._services = NamedItemList[Union[DiagService, SingleEcuJob]]( @@ -256,14 +271,15 @@ def _resolve_references(self, odxlinks: OdxLinkDatabase) -> None: struct._resolve_references(self, odxlinks) local_diag_comms: Iterable[Union[DiagService, SingleEcuJob]] \ - = (*self._local_services, *self._local_single_ecu_jobs) - for service in local_diag_comms: - service._resolve_references(odxlinks) + = [*self._local_services, *self._local_single_ecu_jobs] + for ldc in local_diag_comms: + ldc._resolve_references(odxlinks) if self.local_diag_data_dictionary_spec: self.local_diag_data_dictionary_spec._resolve_references(self, odxlinks) + def __local_services_by_name(self, odxlinks: OdxLinkDatabase) -> Dict[str, Union[DiagService, SingleEcuJob]]: services_by_name: Dict[str, Union[DiagService, SingleEcuJob]] = {} @@ -735,31 +751,30 @@ def read_diag_layer_from_odx(et_element: ElementTree.Element, import_refs = [OdxLinkRef.from_et(ref, doc_frags) for ref in et_element.iterfind("IMPORT-REFS/IMPORT-REF")] - # TODO: Are UNIT-SPEC and SDGS needed? + sdgs = read_sdgs_from_odx(et_element.find("SDGS"), doc_frags) # Create DiagLayer - dl = DiagLayer(variant_type, - odx_id, - short_name, - long_name=long_name, - description=description, - requests=requests, - positive_responses=positive_responses, - negative_responses=negative_responses, - services=services, - diag_comm_refs=diag_comm_refs, - single_ecu_jobs=single_ecu_jobs, - parent_refs=parent_refs, - diag_data_dictionary_spec=diag_data_dictionary_spec, - communication_parameters=com_params, - additional_audiences=additional_audiences, - functional_classes=functional_classes, - states=states, - state_transitions=state_transitions, - import_refs=import_refs, - ) - - return dl + return DiagLayer(variant_type, + odx_id, + short_name, + long_name=long_name, + description=description, + requests=requests, + positive_responses=positive_responses, + negative_responses=negative_responses, + services=services, + diag_comm_refs=diag_comm_refs, + single_ecu_jobs=single_ecu_jobs, + parent_refs=parent_refs, + diag_data_dictionary_spec=diag_data_dictionary_spec, + communication_parameters=com_params, + additional_audiences=additional_audiences, + functional_classes=functional_classes, + states=states, + state_transitions=state_transitions, + import_refs=import_refs, + sdgs=sdgs, + ) class DiagLayerContainer: def __init__(self, @@ -773,7 +788,8 @@ def __init__(self, protocols: List[DiagLayer] = [], functional_groups: List[DiagLayer] = [], base_variants: List[DiagLayer] = [], - ecu_variants: List[DiagLayer] = [] + ecu_variants: List[DiagLayer] = [], + sdgs: List[SpecialDataGroup] = [], ) -> None: self.odx_id = odx_id self.short_name = short_name @@ -787,6 +803,7 @@ def __init__(self, self.functional_groups = functional_groups self.base_variants = base_variants self.ecu_variants = ecu_variants + self.sdgs = sdgs self._diag_layers = NamedItemList[DiagLayer](short_name_as_id, list( chain(self.ecu_shared_datas, self.protocols, self.functional_groups, self.base_variants, self.ecu_variants))) @@ -802,6 +819,16 @@ def _build_odxlinks(self): for cd in self.company_datas: result.update(cd._build_odxlinks()) + for dl in chain(self.ecu_shared_datas, + self.protocols, + self.functional_groups, + self.base_variants, + self.ecu_variants): + result.update(dl._build_odxlinks()) + + for sdg in self.sdgs: + result.update(sdg._build_odxlinks()) + return result def _resolve_references(self, odxlinks: OdxLinkDatabase) -> None: @@ -812,6 +839,16 @@ def _resolve_references(self, odxlinks: OdxLinkDatabase) -> None: for cd in self.company_datas: cd._resolve_references(odxlinks) + for dl in chain(self.ecu_shared_datas, + self.protocols, + self.functional_groups, + self.base_variants, + self.ecu_variants): + dl._resolve_references(odxlinks) + + for sdg in self.sdgs: + sdg._resolve_references(odxlinks) + @property def diag_layers(self): return self._diag_layers diff --git a/odxtools/endofpdufield.py b/odxtools/endofpdufield.py index fff2d10e..5381a75f 100644 --- a/odxtools/endofpdufield.py +++ b/odxtools/endofpdufield.py @@ -88,10 +88,10 @@ def convert_bytes_to_physical(self, decode_state: DecodeState, bit_position: int return value, next_byte_position - def _resolve_references(self, + def _resolve_references(self, # type: ignore[override] parent_dl: "DiagLayer", odxlinks: OdxLinkDatabase) \ - -> None: + -> None: """Recursively resolve any references (odxlinks or sn-refs) """ if self.structure_ref is not None: diff --git a/odxtools/envdata.py b/odxtools/envdata.py index 332c236c..ade2204d 100644 --- a/odxtools/envdata.py +++ b/odxtools/envdata.py @@ -2,15 +2,18 @@ # Copyright (c) 2022 MBition GmbH from dataclasses import dataclass -from typing import Optional, Any, Dict, List +from typing import TYPE_CHECKING, Optional, Any, Dict, List from .parameters import read_parameter_from_odx from .utils import read_description_from_odx -from .odxlink import OdxLinkId, OdxDocFragment +from .odxlink import OdxLinkId, OdxDocFragment, OdxLinkDatabase from .structures import BasicStructure from .parameters.parameterbase import Parameter from .globals import logger +if TYPE_CHECKING: + from .diaglayer import DiagLayer + @dataclass class EnvironmentData(BasicStructure): """This class represents Environment Data that describes the circumstances in which the error occurred.""" @@ -30,9 +33,18 @@ def __init__(self, self.dtc_values = dtc_values def _build_odxlinks(self) -> Dict[OdxLinkId, Any]: - odxlinks = {self.odx_id: self} + odxlinks = super()._build_odxlinks() + + odxlinks[self.odx_id] = self + return odxlinks + def _resolve_references(self, # type: ignore[override] + parent_dl: "DiagLayer", + odxlinks: OdxLinkDatabase) \ + -> None: + super()._resolve_references(parent_dl, odxlinks) + def __repr__(self) -> str: return ( f"EnvironmentData('{self.short_name}', " diff --git a/odxtools/functionalclass.py b/odxtools/functionalclass.py index 905cc072..a5f20cfc 100644 --- a/odxtools/functionalclass.py +++ b/odxtools/functionalclass.py @@ -2,10 +2,10 @@ # Copyright (c) 2022 MBition GmbH from dataclasses import dataclass -from typing import Optional, List +from typing import Optional, Dict, List, Any from .utils import read_description_from_odx -from .odxlink import OdxLinkId, OdxDocFragment +from .odxlink import OdxLinkId, OdxDocFragment, OdxLinkDatabase @dataclass() @@ -18,6 +18,11 @@ class FunctionalClass: long_name: Optional[str] = None description: Optional[str] = None + def _build_odxlinks(self) -> Dict[OdxLinkId, Any]: + return { self.odx_id: self } + + def _resolve_references(self, odxlinks: OdxLinkDatabase) -> None: + pass def read_functional_class_from_odx(et_element, doc_frags: List[OdxDocFragment]): short_name = et_element.findtext("SHORT-NAME") diff --git a/odxtools/parameters/parameterbase.py b/odxtools/parameters/parameterbase.py index a4796ddc..5aca4322 100644 --- a/odxtools/parameters/parameterbase.py +++ b/odxtools/parameters/parameterbase.py @@ -3,13 +3,15 @@ import abc -from typing import Optional, Union +from typing import Optional, Union, List import warnings from ..decodestate import DecodeState from ..encodestate import EncodeState from ..exceptions import OdxWarning +from ..odxlink import OdxLinkDatabase from ..globals import logger +from ..specialdata import SpecialDataGroup, read_sdgs_from_odx class Parameter(abc.ABC): def __init__(self, @@ -19,7 +21,8 @@ def __init__(self, byte_position: Optional[int] = None, bit_position: Optional[int] = None, semantic: Optional[str] = None, - description: Optional[str] = None) -> None: + description: Optional[str] = None, + sdgs: List[SpecialDataGroup] = []) -> None: self.short_name: str = short_name self.long_name: Optional[str] = long_name self.byte_position: Optional[int] = byte_position @@ -27,6 +30,19 @@ def __init__(self, self.parameter_type: str = parameter_type self.semantic: Optional[str] = semantic self.description: Optional[str] = description + self.sdgs = sdgs + + def _build_odxlinks(self): + result = {} + + for sdg in self.sdgs: + result.update(sdg._build_odxlinks()) + + return result + + def _resolve_references(self, odxlinks: OdxLinkDatabase) -> None: + for sdg in self.sdgs: + sdg._resolve_references(odxlinks) @property def bit_length(self) -> Optional[int]: diff --git a/odxtools/parameters/parameterwithdop.py b/odxtools/parameters/parameterwithdop.py index 3144d425..35d417b2 100644 --- a/odxtools/parameters/parameterwithdop.py +++ b/odxtools/parameters/parameterwithdop.py @@ -45,6 +45,8 @@ def dop(self, dop: DopBase): self.dop_snref = dop.short_name def resolve_references(self, parent_dl, odxlinks: OdxLinkDatabase): + super()._resolve_references(odxlinks) + dop = self.dop if self.dop_snref: dop = parent_dl.data_object_properties.get(self.dop_snref) diff --git a/odxtools/parameters/readparameter.py b/odxtools/parameters/readparameter.py index fce1525d..84e5c86a 100644 --- a/odxtools/parameters/readparameter.py +++ b/odxtools/parameters/readparameter.py @@ -7,6 +7,7 @@ from ..globals import xsi from ..utils import read_description_from_odx from ..odxlink import OdxLinkRef, OdxLinkId, OdxDocFragment +from ..specialdata import SpecialDataGroup, read_sdgs_from_odx from .codedconstparameter import CodedConstParameter from .dynamicparameter import DynamicParameter @@ -35,6 +36,8 @@ def read_parameter_from_odx(et_element, doc_frags): bit_position = int(bit_position_str) parameter_type = et_element.get(f"{xsi}type") + sdgs = read_sdgs_from_odx(et_element.find("SDGS"), doc_frags) + # Which attributes are set depends on the type of the parameter. if parameter_type in ["VALUE", "PHYS-CONST", "SYSTEM", "LENGTH-KEY"]: dop_ref = OdxLinkRef.from_et(et_element.find("DOP-REF"), doc_frags) @@ -57,7 +60,8 @@ def read_parameter_from_odx(et_element, doc_frags): dop_ref=dop_ref, dop_snref=dop_snref, physical_default_value=physical_default_value, - description=description) + description=description, + sdgs=sdgs) elif parameter_type == "PHYS-CONST": physical_constant_value = et_element.findtext( @@ -71,7 +75,8 @@ def read_parameter_from_odx(et_element, doc_frags): dop_ref=dop_ref, dop_snref=dop_snref, physical_constant_value=physical_constant_value, - description=description) + description=description, + sdgs=sdgs) elif parameter_type == "CODED-CONST": diag_coded_type = read_diag_coded_type_from_odx( @@ -86,7 +91,8 @@ def read_parameter_from_odx(et_element, doc_frags): coded_value=coded_value, byte_position=byte_position, bit_position=bit_position, - description=description) + description=description, + sdgs=sdgs) elif parameter_type == "NRC-CONST": diag_coded_type = read_diag_coded_type_from_odx( @@ -101,7 +107,8 @@ def read_parameter_from_odx(et_element, doc_frags): coded_values=coded_values, byte_position=byte_position, bit_position=bit_position, - description=description) + description=description, + sdgs=sdgs) elif parameter_type == "RESERVED": bit_length = int(et_element.findtext("BIT-LENGTH")) @@ -112,7 +119,8 @@ def read_parameter_from_odx(et_element, doc_frags): byte_position=byte_position, bit_position=bit_position, bit_length=bit_length, - description=description) + description=description, + sdgs=sdgs) elif parameter_type == "MATCHING-REQUEST-PARAM": byte_length = int(et_element.findtext("BYTE-LENGTH")) @@ -124,7 +132,8 @@ def read_parameter_from_odx(et_element, doc_frags): semantic=semantic, byte_position=byte_position, bit_position=bit_position, request_byte_position=request_byte_pos, byte_length=byte_length, - description=description) + description=description, + sdgs=sdgs) elif parameter_type == "SYSTEM": sysparam = et_element.get("SYSPARAM") @@ -137,7 +146,8 @@ def read_parameter_from_odx(et_element, doc_frags): bit_position=bit_position, dop_ref=dop_ref, dop_snref=dop_snref, - description=description) + description=description, + sdgs=sdgs) elif parameter_type == "LENGTH-KEY": odx_id = OdxLinkId.from_et(et_element, doc_frags) @@ -150,7 +160,8 @@ def read_parameter_from_odx(et_element, doc_frags): bit_position=bit_position, dop_ref=dop_ref, dop_snref=dop_snref, - description=description) + description=description, + sdgs=sdgs) elif parameter_type == "DYNAMIC": @@ -159,7 +170,8 @@ def read_parameter_from_odx(et_element, doc_frags): semantic=semantic, byte_position=byte_position, bit_position=bit_position, - description=description) + description=description, + sdgs=sdgs) elif parameter_type == "TABLE-STRUCT": key_ref = OdxLinkRef.from_et(et_element.find("TABLE-KEY-REF"), doc_frags) @@ -173,7 +185,8 @@ def read_parameter_from_odx(et_element, doc_frags): semantic=semantic, byte_position=byte_position, bit_position=bit_position, - description=description) + description=description, + sdgs=sdgs) elif parameter_type == "TABLE-KEY": @@ -195,7 +208,8 @@ def read_parameter_from_odx(et_element, doc_frags): byte_position=byte_position, bit_position=bit_position, semantic=semantic, - description=description) + description=description, + sdgs=sdgs) elif parameter_type == "TABLE-ENTRY": target = et_element.findtext("TARGET") @@ -208,6 +222,7 @@ def read_parameter_from_odx(et_element, doc_frags): byte_position=byte_position, bit_position=bit_position, semantic=semantic, - description=description) + description=description, + sdgs=sdgs) - raise NotImplementedError(f"I don't know the type {parameter_type}") + raise NotImplementedError(f"I don't know about parameters of type {parameter_type}") diff --git a/odxtools/parameters/tablekeyparameter.py b/odxtools/parameters/tablekeyparameter.py index b3a6ed91..68e43643 100644 --- a/odxtools/parameters/tablekeyparameter.py +++ b/odxtools/parameters/tablekeyparameter.py @@ -21,7 +21,8 @@ def __init__(self, byte_position=None, bit_position=None, semantic=None, - description=None): + description=None, + **kwargs): super().__init__( short_name=short_name, long_name=long_name, @@ -29,7 +30,8 @@ def __init__(self, bit_position=bit_position, parameter_type="TABLE-KEY", semantic=semantic, - description=description + description=description, + **kwargs ) self.table_ref = None self.table_snref = None @@ -71,6 +73,7 @@ def decode_from_pdu(self, coded_message, default_byte_position=None): def resolve_references(self, parent_dl: "DiagLayer", odxlinks: OdxLinkDatabase): + super()._resolve_references(odxlinks) self.table = None if self.table_snref: self.table = parent_dl.local_diag_data_dictionary_spec.tables[self.table_snref] diff --git a/odxtools/parameters/tablestructparameter.py b/odxtools/parameters/tablestructparameter.py index d06f2ab0..7bc43123 100644 --- a/odxtools/parameters/tablestructparameter.py +++ b/odxtools/parameters/tablestructparameter.py @@ -14,7 +14,8 @@ def __init__(self, byte_position=None, bit_position=None, semantic=None, - description=None): + description=None, + **kwargs): super().__init__( short_name=short_name, long_name=long_name, @@ -22,7 +23,8 @@ def __init__(self, bit_position=bit_position, parameter_type="TABLE-STRUCT", semantic=semantic, - description=description + description=description, + **kwargs ) if table_key_ref: self.table_key_ref = table_key_ref diff --git a/odxtools/service.py b/odxtools/service.py index 4ff8adfa..455429e7 100644 --- a/odxtools/service.py +++ b/odxtools/service.py @@ -2,6 +2,7 @@ # Copyright (c) 2022 MBition GmbH from typing import List, Iterable, Optional, Union +from xml.etree import ElementTree from .utils import short_name_as_id from .audience import Audience, read_audience_from_odx @@ -15,7 +16,8 @@ from .structures import Request, Response from .nameditemlist import NamedItemList from .message import Message - +from .specialdata import SpecialDataGroup, read_sdgs_from_odx +from .admindata import read_admin_data_from_odx, AdminData class DiagService: def __init__(self, @@ -25,12 +27,14 @@ def __init__(self, positive_responses: Union[Iterable[OdxLinkRef], Iterable[Response]], negative_responses: Union[Iterable[OdxLinkRef], Iterable[Response]], long_name: Optional[str] = None, + admin_data: Optional[AdminData] = None, description: Optional[str] = None, semantic: Optional[str] = None, audience: Optional[Audience] = None, functional_class_refs: Iterable[OdxLinkRef] = [], pre_condition_state_refs: Iterable[OdxLinkRef] = [], - state_transition_refs: Iterable[OdxLinkRef] = []): + state_transition_refs: Iterable[OdxLinkRef] = [], + sdgs: List[SpecialDataGroup] = []): """Constructs the service. Parameters: @@ -103,6 +107,8 @@ def __init__(self, raise TypeError( "negative_responses must be of type Union[List[str], List[Response], None]") + self.sdgs = sdgs + @property def request(self) -> Optional[Request]: return self._request @@ -143,6 +149,14 @@ def pre_condition_states(self): def state_transitions(self): return self._state_transitions + def _build_odxlinks(self): + result = {} + + for sdg in self.sdgs: + result.update(sdg._build_odxlinks()) + + return result + def _resolve_references(self, odxlinks: OdxLinkDatabase): self._request = odxlinks.resolve(self.request_ref) self._positive_responses = \ @@ -168,6 +182,10 @@ def _resolve_references(self, odxlinks: OdxLinkDatabase): if self.audience: self.audience._resolve_references(odxlinks) + for sdg in self.sdgs: + sdg._resolve_references(odxlinks) + + def decode_message(self, message: Union[bytes, bytearray]) -> Message: # Check if message is a request or positive or negative response @@ -281,6 +299,7 @@ def read_diag_service_from_odx(et_element, doc_frags: List[OdxDocFragment]): long_name = et_element.findtext("LONG-NAME") description = read_description_from_odx(et_element.find("DESC")) + admin_data = read_admin_data_from_odx(et_element.find("ADMIN-DATA"), doc_frags) semantic = et_element.get("SEMANTIC") audience = None @@ -288,16 +307,19 @@ def read_diag_service_from_odx(et_element, doc_frags: List[OdxDocFragment]): audience = read_audience_from_odx(et_element.find( "AUDIENCE"), doc_frags) - diag_service = DiagService(odx_id, - short_name, - request_ref, - pos_res_refs, - neg_res_refs, - long_name=long_name, - description=description, - semantic=semantic, - audience=audience, - functional_class_refs=functional_class_refs, - pre_condition_state_refs=pre_condition_state_refs, - state_transition_refs=state_transition_refs) - return diag_service + sdgs = read_sdgs_from_odx(et_element.find("SDGS"), doc_frags) + + return DiagService(odx_id, + short_name, + request_ref, + pos_res_refs, + neg_res_refs, + long_name=long_name, + description=description, + admin_data=admin_data, + semantic=semantic, + audience=audience, + functional_class_refs=functional_class_refs, + pre_condition_state_refs=pre_condition_state_refs, + state_transition_refs=state_transition_refs, + sdgs=sdgs) diff --git a/odxtools/singleecujob.py b/odxtools/singleecujob.py index 3c7908a1..94d25bd8 100644 --- a/odxtools/singleecujob.py +++ b/odxtools/singleecujob.py @@ -6,6 +6,7 @@ from .utils import short_name_as_id from .dataobjectproperty import DopBase +from .admindata import read_admin_data_from_odx, AdminData from .audience import Audience, read_audience_from_odx from .functionalclass import FunctionalClass from .utils import read_description_from_odx @@ -14,6 +15,7 @@ from .globals import logger from .exceptions import EncodeError, DecodeError from .message import Message +from .specialdata import SpecialDataGroup, read_sdgs_from_odx DiagClassType = Literal["STARTCOMM", "STOPCOMM", @@ -119,7 +121,7 @@ class SingleEcuJob: Single ECU jobs are defined in section 7.3.5.7 of the ASAM MCD-2 standard. TODO: The following xml attributes are not internalized yet: - ADMIN-DATA, SDGS, PROTOCOL-SNREFS, RELATED-DIAG-COMM-REFS, PRE-CONDITION-STATE-REFS, STATE-TRANSITION-REFS + PROTOCOL-SNREFS, RELATED-DIAG-COMM-REFS, PRE-CONDITION-STATE-REFS, STATE-TRANSITION-REFS """ odx_id: OdxLinkId short_name: str @@ -129,6 +131,7 @@ class SingleEcuJob: oid: Optional[str] = None long_name: Optional[str] = None description: Optional[str] = None + admin_data: Optional[AdminData] = None functional_class_refs: List[OdxLinkRef] = field(default_factory=list) audience: Optional[Audience] = None # optional xsd:elements specific to SINGLE-ECU-JOB @@ -145,6 +148,7 @@ class SingleEcuJob: is_mandatory: bool = False is_executable: bool = True is_final: bool = False + sdgs: List[SpecialDataGroup] = field(default_factory=list) def __post_init__(self) -> None: if not self.functional_class_refs: @@ -166,6 +170,14 @@ def functional_classes(self) -> Optional[NamedItemList[FunctionalClass]]: """ return self._functional_classes + def _build_odxlinks(self): + result = {} + + for sdg in self.sdgs: + result.update(sdg._build_odxlinks()) + + return result + def _resolve_references(self, odxlinks: OdxLinkDatabase) -> None: # Resolve references to functional classes self._functional_classes = NamedItemList[FunctionalClass]( @@ -191,6 +203,9 @@ def _resolve_references(self, odxlinks: OdxLinkDatabase) -> None: for code in self.prog_codes: code._resolve_references(odxlinks) + for sdg in self.sdgs: + sdg._resolve_references(odxlinks) + def decode_message(self, message: Union[bytes, bytearray]) -> Message: """This function's signature matches `DiagService.decode_message` and only raises an informative error. @@ -325,6 +340,7 @@ def read_single_ecu_job_from_odx(et_element, doc_frags: List[OdxDocFragment]): assert short_name is not None long_name = et_element.findtext("LONG-NAME") description = read_description_from_odx(et_element.find("DESC")) + admin_data = read_admin_data_from_odx(et_element.find("ADMIN-DATA"), doc_frags) semantic = et_element.get("SEMANTIC") functional_class_refs = [] @@ -358,10 +374,13 @@ def read_single_ecu_job_from_odx(et_element, doc_frags: List[OdxDocFragment]): else True) is_final = True if et_element.get("IS-FINAL") == "true" else False + sdgs = read_sdgs_from_odx(et_element.find("SDGS"), doc_frags) + diag_service = SingleEcuJob(odx_id=odx_id, short_name=short_name, long_name=long_name, description=description, + admin_data=admin_data, prog_codes=prog_codes, semantic=semantic, audience=audience, @@ -371,5 +390,6 @@ def read_single_ecu_job_from_odx(et_element, doc_frags: List[OdxDocFragment]): neg_output_params=neg_output_params, is_mandatory=is_mandatory, is_executable=is_executable, - is_final=is_final) + is_final=is_final, + sdgs=sdgs) return diag_service diff --git a/odxtools/specialdata.py b/odxtools/specialdata.py new file mode 100644 index 00000000..b8add6af --- /dev/null +++ b/odxtools/specialdata.py @@ -0,0 +1,136 @@ +# SPDX-License-Identifier: MIT +# Copyright (c) 2022 MBition GmbH + +from typing import Optional, Any, Dict, List, Union +import warnings +from xml.etree import ElementTree +from dataclasses import dataclass, field + +from .utils import short_name_as_id +from .odxlink import OdxLinkRef, OdxLinkId, OdxLinkDatabase, OdxDocFragment +from .utils import read_description_from_odx + +@dataclass +class SpecialDataGroupCaption: + odx_id: OdxLinkId + short_name: str + long_name: Optional[str] + description: Optional[str] + + @staticmethod + def from_et(et_element: ElementTree.Element, + doc_frags: List[OdxDocFragment]) \ + -> "SpecialDataGroupCaption": + odx_id = OdxLinkId.from_et(et_element, doc_frags) + assert odx_id is not None + + short_name = et_element.findtext("SHORT-NAME") + assert short_name is not None + long_name = et_element.findtext("LONG-NAME") + description = read_description_from_odx(et_element.find("DESC")) + + return SpecialDataGroupCaption(odx_id=odx_id, + short_name=short_name, + long_name=long_name, + description=description) + + def _build_odxlinks(self) -> Dict[OdxLinkId, Any]: + result = {} + + result[self.odx_id] = self + + return result + +@dataclass +class SpecialData: + semantic_info: Optional[str] # the "SI" attribute + text_identifier: Optional[str] # the "TI" attribute, specifies the language used + value: str + + def _build_odxlinks(self) -> Dict[OdxLinkId, Any]: + return {} + + def _resolve_references(self, odxlinks: OdxLinkDatabase) -> None: + pass + + @staticmethod + def from_et(et_element: ElementTree.Element, + doc_frags: List[OdxDocFragment]) \ + -> "SpecialData": + semantic_info = et_element.get("SI") + text_identifier = et_element.get("TI") + value = et_element.text or "" + + return SpecialData(semantic_info=semantic_info, + text_identifier=text_identifier, + value=value) + +@dataclass +class SpecialDataGroup: + sdg_caption: Optional[SpecialDataGroupCaption] + sdg_caption_ref: Optional[OdxLinkRef] + semantic_info: Optional[str] # the "SI" attribute + values: List[Union["SpecialDataGroup", SpecialData]] + + @staticmethod + def from_et(et_element: ElementTree.Element, + doc_frags: List[OdxDocFragment]) \ + -> "SpecialDataGroup": + sdg_caption = None + if caption_elem := et_element.find("SDG-CAPTION"): + sdg_caption = SpecialDataGroupCaption.from_et(caption_elem, doc_frags) + + sdg_caption_ref = None + if (caption_ref_elem := et_element.find("SDG-CAPTION-REF")) is not None: + sdg_caption_ref = OdxLinkRef.from_et(caption_ref_elem, doc_frags) + + semantic_info = et_element.get("SI") + + values: List[Union[SpecialData, SpecialDataGroup]] = [] + for value_elem in et_element: + next_entry: Optional[Union[SpecialData, SpecialDataGroup]] = None + if value_elem.tag == "SDG": + next_entry = SpecialDataGroup.from_et(value_elem, doc_frags) + elif value_elem.tag == "SD": + next_entry = SpecialData.from_et(value_elem, doc_frags) + + if next_entry is not None: + values.append(next_entry) + + return SpecialDataGroup(sdg_caption=sdg_caption, + sdg_caption_ref=sdg_caption_ref, + semantic_info=semantic_info, + values=values) + + def _build_odxlinks(self) -> Dict[OdxLinkId, Any]: + result = {} + + if self.sdg_caption is not None: + result.update(self.sdg_caption._build_odxlinks()) + + for val in self.values: + result.update(val._build_odxlinks()) + + return result + + def _resolve_references(self, odxlinks: OdxLinkDatabase) -> None: + if self.sdg_caption_ref is not None: + caption = odxlinks.resolve(self.sdg_caption_ref) + assert isinstance(caption, SpecialDataGroupCaption) + self.sdg_caption = caption + + for val in self.values: + val._resolve_references(odxlinks) + +def read_sdgs_from_odx(et_element: Optional[ElementTree.Element], + doc_frags: List[OdxDocFragment]) \ + -> List[SpecialDataGroup]: + + if not et_element: + return [] + + result = [] + for sdg_elem in et_element.iterfind("SDG"): + result.append(SpecialDataGroup.from_et(sdg_elem, doc_frags)) + + return result diff --git a/odxtools/state.py b/odxtools/state.py index 344b4d39..79d5b6f0 100644 --- a/odxtools/state.py +++ b/odxtools/state.py @@ -2,10 +2,10 @@ # Copyright (c) 2022 MBition GmbH from dataclasses import dataclass -from typing import Optional, List +from typing import Optional, List, Dict, Any from .utils import read_description_from_odx -from .odxlink import OdxLinkId, OdxDocFragment +from .odxlink import OdxLinkId, OdxDocFragment, OdxLinkDatabase @dataclass() class State: @@ -17,6 +17,11 @@ class State: long_name: Optional[str] = None description: Optional[str] = None + def _build_odxlinks(self) -> Dict[OdxLinkId, Any]: + return { self.odx_id: self } + + def _resolve_references(self, odxlinks: OdxLinkDatabase) -> None: + pass def read_state_from_odx(et_element, doc_frags: List[OdxDocFragment]): short_name = et_element.findtext("SHORT-NAME") diff --git a/odxtools/state_transition.py b/odxtools/state_transition.py index 415a4993..f2621713 100644 --- a/odxtools/state_transition.py +++ b/odxtools/state_transition.py @@ -2,9 +2,9 @@ # Copyright (c) 2022 MBition GmbH from dataclasses import dataclass -from typing import Optional, List +from typing import Optional, List, Dict, Any -from .odxlink import OdxLinkId, OdxDocFragment +from .odxlink import OdxLinkId, OdxDocFragment, OdxLinkDatabase @dataclass() class StateTransition: @@ -17,6 +17,11 @@ class StateTransition: source_short_name: Optional[str] = None target_short_name: Optional[str] = None + def _build_odxlinks(self) -> Dict[OdxLinkId, Any]: + return { self.odx_id: self } + + def _resolve_references(self, odxlinks: OdxLinkDatabase) -> None: + pass def read_state_transition_from_odx(et_element, doc_frags: List[OdxDocFragment]): short_name = et_element.findtext("SHORT-NAME") diff --git a/odxtools/structures.py b/odxtools/structures.py index 023f9a1d..104555be 100644 --- a/odxtools/structures.py +++ b/odxtools/structures.py @@ -18,6 +18,7 @@ from .parameters import CodedConstParameter, MatchingRequestParameter, ValueParameter from .utils import read_description_from_odx from .odxlink import OdxLinkId, OdxDocFragment, OdxLinkDatabase +from .specialdata import SpecialDataGroup, read_sdgs_from_odx if TYPE_CHECKING: from .diaglayer import DiagLayer @@ -32,8 +33,13 @@ def __init__(self, parameters: Iterable[Union[Parameter, "EndOfPduField"]], long_name=None, byte_size=None, - description=None): - super().__init__(odx_id, short_name, long_name=long_name, description=description) + description=None, + sdgs=[]): + super().__init__(odx_id, + short_name, + long_name=long_name, + description=description, + sdgs=sdgs) self.parameters : NamedItemList[Union[Parameter, "EndOfPduField"]] = NamedItemList(short_name_as_id, parameters) self._byte_size = byte_size @@ -181,7 +187,7 @@ def convert_physical_to_internal(self, def _validate_coded_rpc( self, coded_rpc: bytearray): - + if self._byte_size is not None: # We definitely broke something if we didn't respect the explicit byte_size assert len(coded_rpc) == self._byte_size, self._get_encode_error_str('was', coded_rpc, self._byte_size * 8) @@ -301,16 +307,29 @@ def parameter_dict(self) -> ParameterDict: }) return param_dict - def _resolve_references(self, + def _build_odxlinks(self): + result = super()._build_odxlinks() + + for param in self.parameters: + result.update(param._build_odxlinks()) + + return result + + def _resolve_references(self, # type: ignore[override] parent_dl: "DiagLayer", - odxlinks: OdxLinkDatabase): + odxlinks: OdxLinkDatabase) \ + -> None: """Recursively resolve any references (odxlinks or sn-refs) """ + super()._resolve_references(odxlinks) + for p in self.parameters: if isinstance(p, ParameterWithDOP): p.resolve_references(parent_dl, odxlinks) - if isinstance(p, TableKeyParameter): + elif isinstance(p, TableKeyParameter): p.resolve_references(parent_dl, odxlinks) + else: + p._resolve_references(odxlinks) def __message_format_lines(self, allow_unknown_lengths: bool = False) \ -> List[str]: @@ -465,13 +484,22 @@ def print_message_format(self, indent: int = 5, allow_unknown_lengths=False): class Structure(BasicStructure): - def __init__(self, odx_id, short_name, parameters, long_name=None, byte_size=None, description=None): - super().__init__(odx_id, short_name, parameters, - long_name=long_name, description=description) + def __init__(self, + odx_id, + short_name, + parameters, + long_name=None, + byte_size=None, + description=None, + sdgs=[]): + super().__init__(odx_id, + short_name, + parameters, + long_name=long_name, + description=description, + sdgs=sdgs) self.parameters = parameters - self.basic_structure = BasicStructure(odx_id, short_name, parameters, - long_name=long_name, byte_size=byte_size, description=description) def __repr__(self) -> str: return f"Structure('{self.short_name}', byte_size={self._byte_size})" @@ -487,9 +515,19 @@ def __str__(self) -> str: class Request(BasicStructure): - def __init__(self, odx_id, short_name, parameters, long_name=None, description=None): - super().__init__(odx_id, short_name, parameters, - long_name=long_name, description=description) + def __init__(self, + odx_id, + short_name, + parameters, + long_name=None, + description=None, + sdgs=[]): + super().__init__(odx_id, + short_name, + parameters, + long_name=long_name, + description=description, + sdgs=sdgs) def __repr__(self) -> str: return f"Request('{self.short_name}')" @@ -499,9 +537,20 @@ def __str__(self) -> str: class Response(BasicStructure): - def __init__(self, odx_id, short_name, parameters, long_name=None, response_type=None, description=None): - super().__init__(odx_id, short_name, parameters, - long_name=long_name, description=description) + def __init__(self, + odx_id, + short_name, + parameters, + long_name=None, + response_type=None, + description=None, + sdgs=[]): + super().__init__(odx_id, + short_name, + parameters, + long_name=long_name, + description=description, + sdgs=sdgs) self.response_type = "POS-RESPONSE" if response_type == "POS-RESPONSE" else "NEG-RESPONSE" def encode(self, coded_request: Optional[ByteString] = None, **params) -> ByteString: @@ -534,6 +583,7 @@ def read_structure_from_odx(et_element, doc_frags: List[OdxDocFragment]) -> Unio description = read_description_from_odx(et_element.find("DESC")) parameters = [read_parameter_from_odx(et_parameter, doc_frags) for et_parameter in et_element.iterfind("PARAMS/PARAM")] + sdgs = read_sdgs_from_odx(et_element.find("SDGS"), doc_frags) res: Union[Structure, Request, Response, None] if et_element.tag == "REQUEST": @@ -542,7 +592,8 @@ def read_structure_from_odx(et_element, doc_frags: List[OdxDocFragment]) -> Unio short_name, parameters=parameters, long_name=long_name, - description=description + description=description, + sdgs=sdgs, ) elif et_element.tag in ["POS-RESPONSE", "NEG-RESPONSE"]: res = Response( @@ -551,7 +602,8 @@ def read_structure_from_odx(et_element, doc_frags: List[OdxDocFragment]) -> Unio response_type=et_element.tag, parameters=parameters, long_name=long_name, - description=description + description=description, + sdgs=sdgs, ) elif et_element.tag == "STRUCTURE": byte_size_text = et_element.findtext("BYTE-SIZE") @@ -562,7 +614,8 @@ def read_structure_from_odx(et_element, doc_frags: List[OdxDocFragment]) -> Unio parameters=parameters, byte_size=byte_size, long_name=long_name, - description=description + description=description, + sdgs=sdgs, ) else: res = None diff --git a/odxtools/table.py b/odxtools/table.py index bdee76ad..78a7e33e 100644 --- a/odxtools/table.py +++ b/odxtools/table.py @@ -2,7 +2,7 @@ # Copyright (c) 2022 MBition GmbH import abc -from dataclasses import dataclass +from dataclasses import dataclass, field from typing import Optional, List, Dict, Any, Iterable from .utils import read_description_from_odx @@ -10,16 +10,33 @@ from .dataobjectproperty import DopBase from .globals import logger +from .specialdata import SpecialDataGroup, read_sdgs_from_odx class TableBase(abc.ABC): """ Base class for all Tables.""" - def __init__(self, odx_id: OdxLinkId, short_name: str, long_name=None): + def __init__(self, + odx_id: OdxLinkId, + short_name: str, + long_name=None, + sdgs: List[SpecialDataGroup] = []): self.odx_id = odx_id self.short_name = short_name self.long_name = long_name + self.sdgs = sdgs + def _build_odxlinks(self) -> Dict[OdxLinkId, Any]: + result = {} + + for sdg in self.sdgs: + result.update(sdg._build_odxlinks()) + + return result + + def _resolve_references(self, odxlinks: OdxLinkDatabase) -> None: + for sdg in self.sdgs: + sdg._resolve_references(odxlinks) @dataclass class TableRow: @@ -33,17 +50,29 @@ class TableRow: dop_ref: Optional[OdxLinkRef] = None description: Optional[str] = None semantic: Optional[str] = None + sdgs: List[SpecialDataGroup] = field(default_factory=list) def __post_init__(self) -> None: self._structure: Optional[DopBase] = None self._dop: Optional[DopBase] = None + def _build_odxlinks(self) -> Dict[OdxLinkId, Any]: + result = {} + + for sdg in self.sdgs: + result.update(sdg._build_odxlinks()) + + return result + def _resolve_references(self, odxlinks: OdxLinkDatabase) -> None: if self.structure_ref is not None: self._structure = odxlinks.resolve(self.structure_ref) if self.dop_ref is not None: self._dop = odxlinks.resolve(self.dop_ref) + for sdg in self.sdgs: + sdg._resolve_references(odxlinks) + @property def structure(self) -> Optional[DopBase]: """The structure associated with this table row.""" @@ -80,10 +109,13 @@ def __init__( key_dop_ref: Optional[OdxLinkRef] = None, description: Optional[str] = None, semantic: Optional[str] = None, + sdgs: List[SpecialDataGroup] = [], ): - super().__init__( - odx_id=odx_id, short_name=short_name, long_name=long_name - ) + super().__init__(odx_id=odx_id, + short_name=short_name, + long_name=long_name, + sdgs = sdgs, + ) self._local_table_rows = table_rows self._ref_table_rows: List[TableRow] = [] self._table_row_refs = table_row_refs or [] @@ -103,11 +135,16 @@ def table_rows(self) -> Iterable[TableRow]: return self._local_table_rows + self._ref_table_rows def _build_odxlinks(self) -> Dict[OdxLinkId, Any]: - odxlinks = {} - odxlinks.update({table_row.odx_id: table_row for table_row in self.table_rows}) - return odxlinks + result = super()._build_odxlinks() + + for tr in self._local_table_rows: + result.update(tr._build_odxlinks()) + + return result def _resolve_references(self, odxlinks: OdxLinkDatabase) -> None: + super()._resolve_references(odxlinks) + if self.key_dop_ref is not None: self._key_dop = odxlinks.resolve(self.key_dop_ref) @@ -117,8 +154,8 @@ def _resolve_references(self, odxlinks: OdxLinkDatabase) -> None: self._ref_table_rows = [] for ref in self._table_row_refs: tr = odxlinks.resolve(ref) - if isinstance(tr, TableRow): - self._ref_table_rows.append(tr) + assert isinstance(tr, TableRow) + self._ref_table_rows.append(tr) def __repr__(self) -> str: return ( @@ -151,12 +188,14 @@ def read_table_row_from_odx(et_element, doc_frags: List[OdxDocFragment]) \ dop_ref = None if et_element.find("DATA-OBJECT-PROP-REF") is not None: dop_ref = OdxLinkRef.from_et(et_element.find("DATA-OBJECT-PROP-REF"), doc_frags) + sdgs = read_sdgs_from_odx(et_element.find("SDGS"), doc_frags) return TableRow( key=key, structure_ref=structure_ref, dop_ref=dop_ref, - **_get_common_props(et_element, doc_frags) + sdgs=sdgs, + **_get_common_props(et_element, doc_frags), ) @@ -178,10 +217,12 @@ def read_table_from_odx(et_element, doc_frags: List[OdxDocFragment]) \ ref = OdxLinkRef.from_et(el, doc_frags) assert ref is not None table_row_refs.append(ref) + sdgs = read_sdgs_from_odx(et_element.find("SDGS"), doc_frags) return Table( key_dop_ref=key_dop_ref, table_rows=table_rows, table_row_refs=table_row_refs, + sdgs=sdgs, **_get_common_props(et_element, doc_frags) ) diff --git a/odxtools/templates/comparam-subset.odx-cs.xml.jinja2 b/odxtools/templates/comparam-subset.odx-cs.xml.jinja2 index 12a189b8..815db56c 100644 --- a/odxtools/templates/comparam-subset.odx-cs.xml.jinja2 +++ b/odxtools/templates/comparam-subset.odx-cs.xml.jinja2 @@ -12,11 +12,12 @@ {%- import('macros/printCompanyData.xml.jinja2') as pcd -%} {%- import('macros/printDOP.xml.jinja2') as pdop %} {%- import('macros/printUnitSpec.xml.jinja2') as pus %} +{%- import('macros/printSpecialData.xml.jinja2') as psd %} {#- -#} - + {{comparam_subset.short_name}} @@ -38,6 +39,7 @@ {%- endfor %} {%- endif %} + {{- psd.printSpecialDataGroups(comparam_subset.sdgs)|indent(3, first=True) }} {%- if simple_comparams | length > 0 %} {%- for cp in simple_comparams %} diff --git a/odxtools/templates/diag_layer_container.odx-d.xml.jinja2 b/odxtools/templates/diag_layer_container.odx-d.xml.jinja2 index e71aae07..c6302836 100644 --- a/odxtools/templates/diag_layer_container.odx-d.xml.jinja2 +++ b/odxtools/templates/diag_layer_container.odx-d.xml.jinja2 @@ -6,6 +6,7 @@ {%- import('macros/printVariant.xml.jinja2') as pv -%} {%- import('macros/printAdminData.xml.jinja2') as pad -%} {%- import('macros/printCompanyData.xml.jinja2') as pcd -%} +{%- import('macros/printSpecialData.xml.jinja2') as psd %} {#- -#} @@ -31,6 +32,7 @@ {%- endfor %} {%- endif %} + {{- psd.printSpecialDataGroups(dlc.sdgs)|indent(2, first=True) }} {%- if dlc.protocols %} {%- for dl in dlc.protocols %} diff --git a/odxtools/templates/macros/printAdminData.xml.jinja2 b/odxtools/templates/macros/printAdminData.xml.jinja2 index 804cdc53..353c9d2d 100644 --- a/odxtools/templates/macros/printAdminData.xml.jinja2 +++ b/odxtools/templates/macros/printAdminData.xml.jinja2 @@ -4,6 +4,8 @@ # Copyright (c) 2022 MBition GmbH -#} +{%- import('macros/printSpecialData.xml.jinja2') as psd %} + {%- macro printAdminData(admin_data) %} {%- if admin_data.language is not none %} @@ -16,10 +18,11 @@ {%- if cdi.team_member_ref is not none %} - {%- endif %} + {%- endif %} {%- if cdi.doc_label is not none %} {{cdi.doc_label|e}} {%- endif %} + {{- psd.printSpecialDataGroups(cdi.sdgs)|indent(1, first=True) }} {%- endfor %} diff --git a/odxtools/templates/macros/printCompanyData.xml.jinja2 b/odxtools/templates/macros/printCompanyData.xml.jinja2 index 851ff670..62d65f31 100644 --- a/odxtools/templates/macros/printCompanyData.xml.jinja2 +++ b/odxtools/templates/macros/printCompanyData.xml.jinja2 @@ -4,6 +4,8 @@ # Copyright (c) 2022 MBition GmbH -#} +{%- import('macros/printSpecialData.xml.jinja2') as psd %} + {%- macro printCompanyData(company_data) -%} {{company_data.short_name|e}} @@ -111,6 +113,7 @@ {%- endfor %} + {{- psd.printSpecialDataGroups(company_data.company_specific_info.sdgs)|indent(2, first=True) }} {%- endif %} diff --git a/odxtools/templates/macros/printDOP.xml.jinja2 b/odxtools/templates/macros/printDOP.xml.jinja2 index f5ad9031..c7f1d508 100644 --- a/odxtools/templates/macros/printDOP.xml.jinja2 +++ b/odxtools/templates/macros/printDOP.xml.jinja2 @@ -4,6 +4,8 @@ # Copyright (c) 2022 MBition GmbH -#} +{%- import('macros/printSpecialData.xml.jinja2') as psd %} + {%- macro printDiagCodedType(dct) -%} {%- endmacro -%} - {%- macro printDOP(dop, tag_name) %} <{{tag_name}} ID="{{dop.odx_id.local_id}}"> {{dop.short_name}} {{dop.long_name|e}} -{%- if dop.compu_method is defined %} + {{- psd.printSpecialDataGroups(dop.sdgs)|indent(2, first=True) }} + {%- if dop.compu_method is defined %} {{ printCompuMethod(dop.compu_method)|indent(1) }} -{%- endif %} -{%- if dop.diag_coded_type is defined %} + {%- endif %} + {%- if dop.diag_coded_type is defined %} {{ printDiagCodedType(dop.diag_coded_type)|indent(1) -}} -{%- endif -%} + {%- endif -%} {{ printPhysicalType(dop.physical_type)|indent(1) }} -{%- if dop.unit_ref %} + {%- if dop.unit_ref %} -{%- endif %} + {%- endif %} {%- endmacro -%} - {%- macro printDTCDOP(dop) %} {{dop.short_name}} - {{dop.long_name}} - {{ printDiagCodedType(dop.diag_coded_type)|indent(1) -}} - {{ printPhysicalType(dop.physical_type)|indent(1) }} - {{ printCompuMethod(dop.compu_method)|indent(1) }} + {{dop.long_name|e}} + {{- psd.printSpecialDataGroups(dop.sdgs)|indent(2, first=True) }} + {{- printDiagCodedType(dop.diag_coded_type)|indent(1, first=True) }} + {{- printPhysicalType(dop.physical_type)|indent(1, first=True) }} + {{- printCompuMethod(dop.compu_method)|indent(1, first=True) }} {%- for dtc in dop.dtcs %} {%- if hasattr(dtc, "dtc_ref") %} @@ -203,6 +205,7 @@ {%- if not dtc.level is none %} {{dtc.level}} {%- endif %} + {{- psd.printSpecialDataGroups(dtc.sdgs)|indent(3, first=True) }} {%- endif %} {%- endfor %} diff --git a/odxtools/templates/macros/printDiagnosticTroubleCode.xml.jinja2 b/odxtools/templates/macros/printDiagnosticTroubleCode.xml.jinja2 deleted file mode 100644 index 4a905f53..00000000 --- a/odxtools/templates/macros/printDiagnosticTroubleCode.xml.jinja2 +++ /dev/null @@ -1,16 +0,0 @@ -{#- -*- mode: sgml; tab-width: 1; indent-tabs-mode: nil -*- - # - # SPDX-License-Identifier: MIT - # Copyright (c) 2022 MBition GmbH --#} - -{#- TODO: function classes - -{%- macro printDTC_DOP(dop) -%} - - {{dop.short_name}} - {{dop.long_name|e}} - -{%- endmacro -%} - -#} diff --git a/odxtools/templates/macros/printParam.xml.jinja2 b/odxtools/templates/macros/printParam.xml.jinja2 index 0b170e9e..13693526 100644 --- a/odxtools/templates/macros/printParam.xml.jinja2 +++ b/odxtools/templates/macros/printParam.xml.jinja2 @@ -5,6 +5,7 @@ -#} {%- import('macros/printDOP.xml.jinja2') as pdop %} +{%- import('macros/printSpecialData.xml.jinja2') as psd %} {%- macro printParam(param) -%} {%- if param.semantic is not none %} @@ -23,6 +24,7 @@ {%- if param.long_name %} {{param.long_name|e}} {%- endif %} + {{- psd.printSpecialDataGroups(param.sdgs)|indent(1, first=True) }} {%- if param.byte_position is not none %} {{param.byte_position}} {%- endif %} diff --git a/odxtools/templates/macros/printRequest.xml.jinja2 b/odxtools/templates/macros/printRequest.xml.jinja2 index 5c4864f5..ba48e7a0 100644 --- a/odxtools/templates/macros/printRequest.xml.jinja2 +++ b/odxtools/templates/macros/printRequest.xml.jinja2 @@ -6,6 +6,7 @@ {%- import('macros/printDOP.xml.jinja2') as pdop %} {%- import('macros/printParam.xml.jinja2') as pp %} +{%- import('macros/printSpecialData.xml.jinja2') as psd %} {%- macro printRequest(request) -%} @@ -25,5 +26,6 @@ {%- endfor %} {%- endif %} + {{- psd.printSpecialDataGroups(request.sdgs)|indent(1, first=True) }} {%- endmacro -%} diff --git a/odxtools/templates/macros/printResponse.xml.jinja2 b/odxtools/templates/macros/printResponse.xml.jinja2 index 953e6164..88d3aff1 100644 --- a/odxtools/templates/macros/printResponse.xml.jinja2 +++ b/odxtools/templates/macros/printResponse.xml.jinja2 @@ -5,6 +5,7 @@ -#} {%- import('macros/printParam.xml.jinja2') as pp %} +{%- import('macros/printSpecialData.xml.jinja2') as psd %} {%- macro printResponse(resp, tag_name="POS-RESPONSE") -%} <{{tag_name}} ID="{{resp.odx_id.local_id}}"> @@ -22,5 +23,6 @@ {%- endfor %} {%- endif %} + {{- psd.printSpecialDataGroups(resp.sdgs)|indent(1, first=True) }} {%- endmacro -%} diff --git a/odxtools/templates/macros/printService.xml.jinja2 b/odxtools/templates/macros/printService.xml.jinja2 index 074993a2..7b0ac9ab 100644 --- a/odxtools/templates/macros/printService.xml.jinja2 +++ b/odxtools/templates/macros/printService.xml.jinja2 @@ -5,6 +5,7 @@ -#} {%- import('macros/printAudience.xml.jinja2') as paud %} +{%- import('macros/printSpecialData.xml.jinja2') as psd %} {%- macro printService(service) -%} {%- if service.semantic is not none %} @@ -22,6 +23,7 @@ {{service.description}} {%- endif %} + {{- psd.printSpecialDataGroups(service.sdgs)|indent(1, first=True) }} {%- if service.functional_class_refs %} {%- for ref in service.functional_class_refs %} diff --git a/odxtools/templates/macros/printSingleEcuJob.xml.jinja2 b/odxtools/templates/macros/printSingleEcuJob.xml.jinja2 index 3d2c3191..d9dbe096 100644 --- a/odxtools/templates/macros/printSingleEcuJob.xml.jinja2 +++ b/odxtools/templates/macros/printSingleEcuJob.xml.jinja2 @@ -4,7 +4,9 @@ # Copyright (c) 2022 MBition GmbH -#} +{%- import('macros/printElementID.xml.jinja2') as peid %} {%- import('macros/printAudience.xml.jinja2') as paud %} +{%- import('macros/printSpecialData.xml.jinja2') as psd %} {%- macro printSingleEcuJob(job) -%} - {{ printElementID(job)|indent(1) }} + {{ peid.printElementID(job)|indent(1) }} + {{- psd.printSpecialDataGroups(job.sdgs)|indent(1, first=True) }} {%- if job.functional_class_refs %} {%- for ref in job.functional_class_refs %} @@ -115,7 +118,7 @@ {%- endif %} {%- endfilter -%} > - {{ printElementID(param)|indent(1) }} + {{ peid.printElementID(param)|indent(1) }} {%- if param.physical_default_value %} {{param.physical_default_value}} {%- endif %} @@ -136,7 +139,7 @@ {%- endif %} {%- endfilter -%} > - {{ printElementID(param)|indent(1) }} + {{ peid.printElementID(param)|indent(1) }} {%- endmacro -%} @@ -144,20 +147,7 @@ {%- macro printNegOutputParam(param) -%} - {{ printElementID(param)|indent(1) }} + {{ peid.printElementID(param)|indent(1) }} {%- endmacro -%} - - -{%- macro printElementID(element) -%} - {{element.short_name}} -{%- if element.long_name and element.long_name.strip() %} - {{element.long_name|e}} -{%- endif %} -{%- if element.description and element.description.strip() %} - - {{element.description}} - -{%- endif %} -{%- endmacro -%} diff --git a/odxtools/templates/macros/printSpecialData.xml.jinja2 b/odxtools/templates/macros/printSpecialData.xml.jinja2 new file mode 100644 index 00000000..8d2071cf --- /dev/null +++ b/odxtools/templates/macros/printSpecialData.xml.jinja2 @@ -0,0 +1,57 @@ +{#- -*- mode: sgml; tab-width: 1; sgml-basic-offset: 1; indent-tabs-mode: nil -*- + # + # SPDX-License-Identifier: MIT + # Copyright (c) 2022 MBition GmbH +-#} + +{%- import('macros/printElementID.xml.jinja2') as peid %} + +{%- macro printSpecialData(sd) %} +{{sd.value|e}} +{%- endmacro %} + +{%- macro printSdgCaption(sdg_caption) %} + + {{ peid.printElementID(sdg_caption) | indent(1) }} + +{%- endmacro %} + +{%- macro printSpecialDataGroup(sdg) %} + + {%- if sdg.sdg_caption_ref %} + + {%- elif sdg.sdg_caption %} + {{- printSdgCaption(sdg.sdg_caption) | indent(1, first=True) }} + {%- endif %} + {%- for sd in sdg.values %} + {%- if hasattr(sd, "values") %} + {#- -> nested SDG #} + {{- printSpecialDataGroup(sd) | indent(1, first=True) }} + {%- else %} + {{- printSpecialData(sd) | indent(1, first=True) }} + {%- endif %} + {%- endfor %} + +{%- endmacro %} + + +{%- macro printSpecialDataGroups(sdgs) %} +{%- if sdgs %} + + {%- for sdg in sdgs %} + {{- printSpecialDataGroup(sdg) | indent(1, first=True) }} + {%- endfor %} + +{%- endif %} +{%- endmacro %} diff --git a/odxtools/templates/macros/printTable.xml.jinja2 b/odxtools/templates/macros/printTable.xml.jinja2 index 51817ab8..eb7af070 100644 --- a/odxtools/templates/macros/printTable.xml.jinja2 +++ b/odxtools/templates/macros/printTable.xml.jinja2 @@ -4,6 +4,8 @@ # Copyright (c) 2022 MBition GmbH -#} +{%- import('macros/printSpecialData.xml.jinja2') as psd %} + {%- macro printTable(table) %} {%- endif %} + {{- psd.printSpecialDataGroups(table_row.sdgs)|indent(2, first=True) }} {%- endfor %} + {{- psd.printSpecialDataGroups(table.sdgs)|indent(1, first=True) }}
{%- endmacro -%} diff --git a/odxtools/templates/macros/printUnitSpec.xml.jinja2 b/odxtools/templates/macros/printUnitSpec.xml.jinja2 index 28d26233..49957a9a 100644 --- a/odxtools/templates/macros/printUnitSpec.xml.jinja2 +++ b/odxtools/templates/macros/printUnitSpec.xml.jinja2 @@ -5,6 +5,7 @@ -#} {%- import('macros/printElementID.xml.jinja2') as peid %} +{%- import('macros/printSpecialData.xml.jinja2') as psd %} {%- macro printUnitSpec(spec) -%} @@ -29,6 +30,7 @@ {%- endfor %} {%- endif %} + {{- psd.printSpecialDataGroups(spec.sdgs)|indent(1, first=True) }} {%- endmacro -%} diff --git a/odxtools/templates/macros/printVariant.xml.jinja2 b/odxtools/templates/macros/printVariant.xml.jinja2 index 64debcf5..b932a752 100644 --- a/odxtools/templates/macros/printVariant.xml.jinja2 +++ b/odxtools/templates/macros/printVariant.xml.jinja2 @@ -20,6 +20,7 @@ {%- import('macros/printComparamRef.xml.jinja2') as pcom %} {%- import('macros/printParentRef.xml.jinja2') as pparref %} {%- import('macros/printAudience.xml.jinja2') as paud %} +{%- import('macros/printSpecialData.xml.jinja2') as psd %} {%- macro printVariant(dl, variant_tag) -%} <{{variant_tag}} ID="{{dl.odx_id.local_id}}"> @@ -99,6 +100,7 @@ {{ pt.printTable(table)|indent(3) }} {%- endfor %} + {{- psd.printSpecialDataGroups(dl.local_diag_data_dictionary_spec.sdgs)|indent(2, first=True) }} {%- endif %} {%- endif %} @@ -160,5 +162,6 @@ {%- endfor %} {%- endif %} + {{- psd.printSpecialDataGroups(dl.sdgs)|indent(1, first=True) }} {%- endmacro -%} diff --git a/odxtools/units.py b/odxtools/units.py index 5c8f872c..4b779b73 100644 --- a/odxtools/units.py +++ b/odxtools/units.py @@ -8,6 +8,7 @@ from .nameditemlist import NamedItemList from .utils import read_description_from_odx from .odxlink import OdxLinkRef, OdxLinkId, OdxLinkDatabase, OdxDocFragment +from .specialdata import SpecialDataGroup, read_sdgs_from_odx UnitGroupCategory = Literal["COUNTRY", "EQUIV-UNITS"] @@ -171,6 +172,7 @@ class UnitSpec: default_factory=list) # type: ignore physical_dimensions: Union[NamedItemList[PhysicalDimension], List[PhysicalDimension]] = field(default_factory=list) # type: ignore + sdgs: List[SpecialDataGroup] = field(default_factory=list) def __post_init__(self): self.unit_groups = NamedItemList(short_name_as_id, self.unit_groups) @@ -185,6 +187,10 @@ def _build_odxlinks(self) -> Dict[OdxLinkId, Any]: odxlinks.update({ dim.odx_id: dim for dim in self.physical_dimensions }) + + for sdg in self.sdgs: + odxlinks.update(sdg._build_odxlinks()) + return odxlinks def _resolve_references(self, odxlinks: OdxLinkDatabase): @@ -192,7 +198,8 @@ def _resolve_references(self, odxlinks: OdxLinkDatabase): unit._resolve_references(odxlinks) for group in self.unit_groups: group._resolve_references(odxlinks) - + for sdg in self.sdgs: + sdg._resolve_references(odxlinks) def read_unit_from_odx(et_element, doc_frags: List[OdxDocFragment]): odx_id = OdxLinkId.from_et(et_element, doc_frags) @@ -221,7 +228,7 @@ def read_optional_float(element, name): description=description, factor_si_to_unit=factor_si_to_unit, offset_si_to_unit=offset_si_to_unit, - physical_dimension_ref=physical_dimension_ref + physical_dimension_ref=physical_dimension_ref, ) @@ -297,8 +304,11 @@ def read_unit_spec_from_odx(et_element, doc_frags: List[OdxDocFragment]): for el in et_element.iterfind("UNITS/UNIT")] physical_dimensions = [read_physical_dimension_from_odx(el, doc_frags) for el in et_element.iterfind("PHYSICAL-DIMENSIONS/PHYSICAL-DIMENSION")] + sdgs = read_sdgs_from_odx(et_element.find("SDGS"), doc_frags) + return UnitSpec( unit_groups=unit_groups, units=units, - physical_dimensions=physical_dimensions + physical_dimensions=physical_dimensions, + sdgs=sdgs, )