-
Notifications
You must be signed in to change notification settings - Fork 101
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
[MC-1458] Add newtab_merino_priors DAG (#6303)
* [MC-1458] Add newtab_merino_priors DAG * Extract shared JSON export function --------- Co-authored-by: Chelsey Beck <64881557+chelseybeck@users.noreply.github.com>
- Loading branch information
1 parent
fe03467
commit 5e08071
Showing
10 changed files
with
418 additions
and
126 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,131 @@ | ||
"""Extract query results and write the combined JSON to a single file.""" | ||
|
||
import json | ||
import logging | ||
from datetime import datetime, timedelta, timezone | ||
|
||
import rich_click as click | ||
from google.cloud import storage # type: ignore | ||
from google.cloud import bigquery | ||
|
||
logging.basicConfig(level=logging.INFO) | ||
log = logging.getLogger(__name__) | ||
|
||
|
||
@click.command() | ||
@click.option( | ||
"--source-project", | ||
required=True, | ||
help="Google Cloud Project where the source table is located.", | ||
) | ||
@click.option( | ||
"--source-dataset", | ||
required=True, | ||
help="Dataset in BigQuery where the source table is located.", | ||
) | ||
@click.option( | ||
"--source-table", required=True, help="Name of the source table in BigQuery." | ||
) | ||
@click.option( | ||
"--destination-bucket", | ||
required=True, | ||
help="Destination Google Cloud Storage Bucket.", | ||
) | ||
@click.option( | ||
"--destination-prefix", required=True, help="Prefix of the bucket path in GCS." | ||
) | ||
@click.option( | ||
"--destination-prefix", required=True, help="Prefix of the bucket path in GCS." | ||
) | ||
@click.option( | ||
"--deletion-days-old", | ||
required=True, | ||
type=int, | ||
help="Number of days after which files in GCS should be deleted.", | ||
) | ||
def export_newtab_merino_table_to_gcs( | ||
source_project: str, | ||
source_dataset: str, | ||
source_table: str, | ||
destination_bucket: str, | ||
destination_prefix: str, | ||
deletion_days_old: int, | ||
): | ||
"""Use bigquery client to export data from BigQuery to GCS.""" | ||
client = bigquery.Client(source_project) | ||
error_counter = 0 | ||
threshold = 1 | ||
|
||
try: | ||
# Generate the current timestamp | ||
timestamp = datetime.utcnow().strftime("%Y%m%d%H%M") | ||
|
||
# BigQuery does not export the proper JSON format, so we use a temp file and reformat | ||
temp_file = "temp.ndjson" | ||
|
||
job_config = bigquery.job.ExtractJobConfig( | ||
destination_format=bigquery.job.DestinationFormat.NEWLINE_DELIMITED_JSON | ||
) | ||
|
||
destination_uri = f"gs://{destination_bucket}/{destination_prefix}/{temp_file}" | ||
|
||
extract_job = client.extract_table( | ||
source=f"{source_project}.{source_dataset}.{source_table}", | ||
destination_uris=[destination_uri], | ||
job_config=job_config, | ||
) | ||
|
||
extract_job.result() # Waits for the job to complete. | ||
|
||
# Verify that job was successful | ||
if extract_job.state != "DONE": | ||
log.error("Export failed with errors:", extract_job.errors) | ||
error_counter += 1 | ||
|
||
# Initialize the storage client | ||
storage_client = storage.Client() | ||
bucket = storage_client.bucket(destination_bucket) | ||
blob = bucket.blob(f"{destination_prefix}/{temp_file}") | ||
|
||
# Read the temporary JSON file from GCS | ||
temp_file_content = blob.download_as_text() | ||
|
||
# Convert the content to a JSON array | ||
json_array = [json.loads(line) for line in temp_file_content.splitlines()] | ||
json_data = json.dumps(json_array, indent=1) | ||
|
||
# Write the JSON array to the final destination files in GCS: | ||
# 1. latest.json is a single file, that's easy to reference from Merino. | ||
# 2. {timestamp}.json keeps a historical record for debugging purposes. | ||
for suffix in ["latest", timestamp]: | ||
final_destination_uri = f"{destination_prefix}/{suffix}.json" | ||
final_blob = bucket.blob(final_destination_uri) | ||
final_blob.upload_from_string(json_data, content_type="application/json") | ||
|
||
# Delete the temporary file from GCS | ||
blob.delete() | ||
|
||
# Delete files older than 3 days | ||
delete_old_files(bucket, destination_prefix, deletion_days_old) | ||
|
||
log.info("Export successful and temporary file deleted") | ||
|
||
except Exception as err: | ||
error_counter += 1 | ||
log.error(f"An error occurred: {err}") | ||
|
||
if error_counter > threshold: | ||
raise Exception( | ||
f"More than the accepted threshold of {threshold} operations failed." | ||
) | ||
|
||
|
||
def delete_old_files(bucket, prefix, days_old): | ||
"""Delete files older than `days_old` days from the bucket with the given prefix.""" | ||
cutoff_date = datetime.now(timezone.utc) - timedelta(days=days_old) | ||
blobs = bucket.list_blobs(prefix=prefix) | ||
|
||
for blob in blobs: | ||
if blob.updated < cutoff_date: | ||
blob.delete() | ||
log.info(f"Deleted {blob.name}") |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
128 changes: 3 additions & 125 deletions
128
sql/moz-fx-data-shared-prod/telemetry_derived/newtab_merino_extract_to_gcs_v1/query.py
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,128 +1,6 @@ | ||
import json | ||
import logging | ||
from datetime import datetime, timedelta, timezone | ||
|
||
import rich_click as click | ||
from google.cloud import storage # type: ignore | ||
from google.cloud import bigquery | ||
|
||
|
||
@click.command() | ||
@click.option( | ||
"--source-project", | ||
required=True, | ||
help="Google Cloud Project where the source table is located.", | ||
) | ||
@click.option( | ||
"--source-dataset", | ||
required=True, | ||
help="Dataset in BigQuery where the source table is located.", | ||
) | ||
@click.option( | ||
"--source-table", required=True, help="Name of the source table in BigQuery." | ||
) | ||
@click.option( | ||
"--destination-bucket", | ||
required=True, | ||
help="Destination Google Cloud Storage Bucket.", | ||
) | ||
@click.option( | ||
"--destination-prefix", required=True, help="Prefix of the bucket path in GCS." | ||
) | ||
|
||
@click.option( | ||
"--deletion-days-old", | ||
required=True, | ||
type=int, | ||
help="Number of days after which files in GCS should be deleted.", | ||
) | ||
|
||
def export_newtab_merino_extract_to_gcs( | ||
source_project: str, | ||
source_dataset: str, | ||
source_table: str, | ||
destination_bucket: str, | ||
destination_prefix: str, | ||
deletion_days_old: int, | ||
): | ||
"""Use bigquery client to export data from BigQuery to GCS.""" | ||
client = bigquery.Client(source_project) | ||
error_counter = 0 | ||
threshold = 1 | ||
|
||
try: | ||
# Generate the current timestamp | ||
timestamp = datetime.utcnow().strftime("%Y%m%d%H%M") | ||
|
||
# BigQuery does not export the proper JSON format, so we use a temp file and reformat | ||
temp_file = "temp.ndjson" | ||
|
||
job_config = bigquery.job.ExtractJobConfig( | ||
destination_format=bigquery.job.DestinationFormat.NEWLINE_DELIMITED_JSON | ||
) | ||
|
||
destination_uri = f"gs://{destination_bucket}/{destination_prefix}/{temp_file}" | ||
|
||
extract_job = client.extract_table( | ||
source=f"{source_project}.{source_dataset}.{source_table}", | ||
destination_uris=[destination_uri], | ||
job_config=job_config, | ||
) | ||
|
||
extract_job.result() # Waits for the job to complete. | ||
|
||
# Verify that job was successful | ||
if extract_job.state != "DONE": | ||
logging.error("Export failed with errors:", extract_job.errors) | ||
error_counter += 1 | ||
|
||
# Initialize the storage client | ||
storage_client = storage.Client() | ||
bucket = storage_client.bucket(destination_bucket) | ||
blob = bucket.blob(f"{destination_prefix}/{temp_file}") | ||
|
||
# Read the temporary JSON file from GCS | ||
temp_file_content = blob.download_as_text() | ||
|
||
# Convert the content to a JSON array | ||
json_array = [json.loads(line) for line in temp_file_content.splitlines()] | ||
|
||
# Write the JSON array to the final destination file in GCS | ||
final_destination_uri = f"{destination_prefix}/engagement_{timestamp}.json" | ||
final_blob = bucket.blob(final_destination_uri) | ||
final_blob.upload_from_string( | ||
json.dumps(json_array, indent=1), content_type="application/json" | ||
) | ||
|
||
# Delete the temporary file from GCS | ||
blob.delete() | ||
|
||
# Delete files older than 3 days | ||
delete_old_files(bucket, destination_prefix, deletion_days_old) | ||
|
||
logging.info("Export successful and temporary file deleted") | ||
|
||
except Exception as err: | ||
error_counter += 1 | ||
logging.error(f"An error occurred: {err}") | ||
|
||
if error_counter > threshold: | ||
raise Exception( | ||
f"More than the accepted threshold of {threshold} operations failed." | ||
) | ||
|
||
|
||
def delete_old_files(bucket, prefix, days_old): | ||
"""Delete files older than `days_old` days from the bucket with the given prefix.""" | ||
cutoff_date = datetime.now(timezone.utc) - timedelta(days=days_old) | ||
blobs = bucket.list_blobs(prefix=prefix) | ||
|
||
for blob in blobs: | ||
if blob.time_created < cutoff_date: | ||
blob.delete() | ||
logging.info(f"Deleted {blob.name}") | ||
"""Extract New Tab engagement query results and write the combined JSON to a single file.""" | ||
|
||
from bigquery_etl.newtab_merino import export_newtab_merino_table_to_gcs | ||
|
||
if __name__ == "__main__": | ||
logging.basicConfig(level=logging.INFO) | ||
export_newtab_merino_extract_to_gcs() | ||
export_newtab_merino_table_to_gcs() |
22 changes: 22 additions & 0 deletions
22
sql/moz-fx-data-shared-prod/telemetry_derived/newtab_merino_priors_to_gcs_v1/metadata.yaml
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,22 @@ | ||
friendly_name: Newtab Merino BigQuery Prior stats to Google Cloud Storage (GCS) | ||
description: |- | ||
Newtab stats that inform the Thompson sampling priors are exported to a GCS | ||
bucket for Merino to consume. The table rebuilds daily and aggregates 7 days | ||
of data. | ||
owners: | ||
- cbeck@mozilla.com | ||
- gkatre@mozilla.com | ||
labels: | ||
incremental: false | ||
owner1: cbeck | ||
scheduling: | ||
dag_name: bqetl_merino_newtab_priors_to_gcs | ||
arguments: | ||
- --source-project=moz-fx-data-shared-prod | ||
- --source-dataset=telemetry_derived | ||
- --source-table=newtab_merino_priors_v1 | ||
- --destination-bucket=merino-airflow-data-prodpy | ||
- --destination-prefix=newtab-merino-exports/priors | ||
- --deletion-days-old=3 | ||
bigquery: null | ||
references: {} |
6 changes: 6 additions & 0 deletions
6
sql/moz-fx-data-shared-prod/telemetry_derived/newtab_merino_priors_to_gcs_v1/query.py
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,6 @@ | ||
"""Extract Thompson sampling prior query results and write the combined JSON to a single file.""" | ||
|
||
from bigquery_etl.newtab_merino import export_newtab_merino_table_to_gcs | ||
|
||
if __name__ == "__main__": | ||
export_newtab_merino_table_to_gcs() |
10 changes: 10 additions & 0 deletions
10
sql/moz-fx-data-shared-prod/telemetry_derived/newtab_merino_priors_v1/checks.sql
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,10 @@ | ||
-- macro checks | ||
|
||
#fail | ||
{{ not_null(["average_ctr_top2_items"]) }} | ||
|
||
#fail | ||
{{ not_null(["impressions_per_item"]) }} | ||
|
||
#fail | ||
{{ min_row_count(1) }} |
15 changes: 15 additions & 0 deletions
15
sql/moz-fx-data-shared-prod/telemetry_derived/newtab_merino_priors_v1/metadata.yaml
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,15 @@ | ||
friendly_name: Newtab Merino Priors | ||
description: |- | ||
Queries New Tab stats used by Merino to calculate Thompson sampling priors. | ||
These determine how new items (without engagement data) are ranked on New Tab. | ||
owners: | ||
- cbeck@mozilla.com | ||
- gkatre@mozilla.com | ||
labels: | ||
incremental: false | ||
owner: cbeck | ||
bigquery: | ||
time_partitioning: null | ||
scheduling: | ||
dag_name: bqetl_merino_newtab_priors_to_gcs | ||
date_partition_parameter: null |
Oops, something went wrong.
5e08071
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Integration report for "[MC-1458] Add newtab_merino_priors DAG (#6303)"
sql.diff
Click to expand!
Link to full diff