Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add skip_wait_on_job_termination option for dataflow job resources #3868

Closed
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
25 changes: 25 additions & 0 deletions google-beta/resource_dataflow_flex_template_job.go
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,13 @@ func resourceDataflowFlexTemplateJob() *schema.Resource {
Type: schema.TypeString,
Computed: true,
},

"skip_wait_on_job_termination": {
Type: schema.TypeBool,
Optional: true,
Default: false,
Description: `If true, treat DRAINING and CANCELLING as terminal job states and do not wait for further changes before removing from terraform state and moving on. WARNING: this will lead to job name conflicts if you do not make sure that the job names are different, e.g. by using a random_id.`,
},
},
UseJSONNumber: true,
}
Expand Down Expand Up @@ -333,6 +340,12 @@ func resourceDataflowFlexTemplateJobDelete(d *schema.ResourceData, meta interfac

// Wait for state to reach terminal state (canceled/drained/done)
_, ok := dataflowTerminalStatesMap[d.Get("state").(string)]
if !ok {
if d.Get("skip_wait_on_job_termination").(bool) {
_, ok = dataflowTerminatingStatesMap[d.Get("state").(string)]
}
}

for !ok {
log.Printf("[DEBUG] Waiting for job with job state %q to terminate...", d.Get("state").(string))
time.Sleep(5 * time.Second)
Expand All @@ -342,6 +355,11 @@ func resourceDataflowFlexTemplateJobDelete(d *schema.ResourceData, meta interfac
return fmt.Errorf("Error while reading job to see if it was properly terminated: %v", err)
}
_, ok = dataflowTerminalStatesMap[d.Get("state").(string)]
if !ok {
if d.Get("skip_wait_on_job_termination").(bool) {
_, ok = dataflowTerminatingStatesMap[d.Get("state").(string)]
}
}
}

// Only remove the job from state if it's actually successfully canceled.
Expand All @@ -350,5 +368,12 @@ func resourceDataflowFlexTemplateJobDelete(d *schema.ResourceData, meta interfac
d.SetId("")
return nil
}
if d.Get("skip_wait_on_job_termination").(bool) {
if _, ok := dataflowTerminatingStatesMap[d.Get("state").(string)]; ok {
log.Printf("[DEBUG] Removing dataflow job with final state %q", d.Get("state").(string))
d.SetId("")
return nil
}
}
return fmt.Errorf("Unable to cancel the dataflow job '%s' - final state was %q.", d.Id(), d.Get("state").(string))
}
25 changes: 25 additions & 0 deletions google-beta/resource_dataflow_job.go
Original file line number Diff line number Diff line change
Expand Up @@ -209,6 +209,13 @@ func resourceDataflowJob() *schema.Resource {
Optional: true,
Description: `Indicates if the job should use the streaming engine feature.`,
},

"skip_wait_on_job_termination": {
Type: schema.TypeBool,
Optional: true,
Default: false,
Description: `If true, treat DRAINING and CANCELLING as terminal job states and do not wait for further changes before removing from terraform state and moving on. WARNING: this will lead to job name conflicts if you do not make sure that the job names are different, e.g. by using a random_id.`,
},
},
UseJSONNumber: true,
}
Expand Down Expand Up @@ -476,6 +483,12 @@ func resourceDataflowJobDelete(d *schema.ResourceData, meta interface{}) error {

// Wait for state to reach terminal state (canceled/drained/done)
_, ok := dataflowTerminalStatesMap[d.Get("state").(string)]
if !ok {
if d.Get("skip_wait_on_job_termination").(bool) {
_, ok = dataflowTerminatingStatesMap[d.Get("state").(string)]
}
}

for !ok {
log.Printf("[DEBUG] Waiting for job with job state %q to terminate...", d.Get("state").(string))
time.Sleep(5 * time.Second)
Expand All @@ -485,6 +498,11 @@ func resourceDataflowJobDelete(d *schema.ResourceData, meta interface{}) error {
return fmt.Errorf("Error while reading job to see if it was properly terminated: %v", err)
}
_, ok = dataflowTerminalStatesMap[d.Get("state").(string)]
if !ok {
if d.Get("skip_wait_on_job_termination").(bool) {
_, ok = dataflowTerminatingStatesMap[d.Get("state").(string)]
}
}
}

// Only remove the job from state if it's actually successfully canceled.
Expand All @@ -493,6 +511,13 @@ func resourceDataflowJobDelete(d *schema.ResourceData, meta interface{}) error {
d.SetId("")
return nil
}
if d.Get("skip_wait_on_job_termination").(bool) {
if _, ok := dataflowTerminatingStatesMap[d.Get("state").(string)]; ok {
log.Printf("[DEBUG] Removing dataflow job with final state %q", d.Get("state").(string))
d.SetId("")
return nil
}
}
return fmt.Errorf("Unable to cancel the dataflow job '%s' - final state was %q.", d.Id(), d.Get("state").(string))
}

Expand Down
31 changes: 31 additions & 0 deletions website/docs/r/dataflow_flex_template_job.html.markdown
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,33 @@ is "cancelled", but if a user sets `on_delete` to `"drain"` in the
configuration, you may experience a long wait for your `terraform destroy` to
complete.

You can potentially short-circuit the wait by setting `skip_wait_for_job_termination`
to `true`, but beware that unless you take active steps to ensure that the job
`name` parameter changes between instances, the name will conflict and the launch
of the new job will fail. One way to do this is with a
[random_id](https://registry.terraform.io/providers/hashicorp/random/latest/docs/resources/id)
resource, for example:

```hcl
resource "random_id" "big_data_job_name_suffix" {
byte_length = 4
keepers = {
region = var.region
}
}

resource "google_dataflow_flex_template_job" "big_data_job" {
provider = google-beta
name = "dataflow-flextemplates-job-${random_id.big_data_job_name_suffix.dec}"
region = var.region
container_spec_gcs_path = "gs://my-bucket/templates/template.json"
skip_wait_for_job_termination = true
parameters = {
inputSubscription = "messages"
}
}
```

## Argument Reference

The following arguments are supported:
Expand All @@ -74,6 +101,10 @@ labels will be ignored to prevent diffs on re-apply.
* `on_delete` - (Optional) One of "drain" or "cancel". Specifies behavior of
deletion during `terraform destroy`. See above note.

* `skip_wait_for_job_termination` - (Optional) If set to `true`, terraform will
treat `DRAINING` and `CANCELLING` as terminal states when deleting the resource,
and will remove the resource from terraform state and move on. See above note.

* `project` - (Optional) The project in which the resource belongs. If it is not
provided, the provider project is used.

Expand Down
30 changes: 30 additions & 0 deletions website/docs/r/dataflow_job.html.markdown
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,35 @@ The Dataflow resource is considered 'existing' while it is in a nonterminal stat

A Dataflow job which is 'destroyed' may be "cancelled" or "drained". If "cancelled", the job terminates - any data written remains where it is, but no new data will be processed. If "drained", no new data will enter the pipeline, but any data currently in the pipeline will finish being processed. The default is "drain". When `on_delete` is set to `"drain"` in the configuration, you may experience a long wait for your `terraform destroy` to complete.

You can potentially short-circuit the wait by setting `skip_wait_for_job_termination`
to `true`, but beware that unless you take active steps to ensure that the job
`name` parameter changes between instances, the name will conflict and the launch
of the new job will fail. One way to do this is with a
[random_id](https://registry.terraform.io/providers/hashicorp/random/latest/docs/resources/id)
resource, for example:

```hcl
resource "random_id" "big_data_job_name_suffix" {
byte_length = 4
keepers = {
region = var.region
}
}

resource "google_dataflow_job" "big_data_job" {
name = "dataflow-job-${random_id.big_data_job_name_suffix.dec}"
region = var.region
template_gcs_path = "gs://my-bucket/templates/template.json"
skip_wait_for_job_termination = true
parameters = {
inputSubscription = "messages"
}

on_delete = "drain"
skip_wait_for_job_termination = true
}
```

## Argument Reference

The following arguments are supported:
Expand All @@ -83,6 +112,7 @@ The following arguments are supported:
* `transform_name_mapping` - (Optional) Only applicable when updating a pipeline. Map of transform name prefixes of the job to be replaced with the corresponding name prefixes of the new job. This field is not used outside of update.
* `max_workers` - (Optional) The number of workers permitted to work on the job. More workers may improve processing speed at additional cost.
* `on_delete` - (Optional) One of "drain" or "cancel". Specifies behavior of deletion during `terraform destroy`. See above note.
* `skip_wait_for_job_termination` - (Optional) If set to `true`, terraform will treat `DRAINING` and `CANCELLING` as terminal states when deleting the resource, and will remove the resource from terraform state and move on. See above note.
* `project` - (Optional) The project in which the resource belongs. If it is not provided, the provider project is used.
* `zone` - (Optional) The zone in which the created job should run. If it is not provided, the provider zone is used.
* `region` - (Optional) The region in which the created job should run.
Expand Down