Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Move Catalog data into database, faster control select autocomplete #1673

Merged
merged 10 commits into from
Aug 8, 2021
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ v0.9.8-dev (August xx, 2021)
**Developer changes**

* Support autostart of a project, taking user to first question of a project when starting a new project.
* Retrieve Catalog data from database instead of file system with new controls.models.CatalogData model.


v0.9.7 (August 06, 2021)
Expand Down
6 changes: 5 additions & 1 deletion controls/admin.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
from django.contrib import admin
from django.http import HttpResponse
from .models import ImportRecord, Statement, Element, ElementControl, ElementRole, System, CommonControlProvider, CommonControl, ElementCommonControl, Poam, Deployment, SystemAssessmentResult
from .oscal import CatalogData
from guardian.admin import GuardedModelAdmin
from simple_history.admin import SimpleHistoryAdmin

Expand Down Expand Up @@ -89,6 +90,9 @@ class SystemAssessmentResultAdmin(admin.ModelAdmin):
list_display = ('id', 'name', 'system', 'deployment', 'uuid')
search_fields = ('id', 'name', 'system', 'deployment', 'uuid')

class CatalogDataAdmin(admin.ModelAdmin):
list_display = ('catalog_key',)
search_fields = ('catalog_key',)

admin.site.register(ImportRecord, ImportRecordAdmin)
admin.site.register(Statement, StatementAdmin)
Expand All @@ -102,4 +106,4 @@ class SystemAssessmentResultAdmin(admin.ModelAdmin):
admin.site.register(Poam, PoamAdmin)
admin.site.register(Deployment, DeploymentAdmin)
admin.site.register(SystemAssessmentResult, SystemAssessmentResultAdmin)

admin.site.register(CatalogData, CatalogDataAdmin)
33 changes: 33 additions & 0 deletions controls/migrations/0057_catalogdata.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
# Generated by Django 3.2.4 on 2021-08-05 02:12

from django.db import migrations, models
import django.db.models.manager
import jsonfield.fields


class Migration(migrations.Migration):

dependencies = [
('controls', '0056_element_oscal_version'),
]

operations = [
migrations.CreateModel(
name='CatalogData',
fields=[
('id', models.AutoField(auto_created=True, primary_key=True, serialize=False, verbose_name='ID')),
('catalog_key', models.CharField(help_text='Unique key for catalog', max_length=100, unique=True)),
('catalog_json', jsonfield.fields.JSONField(blank=True, help_text='JSON object representing the OSCAL-formatted control catalog.', null=True)),
('created', models.DateTimeField(auto_now_add=True, db_index=True)),
('updated', models.DateTimeField(auto_now=True, db_index=True)),
],
options={
'abstract': False,
'base_manager_name': 'prefetch_manager',
},
managers=[
('objects', django.db.models.manager.Manager()),
('prefetch_manager', django.db.models.manager.Manager()),
],
),
]
19 changes: 19 additions & 0 deletions controls/migrations/0058_catalogdata_baselines_json.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
# Generated by Django 3.2.4 on 2021-08-05 15:19

from django.db import migrations
import jsonfield.fields


class Migration(migrations.Migration):

dependencies = [
('controls', '0057_catalogdata'),
]

operations = [
migrations.AddField(
model_name='catalogdata',
name='baselines_json',
field=jsonfield.fields.JSONField(blank=True, help_text='JSON object representing the baselines for the catalog.', null=True),
),
]
65 changes: 11 additions & 54 deletions controls/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,14 +15,13 @@
from controls.enums.components import ComponentTypeEnum, ComponentStateEnum
from siteapp.model_mixins.tags import TagModelMixin
from controls.enums.statements import StatementTypeEnum
from controls.oscal import Catalogs, Catalog, check_and_extend
from controls.oscal import Catalogs, Catalog, CatalogData
import uuid
import tools.diff_match_patch.python3 as dmp_module
from copy import deepcopy
from django.db import transaction

BASELINE_PATH = os.path.join(os.path.dirname(__file__),'data','baselines')
EXTERNAL_BASELINE_PATH = os.path.join(f"{os.getcwd()}",'local', 'controls', 'data', 'baselines')
ORGPARAM_PATH = os.path.join(os.path.dirname(__file__),'data','org_defined_parameters')

class ImportRecord(models.Model):
Expand Down Expand Up @@ -783,9 +782,7 @@ class Baselines (object):
def __init__(self):

self.file_path = BASELINE_PATH
self.external_file_path = EXTERNAL_BASELINE_PATH
self.baselines_keys = self._list_keys()
# self.index = self._build_index()

# Usage
# from controls.models import Baselines
Expand All @@ -796,44 +793,18 @@ def __init__(self):
# # Returns ['ac-1', 'ac-2', 'ac-2.1', 'ac-2.2', ...]
# bs.get_baseline_controls('NIST_SP-800-53_rev4', 'moderate')

def _list_files(self):
return self.extend_external_baselines([
'NIST_SP-800-53_rev4_baselines.json',
# 'NIST_SP-800-53_rev5_baselines.json',
'NIST_SP-800-171_rev1_baselines.json',
'CMMC_ver1_baselines.json'
], "files")


def _list_keys(self):
return self.extend_external_baselines([
'NIST_SP-800-53_rev4',
# 'NIST_SP-800-53_rev5',
'NIST_SP-800-171_rev1',
'CMMC_ver1'
], "keys")

# TODO: only return keys for records that have baselines?
return list(CatalogData.objects.order_by('catalog_key').values_list('catalog_key', flat=True).distinct())

def _load_json(self, baselines_key):
"""Read baseline file - JSON"""
# TODO Escape baselines_key
self.data_file = baselines_key + "_baselines.json"
data_file = os.path.join(self.file_path, self.data_file)
# Does file exist?
if not os.path.isfile(data_file):
# Check if there any external oscal baseline files
try:
data_file = os.path.join(self.external_file_path, self.data_file)
except:
print("ERROR: {} does not exist".format(data_file))
return False
# Load file as json
try:
with open(data_file, 'r') as json_file:
data = json.load(json_file)
return data
except:
print("ERROR: {} could not be read or could not be read as json".format(data_file))

catalog_record = CatalogData.objects.get(catalog_key=baselines_key)
baselines = catalog_record.baselines_json
if baselines:
return baselines
else:
return False

def get_baseline_controls(self, baselines_key, baseline_name):
Expand All @@ -849,22 +820,6 @@ def get_baseline_controls(self, baselines_key, baseline_name):
print("Requested baseline name not found in baselines_key data file")
return False

@property
def body(self):
return self.legacy_imp_smt


def extend_external_baselines(self, baseline_info, extendtype):
"""
Add external baselines to list of baselines
"""
os.makedirs(EXTERNAL_BASELINE_PATH, exist_ok=True)
external_baselines = [file for file in os.listdir(EXTERNAL_BASELINE_PATH) if
file.endswith('.json')]

baseline_info = check_and_extend(baseline_info, external_baselines, extendtype, "_baselines")
return baseline_info

class OrgParams(object):
"""
Represent list of organizational defined parameters. Temporary
Expand Down Expand Up @@ -1016,3 +971,5 @@ def __repr__(self):
# # For debugging.
# return "<AssesmentResult %s id=%d>" % (self.statement, self.id)



99 changes: 33 additions & 66 deletions controls/oscal.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,56 +6,47 @@
from pathlib import Path
import sys

import auto_prefetch
from django.db import models
from django.utils.functional import cached_property
from jsonfield import JSONField


CATALOG_PATH = os.path.join(os.path.dirname(__file__), 'data', 'catalogs')
EXTERNAL_CATALOG_PATH = os.path.join(f"{os.getcwd()}",'local', 'controls', 'data', 'catalogs')
BASELINE_PATH = os.path.join(os.path.dirname(__file__),'data','baselines')

class CatalogData(auto_prefetch.Model):
catalog_key = models.CharField(max_length=100, help_text="Unique key for catalog", unique=True, blank=False, null=False)
catalog_json = JSONField(blank=True, null=True, help_text="JSON object representing the OSCAL-formatted control catalog.")
baselines_json = JSONField(blank=True, null=True, help_text="JSON object representing the baselines for the catalog.")
created = models.DateTimeField(auto_now_add=True, db_index=True)
updated = models.DateTimeField(auto_now=True, db_index=True)

def __str__(self):
return "'%s id=%d'" % (self.catalog_key, self.id)

def __repr__(self):
# For debugging.
return "'%s id=%d'" % (self.catalog_key, self.id)

class Catalogs(object):
"""Represent list of catalogs"""

# well known catalog identifiers

NIST_SP_800_53_rev4 = 'NIST_SP-800-53_rev4'
NIST_SP_800_53_rev5 = 'NIST_SP-800-53_rev5'
NIST_SP_800_171_rev1 = 'NIST_SP-800-171_rev1'
CMMC_ver1 = 'CMMC_ver1'

def __init__(self):
self.catalog_path = CATALOG_PATH
# self.catalog = None
self.catalog_keys = self._list_catalog_keys()
self.index = self._build_index()

def extend_external_catalogs(self, catalog_info, extendtype):
"""
Add external catalogs to list of catalogs
"""
os.makedirs(EXTERNAL_CATALOG_PATH, exist_ok=True)
external_catalogs = [file for file in os.listdir(EXTERNAL_CATALOG_PATH) if
file.endswith('.json')]
catalog_info = check_and_extend(catalog_info, external_catalogs, extendtype, "_catalog")

return catalog_info

def _list_catalog_files(self):
return self.extend_external_catalogs([
'NIST_SP-800-53_rev4_catalog.json',
'NIST_SP-800-53_rev5_catalog.json',
'NIST_SP-800-171_rev1_catalog.json',
'CMMC_ver1_catalog.json'
], "files")

def _list_catalog_keys(self):

return self.extend_external_catalogs([
Catalogs.NIST_SP_800_53_rev4,
Catalogs.NIST_SP_800_53_rev5,
Catalogs.NIST_SP_800_171_rev1,
Catalogs.CMMC_ver1
], "keys")
return list(CatalogData.objects.order_by('catalog_key').values_list('catalog_key', flat=True).distinct())

def _load_catalog_json(self, catalog_key):
catalog = Catalog(catalog_key)
#print(catalog_key, catalog._load_catalog_json())
return catalog._load_catalog_json()

def _build_index(self):
Expand All @@ -76,26 +67,13 @@ def list_catalogs(self):
"""
List catalog objects
"""
return [Catalog(key) for key in Catalogs()._list_catalog_keys()]

return [Catalog.GetInstance(catalog_key=key) for key in Catalogs()._list_catalog_keys()]

def uhash(obj):
"""Return a positive hash code"""
h = hash(obj)
return h + sys.maxsize + 1

def check_and_extend(values, external_values, extendtype, splitter):
"""
Modularize value to extend
"""
if extendtype == "keys":
keys = [key.split(f'{splitter}.json')[0] for key in external_values]
values.extend(keys)
elif extendtype == "files":
files = [file for file in external_values]
values.extend(files)
return values

def de_oscalize_control(control_id):
"""
Returns the regular control formatting from an oscalized version of the control number.
Expand All @@ -108,10 +86,10 @@ class Catalog(object):

# Create a singleton instance of this class per catalog. GetInstance returns
# that singleton instance. Instead of doing
# `cg = Catalog(catalog_key=Catalogs.NIST_SP_800_53_rev4)`,
# do `cg = Catalog.GetInstance(catalog_key=Catalogs.NIST_SP_800_53_rev4')`.
# `cg = Catalog(catalog_key='NIST_SP-800-53_rev4')`,
# do `cg = Catalog.GetInstance(catalog_key='NIST_SP-800-53_rev4')`.
@classmethod
def GetInstance(cls, catalog_key=Catalogs.NIST_SP_800_53_rev4, parameter_values=dict()):
def GetInstance(cls, catalog_key='NIST_SP-800-53_rev4', parameter_values=dict()):
# Create a new instance of Catalog() the first time for each
# catalog key / parameter combo
# this method is called. Keep it in memory indefinitely.
Expand All @@ -132,11 +110,10 @@ def _catalog_instance_key(catalog_key, parameter_values):
catalog_instance_key += '_' + str(parameter_values_hash)
return catalog_instance_key.replace('-', '_')

def __init__(self, catalog_key=Catalogs.NIST_SP_800_53_rev4, parameter_values=dict()):
def __init__(self, catalog_key='NIST_SP-800-53_rev4', parameter_values=dict()):
self.catalog_key = catalog_key
self.catalog_key_display = catalog_key.replace("_", " ")
self.catalog_path = CATALOG_PATH
self.external_catalog_path = EXTERNAL_CATALOG_PATH
self.catalog_file = catalog_key + "_catalog.json"
try:
self.oscal = self._load_catalog_json()
Expand All @@ -163,22 +140,12 @@ def __init__(self, catalog_key=Catalogs.NIST_SP_800_53_rev4, parameter_values=di

def _load_catalog_json(self):
"""Read catalog file - JSON"""
catalog_file = os.path.join(self.catalog_path, self.catalog_file)
catalog_file_external = os.path.join(self.external_catalog_path, self.catalog_file)
# Get catalog file from internal or "external" catalog files
if os.path.isfile(catalog_file):
with open(catalog_file, 'r') as json_file:
data = json.load(json_file)
oscal = data['catalog']
return oscal
elif os.path.isfile(catalog_file_external):
with open(catalog_file_external, 'r') as json_file:
data = json.load(json_file)
oscal = data['catalog']
return oscal
else:
# Catalog file doesn't exist
return False

# Get catalog from database
# TODO: check for DB miss
catalog_record = CatalogData.objects.get(catalog_key=self.catalog_key)
oscal = catalog_record.catalog_json['catalog']
return oscal

def find_dict_by_value(self, search_array, search_key, search_value):
"""Return the dictionary in an array of dictionaries with a key matching a value"""
Expand Down
Loading