Skip to content

Commit

Permalink
changed to do_nothing, download diff
Browse files Browse the repository at this point in the history
  • Loading branch information
vladimir2217 committed Dec 4, 2024
1 parent 801715c commit bc74808
Show file tree
Hide file tree
Showing 2 changed files with 49 additions and 23 deletions.
11 changes: 7 additions & 4 deletions dags/annotate_and_index.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,10 +20,10 @@
default_args=default_args,
params=
{
'repository_id': "def_repository_id",
'branch_name': 'def_branch_name',
'commitid_from': 'commit_id_from',
'commitid_to': 'commit_id_to'
"repository_id": None,
"branch_name": None,
"commitid_from": None,
"commitid_to": None
},
schedule_interval=None
) as dag:
Expand All @@ -49,3 +49,6 @@
# . . .

init >> create_pipeline_taskgroup(dag, pipeline_class, config) >> finish



61 changes: 42 additions & 19 deletions dags/roger/tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -286,19 +286,33 @@ def setup_input_data(context, exec_conf):
repo['path'] = '*'
logger.info(f"repos : {repos}")
for r in repos:
logger.info("downloading %s from %s@%s to %s",
r['path'], r['repo'], r['branch'], input_dir)
# create path to download to ...
if not os.path.exists(input_dir + f'/{r["repo"]}'):
os.mkdir(input_dir + f'/{r["repo"]}')
get_files(
local_path=input_dir + f'/{r["repo"]}',
remote_path=r['path'],
branch=r['branch'],
repo=r['repo'],
changes_only=False,
lake_fs_client=client
)

if not r["param_repo"]["repository_id"]:
logger.info("downloading %s from %s@%s to %s", r['path'], r['repo'], r['branch'], input_dir)
get_files(
local_path=input_dir + f'/{r["repo"]}',
remote_path=r['path'],
branch=r['branch'],
repo=r['repo'],
changes_only=False,
lake_fs_client=client
)
else:
logger.info("downloading %s from %s@%s to %s", r['path'], r["param_repo"]["repository_id"], r["param_repo"]["branch_name"], input_dir)
logger.info("from commit %s to commit %s", r["param_repo"]["commitid_from"], r["param_repo"]["commitid_to"])
get_files(
local_path=input_dir + f'/{r["repo"]}',
remote_path=r['path'],
branch=r["param_repo"]["branch_name"],
repo=r["param_repo"]["repository_id"],
commit_from=r["param_repo"]["commitid_from"],
commit_to=r["param_repo"]["commitid_to"],
changes_only=True,
lake_fs_client=client
)


def create_python_task(dag, name, a_callable, func_kwargs=None, external_repos = {}, pass_conf=True, no_output_files=False):
Expand Down Expand Up @@ -343,7 +357,8 @@ def create_python_task(dag, name, a_callable, func_kwargs=None, external_repos =
'repos': [{
'repo': r['name'],
'branch': r['branch'],
'path': r.get('path', '*')
'path': r.get('path', '*'),
'param_repo' : r['param_repo']
} for r in external_repos]
}

Expand Down Expand Up @@ -373,22 +388,30 @@ def create_pipeline_taskgroup(
input_dataset_version = pipeline_class.input_version

with TaskGroup(group_id=f"{name}_dataset_pipeline_task_group") as tg:
do_nothing = lambda *args: None
with pipeline_class(config=configparam, **kwargs) as pipeline:
pipeline: DugPipeline
annotate_task = create_python_task(
dag,
f"annotate_{name}_files",
pipeline.annotate,
do_nothing,
external_repos=[{
'name': getattr(pipeline_class, 'pipeline_name'),
'branch': input_dataset_version
'branch': input_dataset_version,
'param_repo':
{
"repository_id": dag.params.get("repository_id") or None,
"branch_name": dag.params.get("branch_name") or None,
"commitid_from": dag.params.get("commitid_from") or None,
"commitid_to": dag.params.get("commitid_to") or None,
}
}],
pass_conf=False)

index_variables_task = create_python_task(
dag,
f"index_{name}_variables",
pipeline.index_variables,
do_nothing,
pass_conf=False,
# declare that this task will not generate files.
no_output_files=True)
Expand All @@ -397,7 +420,7 @@ def create_pipeline_taskgroup(
validate_index_variables_task = create_python_task(
dag,
f"validate_{name}_index_variables",
pipeline.validate_indexed_variables,
do_nothing,
pass_conf=False,
# declare that this task will not generate files.
no_output_files=True
Expand All @@ -407,21 +430,21 @@ def create_pipeline_taskgroup(
make_kgx_task = create_python_task(
dag,
f"make_kgx_{name}",
pipeline.make_kg_tagged,
do_nothing,
pass_conf=False)
make_kgx_task.set_upstream(annotate_task)

crawl_task = create_python_task(
dag,
f"crawl_{name}",
pipeline.crawl_tranql,
do_nothing,
pass_conf=False)
crawl_task.set_upstream(annotate_task)

index_concepts_task = create_python_task(
dag,
f"index_{name}_concepts",
pipeline.index_concepts,
do_nothing,
pass_conf=False,
# declare that this task will not generate files.
no_output_files=True)
Expand All @@ -430,7 +453,7 @@ def create_pipeline_taskgroup(
validate_index_concepts_task = create_python_task(
dag,
f"validate_{name}_index_concepts",
pipeline.validate_indexed_concepts,
do_nothing,
pass_conf=False,
# declare that this task will not generate files.
no_output_files=True
Expand Down

0 comments on commit bc74808

Please sign in to comment.