From 75d272f5e106282feaca225c37bd4e374ce71be4 Mon Sep 17 00:00:00 2001 From: Greg Elin Date: Mon, 17 May 2021 05:39:35 -0500 Subject: [PATCH 1/2] Add management command to export as OSCAL or CSV. --- CHANGELOG.md | 3 + .../commands/exportcomponentlibrary.py | 99 +++++++++++++++++++ controls/views.py | 4 +- 3 files changed, 105 insertions(+), 1 deletion(-) create mode 100644 controls/management/commands/exportcomponentlibrary.py diff --git a/CHANGELOG.md b/CHANGELOG.md index 72357df9a..13a1806d6 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -4,6 +4,9 @@ GovReady-Q Release Notes v999 (May XX, 2021) --------------------- +**Developer changes** + +* Add management command `exportcomponentlibrary` to export as OSCAL or CSV. v0.9.3.5.3 (May 16, 2021) ------------------------- diff --git a/controls/management/commands/exportcomponentlibrary.py b/controls/management/commands/exportcomponentlibrary.py new file mode 100644 index 000000000..8c92e8428 --- /dev/null +++ b/controls/management/commands/exportcomponentlibrary.py @@ -0,0 +1,99 @@ +import sys +import os.path + +from django.core.management import call_command +from django.core.management.base import BaseCommand, CommandError +from django.db import transaction, models +from django.db.utils import OperationalError +from django.conf import settings +from pathlib import Path +from pathlib import PurePath +from django.utils.text import slugify + +# from siteapp.models import User, Organization, Portfolio +from controls.models import Element, Statement +# from controls.views import system_element_download_oscal_json +from controls.views import OSCALComponentSerializer + +import fs, fs.errors + + +class Command(BaseCommand): + help = 'Export all components.' + + def add_arguments(self, parser): + parser.add_argument('--format', metavar='format', nargs='?', default="oscal", help="File format") + parser.add_argument('--path', metavar='dir_or_pdf', nargs='?', default="local/export/components", help="The directory path to write export file(s) into.") + + def handle(self, *args, **options): + + # Configure + FORMAT = options['format'] + EXPORT_PATH = options['path'] + # Create export directory path + if not os.path.exists(EXPORT_PATH): + Path(EXPORT_PATH).mkdir(parents=True, exist_ok=True) + + # Export the component library + elements = Element.objects.filter(element_type="system_element") + element_cnt = len(elements) + + if FORMAT == 'oscal': + counter = 0 + for element in elements: + counter += 1 + print(f"{counter} id: {element.id}, element: {element.name}") + # Get the impl_smts for component + impl_smts = Statement.objects.filter(producer_element=element) + filename = str(PurePath(slugify(element.name)).with_suffix('.json')) + body = OSCALComponentSerializer(element, impl_smts).as_json() + # Save component OSCAL + with open(os.path.join(EXPORT_PATH,filename), "w") as f: + f.write(body) + elif FORMAT == "csv": + import csv + counter = 0 + for element in elements: + counter += 1 + print(f"{counter} id: {element.id}, element: {element.name}") + # Get the impl_smts for component + impl_smts = Statement.objects.filter(producer_element=element) + filename = str(PurePath(slugify(element.name)).with_suffix('.csv')) + tags = ";".join([f"'{tag.label}'" for tag in element.tags.all()]) + with open(os.path.join(EXPORT_PATH,filename), mode='w') as f: + component_writer = csv.writer(f, delimiter='|', quotechar='"', quoting=csv.QUOTE_MINIMAL) + component_writer.writerow(["component_name", + "component_uuid", + "component_tags", + "control_key", + "control_id", + "control_part", + "statement_uuid", + "statement", + "statement_type", + "remarks", + "version", + "created", + "updated" + ]) + for smt in impl_smts: + component_writer.writerow([element.name, + element.uuid, + tags, + smt.sid_class, + smt.sid, + smt.pid, + smt.uuid, + smt.body, + smt.statement_type, + smt.remarks, + smt.version, + smt.created, + smt.updated + ]) + else: + print(f"Format '{FORMAT}' not yet supported.") + + # Done + print(f"Exported {counter} components in {FORMAT} to folder `{EXPORT_PATH}`.") + diff --git a/controls/views.py b/controls/views.py index eb3b6185a..57b3b06d3 100644 --- a/controls/views.py +++ b/controls/views.py @@ -494,6 +494,7 @@ def as_json(self): # Example: https://github.com/usnistgov/OSCAL/blob/master/src/content/ssp-example/json/example-component.json uuid = str(self.element.uuid) control_implementations = [] + of = { "component-definition": { "uuid": str(uuid4()), @@ -502,7 +503,8 @@ def as_json(self): "published": datetime.now(timezone.utc).replace(microsecond=0).isoformat(), "last-modified": self.element.updated.replace(microsecond=0).isoformat(), "version": "string", - "oscal-version": "1.0.0-rc1" + "oscal-version": "1.0.0-rc1", + "props": [{"name": "tag", "ns": "https://govready.com/ns/oscal", "value": tag.label} for tag in self.element.tags.all()] }, "components": { uuid: { From eff805c7cf861aec2a6b4431972a780e6293bef3 Mon Sep 17 00:00:00 2001 From: Greg Elin Date: Tue, 18 May 2021 20:10:54 -0500 Subject: [PATCH 2/2] Add Django mgt cmd importcomponents to batch import components Updates to generation of components to improve adherence to OSCAL specification by removing certain keys when value for keys is None. Added new parameter `existing_import_record` to importing and creating components to group multiple imports under the same import record. --- CHANGELOG.md | 5 +- .../commands/exportcomponentlibrary.py | 1 + .../management/commands/importcomponents.py | 64 ++++++++++++++++ controls/views.py | 73 ++++++++++++++----- 4 files changed, 124 insertions(+), 19 deletions(-) create mode 100644 controls/management/commands/importcomponents.py diff --git a/CHANGELOG.md b/CHANGELOG.md index 13a1806d6..ecad12e6d 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -6,7 +6,10 @@ v999 (May XX, 2021) **Developer changes** -* Add management command `exportcomponentlibrary` to export as OSCAL or CSV. +* Add management command `exportcomponentlibrary` to batch export components from library as OSCAL or CSV. +* Add management command `importcomponents` to batch import OSCAL components to library. +* Add `existing_import_record` to importing and creating components to group multiple imports under the same import record. +* Improve generation of components in OSCAL model by removing certain keys when values are none as per specification. v0.9.3.5.3 (May 16, 2021) ------------------------- diff --git a/controls/management/commands/exportcomponentlibrary.py b/controls/management/commands/exportcomponentlibrary.py index 8c92e8428..bbd9700bb 100644 --- a/controls/management/commands/exportcomponentlibrary.py +++ b/controls/management/commands/exportcomponentlibrary.py @@ -92,6 +92,7 @@ def handle(self, *args, **options): smt.updated ]) else: + counter = 0 print(f"Format '{FORMAT}' not yet supported.") # Done diff --git a/controls/management/commands/importcomponents.py b/controls/management/commands/importcomponents.py new file mode 100644 index 000000000..3de988d49 --- /dev/null +++ b/controls/management/commands/importcomponents.py @@ -0,0 +1,64 @@ +import sys +import os.path + +from django.core.management import call_command +from django.core.management.base import BaseCommand, CommandError +from django.db import transaction, models +from django.db.utils import OperationalError +from django.conf import settings +from pathlib import Path +from pathlib import PurePath +from django.utils.text import slugify + +# from siteapp.models import User, Organization, Portfolio +from controls.models import Element, Statement +# from controls.views import system_element_download_oscal_json +from controls.views import OSCALComponentSerializer, ComponentImporter + +import fs, fs.errors + + +class Command(BaseCommand): + help = 'Import directory of component files.' + + def add_arguments(self, parser): + parser.add_argument('--format', metavar='format', nargs='?', default="oscal", help="File format") + parser.add_argument('--path', metavar='dir_or_pdf', nargs='?', default="local/export/components", help="The directory path containing component files to import.") + parser.add_argument('--importname', metavar='importname', nargs='?', default="Batch component import", help="Name to identify the batch import") + + def handle(self, *args, **options): + + # Configure + FORMAT = options['format'] + IMPORT_PATH = options['path'] + IMPORT_NAME = options['importname'] + # Check if import directory path exists + if not os.path.exists(IMPORT_PATH): + print(f"Import directory {IMPORT_PATH} not found.") + quit() + + if FORMAT == 'oscal': + counter = 0 + # Get list of files in directory + pathlist = Path(IMPORT_PATH).rglob('*.json') + print(pathlist) + # Import each file + for path in pathlist: + counter += 1 + path_in_str = str(path) + print(path_in_str) + + with open(path_in_str) as f: + oscal_component_json = f.read() + result = ComponentImporter().import_components_as_json(IMPORT_NAME, oscal_component_json, existing_import_record=True) + + elif FORMAT == "csv": + import csv + counter = 0 + pprint(f"Format '{FORMAT}' not yet supported.") + else: + print(f"Format '{FORMAT}' not yet supported.") + + # Done + print(f"Imported {counter} components in {FORMAT} from folder `{IMPORT_PATH}`.") + diff --git a/controls/views.py b/controls/views.py index 57b3b06d3..52aa47a4d 100644 --- a/controls/views.py +++ b/controls/views.py @@ -488,12 +488,12 @@ def statement_id_from_control(control_id, part_id): else: return f"{control_id}_smt" - def as_json(self): # Build OSCAL # Example: https://github.com/usnistgov/OSCAL/blob/master/src/content/ssp-example/json/example-component.json uuid = str(self.element.uuid) control_implementations = [] + props = [] of = { "component-definition": { @@ -504,7 +504,7 @@ def as_json(self): "last-modified": self.element.updated.replace(microsecond=0).isoformat(), "version": "string", "oscal-version": "1.0.0-rc1", - "props": [{"name": "tag", "ns": "https://govready.com/ns/oscal", "value": tag.label} for tag in self.element.tags.all()] + "props": props }, "components": { uuid: { @@ -517,6 +517,14 @@ def as_json(self): }, } + # Add component's tags if they exist + if len(self.element.tags.all()) > 0: + props.extend([{"name": "tag", "ns": "https://govready.com/ns/oscal", "value": tag.label} for tag in self.element.tags.all()]) + + # Remove 'metadata.props' key if no metadata.props exist + if len(props) == 0: + of['component-definition']['metadata'].pop('props', None) + # create requirements and organize by source (sid_class) by_class = defaultdict(list) @@ -544,8 +552,10 @@ def as_json(self): statement = { "uuid": str(smt.uuid), "description": smt.body, - "remarks": smt.remarks + "remarks": smt.remarks or "" } + if smt.remarks is None: + statement.pop('remarks', None) statement_id = self.statement_id_from_control(control_id, smt.pid) requirement["statements"][statement_id] = statement @@ -559,6 +569,9 @@ def as_json(self): "implemented-requirements": [req for req in requirements] } control_implementations.append(control_implementation) + # Remove 'control-implementations' key if no implementations exist + if len(control_implementations) == 0: + of['component-definition']['components'][uuid].pop('control-implementations', None) oscal_string = json.dumps(of, sort_keys=False, indent=2) return oscal_string @@ -594,13 +607,15 @@ def as_yaml(self): class ComponentImporter(object): - def import_components_as_json(self, import_name, json_object, request=None): + def import_components_as_json(self, import_name, json_object, request=None, existing_import_record=False): """Imports Components from a JSON object @type import_name: str @param import_name: Name of import file (if it exists) @type json_object: dict @param json_object: Element attributes from JSON object + @type existing_import_record: boolean + @param existing_import_record: Continue to append imports to an existing import record @rtype: ImportRecord if success, bool (false) if failure @returns: ImportRecord linked to the created components (if success) or False if failure """ @@ -615,43 +630,63 @@ def import_components_as_json(self, import_name, json_object, request=None): return False if self.validate_oscal_json(oscal_json): # Returns list of created components + created_components = self.create_components(oscal_json) - new_import_record = self.create_import_record(import_name, created_components) + new_import_record = self.create_import_record(import_name, created_components, existing_import_record=existing_import_record) return new_import_record else: + if request is not None: messages.add_message(request, messages.ERROR, f"Invalid OSCAL. Component(s) not created.") logger.info(f"Invalid JSON. Component(s) not created.") + else: + logger.info(f"Invalid JSON. Component(s) not created.") + return False - def create_import_record(self, import_name, components): + # def find_import_record_by_name(self, import_name): + # """Returns most recent existing import record by name + + # @type import_name: str + # @param import_name: Name of import file (if it exists) + # """ + + # found_import_record = ImportRecord.objects.filter(name=import_name).last() + + # return found_import_record + + def create_import_record(self, import_name, components, existing_import_record=False): """Associates components and statements to an import record @type import_name: str @param import_name: Name of import file (if it exists) @type components: list @param components: List of components + @type existing_import_record: booleen + @param existing_import_record: Continue to append imports to an existing import record @rtype: ImportRecord @returns: New ImportRecord object with components and statements associated """ - new_import_record = ImportRecord.objects.create(name=import_name) + import_record = ImportRecord.objects.filter(name=import_name).last() + if import_record is None or not existing_import_record: + import_record = ImportRecord.objects.create(name=import_name) for component in components: statements = Statement.objects.filter(producer_element=component) for statement in statements: - statement.import_record = new_import_record + statement.import_record = import_record #statement.save() - component.import_record = new_import_record + component.import_record = import_record component.save() - return new_import_record - + return import_record def validate_oscal_json(self, oscal_json): """Validates the JSON object is valid OSCAL format""" project_root = os.path.abspath(os.path.dirname(__name__)) oscal_schema_path = os.path.join(project_root, "schemas", "oscal_component_schema.json") + with open(oscal_schema_path, "r") as schema_content: oscal_json_schema = json.load(schema_content) try: @@ -663,7 +698,6 @@ def validate_oscal_json(self, oscal_json): def create_components(self, oscal_json): """Creates Elements (Components) from valid OSCAL JSON""" - components_created = [] components = oscal_json['component-definition']['components'] for component in components: @@ -688,18 +722,19 @@ def create_component(self, component_json): new_component = Element.objects.create( name=component_name, - description=component_json['description'] if 'description' in component_json else '', + description=component_json['description'] if 'description' in component_json else 'Description missing', # Components uploaded to the Component Library are all system_element types # TODO: When components can be uploaded by project, set element_type from component-type OSCAL property element_type="system_element" ) logger.info(f"Component {new_component.name} created with UUID {new_component.uuid}.") - control_implementation_statements = component_json['control-implementations'] - for control_element in control_implementation_statements: - catalog = oscalize_catalog_key(control_element['source']) if 'source' in control_element else None - implemented_reqs = control_element['implemented-requirements'] if 'implemented-requirements' in control_element else [] - created_statements = self.create_control_implementation_statements(catalog, implemented_reqs, new_component) + control_implementation_statements = component_json.get('control-implementations', None) + if control_implementation_statements: + for control_element in control_implementation_statements: + catalog = oscalize_catalog_key(control_element['source']) if 'source' in control_element else None + implemented_reqs = control_element['implemented-requirements'] if 'implemented-requirements' in control_element else [] + created_statements = self.create_control_implementation_statements(catalog, implemented_reqs, new_component) return new_component def create_control_implementation_statements(self, catalog_key, implemented_reqs, parent_component): @@ -1526,6 +1561,8 @@ def editor(request, system_id, catalog_key, cl_id): "remarks": smt.remarks }, } + if smt.remarks is None: + my_dict[smt.sid + "{}".format(smt.producer_element.name.replace(" ", "-"))].pop("remarks", None) by_components.update(my_dict) oscal_string = json.dumps(of, sort_keys=False, indent=2)