Skip to content

Commit

Permalink
Merge pull request Azure#29 from jddarby/jl/refactor-deploy-params
Browse files Browse the repository at this point in the history
Jl/refactor deploy params
  • Loading branch information
jordlay authored Jun 19, 2023
2 parents 93840e8 + 3adcb80 commit 1c0dbe1
Showing 1 changed file with 61 additions and 80 deletions.
141 changes: 61 additions & 80 deletions src/aosm/azext_aosm/generate_nfd/cnf_nfd_generator.py
Original file line number Diff line number Diff line change
Expand Up @@ -310,7 +310,7 @@ def copy_to_output_folder(self) -> None:
self._tmp_folder_name, CNF_MANIFEST_BICEP_TEMPLATE
)
shutil.copy(tmp_manifest_bicep_path, self.output_folder_name)

# Copy any generated values mappings YAML files to the corresponding folder in
# the output directory so that the user can edit them and re-run the build if
# required
Expand Down Expand Up @@ -475,95 +475,77 @@ def get_chart_mapping_schema(
values_data = yaml.load(stream, Loader=yaml.SafeLoader)

with open(values_schema, "r", encoding="utf-8") as f:
data = json.load(f)
schema_data = data["properties"]
schema_data = json.load(f)

try:
final_schema = self.find_deploy_params(values_data, schema_data, {})
deploy_params_dict = self.traverse_dict(values_data, DEPLOYMENT_PARAMETER_MAPPING_REGEX)
new_schema = self.search_schema(deploy_params_dict, schema_data)
except KeyError as e:
raise InvalidTemplateError(
f"ERROR: Your schema and values for the helm package '{helm_package.name}' do not match. \
f"ERROR: There is a problem with your schema or values for the helm package '{helm_package.name}'. \
Please fix this and run the command again."
) from e

logger.debug("Generated chart mapping schema for %s", helm_package.name)
return final_schema
return new_schema

def find_deploy_params(
self, nested_dict, schema_nested_dict, final_schema
) -> Dict[Any, Any]:
def traverse_dict(self, d, target):
"""
Create a schema of types of only those values in the values.mappings.yaml file which have a deployParameters mapping.
Finds the relevant part of the full schema of the values file and finds the
type of the parameter name, then adds that to the final schema, with no nesting.
Returns a schema of form:
{
"$schema": "https://json-schema.org/draft-07/schema#",
"title": "DeployParametersSchema",
"type": "object",
"properties": {
"<parameter1>": {
"type": "<type>"
},
"<parameter2>": {
"type": "<type>"
},
nested_dict: the dictionary of the values mappings yaml which contains
deployParameters mapping placeholders
schema_nested_dict: the properties section of the full schema (or sub-object in
schema)
final_schema: Blank dictionary if this is the top level starting point,
otherwise the final_schema as far as we have got.
Traverse the dictionary that is loaded from the file provided by path_to_mappings in the input.json.
Returns a dictionary of all the values that match the target regex,
with the key being the deploy parameter and the value being the path to the value.
e.g. {"foo": ["global", "foo", "bar"]}
param d: The dictionary to traverse.
param target: The regex to search for.
"""
original_schema_nested_dict = schema_nested_dict
# if given a blank mapping file, return empty schema
if nested_dict is None:
return {}
for k, v in nested_dict.items():
# if value is a string and contains deployParameters.
if isinstance(v, str) and re.search(DEPLOYMENT_PARAMETER_MAPPING_REGEX, v):
logger.debug(
"Found string deploy parameter for key %s, value %s. Find schema type",
k,
v,
)
# only add the parameter name (e.g. from {deployParameter.zone} only
# param = zone)
param = v.split(".", 1)[1]
param = param.split("}", 1)[0]

# add the schema for k (from the full schema) to the (new) schema
if "properties" in schema_nested_dict.keys():
# Occurs if top level item in schema properties is an object with
# properties itself
final_schema.update(
{param: {"type": schema_nested_dict["properties"][k]["type"]}}
)
else:
# Occurs if top level schema item in schema properties are objects
# with no "properties" - but can have "type".
final_schema.update(
{param: {"type": schema_nested_dict[k]["type"]}}
)
# else if value is a (non-empty) dictionary (i.e another layer of nesting)
elif hasattr(v, "items") and v.items():
logger.debug("Found dict value for key %s. Find schema type", k)
# handling schema having properties which doesn't map directly to the
# values file nesting
if "properties" in schema_nested_dict.keys():
schema_nested_dict = schema_nested_dict["properties"][k]
else:
schema_nested_dict = schema_nested_dict[k]
# recursively call function with values (i.e the nested dictionary)
self.find_deploy_params(v, schema_nested_dict, final_schema)
# reset the schema dict to its original value (once finished with that
# level of recursion)
schema_nested_dict = original_schema_nested_dict
stack = [(d, [])] # Initialize the stack with the dictionary and an empty path
result = {} # Initialize empty dictionary to store the results
while stack: # While there are still items in the stack
# Pop the last item from the stack and unpack it into node (the dictionary) and path
(node, path) = stack.pop()
for k, v in node.items(): # For each key-value pair in the popped item
if isinstance(v, dict): # If the value is a dictionary
stack.append((v, path + [k])) # Add the dictionary to the stack with the path
elif isinstance(v, str) and re.search(target, v): # If the value is a string + matches target regex
match = re.search(target, v) # Take the match i.e, foo from {deployParameter.foo}
result[match.group(1)] = path + [k] # Add it to the result dictionary with its path as the value
elif isinstance(v, list):
for i in v:
if isinstance(i, str) and re.search(target, i):
match = re.search(target, i)
result[match.group(1)] = path + [k]
return result

def search_schema(self, result, full_schema):
"""
Search through provided schema for the types of the deployment parameters.
This assumes that the type of the key will be the type of the deployment parameter.
e.g. if foo: {deployParameter.bar} and foo is type string, then bar is type string.
return final_schema
Returns a dictionary of the deployment parameters in the format:
{"foo": {"type": "string"}, "bar": {"type": "string"}}
param result: The result of the traverse_dict function.
param full_schema: The schema to search through.
"""
new_schema = {}
no_schema_list = []
print(result)
for deploy_param in result:
node = full_schema
for path_list in result[deploy_param]:
if "properties" in node.keys():
node = node["properties"][path_list]
else:
no_schema_list.append(deploy_param)
new_schema.update({deploy_param: {"type": "string"}})
if deploy_param not in new_schema:
new_schema.update({deploy_param: {"type": node.get('type', None)}})
if no_schema_list:
print("No schema found for deployment parameter(s):", no_schema_list)
print("We default these parameters to type string")
return new_schema

def _replace_values_with_deploy_params(
self,
Expand Down Expand Up @@ -595,14 +577,13 @@ def _replace_values_with_deploy_params(
# add the schema for k (from the big schema) to the (smaller) schema
final_values_mapping_dict.update({k: replacement_value})
elif isinstance(v, dict):

final_values_mapping_dict[k] = self._replace_values_with_deploy_params(
v, param_name
)
elif isinstance(v, list):
final_values_mapping_dict[k] = []
for index, item in enumerate(v):
param_name = f"{param_prefix}_{k}_{index}" if param_prefix else f"{k})_{index}"
param_name = f"{param_prefix}_{k}_{index}" if param_prefix else f"{k})_{index}"
if isinstance(item, dict):
final_values_mapping_dict[k].append(
self._replace_values_with_deploy_params(
Expand Down

0 comments on commit 1c0dbe1

Please sign in to comment.