diff --git a/bigquery_etl/newtab_merino/__init__.py b/bigquery_etl/newtab_merino/__init__.py new file mode 100644 index 00000000000..d348674ef6c --- /dev/null +++ b/bigquery_etl/newtab_merino/__init__.py @@ -0,0 +1,131 @@ +"""Extract query results and write the combined JSON to a single file.""" + +import json +import logging +from datetime import datetime, timedelta, timezone + +import rich_click as click +from google.cloud import storage # type: ignore +from google.cloud import bigquery + +logging.basicConfig(level=logging.INFO) +log = logging.getLogger(__name__) + + +@click.command() +@click.option( + "--source-project", + required=True, + help="Google Cloud Project where the source table is located.", +) +@click.option( + "--source-dataset", + required=True, + help="Dataset in BigQuery where the source table is located.", +) +@click.option( + "--source-table", required=True, help="Name of the source table in BigQuery." +) +@click.option( + "--destination-bucket", + required=True, + help="Destination Google Cloud Storage Bucket.", +) +@click.option( + "--destination-prefix", required=True, help="Prefix of the bucket path in GCS." +) +@click.option( + "--destination-prefix", required=True, help="Prefix of the bucket path in GCS." +) +@click.option( + "--deletion-days-old", + required=True, + type=int, + help="Number of days after which files in GCS should be deleted.", +) +def export_newtab_merino_table_to_gcs( + source_project: str, + source_dataset: str, + source_table: str, + destination_bucket: str, + destination_prefix: str, + deletion_days_old: int, +): + """Use bigquery client to export data from BigQuery to GCS.""" + client = bigquery.Client(source_project) + error_counter = 0 + threshold = 1 + + try: + # Generate the current timestamp + timestamp = datetime.utcnow().strftime("%Y%m%d%H%M") + + # BigQuery does not export the proper JSON format, so we use a temp file and reformat + temp_file = "temp.ndjson" + + job_config = bigquery.job.ExtractJobConfig( + destination_format=bigquery.job.DestinationFormat.NEWLINE_DELIMITED_JSON + ) + + destination_uri = f"gs://{destination_bucket}/{destination_prefix}/{temp_file}" + + extract_job = client.extract_table( + source=f"{source_project}.{source_dataset}.{source_table}", + destination_uris=[destination_uri], + job_config=job_config, + ) + + extract_job.result() # Waits for the job to complete. + + # Verify that job was successful + if extract_job.state != "DONE": + log.error("Export failed with errors:", extract_job.errors) + error_counter += 1 + + # Initialize the storage client + storage_client = storage.Client() + bucket = storage_client.bucket(destination_bucket) + blob = bucket.blob(f"{destination_prefix}/{temp_file}") + + # Read the temporary JSON file from GCS + temp_file_content = blob.download_as_text() + + # Convert the content to a JSON array + json_array = [json.loads(line) for line in temp_file_content.splitlines()] + json_data = json.dumps(json_array, indent=1) + + # Write the JSON array to the final destination files in GCS: + # 1. latest.json is a single file, that's easy to reference from Merino. + # 2. {timestamp}.json keeps a historical record for debugging purposes. + for suffix in ["latest", timestamp]: + final_destination_uri = f"{destination_prefix}/{suffix}.json" + final_blob = bucket.blob(final_destination_uri) + final_blob.upload_from_string(json_data, content_type="application/json") + + # Delete the temporary file from GCS + blob.delete() + + # Delete files older than 3 days + delete_old_files(bucket, destination_prefix, deletion_days_old) + + log.info("Export successful and temporary file deleted") + + except Exception as err: + error_counter += 1 + log.error(f"An error occurred: {err}") + + if error_counter > threshold: + raise Exception( + f"More than the accepted threshold of {threshold} operations failed." + ) + + +def delete_old_files(bucket, prefix, days_old): + """Delete files older than `days_old` days from the bucket with the given prefix.""" + cutoff_date = datetime.now(timezone.utc) - timedelta(days=days_old) + blobs = bucket.list_blobs(prefix=prefix) + + for blob in blobs: + if blob.updated < cutoff_date: + blob.delete() + log.info(f"Deleted {blob.name}") diff --git a/dags.yaml b/dags.yaml index 135488cd2e0..afebaff8868 100644 --- a/dags.yaml +++ b/dags.yaml @@ -1790,6 +1790,27 @@ bqetl_merino_newtab_extract_to_gcs: - repo/bigquery-etl - impact/tier_1 +bqetl_merino_newtab_priors_to_gcs: + default_args: + depends_on_past: false + email: + - cbeck@mozilla.com + - gkatre@mozilla.com + email_on_failure: true + email_on_retry: false + end_date: null + owner: cbeck@mozilla.com + retries: 2 + retry_delay: 5m + start_date: '2024-10-08' + description: | + Aggregates Newtab stats that land in a GCS bucket for Merino to derive Thompson sampling priors. + repo: bigquery-etl + schedule_interval: "0 2 * * *" + tags: + - repo/bigquery-etl + - impact/tier_1 + bqetl_dynamic_dau: default_args: depends_on_past: false diff --git a/sql/moz-fx-data-shared-prod/telemetry_derived/newtab_merino_extract_to_gcs_v1/metadata.yaml b/sql/moz-fx-data-shared-prod/telemetry_derived/newtab_merino_extract_to_gcs_v1/metadata.yaml index 8b686fcf7d2..a18b2c78304 100644 --- a/sql/moz-fx-data-shared-prod/telemetry_derived/newtab_merino_extract_to_gcs_v1/metadata.yaml +++ b/sql/moz-fx-data-shared-prod/telemetry_derived/newtab_merino_extract_to_gcs_v1/metadata.yaml @@ -15,7 +15,7 @@ scheduling: - --source-dataset=telemetry_derived - --source-table=newtab_merino_extract_v1 - --destination-bucket=merino-airflow-data-prodpy - - --destination-prefix=newtab-merino-exports + - --destination-prefix=newtab-merino-exports/engagement - --deletion-days-old=3 bigquery: null references: {} diff --git a/sql/moz-fx-data-shared-prod/telemetry_derived/newtab_merino_extract_to_gcs_v1/query.py b/sql/moz-fx-data-shared-prod/telemetry_derived/newtab_merino_extract_to_gcs_v1/query.py index 829530cfd54..8f6af7b4740 100644 --- a/sql/moz-fx-data-shared-prod/telemetry_derived/newtab_merino_extract_to_gcs_v1/query.py +++ b/sql/moz-fx-data-shared-prod/telemetry_derived/newtab_merino_extract_to_gcs_v1/query.py @@ -1,128 +1,6 @@ -import json -import logging -from datetime import datetime, timedelta, timezone - -import rich_click as click -from google.cloud import storage # type: ignore -from google.cloud import bigquery - - -@click.command() -@click.option( - "--source-project", - required=True, - help="Google Cloud Project where the source table is located.", -) -@click.option( - "--source-dataset", - required=True, - help="Dataset in BigQuery where the source table is located.", -) -@click.option( - "--source-table", required=True, help="Name of the source table in BigQuery." -) -@click.option( - "--destination-bucket", - required=True, - help="Destination Google Cloud Storage Bucket.", -) -@click.option( - "--destination-prefix", required=True, help="Prefix of the bucket path in GCS." -) - -@click.option( - "--deletion-days-old", - required=True, - type=int, - help="Number of days after which files in GCS should be deleted.", -) - -def export_newtab_merino_extract_to_gcs( - source_project: str, - source_dataset: str, - source_table: str, - destination_bucket: str, - destination_prefix: str, - deletion_days_old: int, -): - """Use bigquery client to export data from BigQuery to GCS.""" - client = bigquery.Client(source_project) - error_counter = 0 - threshold = 1 - - try: - # Generate the current timestamp - timestamp = datetime.utcnow().strftime("%Y%m%d%H%M") - - # BigQuery does not export the proper JSON format, so we use a temp file and reformat - temp_file = "temp.ndjson" - - job_config = bigquery.job.ExtractJobConfig( - destination_format=bigquery.job.DestinationFormat.NEWLINE_DELIMITED_JSON - ) - - destination_uri = f"gs://{destination_bucket}/{destination_prefix}/{temp_file}" - - extract_job = client.extract_table( - source=f"{source_project}.{source_dataset}.{source_table}", - destination_uris=[destination_uri], - job_config=job_config, - ) - - extract_job.result() # Waits for the job to complete. - - # Verify that job was successful - if extract_job.state != "DONE": - logging.error("Export failed with errors:", extract_job.errors) - error_counter += 1 - - # Initialize the storage client - storage_client = storage.Client() - bucket = storage_client.bucket(destination_bucket) - blob = bucket.blob(f"{destination_prefix}/{temp_file}") - - # Read the temporary JSON file from GCS - temp_file_content = blob.download_as_text() - - # Convert the content to a JSON array - json_array = [json.loads(line) for line in temp_file_content.splitlines()] - - # Write the JSON array to the final destination file in GCS - final_destination_uri = f"{destination_prefix}/engagement_{timestamp}.json" - final_blob = bucket.blob(final_destination_uri) - final_blob.upload_from_string( - json.dumps(json_array, indent=1), content_type="application/json" - ) - - # Delete the temporary file from GCS - blob.delete() - - # Delete files older than 3 days - delete_old_files(bucket, destination_prefix, deletion_days_old) - - logging.info("Export successful and temporary file deleted") - - except Exception as err: - error_counter += 1 - logging.error(f"An error occurred: {err}") - - if error_counter > threshold: - raise Exception( - f"More than the accepted threshold of {threshold} operations failed." - ) - - -def delete_old_files(bucket, prefix, days_old): - """Delete files older than `days_old` days from the bucket with the given prefix.""" - cutoff_date = datetime.now(timezone.utc) - timedelta(days=days_old) - blobs = bucket.list_blobs(prefix=prefix) - - for blob in blobs: - if blob.time_created < cutoff_date: - blob.delete() - logging.info(f"Deleted {blob.name}") +"""Extract New Tab engagement query results and write the combined JSON to a single file.""" +from bigquery_etl.newtab_merino import export_newtab_merino_table_to_gcs if __name__ == "__main__": - logging.basicConfig(level=logging.INFO) - export_newtab_merino_extract_to_gcs() + export_newtab_merino_table_to_gcs() diff --git a/sql/moz-fx-data-shared-prod/telemetry_derived/newtab_merino_priors_to_gcs_v1/metadata.yaml b/sql/moz-fx-data-shared-prod/telemetry_derived/newtab_merino_priors_to_gcs_v1/metadata.yaml new file mode 100644 index 00000000000..3e74a8cbf62 --- /dev/null +++ b/sql/moz-fx-data-shared-prod/telemetry_derived/newtab_merino_priors_to_gcs_v1/metadata.yaml @@ -0,0 +1,22 @@ +friendly_name: Newtab Merino BigQuery Prior stats to Google Cloud Storage (GCS) +description: |- + Newtab stats that inform the Thompson sampling priors are exported to a GCS + bucket for Merino to consume. The table rebuilds daily and aggregates 7 days + of data. +owners: +- cbeck@mozilla.com +- gkatre@mozilla.com +labels: + incremental: false + owner1: cbeck +scheduling: + dag_name: bqetl_merino_newtab_priors_to_gcs + arguments: + - --source-project=moz-fx-data-shared-prod + - --source-dataset=telemetry_derived + - --source-table=newtab_merino_priors_v1 + - --destination-bucket=merino-airflow-data-prodpy + - --destination-prefix=newtab-merino-exports/priors + - --deletion-days-old=3 +bigquery: null +references: {} diff --git a/sql/moz-fx-data-shared-prod/telemetry_derived/newtab_merino_priors_to_gcs_v1/query.py b/sql/moz-fx-data-shared-prod/telemetry_derived/newtab_merino_priors_to_gcs_v1/query.py new file mode 100644 index 00000000000..9664ff1c794 --- /dev/null +++ b/sql/moz-fx-data-shared-prod/telemetry_derived/newtab_merino_priors_to_gcs_v1/query.py @@ -0,0 +1,6 @@ +"""Extract Thompson sampling prior query results and write the combined JSON to a single file.""" + +from bigquery_etl.newtab_merino import export_newtab_merino_table_to_gcs + +if __name__ == "__main__": + export_newtab_merino_table_to_gcs() diff --git a/sql/moz-fx-data-shared-prod/telemetry_derived/newtab_merino_priors_v1/checks.sql b/sql/moz-fx-data-shared-prod/telemetry_derived/newtab_merino_priors_v1/checks.sql new file mode 100644 index 00000000000..9c27811683c --- /dev/null +++ b/sql/moz-fx-data-shared-prod/telemetry_derived/newtab_merino_priors_v1/checks.sql @@ -0,0 +1,10 @@ +-- macro checks + +#fail +{{ not_null(["average_ctr_top2_items"]) }} + +#fail +{{ not_null(["impressions_per_item"]) }} + +#fail +{{ min_row_count(1) }} diff --git a/sql/moz-fx-data-shared-prod/telemetry_derived/newtab_merino_priors_v1/metadata.yaml b/sql/moz-fx-data-shared-prod/telemetry_derived/newtab_merino_priors_v1/metadata.yaml new file mode 100644 index 00000000000..33149aeaf26 --- /dev/null +++ b/sql/moz-fx-data-shared-prod/telemetry_derived/newtab_merino_priors_v1/metadata.yaml @@ -0,0 +1,15 @@ +friendly_name: Newtab Merino Priors +description: |- + Queries New Tab stats used by Merino to calculate Thompson sampling priors. + These determine how new items (without engagement data) are ranked on New Tab. +owners: +- cbeck@mozilla.com +- gkatre@mozilla.com +labels: + incremental: false + owner: cbeck +bigquery: + time_partitioning: null +scheduling: + dag_name: bqetl_merino_newtab_priors_to_gcs + date_partition_parameter: null diff --git a/sql/moz-fx-data-shared-prod/telemetry_derived/newtab_merino_priors_v1/query.sql b/sql/moz-fx-data-shared-prod/telemetry_derived/newtab_merino_priors_v1/query.sql new file mode 100644 index 00000000000..4823fdf915c --- /dev/null +++ b/sql/moz-fx-data-shared-prod/telemetry_derived/newtab_merino_priors_v1/query.sql @@ -0,0 +1,199 @@ +WITH +-- Define common parameters +params AS ( + SELECT + TIMESTAMP_TRUNC(CURRENT_TIMESTAMP(), DAY) AS end_timestamp, + TIMESTAMP_TRUNC(CURRENT_TIMESTAMP(), DAY) - INTERVAL 7 DAY AS start_timestamp +), +-- Flatten events and filter relevant data +flattened_newtab_events AS ( + SELECT + sub.* + FROM + ( + SELECT + submission_timestamp, + normalized_country_code AS region, + event.name AS event_name, + SAFE_CAST( + mozfun.map.get_key(event.extra, 'scheduled_corpus_item_id') AS STRING + ) AS scheduled_corpus_item_id, + SAFE_CAST(mozfun.map.get_key(event.extra, 'recommended_at') AS INT64) AS recommended_at + FROM + `moz-fx-data-shared-prod.firefox_desktop.newtab`, + UNNEST(events) AS event, + params + WHERE + submission_timestamp >= params.start_timestamp + AND submission_timestamp < params.end_timestamp + AND event.category = 'pocket' + AND event.name IN ('impression', 'click') + AND mozfun.map.get_key(event.extra, 'scheduled_corpus_item_id') IS NOT NULL + AND SAFE_CAST(mozfun.map.get_key(event.extra, 'recommended_at') AS INT64) IS NOT NULL + ) AS sub, + params + WHERE + TIMESTAMP_MILLIS(recommended_at) >= params.start_timestamp + AND TIMESTAMP_MILLIS(recommended_at) < params.end_timestamp +), +-- Aggregate events by scheduled_corpus_item_id and region +aggregated_events AS ( + SELECT + scheduled_corpus_item_id, + region, + SUM(IF(event_name = 'impression', 1, 0)) AS impression_count, + SUM(IF(event_name = 'click', 1, 0)) AS click_count + FROM + flattened_newtab_events + GROUP BY + scheduled_corpus_item_id, + region +), +-- Calculate CTR per scheduled_corpus_item_id and region +per_region_ctr AS ( + SELECT + scheduled_corpus_item_id, + region, + SAFE_DIVIDE(click_count, impression_count) AS ctr, + impression_count, + click_count + FROM + aggregated_events + WHERE + impression_count > 0 +), +-- Calculate average impressions per item per region and round to whole number +per_region_impressions_per_item AS ( + SELECT + region, + ROUND(AVG(impression_count)) AS impressions_per_item + FROM + aggregated_events + GROUP BY + region +), +-- Rank items by click_count per region +ranked_per_region AS ( + SELECT + *, + ROW_NUMBER() OVER (PARTITION BY region ORDER BY click_count DESC) AS rank + FROM + per_region_ctr +), +-- Select top 2 items per region +top2_per_region AS ( + SELECT + scheduled_corpus_item_id, + region, + ctr + FROM + ranked_per_region + WHERE + rank <= 2 +), +-- Calculate average CTR of top 2 items per region +per_region_stats AS ( + SELECT + region, + AVG(ctr) AS average_ctr_top2_items + FROM + top2_per_region + GROUP BY + region +), +-- Combine per-region stats with impressions_per_item +per_region_stats_with_impressions AS ( + SELECT + s.region, + s.average_ctr_top2_items, + i.impressions_per_item + FROM + per_region_stats s + JOIN + per_region_impressions_per_item i + USING (region) +), +-- Aggregate events globally +aggregated_events_global AS ( + SELECT + scheduled_corpus_item_id, + SUM(impression_count) AS impression_count, + SUM(click_count) AS click_count + FROM + aggregated_events + GROUP BY + scheduled_corpus_item_id +), +-- Calculate CTR per scheduled_corpus_item_id globally +per_global_ctr AS ( + SELECT + scheduled_corpus_item_id, + SAFE_DIVIDE(click_count, impression_count) AS ctr, + impression_count, + click_count + FROM + aggregated_events_global + WHERE + impression_count > 0 +), +-- Calculate average impressions per item globally and round to whole number +global_impressions_per_item AS ( + SELECT + CAST(NULL AS STRING) AS region, + ROUND(AVG(impression_count)) AS impressions_per_item + FROM + aggregated_events_global +), +-- Rank items by click_count globally +ranked_global AS ( + SELECT + *, + ROW_NUMBER() OVER (ORDER BY click_count DESC) AS rank + FROM + per_global_ctr +), +-- Select top 2 items globally +top2_global AS ( + SELECT + scheduled_corpus_item_id, + ctr + FROM + ranked_global + WHERE + rank <= 2 +), +-- Calculate average CTR of top 2 items globally +global_stats AS ( + SELECT + CAST(NULL AS STRING) AS region, + AVG(ctr) AS average_ctr_top2_items + FROM + top2_global +), +-- Combine global stats with impressions_per_item +global_stats_with_impressions AS ( + SELECT + s.region, + s.average_ctr_top2_items, + i.impressions_per_item + FROM + global_stats s + CROSS JOIN + global_impressions_per_item i +) +-- Final output combining per-region and global statistics +SELECT + region, + average_ctr_top2_items, + impressions_per_item +FROM + per_region_stats_with_impressions +UNION ALL +SELECT + region, + average_ctr_top2_items, + impressions_per_item +FROM + global_stats_with_impressions +ORDER BY + impressions_per_item DESC; diff --git a/sql/moz-fx-data-shared-prod/telemetry_derived/newtab_merino_priors_v1/schema.yaml b/sql/moz-fx-data-shared-prod/telemetry_derived/newtab_merino_priors_v1/schema.yaml new file mode 100644 index 00000000000..8de4622411a --- /dev/null +++ b/sql/moz-fx-data-shared-prod/telemetry_derived/newtab_merino_priors_v1/schema.yaml @@ -0,0 +1,10 @@ +fields: +- mode: NULLABLE + name: region + type: STRING +- mode: NULLABLE + name: average_ctr_top2_items + type: FLOAT +- mode: NULLABLE + name: impressions_per_item + type: FLOAT