Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add management commands to batch export components as OSCAL or CSV files and import components from OSCAL files #1596

Merged
merged 2 commits into from
May 19, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,12 @@ GovReady-Q Release Notes
v999 (May XX, 2021)
---------------------

**Developer changes**

* Add management command `exportcomponentlibrary` to batch export components from library as OSCAL or CSV.
* Add management command `importcomponents` to batch import OSCAL components to library.
* Add `existing_import_record` to importing and creating components to group multiple imports under the same import record.
* Improve generation of components in OSCAL model by removing certain keys when values are none as per specification.

v0.9.3.5.3 (May 16, 2021)
-------------------------
Expand Down
100 changes: 100 additions & 0 deletions controls/management/commands/exportcomponentlibrary.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,100 @@
import sys
import os.path

from django.core.management import call_command
from django.core.management.base import BaseCommand, CommandError
from django.db import transaction, models
from django.db.utils import OperationalError
from django.conf import settings
from pathlib import Path
from pathlib import PurePath
from django.utils.text import slugify

# from siteapp.models import User, Organization, Portfolio
from controls.models import Element, Statement
# from controls.views import system_element_download_oscal_json
from controls.views import OSCALComponentSerializer

import fs, fs.errors


class Command(BaseCommand):
help = 'Export all components.'

def add_arguments(self, parser):
parser.add_argument('--format', metavar='format', nargs='?', default="oscal", help="File format")
parser.add_argument('--path', metavar='dir_or_pdf', nargs='?', default="local/export/components", help="The directory path to write export file(s) into.")

def handle(self, *args, **options):

# Configure
FORMAT = options['format']
EXPORT_PATH = options['path']
# Create export directory path
if not os.path.exists(EXPORT_PATH):
Path(EXPORT_PATH).mkdir(parents=True, exist_ok=True)

# Export the component library
elements = Element.objects.filter(element_type="system_element")
element_cnt = len(elements)

if FORMAT == 'oscal':
counter = 0
for element in elements:
counter += 1
print(f"{counter} id: {element.id}, element: {element.name}")
# Get the impl_smts for component
impl_smts = Statement.objects.filter(producer_element=element)
filename = str(PurePath(slugify(element.name)).with_suffix('.json'))
body = OSCALComponentSerializer(element, impl_smts).as_json()
# Save component OSCAL
with open(os.path.join(EXPORT_PATH,filename), "w") as f:
f.write(body)
gregelin marked this conversation as resolved.
Show resolved Hide resolved
elif FORMAT == "csv":
import csv
counter = 0
for element in elements:
counter += 1
print(f"{counter} id: {element.id}, element: {element.name}")
# Get the impl_smts for component
impl_smts = Statement.objects.filter(producer_element=element)
filename = str(PurePath(slugify(element.name)).with_suffix('.csv'))
tags = ";".join([f"'{tag.label}'" for tag in element.tags.all()])
with open(os.path.join(EXPORT_PATH,filename), mode='w') as f:
component_writer = csv.writer(f, delimiter='|', quotechar='"', quoting=csv.QUOTE_MINIMAL)
component_writer.writerow(["component_name",
"component_uuid",
"component_tags",
"control_key",
"control_id",
"control_part",
"statement_uuid",
"statement",
"statement_type",
"remarks",
"version",
"created",
"updated"
])
for smt in impl_smts:
component_writer.writerow([element.name,
element.uuid,
tags,
smt.sid_class,
smt.sid,
smt.pid,
smt.uuid,
smt.body,
smt.statement_type,
smt.remarks,
smt.version,
smt.created,
smt.updated
])
else:
counter = 0
print(f"Format '{FORMAT}' not yet supported.")

# Done
print(f"Exported {counter} components in {FORMAT} to folder `{EXPORT_PATH}`.")

64 changes: 64 additions & 0 deletions controls/management/commands/importcomponents.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
import sys
import os.path

from django.core.management import call_command
from django.core.management.base import BaseCommand, CommandError
from django.db import transaction, models
from django.db.utils import OperationalError
from django.conf import settings
from pathlib import Path
from pathlib import PurePath
from django.utils.text import slugify

# from siteapp.models import User, Organization, Portfolio
from controls.models import Element, Statement
# from controls.views import system_element_download_oscal_json
from controls.views import OSCALComponentSerializer, ComponentImporter

import fs, fs.errors


class Command(BaseCommand):
help = 'Import directory of component files.'

def add_arguments(self, parser):
parser.add_argument('--format', metavar='format', nargs='?', default="oscal", help="File format")
parser.add_argument('--path', metavar='dir_or_pdf', nargs='?', default="local/export/components", help="The directory path containing component files to import.")
parser.add_argument('--importname', metavar='importname', nargs='?', default="Batch component import", help="Name to identify the batch import")

def handle(self, *args, **options):

# Configure
FORMAT = options['format']
IMPORT_PATH = options['path']
IMPORT_NAME = options['importname']
# Check if import directory path exists
if not os.path.exists(IMPORT_PATH):
print(f"Import directory {IMPORT_PATH} not found.")
quit()

if FORMAT == 'oscal':
counter = 0
# Get list of files in directory
pathlist = Path(IMPORT_PATH).rglob('*.json')
print(pathlist)
# Import each file
for path in pathlist:
counter += 1
path_in_str = str(path)
print(path_in_str)

with open(path_in_str) as f:
oscal_component_json = f.read()
gregelin marked this conversation as resolved.
Show resolved Hide resolved
result = ComponentImporter().import_components_as_json(IMPORT_NAME, oscal_component_json, existing_import_record=True)

elif FORMAT == "csv":
import csv
counter = 0
pprint(f"Format '{FORMAT}' not yet supported.")
else:
print(f"Format '{FORMAT}' not yet supported.")

# Done
print(f"Imported {counter} components in {FORMAT} from folder `{IMPORT_PATH}`.")

75 changes: 57 additions & 18 deletions controls/views.py
Original file line number Diff line number Diff line change
Expand Up @@ -488,12 +488,13 @@ def statement_id_from_control(control_id, part_id):
else:
return f"{control_id}_smt"


def as_json(self):
# Build OSCAL
# Example: https://github.com/usnistgov/OSCAL/blob/master/src/content/ssp-example/json/example-component.json
uuid = str(self.element.uuid)
control_implementations = []
props = []

of = {
"component-definition": {
"uuid": str(uuid4()),
Expand All @@ -502,7 +503,8 @@ def as_json(self):
"published": datetime.now(timezone.utc).replace(microsecond=0).isoformat(),
"last-modified": self.element.updated.replace(microsecond=0).isoformat(),
"version": "string",
"oscal-version": "1.0.0-rc1"
"oscal-version": "1.0.0-rc1",
"props": props
},
"components": {
uuid: {
Expand All @@ -515,6 +517,14 @@ def as_json(self):
},
}

# Add component's tags if they exist
if len(self.element.tags.all()) > 0:
gregelin marked this conversation as resolved.
Show resolved Hide resolved
props.extend([{"name": "tag", "ns": "https://govready.com/ns/oscal", "value": tag.label} for tag in self.element.tags.all()])

# Remove 'metadata.props' key if no metadata.props exist
if len(props) == 0:
of['component-definition']['metadata'].pop('props', None)

# create requirements and organize by source (sid_class)

by_class = defaultdict(list)
Expand Down Expand Up @@ -542,8 +552,10 @@ def as_json(self):
statement = {
"uuid": str(smt.uuid),
"description": smt.body,
"remarks": smt.remarks
"remarks": smt.remarks or ""
}
if smt.remarks is None:
statement.pop('remarks', None)
statement_id = self.statement_id_from_control(control_id, smt.pid)
requirement["statements"][statement_id] = statement

Expand All @@ -557,6 +569,9 @@ def as_json(self):
"implemented-requirements": [req for req in requirements]
}
control_implementations.append(control_implementation)
# Remove 'control-implementations' key if no implementations exist
if len(control_implementations) == 0:
of['component-definition']['components'][uuid].pop('control-implementations', None)

oscal_string = json.dumps(of, sort_keys=False, indent=2)
return oscal_string
Expand Down Expand Up @@ -592,13 +607,15 @@ def as_yaml(self):

class ComponentImporter(object):

def import_components_as_json(self, import_name, json_object, request=None):
def import_components_as_json(self, import_name, json_object, request=None, existing_import_record=False):
"""Imports Components from a JSON object

@type import_name: str
@param import_name: Name of import file (if it exists)
@type json_object: dict
@param json_object: Element attributes from JSON object
@type existing_import_record: boolean
@param existing_import_record: Continue to append imports to an existing import record
@rtype: ImportRecord if success, bool (false) if failure
@returns: ImportRecord linked to the created components (if success) or False if failure
"""
Expand All @@ -613,43 +630,63 @@ def import_components_as_json(self, import_name, json_object, request=None):
return False
if self.validate_oscal_json(oscal_json):
# Returns list of created components

created_components = self.create_components(oscal_json)
new_import_record = self.create_import_record(import_name, created_components)
new_import_record = self.create_import_record(import_name, created_components, existing_import_record=existing_import_record)
return new_import_record
else:

if request is not None:
messages.add_message(request, messages.ERROR, f"Invalid OSCAL. Component(s) not created.")
logger.info(f"Invalid JSON. Component(s) not created.")
else:
logger.info(f"Invalid JSON. Component(s) not created.")

return False

def create_import_record(self, import_name, components):
# def find_import_record_by_name(self, import_name):
# """Returns most recent existing import record by name

# @type import_name: str
# @param import_name: Name of import file (if it exists)
# """

# found_import_record = ImportRecord.objects.filter(name=import_name).last()

# return found_import_record

def create_import_record(self, import_name, components, existing_import_record=False):
"""Associates components and statements to an import record

@type import_name: str
@param import_name: Name of import file (if it exists)
@type components: list
@param components: List of components
@type existing_import_record: booleen
@param existing_import_record: Continue to append imports to an existing import record
@rtype: ImportRecord
@returns: New ImportRecord object with components and statements associated
"""

new_import_record = ImportRecord.objects.create(name=import_name)
import_record = ImportRecord.objects.filter(name=import_name).last()
if import_record is None or not existing_import_record:
import_record = ImportRecord.objects.create(name=import_name)
for component in components:
statements = Statement.objects.filter(producer_element=component)
for statement in statements:
statement.import_record = new_import_record
statement.import_record = import_record
#statement.save()
component.import_record = new_import_record
component.import_record = import_record
component.save()

return new_import_record

return import_record

def validate_oscal_json(self, oscal_json):
"""Validates the JSON object is valid OSCAL format"""

project_root = os.path.abspath(os.path.dirname(__name__))
oscal_schema_path = os.path.join(project_root, "schemas", "oscal_component_schema.json")

with open(oscal_schema_path, "r") as schema_content:
oscal_json_schema = json.load(schema_content)
try:
Expand All @@ -661,7 +698,6 @@ def validate_oscal_json(self, oscal_json):

def create_components(self, oscal_json):
"""Creates Elements (Components) from valid OSCAL JSON"""

components_created = []
components = oscal_json['component-definition']['components']
for component in components:
Expand All @@ -686,18 +722,19 @@ def create_component(self, component_json):

new_component = Element.objects.create(
name=component_name,
description=component_json['description'] if 'description' in component_json else '',
description=component_json['description'] if 'description' in component_json else 'Description missing',
# Components uploaded to the Component Library are all system_element types
# TODO: When components can be uploaded by project, set element_type from component-type OSCAL property
element_type="system_element"
)

logger.info(f"Component {new_component.name} created with UUID {new_component.uuid}.")
control_implementation_statements = component_json['control-implementations']
for control_element in control_implementation_statements:
catalog = oscalize_catalog_key(control_element['source']) if 'source' in control_element else None
implemented_reqs = control_element['implemented-requirements'] if 'implemented-requirements' in control_element else []
created_statements = self.create_control_implementation_statements(catalog, implemented_reqs, new_component)
control_implementation_statements = component_json.get('control-implementations', None)
if control_implementation_statements:
for control_element in control_implementation_statements:
catalog = oscalize_catalog_key(control_element['source']) if 'source' in control_element else None
implemented_reqs = control_element['implemented-requirements'] if 'implemented-requirements' in control_element else []
created_statements = self.create_control_implementation_statements(catalog, implemented_reqs, new_component)
return new_component

def create_control_implementation_statements(self, catalog_key, implemented_reqs, parent_component):
Expand Down Expand Up @@ -1524,6 +1561,8 @@ def editor(request, system_id, catalog_key, cl_id):
"remarks": smt.remarks
},
}
if smt.remarks is None:
my_dict[smt.sid + "{}".format(smt.producer_element.name.replace(" ", "-"))].pop("remarks", None)
by_components.update(my_dict)
oscal_string = json.dumps(of, sort_keys=False, indent=2)

Expand Down