Skip to content

Commit

Permalink
Add skip_wait_for_job_termination option for dataflow job resources
Browse files Browse the repository at this point in the history
  • Loading branch information
n-oden committed Dec 10, 2021
1 parent 4ea6fb2 commit 72dca52
Show file tree
Hide file tree
Showing 4 changed files with 111 additions and 0 deletions.
25 changes: 25 additions & 0 deletions google-beta/resource_dataflow_flex_template_job.go
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,13 @@ func resourceDataflowFlexTemplateJob() *schema.Resource {
Type: schema.TypeString,
Computed: true,
},

"skip_wait_on_job_termination": {
Type: schema.TypeBool,
Optional: true,
Default: false,
Description: `If true, treat DRAINING and CANCELLING as terminal job states and do not wait for further changes before removing from terraform state and moving on. WARNING: this will lead to job name conflicts if you do not make sure that the job names are different, e.g. by using a random_id.`,
},
},
UseJSONNumber: true,
}
Expand Down Expand Up @@ -333,6 +340,12 @@ func resourceDataflowFlexTemplateJobDelete(d *schema.ResourceData, meta interfac

// Wait for state to reach terminal state (canceled/drained/done)
_, ok := dataflowTerminalStatesMap[d.Get("state").(string)]
if !ok {
if d.Get("skip_wait_on_job_termination").(bool) {
_, ok = dataflowTerminatingStatesMap[d.Get("state").(string)]
}
}

for !ok {
log.Printf("[DEBUG] Waiting for job with job state %q to terminate...", d.Get("state").(string))
time.Sleep(5 * time.Second)
Expand All @@ -342,6 +355,11 @@ func resourceDataflowFlexTemplateJobDelete(d *schema.ResourceData, meta interfac
return fmt.Errorf("Error while reading job to see if it was properly terminated: %v", err)
}
_, ok = dataflowTerminalStatesMap[d.Get("state").(string)]
if !ok {
if d.Get("skip_wait_on_job_termination").(bool) {
_, ok = dataflowTerminatingStatesMap[d.Get("state").(string)]
}
}
}

// Only remove the job from state if it's actually successfully canceled.
Expand All @@ -350,5 +368,12 @@ func resourceDataflowFlexTemplateJobDelete(d *schema.ResourceData, meta interfac
d.SetId("")
return nil
}
if d.Get("skip_wait_on_job_termination").(bool) {
if _, ok := dataflowTerminatingStatesMap[d.Get("state").(string)]; ok {
log.Printf("[DEBUG] Removing dataflow job with final state %q", d.Get("state").(string))
d.SetId("")
return nil
}
}
return fmt.Errorf("Unable to cancel the dataflow job '%s' - final state was %q.", d.Id(), d.Get("state").(string))
}
25 changes: 25 additions & 0 deletions google-beta/resource_dataflow_job.go
Original file line number Diff line number Diff line change
Expand Up @@ -209,6 +209,13 @@ func resourceDataflowJob() *schema.Resource {
Optional: true,
Description: `Indicates if the job should use the streaming engine feature.`,
},

"skip_wait_on_job_termination": {
Type: schema.TypeBool,
Optional: true,
Default: false,
Description: `If true, treat DRAINING and CANCELLING as terminal job states and do not wait for further changes before removing from terraform state and moving on. WARNING: this will lead to job name conflicts if you do not make sure that the job names are different, e.g. by using a random_id.`,
},
},
UseJSONNumber: true,
}
Expand Down Expand Up @@ -476,6 +483,12 @@ func resourceDataflowJobDelete(d *schema.ResourceData, meta interface{}) error {

// Wait for state to reach terminal state (canceled/drained/done)
_, ok := dataflowTerminalStatesMap[d.Get("state").(string)]
if !ok {
if d.Get("skip_wait_on_job_termination").(bool) {
_, ok = dataflowTerminatingStatesMap[d.Get("state").(string)]
}
}

for !ok {
log.Printf("[DEBUG] Waiting for job with job state %q to terminate...", d.Get("state").(string))
time.Sleep(5 * time.Second)
Expand All @@ -485,6 +498,11 @@ func resourceDataflowJobDelete(d *schema.ResourceData, meta interface{}) error {
return fmt.Errorf("Error while reading job to see if it was properly terminated: %v", err)
}
_, ok = dataflowTerminalStatesMap[d.Get("state").(string)]
if !ok {
if d.Get("skip_wait_on_job_termination").(bool) {
_, ok = dataflowTerminatingStatesMap[d.Get("state").(string)]
}
}
}

// Only remove the job from state if it's actually successfully canceled.
Expand All @@ -493,6 +511,13 @@ func resourceDataflowJobDelete(d *schema.ResourceData, meta interface{}) error {
d.SetId("")
return nil
}
if d.Get("skip_wait_on_job_termination").(bool) {
if _, ok := dataflowTerminatingStatesMap[d.Get("state").(string)]; ok {
log.Printf("[DEBUG] Removing dataflow job with final state %q", d.Get("state").(string))
d.SetId("")
return nil
}
}
return fmt.Errorf("Unable to cancel the dataflow job '%s' - final state was %q.", d.Id(), d.Get("state").(string))
}

Expand Down
31 changes: 31 additions & 0 deletions website/docs/r/dataflow_flex_template_job.html.markdown
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,33 @@ is "cancelled", but if a user sets `on_delete` to `"drain"` in the
configuration, you may experience a long wait for your `terraform destroy` to
complete.

You can potentially short-circuit the wait by setting `skip_wait_for_job_termination`
to `true`, but beware that unless you take active steps to ensure that the job
`name` parameter changes between instances, the name will conflict and the launch
of the new job will fail. One way to do this is with a
[random_id](https://registry.terraform.io/providers/hashicorp/random/latest/docs/resources/id)
resource, for example:

```hcl
resource "random_id" "big_data_job_name_suffix" {
byte_length = 4
keepers = {
region = var.region
}
}
resource "google_dataflow_flex_template_job" "big_data_job" {
provider = google-beta
name = "dataflow-flextemplates-job-${random_id.big_data_job_name_suffix.dec}"
region = var.region
container_spec_gcs_path = "gs://my-bucket/templates/template.json"
skip_wait_for_job_termination = true
parameters = {
inputSubscription = "messages"
}
}
```

## Argument Reference

The following arguments are supported:
Expand All @@ -74,6 +101,10 @@ labels will be ignored to prevent diffs on re-apply.
* `on_delete` - (Optional) One of "drain" or "cancel". Specifies behavior of
deletion during `terraform destroy`. See above note.

* `skip_wait_for_job_termination` - (Optional) If set to `true`, terraform will
treat `DRAINING` and `CANCELLING` as terminal states when deleting the resource,
and will remove the resource from terraform state and move on. See above note.

* `project` - (Optional) The project in which the resource belongs. If it is not
provided, the provider project is used.

Expand Down
30 changes: 30 additions & 0 deletions website/docs/r/dataflow_job.html.markdown
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,35 @@ The Dataflow resource is considered 'existing' while it is in a nonterminal stat

A Dataflow job which is 'destroyed' may be "cancelled" or "drained". If "cancelled", the job terminates - any data written remains where it is, but no new data will be processed. If "drained", no new data will enter the pipeline, but any data currently in the pipeline will finish being processed. The default is "drain". When `on_delete` is set to `"drain"` in the configuration, you may experience a long wait for your `terraform destroy` to complete.

You can potentially short-circuit the wait by setting `skip_wait_for_job_termination`
to `true`, but beware that unless you take active steps to ensure that the job
`name` parameter changes between instances, the name will conflict and the launch
of the new job will fail. One way to do this is with a
[random_id](https://registry.terraform.io/providers/hashicorp/random/latest/docs/resources/id)
resource, for example:

```hcl
resource "random_id" "big_data_job_name_suffix" {
byte_length = 4
keepers = {
region = var.region
}
}
resource "google_dataflow_job" "big_data_job" {
name = "dataflow-job-${random_id.big_data_job_name_suffix.dec}"
region = var.region
template_gcs_path = "gs://my-bucket/templates/template.json"
skip_wait_for_job_termination = true
parameters = {
inputSubscription = "messages"
}
on_delete = "drain"
skip_wait_for_job_termination = true
}
```

## Argument Reference

The following arguments are supported:
Expand All @@ -83,6 +112,7 @@ The following arguments are supported:
* `transform_name_mapping` - (Optional) Only applicable when updating a pipeline. Map of transform name prefixes of the job to be replaced with the corresponding name prefixes of the new job. This field is not used outside of update.
* `max_workers` - (Optional) The number of workers permitted to work on the job. More workers may improve processing speed at additional cost.
* `on_delete` - (Optional) One of "drain" or "cancel". Specifies behavior of deletion during `terraform destroy`. See above note.
* `skip_wait_for_job_termination` - (Optional) If set to `true`, terraform will treat `DRAINING` and `CANCELLING` as terminal states when deleting the resource, and will remove the resource from terraform state and move on. See above note.
* `project` - (Optional) The project in which the resource belongs. If it is not provided, the provider project is used.
* `zone` - (Optional) The zone in which the created job should run. If it is not provided, the provider zone is used.
* `region` - (Optional) The region in which the created job should run.
Expand Down

0 comments on commit 72dca52

Please sign in to comment.