Skip to content

Commit

Permalink
Merge pull request #229 from loglabs/shreyashankar/iopmod
Browse files Browse the repository at this point in the history
Modify get io pointer
  • Loading branch information
shreyashankar authored Sep 15, 2021
2 parents b252048 + 3aee768 commit edc782b
Show file tree
Hide file tree
Showing 7 changed files with 159 additions and 92 deletions.
54 changes: 32 additions & 22 deletions mltrace/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ def clean_db():


def create_component(
name: str, description: str, owner: str, tags: typing.List[str] = []
name: str, description: str, owner: str, tags: typing.List[str] = []
):
"""Creates a component entity in the database."""
store = Store(_db_uri)
Expand All @@ -72,9 +72,9 @@ def tag_component(component_name: str, tags: typing.List[str]):


def log_component_run(
component_run: ComponentRun,
set_dependencies_from_inputs=True,
staleness_threshold: int = (60 * 60 * 24 * 30),
component_run: ComponentRun,
set_dependencies_from_inputs=True,
staleness_threshold: int = (60 * 60 * 24 * 30),
):
"""Takes client-facing ComponentRun object and logs it to the DB."""
store = Store(_db_uri)
Expand Down Expand Up @@ -107,13 +107,17 @@ def log_component_run(
# Add I/O
component_run_sql.add_inputs(
[
store.get_io_pointer(inp.name, inp.pointer_type)
store.get_io_pointer(
inp.name, inp.value, pointer_type=inp.pointer_type
)
for inp in component_run_dict["inputs"]
]
)
component_run_sql.add_outputs(
[
store.get_io_pointer(out.name, out.pointer_type)
store.get_io_pointer(
out.name, out.value, pointer_type=out.pointer_type
)
for out in component_run_dict["outputs"]
]
)
Expand Down Expand Up @@ -144,13 +148,13 @@ def create_random_ids(num_outputs=1) -> typing.List[str]:


def register(
component_name: str,
inputs: typing.List[str] = [],
outputs: typing.List[str] = [],
input_vars: typing.List[str] = [],
output_vars: typing.List[str] = [],
endpoint: bool = False,
staleness_threshold: int = (60 * 60 * 24 * 30),
component_name: str,
inputs: typing.List[str] = [],
outputs: typing.List[str] = [],
input_vars: typing.List[str] = [],
output_vars: typing.List[str] = [],
endpoint: bool = False,
staleness_threshold: int = (60 * 60 * 24 * 30),
):
def actual_decorator(func):
@functools.wraps(func)
Expand Down Expand Up @@ -216,15 +220,17 @@ def trace_helper(_frame, event, arg):
continue
if isinstance(val, list):
output_pointers += (
store.get_io_pointers(val, PointerTypeEnum.ENDPOINT)
store.get_io_pointers(
val, pointer_type=PointerTypeEnum.ENDPOINT
)
if endpoint
else store.get_io_pointers(val)
)
else:
output_pointers += (
[
store.get_io_pointer(
str(val), PointerTypeEnum.ENDPOINT
str(val), pointer_type=PointerTypeEnum.ENDPOINT
)
]
if endpoint
Expand All @@ -238,7 +244,9 @@ def trace_helper(_frame, event, arg):
input_pointers = [store.get_io_pointer(inp) for inp in inputs]
output_pointers = (
[
store.get_io_pointer(out, PointerTypeEnum.ENDPOINT)
store.get_io_pointer(
out, pointer_type=PointerTypeEnum.ENDPOINT
)
for out in outputs
]
if endpoint
Expand Down Expand Up @@ -332,10 +340,10 @@ def unflag_all():


def get_history(
component_name: str,
limit: int = 10,
date_lower: typing.Union[datetime, str] = datetime.min,
date_upper: typing.Union[datetime, str] = datetime.max,
component_name: str,
limit: int = 10,
date_lower: typing.Union[datetime, str] = datetime.min,
date_upper: typing.Union[datetime, str] = datetime.max,
) -> typing.List[ComponentRun]:
"""Returns a list of ComponentRuns that are part of the component's
history."""
Expand Down Expand Up @@ -433,10 +441,12 @@ def get_recent_run_ids(limit: int = 5, last_run_id=None):
return store.get_recent_run_ids(limit, last_run_id)


def get_io_pointer(io_pointer_id: str, create=True):
def get_io_pointer(
io_pointer_id: str, io_pointer_val: typing.Any = None, create=True
):
"""Returns IO Pointer metadata."""
store = Store(_db_uri)
iop = store.get_io_pointer(io_pointer_id, create)
iop = store.get_io_pointer(io_pointer_id, io_pointer_val, create=create)
return IOPointer.from_dictionary(iop.__dict__)


Expand Down
Loading

0 comments on commit edc782b

Please sign in to comment.