Skip to content

Commit

Permalink
Generate BigConfig files for views
Browse files Browse the repository at this point in the history
  • Loading branch information
scholtzan committed Oct 8, 2024
1 parent 815a075 commit dbe36f6
Showing 1 changed file with 141 additions and 50 deletions.
191 changes: 141 additions & 50 deletions bigquery_etl/cli/monitoring.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,15 @@
from bigeye_sdk.client.enum import Method
from bigeye_sdk.controller.metric_suite_controller import MetricSuiteController
from bigeye_sdk.exceptions.exceptions import FileLoadException
from bigeye_sdk.model.big_config import BigConfig, TableDeployment, TableDeploymentSuite
from bigeye_sdk.model.big_config import (
BigConfig,
ColumnSelector,
RowCreationTimes,
TableDeployment,
TableDeploymentSuite,
TagDeployment,
TagDeploymentSuite,
)
from bigeye_sdk.model.protobuf_message_facade import (
SimpleCollection,
SimpleMetricDefinition,
Expand Down Expand Up @@ -130,6 +138,122 @@ def deploy(
print("No metadata file for: {}.{}.{}".format(project, dataset, table))


def _update_table_bigconfig(
bigconfig,
metadata,
project,
dataset,
table,
):
"""Update the BigConfig file to monitor a table."""
default_metrics = [
SimplePredefinedMetricName.FRESHNESS,
SimplePredefinedMetricName.VOLUME,
]

for collection in bigconfig.table_deployments:
for deployment in collection.deployments:
for metric in deployment.table_metrics:
if metric.metric_type.predefined_metric in default_metrics:
default_metrics.remove(metric.metric_type.predefined_metric)

if metadata.monitoring.collection and collection.collection is None:
collection.collection = SimpleCollection(
name=metadata.monitoring.collection
)

if len(default_metrics) > 0:
deployments = [
TableDeployment(
fq_table_name=f"{project}.{project}.{dataset}.{table}",
table_metrics=[
SimpleMetricDefinition(
metric_type=SimplePredefinedMetric(
type="PREDEFINED", predefined_metric=metric
),
metric_schedule=SimpleMetricSchedule(
named_schedule=SimpleNamedSchedule(
name="Default Schedule - 13:00 UTC"
)
),
)
for metric in default_metrics
],
)
]

collection = None
if metadata.monitoring.collection:
collection = SimpleCollection(name=metadata.monitoring.collection)

bigconfig.table_deployments += [
TableDeploymentSuite(deployments=deployments, collection=collection)
]


def _update_view_bigconfig(
bigconfig,
metadata,
project,
dataset,
table,
):
"""Update the BigConfig file to monitor a view."""
default_metrics = [
SimplePredefinedMetricName.FRESHNESS_DATA,
SimplePredefinedMetricName.VOLUME_DATA,
]

for collection in bigconfig.tag_deployments:
for deployment in collection.deployments:
for metric in deployment.metrics:
if metric.metric_type.predefined_metric in default_metrics:
default_metrics.remove(metric.metric_type.predefined_metric)

if metadata.monitoring.collection and collection.collection is None:
collection.collection = SimpleCollection(
name=metadata.monitoring.collection
)

if len(default_metrics) > 0:
deployments = [
TagDeployment(
column_selectors=[
ColumnSelector(name=f"{project}.{project}.{dataset}.{table}.*")
],
metrics=[
SimpleMetricDefinition(
metric_type=SimplePredefinedMetric(
type="PREDEFINED", predefined_metric=metric
),
metric_schedule=SimpleMetricSchedule(
named_schedule=SimpleNamedSchedule(
name="Default Schedule - 13:00 UTC"
)
),
)
for metric in default_metrics
],
)
]

collection = None
if metadata.monitoring.collection:
collection = SimpleCollection(name=metadata.monitoring.collection)

bigconfig.tag_deployments += [
TagDeploymentSuite(deployments=deployments, collection=collection)
]

bigconfig.row_creation_times = RowCreationTimes(
column_selectors=[
ColumnSelector(
name=f"{project}.{project}.{dataset}.{table}.{metadata.monitoring.partition_column}"
)
]
)


@monitoring.command(
help="""
Update BigConfig files based on monitoring metadata.
Expand Down Expand Up @@ -158,55 +282,22 @@ def update(name: str, sql_dir: Optional[str], project_id: Optional[str]) -> None
else:
bigconfig = BigConfig(type="BIGCONFIG_FILE")

default_metrics = [
SimplePredefinedMetricName.FRESHNESS,
SimplePredefinedMetricName.VOLUME,
]

for collection in bigconfig.table_deployments:
for deployment in collection.deployments:
for metric in deployment.table_metrics:
if metric.metric_type.predefined_metric in default_metrics:
default_metrics.remove(
metric.metric_type.predefined_metric
)

if metadata.monitoring.collection and collection.collection is None:
collection.collection = SimpleCollection(
name=metadata.monitoring.collection
)

if len(default_metrics) > 0:
deployments = [
TableDeployment(
fq_table_name=f"{project}.{project}.{dataset}.{table}",
table_metrics=[
SimpleMetricDefinition(
metric_type=SimplePredefinedMetric(
type="PREDEFINED", predefined_metric=metric
),
metric_schedule=SimpleMetricSchedule(
named_schedule=SimpleNamedSchedule(
name="Default Schedule - 17:00 UTC"
)
),
)
for metric in default_metrics
],
)
]

collection = None
if metadata.monitoring.collection:
collection = SimpleCollection(
name=metadata.monitoring.collection
)

bigconfig.table_deployments += [
TableDeploymentSuite(
deployments=deployments, collection=collection
)
]
if (metadata_file.parent / VIEW_FILE).exists():
_update_view_bigconfig(
bigconfig=bigconfig,
metadata=metadata,
project=project,
dataset=dataset,
table=table,
)
else:
_update_table_bigconfig(
bigconfig=bigconfig,
metadata=metadata,
project=project,
dataset=dataset,
table=table,
)

bigconfig.save(
output_path=bigconfig_file.parent,
Expand Down

0 comments on commit dbe36f6

Please sign in to comment.