Skip to content

Commit

Permalink
fix(project_id): ensure project_id exists (#131)
Browse files Browse the repository at this point in the history
* fix(project_id): ensure project_id exists

* fix(node_id): change node_id name

* fix(mapping): mapping src to multiple renaming name

* fix(comments): address some comments

* fix(joining): id field in joining

* fix(joining): id field in joining
  • Loading branch information
thanh-nguyen-dang authored Jul 30, 2020
1 parent 3619fa3 commit dfcfd51
Show file tree
Hide file tree
Showing 11 changed files with 155 additions and 24 deletions.
38 changes: 36 additions & 2 deletions tube/etl/indexers/aggregation/parser.py
Original file line number Diff line number Diff line change
@@ -1,11 +1,9 @@
import re
from tube.utils.dd import (
get_attribute_from_path,
get_edge_table,
get_child_table,
get_multiplicity,
get_node_table_name,
get_properties_types,
object_to_string,
)
from .nodes.aggregated_node import AggregatedNode, Reducer
Expand All @@ -15,6 +13,7 @@
from .nodes.parent_node import ParentChain, ParentNode
from ..base.parser import Parser as BaseParser
from copy import deepcopy
from tube.utils.general import PROJECT_CODE, PROGRAM_NAME


class Path(object):
Expand Down Expand Up @@ -106,7 +105,42 @@ def get_parent_props(self):
)
return list_nodes

def add_program_name_to_parent(self):
"""
In case program name is not in self.mapping["parent_props"] while the root node is project. We must add that field
:return:
"""
found_program = -1
i = -1
for path in self.mapping["parent_props"]:
p = path["path"]
i += 1
if p.startswith("program"):
found_program = i
break
if found_program == -1:
self.mapping["parent_props"].append(
{"path": "programs[{PROGRAM_N}:name]".format(PROGRAM_N=PROGRAM_NAME)}
)
found_program = len(self.mapping["parent_props"]) - 1
program_path = self.mapping["parent_props"][found_program]
program_path_val = program_path["path"]
if program_path_val.find(PROGRAM_NAME) == -1:
separator = "" if program_path_val.find("[]") > 0 else ","
program_path["path"] = "{}{SEP}{PROGRAM_N}:name]".format(
program_path_val[: len(program_path) - 1],
SEP=separator,
PROGRAM_N=PROGRAM_NAME,
)

def get_host_props(self):
if self.root == "project":
if "project_code" not in [p.get("name") for p in self.mapping["props"]]:
self.mapping["props"].append({"name": PROJECT_CODE, "src": "code"})
if "parent_props" not in self.mapping:
self.mapping["parent_props"] = []
self.add_program_name_to_parent()

return self.create_props_from_json(
self.doc_type, self.mapping["props"], node_label=self.root
)
Expand Down
67 changes: 60 additions & 7 deletions tube/etl/indexers/aggregation/translator.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,10 +13,17 @@
get_normal_frame,
get_single_frame_zero_by_func,
)
from tube.etl.indexers.base.prop import PropFactory
from tube.utils.dd import get_node_table_name
from tube.utils.general import (
get_node_id_name,
get_node_id_name_without_prefix,
PROJECT_ID,
PROJECT_CODE,
PROGRAM_NAME,
)
from .parser import Parser
from ..base.lambdas import sort_by_field, swap_property_as_key, make_key_from_property
from tube.etl.indexers.base.prop import PropFactory
from .lambdas import sliding


Expand Down Expand Up @@ -192,6 +199,16 @@ def get_joining_props(self, translator, joining_index):
props_without_fn = []
for r in joining_index.getting_fields:
src_prop = translator.parser.get_prop_by_name(r.prop.src)
# field which is identity of a node is named as _{node}_id now
# before in etl-mapping for joining_props, we use {node}_id
# for backward compatibility, we check first with the value in mapping file.
# if there is not any Prop object like that, we check with new format _{node}_id
if src_prop is None and r.prop.src == get_node_id_name_without_prefix(
translator.parser.doc_type
):
src_prop = translator.parser.get_prop_by_name(
get_node_id_name(translator.parser.doc_type)
)
dst_prop = self.parser.get_prop_by_name(r.prop.name)
if r.fn is None:
props_without_fn.append({"src": src_prop, "dst": dst_prop})
Expand Down Expand Up @@ -239,19 +256,34 @@ def join_to_an_index(self, df, translator, joining_node):
# based on join_on value in the etlMapping, we know what field is used as joining field.
# We swap the index that have name of key field different than the name of joining field
joining_df_key_id = translator.parser.get_key_prop().id
field_id_in_joining_df = translator.parser.get_prop_by_name(
id_field_in_joining_df = translator.parser.get_prop_by_name(
joining_node.joining_field
).id
field_id_in_df = self.parser.get_prop_by_name(joining_node.joining_field).id
# field which is identity of a node is named as _{node}_id now
# before in etl-mapping for joining_props, we use {node}_id
# for backward compatibility, we check first with the value in mapping file.
# if there is not any Prop object like that, we check with new format _{node}_id
id_field_in_df = self.parser.get_prop_by_name(joining_node.joining_field)
if id_field_in_df is None:
id_field_in_df = self.parser.get_prop_by_name(
get_node_id_name(self.parser.doc_type)
)
if id_field_in_df is None:
raise Exception(
"{} field does not exist in index {}".format(
joining_node.joining_field, self.parser.doc_type
)
)
id_field_in_df_id = id_field_in_df.id
df_key_id = self.parser.get_key_prop().id

swap_df = False
if joining_df_key_id != field_id_in_joining_df:
if joining_df_key_id != id_field_in_joining_df:
joining_df = swap_property_as_key(
joining_df, field_id_in_joining_df, joining_df_key_id
joining_df, id_field_in_joining_df, joining_df_key_id
)
else:
df = swap_property_as_key(df, field_id_in_df, df_key_id)
df = swap_property_as_key(df, id_field_in_df_id, df_key_id)
swap_df = True

# Join can be done with or without an aggregation function like max, min, sum, ...
Expand All @@ -265,7 +297,27 @@ def join_to_an_index(self, df, translator, joining_node):
df = self.join_no_aggregate(df, joining_df, props_without_fn)

if swap_df:
df = swap_property_as_key(df, df_key_id, field_id_in_df)
df = swap_property_as_key(df, df_key_id, id_field_in_df_id)
return df

def ensure_project_id_exist(self, df):
project_id_prop = self.parser.get_prop_by_name(PROJECT_ID)
if project_id_prop is None:
project_id_prop = PropFactory.adding_prop(
self.parser.doc_type, PROJECT_ID, None, [], prop_type=(str,)
)
project_code_id = self.parser.get_prop_by_name(PROJECT_CODE).id
program_name_id = self.parser.get_prop_by_name(PROGRAM_NAME).id
df = df.mapValues(
lambda x: merge_dictionary(
x,
{
project_id_prop.id: "{}-{}".format(
x.get(program_name_id), x.get(project_code_id)
)
},
)
)
return df

def translate(self):
Expand All @@ -274,6 +326,7 @@ def translate(self):
root_df = self.translate_special(root_df)
root_df = self.translate_parent(root_df)
root_df = self.get_direct_children(root_df)
root_df = self.ensure_project_id_exist(root_df)
if len(self.parser.aggregated_nodes) == 0:
return root_df
return root_df.join(self.aggregate_nested_properties()).mapValues(
Expand Down
8 changes: 5 additions & 3 deletions tube/etl/indexers/base/lambdas.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
import ast
import json
import collections
import sys


def extract_metadata(str_value):
Expand Down Expand Up @@ -56,11 +55,14 @@ def swap_key_value(df):

def get_props(names, values):
return lambda x: {
names[src]: values[src][v]
if isinstance(v, collections.Hashable) and src in values and v in values[src]
p_id: values[src][p_id][v]
if isinstance(v, collections.Hashable)
and src in values
and v in values[src][p_id]
else v
for (src, v) in list(x.items())
if src in list(names.keys())
for p_id in names[src]
}


Expand Down
5 changes: 3 additions & 2 deletions tube/etl/indexers/base/parser.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
from tube.utils.general import get_node_id_name
from ..base.prop import PropFactory
from tube.utils.dd import get_properties_types

Expand All @@ -16,7 +17,7 @@ def __init__(self, mapping, model):
self.joining_nodes = []
PropFactory.adding_prop(
self.doc_type,
"{}_id".format(self.doc_type),
get_node_id_name(self.doc_type),
"",
[],
src_node=None,
Expand Down Expand Up @@ -67,7 +68,7 @@ def select_widest_type(self, types):

def get_key_prop(self):
return PropFactory.get_prop_by_name(
self.doc_type, "{}_id".format(self.doc_type)
self.doc_type, get_node_id_name(self.doc_type)
)

def get_prop_by_name(self, name):
Expand Down
6 changes: 6 additions & 0 deletions tube/etl/indexers/base/prop.py
Original file line number Diff line number Diff line change
Expand Up @@ -110,5 +110,11 @@ def __str__(self):
def __repr__(self):
return self.__str__()

def __eq__(self, other):
return self.id == other.id

def __ne__(self, other):
return self.id != other.id

def update_type(self, prop_type):
self.type = prop_type
27 changes: 24 additions & 3 deletions tube/etl/indexers/base/translator.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,8 @@
get_props_empty_values,
get_number,
)
from tube.etl.indexers.base.prop import PropFactory
from tube.utils.spark import save_rds
from .prop import PropFactory


class Translator(object):
Expand Down Expand Up @@ -72,8 +72,29 @@ def write(self, df):
def get_props_from_data_row(self, df, props, to_tuple=False):
if df.isEmpty():
return df.mapValues(get_props_empty_values(props))
names = {p.src: p.id for p in props}
values = {p.src: {m.original: m.final for m in p.value_mappings} for p in props}
# names is dictionary which maps from the name of source fields in datatable to the list of ids
# of properties in dataframe
# example: names = {"gender": [1, 2], project_name: [3]}
names = {}
# values is a dictionary which defines the mapping values (if exist) for each field.
# values = {
# "gender": {
# 1: {"male": "M", "female": "F"},
# 2: {"male": "Male", "female": "Female}
# },
# "project_name": {3: {}}
# }
values = {}
for p in props:
n = names.get(p.src, [])
n.append(p.id)
names[p.src] = n
v = values.get(p.src, {})
v[p.id] = {}
for m in p.value_mappings:
v[p.id][m.original] = m.final
values[p.src] = v

return df.mapValues(get_props(names, values))

def get_props_from_df(self, df, props):
Expand Down
7 changes: 5 additions & 2 deletions tube/etl/indexers/injection/parser.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@
get_parent_label,
get_node_category,
)
from tube.utils.general import PROJECT_CODE
from tube.utils.general import PROJECT_CODE, PROGRAM_NAME
from tube.etl.indexers.base.parser import Parser as BaseParser
from tube.etl.indexers.injection.nodes.collecting_node import (
CollectingNode,
Expand Down Expand Up @@ -44,6 +44,9 @@ def __repr__(self):
def __eq__(self, other):
return self.__key__() == other.__key__()

def __ne__(self, other):
return self.__key__() != other.__key__()


class NodePath(object):
def __init__(self, class_name, upper_path):
Expand Down Expand Up @@ -136,7 +139,7 @@ def get_props_for_nodes(self):
)
else:
node_props = v.get("props")
node_props.append({"name": "program_name", "src": "name"})
node_props.append({"name": PROGRAM_NAME, "src": "name"})
roots[k] = RootNode(
k,
get_node_table_name(self.model, k),
Expand Down
8 changes: 4 additions & 4 deletions tube/etl/indexers/injection/translator.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
get_frame_zero,
)
from tube.etl.indexers.injection.nodes.collecting_node import LeafNode
from tube.utils.general import PROJECT_ID, PROJECT_CODE, PROGRAM_NAME
from tube.utils.general import PROJECT_ID, PROJECT_CODE, PROGRAM_NAME, get_node_id_name


class Translator(BaseTranslator):
Expand Down Expand Up @@ -53,7 +53,7 @@ def collect_collecting_child(self, child, edge_df, collected_collecting_dfs):
child.no_parent_to_map -= 1
if len(child.props) > 0:
child_df = self.translate_table(child.tbl_name, props=child.props)
node = self.parser.get_prop_by_name("{}_id".format(child.name))
node = self.parser.get_prop_by_name(get_node_id_name(child.name))
node_id = node.id if node is not None else None
if node_id is not None:
child_df = child_df.map(
Expand Down Expand Up @@ -84,12 +84,12 @@ def merge_project(self, child, edge_df, collected_collecting_dfs):
return
child.no_parent_to_map -= 1
child_df = self.translate_table(child.tbl_name, props=child.props)
project_code_id = self.parser.get_prop_by_name(PROJECT_CODE).id
child_df = child_df.join(edge_df).mapValues(
lambda x: merge_dictionary(x[0], x[1])
)
program_name_id = self.parser.get_prop_by_name(PROGRAM_NAME).id

project_code_id = self.parser.get_prop_by_name(PROJECT_CODE).id
program_name_id = self.parser.get_prop_by_name(PROGRAM_NAME).id
project_id_prop = self.parser.get_prop_by_name(PROJECT_ID)
if project_id_prop is None:
project_id_prop = PropFactory.adding_prop(
Expand Down
2 changes: 2 additions & 0 deletions tube/etl/indexers/interpreter.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,8 @@ def run_transform(translators):

for translator in list(translators.values()):
df = translator.translate()
if df is None:
continue
translator.save_to_hadoop(df)
translator.current_step = 1
if len(translator.parser.joining_nodes) > 0:
Expand Down
3 changes: 2 additions & 1 deletion tube/etl/outputs/es/writer.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,10 +9,11 @@
from tube.etl.outputs.es.versioning import Versioning
from tube.etl.plugins import post_process_plugins, add_auth_resource_path_mapping
from tube.etl.spark_base import SparkBase
from tube.utils.general import get_node_id_name


def json_export(x, doc_type):
x[1]["{}_id".format(doc_type)] = x[0]
x[1][get_node_id_name(doc_type)] = x[0]
x[1]["node_id"] = x[0] # redundant field for backward compatibility with arranger
return (x[0], json.dumps(x[1]))

Expand Down
8 changes: 8 additions & 0 deletions tube/utils/general.py
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,14 @@ def get_resource_path_from_json(results, json_data):
return results


def get_node_id_name(name):
return "_{}_id".format(name)


def get_node_id_name_without_prefix(name):
return "{}_id".format(name)


PROGRAM_NAME = "program_name"
PROJECT_CODE = "project_code"
PROJECT_ID = "project_id"

0 comments on commit dfcfd51

Please sign in to comment.