From dfcfd512f5cbe69e0a0ead84b0151417c88ebc40 Mon Sep 17 00:00:00 2001 From: Thanh Dang Nguyen Date: Thu, 30 Jul 2020 15:12:59 -0500 Subject: [PATCH] fix(project_id): ensure project_id exists (#131) * fix(project_id): ensure project_id exists * fix(node_id): change node_id name * fix(mapping): mapping src to multiple renaming name * fix(comments): address some comments * fix(joining): id field in joining * fix(joining): id field in joining --- tube/etl/indexers/aggregation/parser.py | 38 +++++++++++- tube/etl/indexers/aggregation/translator.py | 67 ++++++++++++++++++--- tube/etl/indexers/base/lambdas.py | 8 ++- tube/etl/indexers/base/parser.py | 5 +- tube/etl/indexers/base/prop.py | 6 ++ tube/etl/indexers/base/translator.py | 27 ++++++++- tube/etl/indexers/injection/parser.py | 7 ++- tube/etl/indexers/injection/translator.py | 8 +-- tube/etl/indexers/interpreter.py | 2 + tube/etl/outputs/es/writer.py | 3 +- tube/utils/general.py | 8 +++ 11 files changed, 155 insertions(+), 24 deletions(-) diff --git a/tube/etl/indexers/aggregation/parser.py b/tube/etl/indexers/aggregation/parser.py index 05f6dcb6..70f3d2e0 100644 --- a/tube/etl/indexers/aggregation/parser.py +++ b/tube/etl/indexers/aggregation/parser.py @@ -1,11 +1,9 @@ import re from tube.utils.dd import ( - get_attribute_from_path, get_edge_table, get_child_table, get_multiplicity, get_node_table_name, - get_properties_types, object_to_string, ) from .nodes.aggregated_node import AggregatedNode, Reducer @@ -15,6 +13,7 @@ from .nodes.parent_node import ParentChain, ParentNode from ..base.parser import Parser as BaseParser from copy import deepcopy +from tube.utils.general import PROJECT_CODE, PROGRAM_NAME class Path(object): @@ -106,7 +105,42 @@ def get_parent_props(self): ) return list_nodes + def add_program_name_to_parent(self): + """ + In case program name is not in self.mapping["parent_props"] while the root node is project. We must add that field + :return: + """ + found_program = -1 + i = -1 + for path in self.mapping["parent_props"]: + p = path["path"] + i += 1 + if p.startswith("program"): + found_program = i + break + if found_program == -1: + self.mapping["parent_props"].append( + {"path": "programs[{PROGRAM_N}:name]".format(PROGRAM_N=PROGRAM_NAME)} + ) + found_program = len(self.mapping["parent_props"]) - 1 + program_path = self.mapping["parent_props"][found_program] + program_path_val = program_path["path"] + if program_path_val.find(PROGRAM_NAME) == -1: + separator = "" if program_path_val.find("[]") > 0 else "," + program_path["path"] = "{}{SEP}{PROGRAM_N}:name]".format( + program_path_val[: len(program_path) - 1], + SEP=separator, + PROGRAM_N=PROGRAM_NAME, + ) + def get_host_props(self): + if self.root == "project": + if "project_code" not in [p.get("name") for p in self.mapping["props"]]: + self.mapping["props"].append({"name": PROJECT_CODE, "src": "code"}) + if "parent_props" not in self.mapping: + self.mapping["parent_props"] = [] + self.add_program_name_to_parent() + return self.create_props_from_json( self.doc_type, self.mapping["props"], node_label=self.root ) diff --git a/tube/etl/indexers/aggregation/translator.py b/tube/etl/indexers/aggregation/translator.py index 9bc438cf..9c089365 100644 --- a/tube/etl/indexers/aggregation/translator.py +++ b/tube/etl/indexers/aggregation/translator.py @@ -13,10 +13,17 @@ get_normal_frame, get_single_frame_zero_by_func, ) +from tube.etl.indexers.base.prop import PropFactory from tube.utils.dd import get_node_table_name +from tube.utils.general import ( + get_node_id_name, + get_node_id_name_without_prefix, + PROJECT_ID, + PROJECT_CODE, + PROGRAM_NAME, +) from .parser import Parser from ..base.lambdas import sort_by_field, swap_property_as_key, make_key_from_property -from tube.etl.indexers.base.prop import PropFactory from .lambdas import sliding @@ -192,6 +199,16 @@ def get_joining_props(self, translator, joining_index): props_without_fn = [] for r in joining_index.getting_fields: src_prop = translator.parser.get_prop_by_name(r.prop.src) + # field which is identity of a node is named as _{node}_id now + # before in etl-mapping for joining_props, we use {node}_id + # for backward compatibility, we check first with the value in mapping file. + # if there is not any Prop object like that, we check with new format _{node}_id + if src_prop is None and r.prop.src == get_node_id_name_without_prefix( + translator.parser.doc_type + ): + src_prop = translator.parser.get_prop_by_name( + get_node_id_name(translator.parser.doc_type) + ) dst_prop = self.parser.get_prop_by_name(r.prop.name) if r.fn is None: props_without_fn.append({"src": src_prop, "dst": dst_prop}) @@ -239,19 +256,34 @@ def join_to_an_index(self, df, translator, joining_node): # based on join_on value in the etlMapping, we know what field is used as joining field. # We swap the index that have name of key field different than the name of joining field joining_df_key_id = translator.parser.get_key_prop().id - field_id_in_joining_df = translator.parser.get_prop_by_name( + id_field_in_joining_df = translator.parser.get_prop_by_name( joining_node.joining_field ).id - field_id_in_df = self.parser.get_prop_by_name(joining_node.joining_field).id + # field which is identity of a node is named as _{node}_id now + # before in etl-mapping for joining_props, we use {node}_id + # for backward compatibility, we check first with the value in mapping file. + # if there is not any Prop object like that, we check with new format _{node}_id + id_field_in_df = self.parser.get_prop_by_name(joining_node.joining_field) + if id_field_in_df is None: + id_field_in_df = self.parser.get_prop_by_name( + get_node_id_name(self.parser.doc_type) + ) + if id_field_in_df is None: + raise Exception( + "{} field does not exist in index {}".format( + joining_node.joining_field, self.parser.doc_type + ) + ) + id_field_in_df_id = id_field_in_df.id df_key_id = self.parser.get_key_prop().id swap_df = False - if joining_df_key_id != field_id_in_joining_df: + if joining_df_key_id != id_field_in_joining_df: joining_df = swap_property_as_key( - joining_df, field_id_in_joining_df, joining_df_key_id + joining_df, id_field_in_joining_df, joining_df_key_id ) else: - df = swap_property_as_key(df, field_id_in_df, df_key_id) + df = swap_property_as_key(df, id_field_in_df_id, df_key_id) swap_df = True # Join can be done with or without an aggregation function like max, min, sum, ... @@ -265,7 +297,27 @@ def join_to_an_index(self, df, translator, joining_node): df = self.join_no_aggregate(df, joining_df, props_without_fn) if swap_df: - df = swap_property_as_key(df, df_key_id, field_id_in_df) + df = swap_property_as_key(df, df_key_id, id_field_in_df_id) + return df + + def ensure_project_id_exist(self, df): + project_id_prop = self.parser.get_prop_by_name(PROJECT_ID) + if project_id_prop is None: + project_id_prop = PropFactory.adding_prop( + self.parser.doc_type, PROJECT_ID, None, [], prop_type=(str,) + ) + project_code_id = self.parser.get_prop_by_name(PROJECT_CODE).id + program_name_id = self.parser.get_prop_by_name(PROGRAM_NAME).id + df = df.mapValues( + lambda x: merge_dictionary( + x, + { + project_id_prop.id: "{}-{}".format( + x.get(program_name_id), x.get(project_code_id) + ) + }, + ) + ) return df def translate(self): @@ -274,6 +326,7 @@ def translate(self): root_df = self.translate_special(root_df) root_df = self.translate_parent(root_df) root_df = self.get_direct_children(root_df) + root_df = self.ensure_project_id_exist(root_df) if len(self.parser.aggregated_nodes) == 0: return root_df return root_df.join(self.aggregate_nested_properties()).mapValues( diff --git a/tube/etl/indexers/base/lambdas.py b/tube/etl/indexers/base/lambdas.py index c65ed637..2794a3e8 100644 --- a/tube/etl/indexers/base/lambdas.py +++ b/tube/etl/indexers/base/lambdas.py @@ -1,7 +1,6 @@ import ast import json import collections -import sys def extract_metadata(str_value): @@ -56,11 +55,14 @@ def swap_key_value(df): def get_props(names, values): return lambda x: { - names[src]: values[src][v] - if isinstance(v, collections.Hashable) and src in values and v in values[src] + p_id: values[src][p_id][v] + if isinstance(v, collections.Hashable) + and src in values + and v in values[src][p_id] else v for (src, v) in list(x.items()) if src in list(names.keys()) + for p_id in names[src] } diff --git a/tube/etl/indexers/base/parser.py b/tube/etl/indexers/base/parser.py index 0c1468d7..b286faa3 100644 --- a/tube/etl/indexers/base/parser.py +++ b/tube/etl/indexers/base/parser.py @@ -1,3 +1,4 @@ +from tube.utils.general import get_node_id_name from ..base.prop import PropFactory from tube.utils.dd import get_properties_types @@ -16,7 +17,7 @@ def __init__(self, mapping, model): self.joining_nodes = [] PropFactory.adding_prop( self.doc_type, - "{}_id".format(self.doc_type), + get_node_id_name(self.doc_type), "", [], src_node=None, @@ -67,7 +68,7 @@ def select_widest_type(self, types): def get_key_prop(self): return PropFactory.get_prop_by_name( - self.doc_type, "{}_id".format(self.doc_type) + self.doc_type, get_node_id_name(self.doc_type) ) def get_prop_by_name(self, name): diff --git a/tube/etl/indexers/base/prop.py b/tube/etl/indexers/base/prop.py index b0c36d34..c1995042 100644 --- a/tube/etl/indexers/base/prop.py +++ b/tube/etl/indexers/base/prop.py @@ -110,5 +110,11 @@ def __str__(self): def __repr__(self): return self.__str__() + def __eq__(self, other): + return self.id == other.id + + def __ne__(self, other): + return self.id != other.id + def update_type(self, prop_type): self.type = prop_type diff --git a/tube/etl/indexers/base/translator.py b/tube/etl/indexers/base/translator.py index b556ed3d..3ce01f22 100644 --- a/tube/etl/indexers/base/translator.py +++ b/tube/etl/indexers/base/translator.py @@ -8,8 +8,8 @@ get_props_empty_values, get_number, ) +from tube.etl.indexers.base.prop import PropFactory from tube.utils.spark import save_rds -from .prop import PropFactory class Translator(object): @@ -72,8 +72,29 @@ def write(self, df): def get_props_from_data_row(self, df, props, to_tuple=False): if df.isEmpty(): return df.mapValues(get_props_empty_values(props)) - names = {p.src: p.id for p in props} - values = {p.src: {m.original: m.final for m in p.value_mappings} for p in props} + # names is dictionary which maps from the name of source fields in datatable to the list of ids + # of properties in dataframe + # example: names = {"gender": [1, 2], project_name: [3]} + names = {} + # values is a dictionary which defines the mapping values (if exist) for each field. + # values = { + # "gender": { + # 1: {"male": "M", "female": "F"}, + # 2: {"male": "Male", "female": "Female} + # }, + # "project_name": {3: {}} + # } + values = {} + for p in props: + n = names.get(p.src, []) + n.append(p.id) + names[p.src] = n + v = values.get(p.src, {}) + v[p.id] = {} + for m in p.value_mappings: + v[p.id][m.original] = m.final + values[p.src] = v + return df.mapValues(get_props(names, values)) def get_props_from_df(self, df, props): diff --git a/tube/etl/indexers/injection/parser.py b/tube/etl/indexers/injection/parser.py index c1c0ff2f..2c1007d1 100644 --- a/tube/etl/indexers/injection/parser.py +++ b/tube/etl/indexers/injection/parser.py @@ -10,7 +10,7 @@ get_parent_label, get_node_category, ) -from tube.utils.general import PROJECT_CODE +from tube.utils.general import PROJECT_CODE, PROGRAM_NAME from tube.etl.indexers.base.parser import Parser as BaseParser from tube.etl.indexers.injection.nodes.collecting_node import ( CollectingNode, @@ -44,6 +44,9 @@ def __repr__(self): def __eq__(self, other): return self.__key__() == other.__key__() + def __ne__(self, other): + return self.__key__() != other.__key__() + class NodePath(object): def __init__(self, class_name, upper_path): @@ -136,7 +139,7 @@ def get_props_for_nodes(self): ) else: node_props = v.get("props") - node_props.append({"name": "program_name", "src": "name"}) + node_props.append({"name": PROGRAM_NAME, "src": "name"}) roots[k] = RootNode( k, get_node_table_name(self.model, k), diff --git a/tube/etl/indexers/injection/translator.py b/tube/etl/indexers/injection/translator.py index 09be3912..0f7067b8 100644 --- a/tube/etl/indexers/injection/translator.py +++ b/tube/etl/indexers/injection/translator.py @@ -16,7 +16,7 @@ get_frame_zero, ) from tube.etl.indexers.injection.nodes.collecting_node import LeafNode -from tube.utils.general import PROJECT_ID, PROJECT_CODE, PROGRAM_NAME +from tube.utils.general import PROJECT_ID, PROJECT_CODE, PROGRAM_NAME, get_node_id_name class Translator(BaseTranslator): @@ -53,7 +53,7 @@ def collect_collecting_child(self, child, edge_df, collected_collecting_dfs): child.no_parent_to_map -= 1 if len(child.props) > 0: child_df = self.translate_table(child.tbl_name, props=child.props) - node = self.parser.get_prop_by_name("{}_id".format(child.name)) + node = self.parser.get_prop_by_name(get_node_id_name(child.name)) node_id = node.id if node is not None else None if node_id is not None: child_df = child_df.map( @@ -84,12 +84,12 @@ def merge_project(self, child, edge_df, collected_collecting_dfs): return child.no_parent_to_map -= 1 child_df = self.translate_table(child.tbl_name, props=child.props) - project_code_id = self.parser.get_prop_by_name(PROJECT_CODE).id child_df = child_df.join(edge_df).mapValues( lambda x: merge_dictionary(x[0], x[1]) ) - program_name_id = self.parser.get_prop_by_name(PROGRAM_NAME).id + project_code_id = self.parser.get_prop_by_name(PROJECT_CODE).id + program_name_id = self.parser.get_prop_by_name(PROGRAM_NAME).id project_id_prop = self.parser.get_prop_by_name(PROJECT_ID) if project_id_prop is None: project_id_prop = PropFactory.adding_prop( diff --git a/tube/etl/indexers/interpreter.py b/tube/etl/indexers/interpreter.py index cdfd3686..8b20124b 100644 --- a/tube/etl/indexers/interpreter.py +++ b/tube/etl/indexers/interpreter.py @@ -35,6 +35,8 @@ def run_transform(translators): for translator in list(translators.values()): df = translator.translate() + if df is None: + continue translator.save_to_hadoop(df) translator.current_step = 1 if len(translator.parser.joining_nodes) > 0: diff --git a/tube/etl/outputs/es/writer.py b/tube/etl/outputs/es/writer.py index 9ab4bd0f..bb57351e 100644 --- a/tube/etl/outputs/es/writer.py +++ b/tube/etl/outputs/es/writer.py @@ -9,10 +9,11 @@ from tube.etl.outputs.es.versioning import Versioning from tube.etl.plugins import post_process_plugins, add_auth_resource_path_mapping from tube.etl.spark_base import SparkBase +from tube.utils.general import get_node_id_name def json_export(x, doc_type): - x[1]["{}_id".format(doc_type)] = x[0] + x[1][get_node_id_name(doc_type)] = x[0] x[1]["node_id"] = x[0] # redundant field for backward compatibility with arranger return (x[0], json.dumps(x[1])) diff --git a/tube/utils/general.py b/tube/utils/general.py index 9b9e5047..5254fe9a 100644 --- a/tube/utils/general.py +++ b/tube/utils/general.py @@ -55,6 +55,14 @@ def get_resource_path_from_json(results, json_data): return results +def get_node_id_name(name): + return "_{}_id".format(name) + + +def get_node_id_name_without_prefix(name): + return "{}_id".format(name) + + PROGRAM_NAME = "program_name" PROJECT_CODE = "project_code" PROJECT_ID = "project_id"