Skip to content

Commit

Permalink
feat: create dataflow flex template module (#168)
Browse files Browse the repository at this point in the history
* dataflow flex template module

* add suport to set network tags

* add value validation to job_language input variable

* fix python pipeline options

* use new dataflow template module in examples

* update variables descriptions

* fix code review issues
  • Loading branch information
daniel-cit authored Oct 13, 2021
1 parent 1abe7c1 commit c0398be
Show file tree
Hide file tree
Showing 8 changed files with 352 additions and 41 deletions.
39 changes: 18 additions & 21 deletions examples/bigquery_confidential_data/main.tf
Original file line number Diff line number Diff line change
Expand Up @@ -114,13 +114,19 @@ resource "google_artifact_registry_repository_iam_member" "confidential_docker_r
member = "serviceAccount:${module.secured_data_warehouse.confidential_dataflow_controller_service_account_email}"
}

resource "google_dataflow_flex_template_job" "regional_deid" {
provider = google-beta
module "regional_deid" {
source = "../../modules/dataflow-flex-job"

project = var.data_ingestion_project_id
project_id = var.data_ingestion_project_id
name = "regional-flex-java-gcs-dlp-bq"
container_spec_gcs_path = var.java_de_identify_template_gs_path
region = local.location
service_account_email = module.secured_data_warehouse.dataflow_controller_service_account_email
subnetwork_self_link = var.data_ingestion_subnets_self_link
kms_key_name = module.secured_data_warehouse.cmek_ingestion_crypto_key
temp_location = "gs://${module.secured_data_warehouse.data_ingest_dataflow_bucket_name}/tmp/"
staging_location = "gs://${module.secured_data_warehouse.data_ingest_dataflow_bucket_name}/staging/"
max_workers = 5

parameters = {
inputFilePattern = "gs://${module.secured_data_warehouse.data_ingest_bucket_name}/${local.cc_file_name}"
Expand All @@ -130,13 +136,6 @@ resource "google_dataflow_flex_template_job" "regional_deid" {
dlpProjectId = var.data_governance_project_id
dlpLocation = local.location
deidentifyTemplateName = module.de_identification_template.template_full_path
serviceAccount = module.secured_data_warehouse.dataflow_controller_service_account_email
subnetwork = var.data_ingestion_subnets_self_link
dataflowKmsKey = module.secured_data_warehouse.cmek_ingestion_crypto_key
tempLocation = "gs://${module.secured_data_warehouse.data_ingest_dataflow_bucket_name}/tmp/"
stagingLocation = "gs://${module.secured_data_warehouse.data_ingest_dataflow_bucket_name}/staging/"
maxNumWorkers = 5
usePublicIps = false
}

depends_on = [
Expand All @@ -149,17 +148,22 @@ resource "time_sleep" "wait_de_identify_job_execution" {
create_duration = "600s"

depends_on = [
google_dataflow_flex_template_job.regional_deid
module.regional_deid
]
}

resource "google_dataflow_flex_template_job" "regional_reid" {
provider = google-beta
module "regional_reid" {
source = "../../modules/dataflow-flex-job"

project = var.confidential_data_project_id
project_id = var.confidential_data_project_id
name = "dataflow-flex-regional-dlp-reid-job"
container_spec_gcs_path = var.java_re_identify_template_gs_path
region = local.location
service_account_email = module.secured_data_warehouse.confidential_dataflow_controller_service_account_email
subnetwork_self_link = var.confidential_subnets_self_link
kms_key_name = module.secured_data_warehouse.cmek_reidentification_crypto_key
temp_location = "gs://${module.secured_data_warehouse.confidential_data_dataflow_bucket_name}/tmp/"
staging_location = "gs://${module.secured_data_warehouse.confidential_data_dataflow_bucket_name}/staging/"

parameters = {
inputBigQueryTable = "${var.non_confidential_project_id}:${local.non_confidential_dataset_id}.${trimsuffix(local.cc_file_name, ".csv")}"
Expand All @@ -168,13 +172,6 @@ resource "google_dataflow_flex_template_job" "regional_reid" {
dlpLocation = local.location
dlpProjectId = var.data_governance_project_id
confidentialDataProjectId = var.confidential_data_project_id
serviceAccount = module.secured_data_warehouse.confidential_dataflow_controller_service_account_email
subnetwork = var.confidential_subnets_self_link
dataflowKmsKey = module.secured_data_warehouse.cmek_reidentification_crypto_key
tempLocation = "gs://${module.secured_data_warehouse.confidential_data_dataflow_bucket_name}/tmp/"
stagingLocation = "gs://${module.secured_data_warehouse.confidential_data_dataflow_bucket_name}/staging/"
usePublicIps = false
enableStreamingEngine = true
}

depends_on = [
Expand Down
20 changes: 10 additions & 10 deletions examples/dataflow-with-dlp/main.tf
Original file line number Diff line number Diff line change
Expand Up @@ -95,13 +95,19 @@ resource "google_artifact_registry_repository_iam_member" "docker_reader" {
]
}

resource "google_dataflow_flex_template_job" "regional_dlp" {
provider = google-beta
module "regional_dlp" {
source = "../../modules/dataflow-flex-job"

project = var.data_ingestion_project_id
project_id = var.data_ingestion_project_id
name = "regional-flex-java-gcs-dlp-bq"
container_spec_gcs_path = var.de_identify_template_gs_path
region = local.region
service_account_email = module.data_ingestion.dataflow_controller_service_account_email
subnetwork_self_link = var.subnetwork_self_link
kms_key_name = module.data_ingestion.cmek_ingestion_crypto_key
temp_location = "gs://${module.data_ingestion.data_ingest_dataflow_bucket_name}/tmp/"
staging_location = "gs://${module.data_ingestion.data_ingest_dataflow_bucket_name}/staging/"
max_workers = 5

parameters = {
inputFilePattern = "gs://${module.data_ingestion.data_ingest_bucket_name}/cc_records.csv"
Expand All @@ -111,13 +117,7 @@ resource "google_dataflow_flex_template_job" "regional_dlp" {
dlpProjectId = var.data_governance_project_id
dlpLocation = local.region
deidentifyTemplateName = module.de_identification_template.template_full_path
serviceAccount = module.data_ingestion.dataflow_controller_service_account_email
subnetwork = var.subnetwork_self_link
dataflowKmsKey = module.data_ingestion.cmek_ingestion_crypto_key
tempLocation = "gs://${module.data_ingestion.data_ingest_dataflow_bucket_name}/tmp/"
stagingLocation = "gs://${module.data_ingestion.data_ingest_dataflow_bucket_name}/staging/"
maxNumWorkers = 5
usePublicIps = "false"

}

depends_on = [
Expand Down
20 changes: 10 additions & 10 deletions examples/regional-dlp/main.tf
Original file line number Diff line number Diff line change
Expand Up @@ -84,14 +84,20 @@ resource "google_artifact_registry_repository_iam_member" "python_reader" {
]
}

module "regional_dlp" {
source = "../../modules/dataflow-flex-job"

resource "google_dataflow_flex_template_job" "regional_dlp" {
provider = google-beta

project = var.data_ingestion_project_id
project_id = var.data_ingestion_project_id
name = "regional-flex-python-pubsub-dlp-bq"
container_spec_gcs_path = var.flex_template_gs_path
job_language = "PYTHON"
region = var.location
service_account_email = module.data_ingestion.dataflow_controller_service_account_email
subnetwork_self_link = var.subnetwork_self_link
kms_key_name = module.data_ingestion.cmek_ingestion_crypto_key
temp_location = "gs://${module.data_ingestion.data_ingest_dataflow_bucket_name}/tmp/"
staging_location = "gs://${module.data_ingestion.data_ingest_dataflow_bucket_name}/staging/"
enable_streaming_engine = false

parameters = {
input_topic = "projects/${var.data_ingestion_project_id}/topics/${module.data_ingestion.data_ingest_topic_name}"
Expand All @@ -100,12 +106,6 @@ resource "google_dataflow_flex_template_job" "regional_dlp" {
dlp_project = var.data_governance_project_id
bq_schema = local.bq_schema
output_table = "${var.datalake_project_id}:${module.data_ingestion.data_ingest_bigquery_dataset.dataset_id}.classical_books"
service_account_email = module.data_ingestion.dataflow_controller_service_account_email
subnetwork = var.subnetwork_self_link
dataflow_kms_key = module.data_ingestion.cmek_ingestion_crypto_key
temp_location = "gs://${module.data_ingestion.data_ingest_dataflow_bucket_name}/tmp/"
staging_location = "gs://${module.data_ingestion.data_ingest_dataflow_bucket_name}/staging/"
no_use_public_ips = "true"
}

depends_on = [
Expand Down
68 changes: 68 additions & 0 deletions modules/dataflow-flex-job/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,68 @@
# Dataflow Flex Template Job Module

This module handles opinionated Dataflow flex template job configuration and deployments.

## Usage

Before using this module, one should get familiar with the `google_dataflow_flex_template_job`’s [Note on "destroy"/"apply"](https://registry.terraform.io/providers/hashicorp/google/latest/docs/resources/dataflow_flex_template_job#note-on-destroy--apply) as the behavior is atypical when compared to other resources.

### Assumption

One assumption is that, before using this module, you already have a working Dataflow flex job template(s) in a GCS location.
If you are not using public IPs, you need to [Configure Private Google Access](https://cloud.google.com/vpc/docs/configure-private-google-access)
on the VPC used by Dataflow.

This is a simple usage:

```hcl
module "dataflow-flex-job" {
source = "terraform-google-modules/secured-data-warehouse/google//modules/dataflow-flex-job"
version = "~> 0.1"
project_id = "<project_id>"
region = "us-east4"
name = "dataflow-flex-job-00001"
container_spec_gcs_path = "gs://<path-to-template>"
staging_location = "gs://<gcs_path_staging_data_bucket>"
temp_location = "gs://<gcs_path_temp_data_bucket>"
subnetwork_self_link = "<subnetwork-self-link>"
kms_key_name = "<fully-qualified-kms-key-id>"
service_account_email = "<dataflow-controller-service-account-email>"
parameters = {
firstParameter = "ONE",
secondParameter = "TWO
}
}
```

<!-- BEGINNING OF PRE-COMMIT-TERRAFORM DOCS HOOK -->
## Inputs

| Name | Description | Type | Default | Required |
|------|-------------|------|---------|:--------:|
| container\_spec\_gcs\_path | The GCS path to the Dataflow job flex template. | `string` | n/a | yes |
| enable\_streaming\_engine | Enable/disable the use of Streaming Engine for the job. Note that Streaming Engine is enabled by default for pipelines developed against the Beam SDK for Python v2.21.0 or later when using Python 3. | `bool` | `true` | no |
| job\_language | Language of the flex template code. Options are 'JAVA' or 'PYTHON'. | `string` | `"JAVA"` | no |
| kms\_key\_name | The name for the Cloud KMS key for the job. Key format is: `projects/PROJECT_ID/locations/LOCATION/keyRings/KEY_RING/cryptoKeys/KEY`. | `string` | n/a | yes |
| max\_workers | The number of workers permitted to work on the job. More workers may improve processing speed at additional cost. | `number` | `1` | no |
| name | The name of the dataflow flex job. | `string` | n/a | yes |
| network\_tags | Network TAGs to be added to the VM instances. Python flex template jobs are only able to set network tags for the launcher VM. For the harness VM it is necessary to configure your firewall rule to use the network tag 'dataflow'. | `list(string)` | `[]` | no |
| on\_delete | One of drain or cancel. Specifies behavior of deletion during terraform destroy. The default is cancel. See https://cloud.google.com/dataflow/docs/guides/stopping-a-pipeline . | `string` | `"cancel"` | no |
| parameters | Key/Value pairs to be passed to the Dataflow job (as used in the template). | `map(any)` | `{}` | no |
| project\_id | The project in which the resource belongs. If it is not provided, the provider project is used. | `string` | n/a | yes |
| region | The region in which the created job should run. | `string` | n/a | yes |
| service\_account\_email | The Service Account email that will be used to identify the VMs in which the jobs are running. See https://cloud.google.com/dataflow/docs/concepts/security-and-permissions#worker_service_account . | `string` | n/a | yes |
| staging\_location | GCS path for staging code packages needed by workers. | `string` | n/a | yes |
| subnetwork\_self\_link | The subnetwork self link to which VMs will be assigned. | `string` | n/a | yes |
| temp\_location | GCS path for saving temporary workflow jobs. | `string` | n/a | yes |
| use\_public\_ips | If VM instances should used public IPs. | `string` | `false` | no |

## Outputs

| Name | Description |
|------|-------------|
| job\_id | The unique ID of this job. |
| state | The current state of the resource, selected from the JobState enum. See https://cloud.google.com/dataflow/docs/reference/rest/v1b3/projects.jobs#Job.JobState . |

<!-- END OF PRE-COMMIT-TERRAFORM DOCS HOOK -->
61 changes: 61 additions & 0 deletions modules/dataflow-flex-job/main.tf
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
/**
* Copyright 2021 Google LLC
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

locals {
java_pipeline_options = {
serviceAccount = var.service_account_email
subnetwork = var.subnetwork_self_link
dataflowKmsKey = var.kms_key_name
tempLocation = var.temp_location
stagingLocation = var.staging_location
maxNumWorkers = var.max_workers
usePublicIps = var.use_public_ips
enableStreamingEngine = var.enable_streaming_engine
}

python_pipeline_options = {
service_account_email = var.service_account_email
subnetwork = var.subnetwork_self_link
dataflow_kms_key = var.kms_key_name
temp_location = var.temp_location
staging_location = var.staging_location
max_num_workers = var.max_workers
no_use_public_ips = !var.use_public_ips
enable_streaming_engine = var.enable_streaming_engine
}

pipeline_options = var.job_language == "JAVA" ? local.java_pipeline_options : local.python_pipeline_options

network_tags = join(";", var.network_tags)
network_tags_experiment_java = local.network_tags != "" ? "use_network_tags=${local.network_tags},use_network_tags_for_flex_templates=${local.network_tags}" : ""
network_tags_experiment_python = local.network_tags != "" ? "use_network_tags_for_flex_templates=${local.network_tags}" : ""
network_tags_experiment = var.job_language == "JAVA" ? local.network_tags_experiment_java : local.network_tags_experiment_python
kms_on_streaming_engine_experiment = var.kms_key_name != null && var.enable_streaming_engine ? "enable_kms_on_streaming_engine" : ""
experiment_options = local.network_tags_experiment != "" || local.kms_on_streaming_engine_experiment != "" ? join(",", compact([local.kms_on_streaming_engine_experiment, local.network_tags_experiment])) : ""
experiments = local.experiment_options != "" ? { experiments = local.experiment_options } : {}
}

resource "google_dataflow_flex_template_job" "dataflow_flex_template_job" {
provider = google-beta

project = var.project_id
name = var.name
container_spec_gcs_path = var.container_spec_gcs_path
region = var.region
on_delete = var.on_delete

parameters = merge(var.parameters, local.pipeline_options, local.experiments)
}
26 changes: 26 additions & 0 deletions modules/dataflow-flex-job/outputs.tf
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
/**
* Copyright 2021 Google LLC
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

output "job_id" {
description = "The unique ID of this job."
value = google_dataflow_flex_template_job.dataflow_flex_template_job.job_id

}

output "state" {
description = "The current state of the resource, selected from the JobState enum. See https://cloud.google.com/dataflow/docs/reference/rest/v1b3/projects.jobs#Job.JobState ."
value = google_dataflow_flex_template_job.dataflow_flex_template_job.state
}
Loading

0 comments on commit c0398be

Please sign in to comment.