diff --git a/sdks/python/apache_beam/io/gcp/bigquery.py b/sdks/python/apache_beam/io/gcp/bigquery.py index e1c509d0e490..b897df2d32ab 100644 --- a/sdks/python/apache_beam/io/gcp/bigquery.py +++ b/sdks/python/apache_beam/io/gcp/bigquery.py @@ -400,6 +400,7 @@ def chain_after(result): from apache_beam.io.iobase import SourceBundle from apache_beam.io.textio import _TextSource as TextSource from apache_beam.metrics import Metrics +from apache_beam.metrics.metric import Lineage from apache_beam.options import value_provider as vp from apache_beam.options.pipeline_options import DebugOptions from apache_beam.options.pipeline_options import GoogleCloudOptions @@ -809,6 +810,11 @@ def split(self, desired_bundle_size, start_position=None, stop_position=None): self.table_reference.get(), project=self._get_project()) elif not self.table_reference.projectId: self.table_reference.projectId = self._get_project() + Lineage.sources().add( + 'bigquery', + self.table_reference.projectId, + self.table_reference.datasetId, + self.table_reference.tableId) schema, metadata_list = self._export_files(bq) self.export_result = _BigQueryExportResult( @@ -1157,6 +1163,11 @@ def split(self, desired_bundle_size, start_position=None, stop_position=None): self.table_reference.projectId, self.table_reference.datasetId, self.table_reference.tableId) + Lineage.sources().add( + "bigquery", + self.table_reference.projectId, + self.table_reference.datasetId, + self.table_reference.tableId) if self.use_native_datetime: requested_session.data_format = bq_storage.types.DataFormat.ARROW diff --git a/sdks/python/apache_beam/io/gcp/bigquery_file_loads.py b/sdks/python/apache_beam/io/gcp/bigquery_file_loads.py index 3203c21a8e64..a7311ad6d063 100644 --- a/sdks/python/apache_beam/io/gcp/bigquery_file_loads.py +++ b/sdks/python/apache_beam/io/gcp/bigquery_file_loads.py @@ -40,6 +40,7 @@ from apache_beam.io import filesystems as fs from apache_beam.io.gcp import bigquery_tools from apache_beam.io.gcp.bigquery_io_metadata import create_bigquery_io_metadata +from apache_beam.metrics.metric import Lineage from apache_beam.options import value_provider as vp from apache_beam.options.pipeline_options import GoogleCloudOptions from apache_beam.transforms import trigger @@ -564,6 +565,11 @@ def process_one(self, element, job_name_prefix): write_disposition = self.write_disposition wait_for_job = True self._observed_tables.add(copy_to_reference.tableId) + Lineage.sinks().add( + 'bigquery', + copy_to_reference.projectId, + copy_to_reference.datasetId, + copy_to_reference.tableId) else: wait_for_job = False write_disposition = 'WRITE_APPEND' @@ -735,6 +741,12 @@ def process( yield pvalue.TaggedOutput( TriggerLoadJobs.TEMP_TABLES, bigquery_tools.get_hashable_destination(table_reference)) + else: + Lineage.sinks().add( + 'bigquery', + table_reference.projectId, + table_reference.datasetId, + table_reference.tableId) _LOGGER.info( 'Triggering job %s to load data to BigQuery table %s.' diff --git a/sdks/python/apache_beam/io/gcp/bigquery_file_loads_test.py b/sdks/python/apache_beam/io/gcp/bigquery_file_loads_test.py index f27c7899f9f3..e4c0e34d9c1f 100644 --- a/sdks/python/apache_beam/io/gcp/bigquery_file_loads_test.py +++ b/sdks/python/apache_beam/io/gcp/bigquery_file_loads_test.py @@ -42,6 +42,7 @@ from apache_beam.io.gcp.internal.clients import bigquery as bigquery_api from apache_beam.io.gcp.tests.bigquery_matcher import BigqueryFullResultMatcher from apache_beam.io.gcp.tests.bigquery_matcher import BigqueryFullResultStreamingMatcher +from apache_beam.metrics.metric import Lineage from apache_beam.options.pipeline_options import PipelineOptions from apache_beam.options.pipeline_options import StandardOptions from apache_beam.runners.dataflow.test_dataflow_runner import TestDataflowRunner @@ -510,6 +511,9 @@ def test_load_job_id_used(self): | "GetJobs" >> beam.Map(lambda x: x[1]) assert_that(jobs, equal_to([job_reference]), label='CheckJobProjectIds') + self.assertSetEqual( + Lineage.query(p.result.metrics(), Lineage.SINK), + set(["bigquery:project1.dataset1.table1"])) def test_load_job_id_use_for_copy_job(self): destination = 'project1:dataset1.table1' @@ -563,6 +567,9 @@ def test_load_job_id_use_for_copy_job(self): job_reference ]), label='CheckCopyJobProjectIds') + self.assertSetEqual( + Lineage.query(p.result.metrics(), Lineage.SINK), + set(["bigquery:project1.dataset1.table1"])) @mock.patch('time.sleep') def test_wait_for_load_job_completion(self, sleep_mock): @@ -725,6 +732,9 @@ def test_multiple_partition_files(self): copy_jobs | "CountCopyJobs" >> combiners.Count.Globally(), equal_to([6]), label='CheckCopyJobCount') + self.assertSetEqual( + Lineage.query(p.result.metrics(), Lineage.SINK), + set(["bigquery:project1.dataset1.table1"])) @parameterized.expand([ param(write_disposition=BigQueryDisposition.WRITE_TRUNCATE), diff --git a/sdks/python/apache_beam/io/gcp/bigquery_read_internal.py b/sdks/python/apache_beam/io/gcp/bigquery_read_internal.py index f3881ed261ae..f038b48e04d5 100644 --- a/sdks/python/apache_beam/io/gcp/bigquery_read_internal.py +++ b/sdks/python/apache_beam/io/gcp/bigquery_read_internal.py @@ -44,6 +44,7 @@ from apache_beam.io.gcp.bigquery_io_metadata import create_bigquery_io_metadata from apache_beam.io.iobase import BoundedSource from apache_beam.io.textio import _TextSource +from apache_beam.metrics.metric import Lineage from apache_beam.options.pipeline_options import GoogleCloudOptions from apache_beam.options.pipeline_options import PipelineOptions from apache_beam.options.value_provider import ValueProvider @@ -261,6 +262,12 @@ def process(self, for metadata in metadata_list: yield self._create_source(metadata.path, schema) + Lineage.sources().add( + 'bigquery', + table_reference.projectId, + table_reference.datasetId, + table_reference.tableId) + if element.query is not None: self.bq._delete_table( table_reference.projectId, diff --git a/sdks/python/apache_beam/io/gcp/bigquery_schema_tools_test.py b/sdks/python/apache_beam/io/gcp/bigquery_schema_tools_test.py index 72697e29c4d5..7ae49dff205d 100644 --- a/sdks/python/apache_beam/io/gcp/bigquery_schema_tools_test.py +++ b/sdks/python/apache_beam/io/gcp/bigquery_schema_tools_test.py @@ -136,12 +136,11 @@ def test_bad_schema_public_api_export(self, get_table): with self.assertRaisesRegex(ValueError, "Encountered an unsupported type: 'DOUBLE'"): p = apache_beam.Pipeline() - pipeline = p | apache_beam.io.gcp.bigquery.ReadFromBigQuery( + _ = p | apache_beam.io.gcp.bigquery.ReadFromBigQuery( table="dataset.sample_table", method="EXPORT", project="project", output_type='BEAM_ROW') - pipeline @mock.patch.object(BigQueryWrapper, 'get_table') def test_bad_schema_public_api_direct_read(self, get_table): @@ -159,21 +158,19 @@ def test_bad_schema_public_api_direct_read(self, get_table): with self.assertRaisesRegex(ValueError, "Encountered an unsupported type: 'DOUBLE'"): p = apache_beam.Pipeline() - pipeline = p | apache_beam.io.gcp.bigquery.ReadFromBigQuery( + _ = p | apache_beam.io.gcp.bigquery.ReadFromBigQuery( table="dataset.sample_table", method="DIRECT_READ", project="project", output_type='BEAM_ROW') - pipeline def test_unsupported_value_provider(self): with self.assertRaisesRegex(TypeError, 'ReadFromBigQuery: table must be of type string' '; got ValueProvider instead'): p = apache_beam.Pipeline() - pipeline = p | apache_beam.io.gcp.bigquery.ReadFromBigQuery( + _ = p | apache_beam.io.gcp.bigquery.ReadFromBigQuery( table=value_provider.ValueProvider(), output_type='BEAM_ROW') - pipeline def test_unsupported_callable(self): def filterTable(table): @@ -185,9 +182,8 @@ def filterTable(table): 'ReadFromBigQuery: table must be of type string' '; got a callable instead'): p = apache_beam.Pipeline() - pipeline = p | apache_beam.io.gcp.bigquery.ReadFromBigQuery( + _ = p | apache_beam.io.gcp.bigquery.ReadFromBigQuery( table=res, output_type='BEAM_ROW') - pipeline def test_unsupported_query_export(self): with self.assertRaisesRegex( @@ -195,12 +191,11 @@ def test_unsupported_query_export(self): "Both a query and an output type of 'BEAM_ROW' were specified. " "'BEAM_ROW' is not currently supported with queries."): p = apache_beam.Pipeline() - pipeline = p | apache_beam.io.gcp.bigquery.ReadFromBigQuery( + _ = p | apache_beam.io.gcp.bigquery.ReadFromBigQuery( table="project:dataset.sample_table", method="EXPORT", query='SELECT name FROM dataset.sample_table', output_type='BEAM_ROW') - pipeline def test_unsupported_query_direct_read(self): with self.assertRaisesRegex( @@ -208,12 +203,11 @@ def test_unsupported_query_direct_read(self): "Both a query and an output type of 'BEAM_ROW' were specified. " "'BEAM_ROW' is not currently supported with queries."): p = apache_beam.Pipeline() - pipeline = p | apache_beam.io.gcp.bigquery.ReadFromBigQuery( + _ = p | apache_beam.io.gcp.bigquery.ReadFromBigQuery( table="project:dataset.sample_table", method="DIRECT_READ", query='SELECT name FROM dataset.sample_table', output_type='BEAM_ROW') - pipeline if __name__ == '__main__': logging.getLogger().setLevel(logging.INFO) diff --git a/sdks/python/apache_beam/io/gcp/bigquery_test.py b/sdks/python/apache_beam/io/gcp/bigquery_test.py index e53204a5ebc6..c263b636b57a 100644 --- a/sdks/python/apache_beam/io/gcp/bigquery_test.py +++ b/sdks/python/apache_beam/io/gcp/bigquery_test.py @@ -50,6 +50,7 @@ from apache_beam.io.gcp.bigquery import TableRowJsonCoder from apache_beam.io.gcp.bigquery import WriteToBigQuery from apache_beam.io.gcp.bigquery import _StreamToBigQuery +from apache_beam.io.gcp.bigquery_read_internal import _BigQueryReadSplit from apache_beam.io.gcp.bigquery_read_internal import _JsonToDictCoder from apache_beam.io.gcp.bigquery_read_internal import bigquery_export_destination_uri from apache_beam.io.gcp.bigquery_tools import JSON_COMPLIANCE_ERROR @@ -61,6 +62,7 @@ from apache_beam.io.gcp.tests.bigquery_matcher import BigqueryFullResultMatcher from apache_beam.io.gcp.tests.bigquery_matcher import BigqueryFullResultStreamingMatcher from apache_beam.io.gcp.tests.bigquery_matcher import BigQueryTableMatcher +from apache_beam.metrics.metric import Lineage from apache_beam.options import value_provider from apache_beam.options.pipeline_options import PipelineOptions from apache_beam.options.pipeline_options import StandardOptions @@ -85,9 +87,11 @@ from apitools.base.py.exceptions import HttpError from apitools.base.py.exceptions import HttpForbiddenError from google.cloud import bigquery as gcp_bigquery + from google.cloud import bigquery_storage_v1 as bq_storage from google.api_core import exceptions except ImportError: gcp_bigquery = None + bq_storage = None HttpError = None HttpForbiddenError = None exceptions = None @@ -460,6 +464,8 @@ def test_create_temp_dataset_exception(self, exception_type, error_message): self.assertIn(error_message, exc.exception.args[0]) @parameterized.expand([ + # read without exception + param(responses=[], expected_retries=0), # first attempt returns a Http 500 blank error and retries # second attempt returns a Http 408 blank error and retries, # third attempt passes @@ -540,6 +546,9 @@ def store_callback(unused_request): # metadata (numBytes), and once to retrieve the table's schema # Any additional calls are retries self.assertEqual(expected_retries, mock_get_table.call_count - 2) + self.assertSetEqual( + Lineage.query(p.result.metrics(), Lineage.SOURCE), + set(["bigquery:project.dataset.table"])) @parameterized.expand([ # first attempt returns a Http 429 with transient reason and retries @@ -719,6 +728,40 @@ def test_read_export_exception(self, exception_type, error_message): mock_query_job.assert_called() self.assertIn(error_message, exc.exception.args[0]) + def test_read_direct_lineage(self): + with mock.patch.object(bigquery_tools.BigQueryWrapper, + '_bigquery_client'),\ + mock.patch.object(bq_storage.BigQueryReadClient, + 'create_read_session'),\ + beam.Pipeline() as p: + + _ = p | ReadFromBigQuery( + method=ReadFromBigQuery.Method.DIRECT_READ, + table='project:dataset.table') + self.assertSetEqual( + Lineage.query(p.result.metrics(), Lineage.SOURCE), + set(["bigquery:project.dataset.table"])) + + def test_read_all_lineage(self): + with mock.patch.object(_BigQueryReadSplit, '_export_files') as export, \ + beam.Pipeline() as p: + + export.return_value = (None, []) + + _ = ( + p + | beam.Create([ + beam.io.ReadFromBigQueryRequest(table='project1:dataset1.table1'), + beam.io.ReadFromBigQueryRequest(table='project2:dataset2.table2') + ]) + | beam.io.ReadAllFromBigQuery(gcs_location='gs://bucket/tmp')) + self.assertSetEqual( + Lineage.query(p.result.metrics(), Lineage.SOURCE), + set([ + 'bigquery:project1.dataset1.table1', + 'bigquery:project2.dataset2.table2' + ])) + @unittest.skipIf(HttpError is None, 'GCP dependencies are not installed') class TestBigQuerySink(unittest.TestCase): diff --git a/sdks/python/apache_beam/metrics/metric.py b/sdks/python/apache_beam/metrics/metric.py index 77cafb8bd64b..6b8e4754a79c 100644 --- a/sdks/python/apache_beam/metrics/metric.py +++ b/sdks/python/apache_beam/metrics/metric.py @@ -28,6 +28,7 @@ # mypy: disallow-untyped-defs import logging +import re from typing import TYPE_CHECKING from typing import Dict from typing import FrozenSet @@ -39,6 +40,7 @@ from typing import Union from apache_beam.metrics import cells +from apache_beam.metrics.execution import MetricResult from apache_beam.metrics.execution import MetricUpdater from apache_beam.metrics.metricbase import Counter from apache_beam.metrics.metricbase import Distribution @@ -50,7 +52,7 @@ from apache_beam.metrics.execution import MetricKey from apache_beam.metrics.metricbase import Metric -__all__ = ['Metrics', 'MetricsFilter'] +__all__ = ['Metrics', 'MetricsFilter', 'Lineage'] _LOGGER = logging.getLogger(__name__) @@ -223,7 +225,7 @@ def matches( def query( self, filter: Optional['MetricsFilter'] = None - ) -> Dict[str, List['MetricResults']]: + ) -> Dict[str, List['MetricResult']]: """Queries the runner for existing user metrics that match the filter. It should return a dictionary, with lists of each kind of metric, and @@ -305,3 +307,80 @@ def with_steps(self, steps: Iterable[str]) -> 'MetricsFilter': self._steps.update(steps) return self + + +class Lineage: + """Standard collection of metrics used to record source and sinks information + for lineage tracking.""" + + LINEAGE_NAMESPACE = "lineage" + SOURCE = "sources" + SINK = "sinks" + + _METRICS = { + SOURCE: Metrics.string_set(LINEAGE_NAMESPACE, SOURCE), + SINK: Metrics.string_set(LINEAGE_NAMESPACE, SINK) + } + + def __init__(self, label: str) -> None: + """Create a Lineage with valid label (:data:`~Lineage.SOURCE` or + :data:`~Lineage.SINK`) + """ + self.metric = Lineage._METRICS[label] + + @classmethod + def sources(cls) -> 'Lineage': + return cls(Lineage.SOURCE) + + @classmethod + def sinks(cls) -> 'Lineage': + return cls(Lineage.SINK) + + _RESERVED_CHARS = re.compile(r'[:\s.]') + + @staticmethod + def wrap_segment(segment: str) -> str: + """Wrap segment to valid segment name. + + Specifically, If there are reserved chars (colon, whitespace, dot), escape + with backtick. If the segment is already wrapped, return the original. + """ + if segment.startswith("`") and segment.endswith("`"): return segment + if Lineage._RESERVED_CHARS.search(segment): + return "`" + segment + "`" + return segment + + @staticmethod + def get_fq_name( + system: str, *segments: str, route: Optional[str] = None) -> str: + """Assemble fully qualified name + (`FQN `_). + Format: + + - `system:segment1.segment2` + - `system:routine:segment1.segment2` + - `system:`segment1.with.dots:clons`.segment2` + + This helper method is for internal and testing usage only. + """ + segs = '.'.join(map(Lineage.wrap_segment, segments)) + if route: + return ':'.join((system, route, segs)) + return ':'.join((system, segs)) + + def add( + self, system: str, *segments: str, route: Optional[str] = None) -> None: + self.metric.add(self.get_fq_name(system, *segments, route=route)) + + @staticmethod + def query(results: MetricResults, label: str) -> Set[str]: + if not label in Lineage._METRICS: + raise ValueError("Label {} does not exist for Lineage", label) + response = results.query( + MetricsFilter().with_namespace(Lineage.LINEAGE_NAMESPACE).with_name( + label))[MetricResults.STRINGSETS] + result = set() + for metric in response: + result.update(metric.committed) + result.update(metric.attempted) + return result diff --git a/sdks/python/apache_beam/metrics/metric_test.py b/sdks/python/apache_beam/metrics/metric_test.py index e3701228feec..3a8da021101e 100644 --- a/sdks/python/apache_beam/metrics/metric_test.py +++ b/sdks/python/apache_beam/metrics/metric_test.py @@ -28,6 +28,7 @@ from apache_beam.metrics.execution import MetricKey from apache_beam.metrics.execution import MetricsContainer from apache_beam.metrics.execution import MetricsEnvironment +from apache_beam.metrics.metric import Lineage from apache_beam.metrics.metric import MetricResults from apache_beam.metrics.metric import Metrics from apache_beam.metrics.metric import MetricsFilter @@ -248,5 +249,26 @@ def test_create_counter_distribution(self): sampler.stop() +class LineageTest(unittest.TestCase): + def test_fq_name(self): + test_cases = { + "apache-beam": "apache-beam", + "`apache-beam`": "`apache-beam`", + "apache.beam": "`apache.beam`", + "apache:beam": "`apache:beam`", + "apache beam": "`apache beam`", + "`apache beam`": "`apache beam`", + "apache\tbeam": "`apache\tbeam`", + "apache\nbeam": "`apache\nbeam`" + } + for k, v in test_cases.items(): + self.assertEqual("apache:" + v, Lineage.get_fq_name("apache", k)) + self.assertEqual( + "apache:beam:" + v, Lineage.get_fq_name("apache", k, route="beam")) + self.assertEqual( + "apache:beam:" + v + '.' + v, + Lineage.get_fq_name("apache", k, k, route="beam")) + + if __name__ == '__main__': unittest.main()