Skip to content

Commit

Permalink
feat: Make Python Flex template bq-to-bq de-identify and re-identify …
Browse files Browse the repository at this point in the history
…data (#257)

* Make flex template flex-templates/python/regional_dlp_re_identification(bq-to-bq) do both. de-identify and re-identify data

* Add mutually_group with query and input_table parameters

* Accept some of the requested review changes.

* Remove deid_config and reid_config atributes from UnmaskDetectedDetails and MaskDetectedDetails classes

* Convert all items on table to string

* Change Write to BQ to method FILE_LOADS

* Add space nedded on error message
  • Loading branch information
vfigueiredo-cit authored Jan 6, 2022
1 parent 77be7ef commit 29079d6
Show file tree
Hide file tree
Showing 7 changed files with 199 additions and 50 deletions.
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
# Regional DLP de-identification Pub/Sub to BigQuery (Streaming) flex template
# Regional DLP transform BigQuery to BigQuery (Streaming) flex template

## Build the flex template

Expand Down Expand Up @@ -41,3 +41,11 @@ gcloud dataflow flex-template build "$TEMPLATE_GS_PATH" \
--project=$PROJECT \
--metadata-file "./metadata.json"
```

## Using flex-template

This flex-template supports processing the entire BigQuery table or just a query result. If you have a dataset to be entirely processed, use the `input_table` parameter. If you just want to read a piece of data of a dataset use the `query` parameter.

For example, to process public datasets it is recommended to use the `query` parameter instead of the `input_table` parameter. But if you want to process your own datasets and all of the data in it, use `input_table` parameter.

This flex-template only supports using one of the parameters at a time.
Original file line number Diff line number Diff line change
Expand Up @@ -21,52 +21,67 @@
import apache_beam.transforms.window as window
from apache_beam.options.pipeline_options import (GoogleCloudOptions,
PipelineOptions)
from apache_beam.transforms import DoFn, ParDo, PTransform, BatchElements
from apache_beam.transforms import BatchElements, DoFn, ParDo, PTransform
from apache_beam.utils.annotations import experimental
from google.cloud import dlp_v2


def run(argv=None, save_main_session=True):
"""Build and run the pipeline."""
parser = argparse.ArgumentParser()
parser.add_argument(
group = parser.add_argument_group()
group_exclusive = parser.add_mutually_exclusive_group(required=True)
group_exclusive.add_argument(
'--query',
help=(
'Input query to retrieve data from Dataset. '
'Example: `SELECT * FROM PROJECT:DATASET.TABLE LIMIT 100`.'
'You need to specify either an input-table or query.'
'It is recommended to use query'
'when you want to use a public dataset.'
)
)
group_exclusive.add_argument(
'--input_table',
required=True,
help=(
'Input BigQuery table for results specified as: '
'PROJECT:DATASET.TABLE or DATASET.TABLE.'
'You need to specify either an input-table or query.'
'It is recommended to use input-table'
'for when you have your own dataset.'
)
)
parser.add_argument(
group.add_argument(
'--output_table',
required=True,
help=(
'Output BigQuery table for results specified as: '
'PROJECT:DATASET.TABLE or DATASET.TABLE.'
)
)
parser.add_argument(
group.add_argument(
'--bq_schema',
required=True,
help=(
'Output BigQuery table schema specified as string with format: '
'FIELD_1:STRING,FIELD_2:STRING,...'
)
)
parser.add_argument(
group.add_argument(
'--dlp_project',
required=True,
help=(
'ID of the project that holds the DLP template.'
)
)
parser.add_argument(
group.add_argument(
'--dlp_location',
required=False,
help=(
'The Location of the DLP template resource.'
)
)
parser.add_argument(
group.add_argument(
'--deidentification_template_name',
required=True,
help=(
Expand All @@ -75,15 +90,15 @@ def run(argv=None, save_main_session=True):
'/deidentifyTemplates/<TEMPLATE_ID>"'
)
)
parser.add_argument(
group.add_argument(
"--window_interval_sec",
default=30,
type=int,
help=(
'Window interval in seconds for grouping incoming messages.'
)
)
parser.add_argument(
group.add_argument(
"--batch_size",
default=1000,
type=int,
Expand All @@ -92,6 +107,14 @@ def run(argv=None, save_main_session=True):
'the call to the Data Loss Prevention (DLP) API.'
)
)
group.add_argument(
"--dlp_transform",
default='RE-IDENTIFY',
required=True,
help=(
'DLP transformation type.'
)
)
known_args, pipeline_args = parser.parse_known_args(argv)

options = PipelineOptions(
Expand All @@ -103,39 +126,77 @@ def run(argv=None, save_main_session=True):
with beam.Pipeline(options=options) as p:

# Read from BigQuery into a PCollection.
messages = (
p
| 'Read from BigQuery Table' >>
beam.io.ReadFromBigQuery(
table=known_args.input_table
if known_args.input_table is not None:
messages = (
p
| 'Read from BigQuery Table' >>
beam.io.ReadFromBigQuery(
table=known_args.input_table
)
| 'Apply window' >> beam.WindowInto(
window.FixedWindows(known_args.window_interval_sec, 0)
)
)
| 'Apply window' >> beam.WindowInto(
window.FixedWindows(known_args.window_interval_sec, 0)
else:
if 'LIMIT' not in known_args.query:
logging.warning('The query has no LIMIT parameter set.'
'This can lead to a pipeline processing'
'taking more time.')

messages = (
p
| 'Run SQL query to read data from BigQuery Table.' >>
beam.io.ReadFromBigQuery(
query=known_args.query
)
| 'Apply window' >> beam.WindowInto(
window.FixedWindows(known_args.window_interval_sec, 0)
)
)
)

re_identified_messages = (
messages
| "Batching" >> BatchElements(
min_batch_size=known_args.batch_size,
max_batch_size=known_args.batch_size
if known_args.dlp_transform == 'RE-IDENTIFY':
transformed_messages = (
messages
| "Batching" >> BatchElements(
min_batch_size=known_args.batch_size,
max_batch_size=known_args.batch_size
)
| 'Convert dicts to table' >>
beam.Map(from_list_dicts_to_table)
| 'Call DLP re-identification' >>
UnmaskDetectedDetails(
project=known_args.dlp_project,
location=known_args.dlp_location,
template_name=known_args.deidentification_template_name
)
| 'Convert table to dicts' >>
beam.FlatMap(from_table_to_list_dict)
)
| 'Convert dicts to table' >>
beam.Map(from_list_dicts_to_table)
| 'Call DLP re-identification' >>
UnmaskDetectedDetails(
project=known_args.dlp_project,
location=known_args.dlp_location,
template_name=known_args.deidentification_template_name
else:
transformed_messages = (
messages
| "Batching" >> BatchElements(
min_batch_size=known_args.batch_size,
max_batch_size=known_args.batch_size
)
| 'Convert dicts to table' >>
beam.Map(from_list_dicts_to_table)
| 'Call DLP de-identification' >>
MaskDetectedDetails(
project=known_args.dlp_project,
location=known_args.dlp_location,
template_name=known_args.deidentification_template_name
)
| 'Convert table to dicts' >>
beam.FlatMap(from_table_to_list_dict)
)
| 'Convert table to dicts' >>
beam.FlatMap(from_table_to_list_dict)
)

# Write to BigQuery.
re_identified_messages | 'Write to BQ' >> beam.io.WriteToBigQuery(
transformed_messages | 'Write to BQ' >> beam.io.WriteToBigQuery(
known_args.output_table,
schema=known_args.bq_schema,
method=beam.io.WriteToBigQuery.Method.FILE_LOADS,
triggering_frequency=300,
create_disposition=beam.io.BigQueryDisposition.CREATE_IF_NEEDED
)

Expand Down Expand Up @@ -168,7 +229,7 @@ def from_list_dicts_to_table(list_item):
for item in list_item:
row = {"values": []}
for item_key in sorted(item):
row["values"].append({"string_value": item[item_key]})
row["values"].append({"string_value": str(item[item_key])})
rows.append(row)
table_item = {"table": {"headers": headers, "rows": rows}}
return table_item
Expand Down Expand Up @@ -197,20 +258,15 @@ class UnmaskDetectedDetails(PTransform):
def __init__(
self,
project=None,
location="global",
location='us-east4',
template_name=None,
reidentification_config=None,
timeout=None):

self.config = {}
self.project = project
self.timeout = timeout
self.location = location

if template_name is not None:
self.config['reidentify_template_name'] = template_name
else:
self.config['reidentify_config'] = reidentification_config
self.config['reidentify_template_name'] = template_name

def expand(self, pcoll):
if self.project is None:
Expand All @@ -219,7 +275,8 @@ def expand(self, pcoll):
if self.project is None:
raise ValueError(
'GCP project name needs to be specified '
'in "project" pipeline option')
'in "dlp_project" pipeline option.'
)
return (
pcoll
| ParDo(_ReidentifyFn(
Expand Down Expand Up @@ -248,7 +305,6 @@ def __init__(
self.params = {}

def setup(self):
from google.cloud import dlp_v2
if self.client is None:
self.client = dlp_v2.DlpServiceClient()
self.params = {
Expand All @@ -266,6 +322,76 @@ def process(self, element, **kwargs):
yield operation.item


@experimental()
class MaskDetectedDetails(PTransform):

def __init__(
self,
project=None,
location='us-east4',
template_name=None,
timeout=None):

self.config = {}
self.project = project
self.timeout = timeout
self.location = location
self.config['deidentify_template_name'] = template_name

def expand(self, pcoll):
if self.project is None:
self.project = pcoll.pipeline.options.view_as(
GoogleCloudOptions).project
if self.project is None:
raise ValueError(
'GCP project name needs to be specified '
'in "dlp_project" pipeline option.'
)
return (
pcoll
| ParDo(_DeidentifyFn(
self.config,
self.timeout,
self.project,
self.location
)))


class _DeidentifyFn(DoFn):

def __init__(
self,
config=None,
timeout=None,
project=None,
location=None,
client=None
):
self.config = config
self.timeout = timeout
self.client = client
self.project = project
self.location = location
self.params = {}

def setup(self):
if self.client is None:
self.client = dlp_v2.DlpServiceClient()
self.params = {
'timeout': self.timeout,
'parent': "projects/{}/locations/{}".format(
self.project,
self.location
)
}
self.params.update(self.config)

def process(self, element, **kwargs):
operation = self.client.deidentify_content(
item=element, **self.params)
yield operation.item


if __name__ == '__main__':
logging.getLogger().setLevel(logging.INFO)
run()
Original file line number Diff line number Diff line change
@@ -1,11 +1,18 @@
{
"name": "Streaming BiqQuery data through DLP to reidentify data flex template",
"description": "Python flex template for streaming BiqQuery data through regional DLP to reidentify the data.",
"name": "Streaming BiqQuery data through DLP to transform data flex template",
"description": "Python flex template for streaming BiqQuery data through regional DLP to transform the data.",
"parameters": [
{
"name": "query",
"label": "SQL query to retrieve input data.",
"helpText": "Input query to retrieve data from Dataset. Example: `SELECT * FROM PROJECT.DATASET.TABLE LIMIT 100`. You need to specify either an input-table or query. It is recommended to use query when you want to use a public dataset.",
"isOptional": true
},
{
"name": "input_table",
"label": "BigQuery input table name.",
"helpText": "Input BigQuery table for results specified as: PROJECT:DATASET.TABLE or DATASET.TABLE."
"helpText": "Input BigQuery table for results specified as: PROJECT:DATASET.TABLE or DATASET.TABLE. You need to specify either an input-table or query. It is recommended to use input-table for when you have your own dataset.",
"isOptional": true
},
{
"name": "deidentification_template_name",
Expand Down Expand Up @@ -46,6 +53,14 @@
"name": "output_table",
"label": "BigQuery output table name.",
"helpText": "Output BigQuery table with reidentified data as: PROJECT:DATASET.TABLE or DATASET.TABLE."
},
{
"name": "dlp_transform",
"label": "DLP transformation type.",
"helpText": "Choose between DLP DE-IDENTIFY or RE-IDENTIFY.",
"regexes": [
"RE-IDENTIFY|DE-IDENTIFY"
]
}
]
}
Loading

0 comments on commit 29079d6

Please sign in to comment.