-
Notifications
You must be signed in to change notification settings - Fork 9
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. Weβll occasionally send you account related emails.
Already on GitHub? Sign in to your account
feat(dag): add data transfer task group for release process #528
Conversation
chore(l2g): update inclusion features list with newer QTLs
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thank you!
Changes here mostly concern adding a group of tasks that place all necessary files in the release bucket prior to the ETL run. Therefore, I'd suggest changing the PR title to sth like feat(dag): add data transfer task group for release process
This is the ETL DAG now:
I made some comments, let me know your thoughts. Overall, I like the implementation and the idea of introducing the ShortCiruitOperator to safeguard ourselves from rerunning unnecessary processes is very nice.
@@ -36,9 +36,9 @@ anderson: ${datasets.static_assets}/andersson2014/enhancer_tss_associations.bed | |||
javierre: ${datasets.static_assets}/javierre_2016_preprocessed | |||
jung: ${datasets.static_assets}/jung2019_pchic_tableS3.csv | |||
thurman: ${datasets.static_assets}/thurman2012/genomewideCorrs_above0.7_promoterPlusMinus500kb_withGeneNames_32celltypeCategories.bed8.gz | |||
target_index: ${datasets.release_folder}/targets # OTP 23.12 data | |||
target_index: ${datasets.static_assets}/targets # OTP 23.12 data |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Would it make sense to just point to our OTP release bucket?
@@ -36,9 +36,9 @@ anderson: ${datasets.static_assets}/andersson2014/enhancer_tss_associations.bed | |||
javierre: ${datasets.static_assets}/javierre_2016_preprocessed | |||
jung: ${datasets.static_assets}/jung2019_pchic_tableS3.csv | |||
thurman: ${datasets.static_assets}/thurman2012/genomewideCorrs_above0.7_promoterPlusMinus500kb_withGeneNames_32celltypeCategories.bed8.gz | |||
target_index: ${datasets.release_folder}/targets # OTP 23.12 data | |||
target_index: ${datasets.static_assets}/targets # OTP 23.12 data | |||
gene_interactions: ${datasets.static_assets}/interaction # OTP 23.12 data |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Would it make sense to just point to our OTP release bucket?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It might makes sense, however the service account can't read that bucket. And I don't see a reason to extend its permission to other buckets. In a way, while the platform and the genetics etls are separated, this solution is fine.
src/airflow/dags/test_DAG.py
Outdated
|
||
# Datasource paths: | ||
GWAS_CATALOG_BUCKET_NAME = "gwas_catalog_data" | ||
EQTL_BUCKET_NAME = "eqtl_catalog_data" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It doesn't concern this PR and it's incredibly minor, but it annoys me a bit that the source name is incorrect. Do you think we should rename it to eqtl_catalogue_data
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, totally makes sense. It must be that the US form was derived from the GWAS Catalog.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
because of funding as far as I understand
src/airflow/dags/test_DAG.py
Outdated
|
||
|
||
# Test if release folder exists: | ||
def test_release_folder_exists() -> bool: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This util might be useful as a sensor in other instances.
You could parametrise it and move it to common_airflow.py
def test_release_folder_exists(bucket: str, path: str) -> bool:
"""This function tests if the release folder exists.
Args:
bucket (str): Name of the bucket in GCS
path (str): Object name in GCS to check
Returns:
bool: False if the folder exists, True otherwise.
"""
hook = GCSHook(gcp_conn_id="google_cloud_default")
return not hook.exists(bucket, path)
Minor, but the GCSHook connector is interesting
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Agree, it makes sense to generalise.
Co-authored-by: Irene LΓ³pez <45119610+ireneisdoomed@users.noreply.github.com>
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thank you for addressing the comments!
src/airflow/dags/common_airflow.py
Outdated
@@ -59,6 +60,24 @@ | |||
} | |||
|
|||
|
|||
# Test if release folder exists: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
# Test if release folder exists: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Unfortunately we are still not there yet. (that's why I haven't ask for a re-reivew) For some reason the the gcshook.exists() stopped working. Now I'm trying to find out what is the problem. Keep you posted.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I see, thanks for testing it.
python_callable=lambda bucket, path: not common.check_gcp_folder_exists( | ||
bucket, path | ||
), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It might be controversial, but I'm up for defending this decision: I don't like to write separate functions for testing the presence and for testing the absence of something. I like the idea to test something positive, because that's eaiser to understand eg. the tested folder is there -> returns True
.
However it implies that if we need to test the absence, we have to flip the boolean value. We can either flip the return value or make the function flexible to decide if we want the presence or the absence. I don't like the latter solution. This means if I'm using an airflow operator on a python callable, I need to use lambda. It might not be the nicest, but I think this is the right choice here.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Not controversial to me. Functions are meant to be reused. Thanks!
Thank you for the changes! Was the hook failing or it simply stopped working? |
Apparently the |
β¨ Context
Some updates were made for the next data release + minor improvements in the Airflow layer. Scope defined under #3238.
π What does this PR implement
π¦ Before submitting
dev
branch?make test
)?poetry run pre-commit run --all-files
)?