Skip to content

Commit

Permalink
Add skip_wait_on_job_termination option for dataflow job resources (#…
Browse files Browse the repository at this point in the history
  • Loading branch information
n-oden authored Apr 8, 2022
1 parent 93f7ecf commit 5a5fe34
Show file tree
Hide file tree
Showing 5 changed files with 181 additions and 21 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,13 @@ func resourceDataflowFlexTemplateJob() *schema.Resource {
Type: schema.TypeString,
Computed: true,
},

"skip_wait_on_job_termination": {
Type: schema.TypeBool,
Optional: true,
Default: false,
Description: `If true, treat DRAINING and CANCELLING as terminal job states and do not wait for further changes before removing from terraform state and moving on. WARNING: this will lead to job name conflicts if you do not ensure that the job names are different, e.g. by embedding a release ID or by using a random_id.`,
},
},
UseJSONNumber: true,
}
Expand Down Expand Up @@ -167,7 +174,7 @@ func resourceDataflowFlexTemplateJobRead(d *schema.ResourceData, meta interface{
return fmt.Errorf("Error setting labels: %s", err)
}

if _, ok := dataflowTerminalStatesMap[job.CurrentState]; ok {
if ok := shouldStopDataflowJobDeleteQuery(job.CurrentState, d.Get("skip_wait_on_job_termination").(bool)); ok {
log.Printf("[DEBUG] Removing resource '%s' because it is in state %s.\n", job.Name, job.CurrentState)
d.SetId("")
return nil
Expand Down Expand Up @@ -333,21 +340,23 @@ func resourceDataflowFlexTemplateJobDelete(d *schema.ResourceData, meta interfac
return err
}

// Wait for state to reach terminal state (canceled/drained/done)
_, ok := dataflowTerminalStatesMap[d.Get("state").(string)]
for !ok {
// Wait for state to reach terminal state (canceled/drained/done plus cancelling/draining if skipWait)
skipWait := d.Get("skip_wait_on_job_termination").(bool)
var ok bool
ok = shouldStopDataflowJobDeleteQuery(d.Get("state").(string), skipWait)
for !ok {
log.Printf("[DEBUG] Waiting for job with job state %q to terminate...", d.Get("state").(string))
time.Sleep(5 * time.Second)

err = resourceDataflowFlexTemplateJobRead(d, meta)
if err != nil {
return fmt.Errorf("Error while reading job to see if it was properly terminated: %v", err)
}
_, ok = dataflowTerminalStatesMap[d.Get("state").(string)]
ok = shouldStopDataflowJobDeleteQuery(d.Get("state").(string), skipWait)
}

// Only remove the job from state if it's actually successfully canceled.
if _, ok := dataflowTerminalStatesMap[d.Get("state").(string)]; ok {
// Only remove the job from state if it's actually successfully hit a final state.
if ok = shouldStopDataflowJobDeleteQuery(d.Get("state").(string), skipWait); ok {
log.Printf("[DEBUG] Removing dataflow job with final state %q", d.Get("state").(string))
d.SetId("")
return nil
Expand Down
32 changes: 24 additions & 8 deletions mmv1/third_party/terraform/resources/resource_dataflow_job.go.erb
Original file line number Diff line number Diff line change
Expand Up @@ -19,12 +19,10 @@ import (

const resourceDataflowJobGoogleProvidedLabelPrefix = "labels.goog-dataflow-provided"

<% unless version == "ga" -%>
var dataflowTerminatingStatesMap = map[string]struct{}{
"JOB_STATE_CANCELLING": {},
"JOB_STATE_DRAINING": {},
}
<% end -%>

var dataflowTerminalStatesMap = map[string]struct{}{
"JOB_STATE_DONE": {},
Expand Down Expand Up @@ -212,6 +210,13 @@ func resourceDataflowJob() *schema.Resource {
Optional: true,
Description: `Indicates if the job should use the streaming engine feature.`,
},

"skip_wait_on_job_termination": {
Type: schema.TypeBool,
Optional: true,
Default: false,
Description: `If true, treat DRAINING and CANCELLING as terminal job states and do not wait for further changes before removing from terraform state and moving on. WARNING: this will lead to job name conflicts if you do not ensure that the job names are different, e.g. by embedding a release ID or by using a random_id.`,
},
},
UseJSONNumber: true,
}
Expand Down Expand Up @@ -241,6 +246,16 @@ func resourceDataflowJobTypeCustomizeDiff(_ context.Context, d *schema.ResourceD
return nil
}

// return true if a job is in a terminal state, OR if a job is in a
// terminating state and skipWait is true
func shouldStopDataflowJobDeleteQuery(state string, skipWait bool) bool {
_, stopQuery := dataflowTerminalStatesMap[state]
if !stopQuery && skipWait {
_, stopQuery = dataflowTerminatingStatesMap[state]
}
return stopQuery
}

func resourceDataflowJobCreate(d *schema.ResourceData, meta interface{}) error {
config := meta.(*Config)
userAgent, err := generateUserAgentString(d, config.userAgent)
Expand Down Expand Up @@ -351,7 +366,7 @@ func resourceDataflowJobRead(d *schema.ResourceData, meta interface{}) error {
return fmt.Errorf("Error setting additional_experiments: %s", err)
}

if _, ok := dataflowTerminalStatesMap[job.CurrentState]; ok {
if ok := shouldStopDataflowJobDeleteQuery(job.CurrentState, d.Get("skip_wait_on_job_termination").(bool)); ok {
log.Printf("[DEBUG] Removing resource '%s' because it is in state %s.\n", job.Name, job.CurrentState)
d.SetId("")
return nil
Expand Down Expand Up @@ -477,8 +492,9 @@ func resourceDataflowJobDelete(d *schema.ResourceData, meta interface{}) error {
return err
}

// Wait for state to reach terminal state (canceled/drained/done)
_, ok := dataflowTerminalStatesMap[d.Get("state").(string)]
// Wait for state to reach terminal state (canceled/drained/done plus cancelling/draining if skipWait)
skipWait := d.Get("skip_wait_on_job_termination").(bool)
ok := shouldStopDataflowJobDeleteQuery(d.Get("state").(string), skipWait)
for !ok {
log.Printf("[DEBUG] Waiting for job with job state %q to terminate...", d.Get("state").(string))
time.Sleep(5 * time.Second)
Expand All @@ -487,11 +503,11 @@ func resourceDataflowJobDelete(d *schema.ResourceData, meta interface{}) error {
if err != nil {
return fmt.Errorf("Error while reading job to see if it was properly terminated: %v", err)
}
_, ok = dataflowTerminalStatesMap[d.Get("state").(string)]
ok = shouldStopDataflowJobDeleteQuery(d.Get("state").(string), skipWait)
}

// Only remove the job from state if it's actually successfully canceled.
if _, ok := dataflowTerminalStatesMap[d.Get("state").(string)]; ok {
// Only remove the job from state if it's actually successfully hit a final state.
if ok = shouldStopDataflowJobDeleteQuery(d.Get("state").(string), skipWait); ok {
log.Printf("[DEBUG] Removing dataflow job with final state %q", d.Get("state").(string))
d.SetId("")
return nil
Expand Down
83 changes: 77 additions & 6 deletions mmv1/third_party/terraform/tests/resource_dataflow_job_test.go.erb
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package google

import (
"fmt"
"strconv"
"strings"
"testing"
"time"
Expand Down Expand Up @@ -49,6 +50,32 @@ func TestAccDataflowJob_basic(t *testing.T) {
})
}

func TestAccDataflowJobSkipWait_basic(t *testing.T) {
// Dataflow responses include serialized java classes and bash commands
// This makes body comparison infeasible
skipIfVcr(t)
t.Parallel()

randStr := randString(t, 10)
bucket := "tf-test-dataflow-gcs-" + randStr
job := "tf-test-dataflow-job-" + randStr
zone := "us-central1-f"

vcrTest(t, resource.TestCase{
PreCheck: func() { testAccPreCheck(t) },
Providers: testAccProviders,
CheckDestroy: testAccCheckDataflowJobDestroyProducer(t),
Steps: []resource.TestStep{
{
Config: testAccDataflowJobSkipWait_zone(bucket, job, zone),
Check: resource.ComposeTestCheckFunc(
testAccDataflowJobExists(t, "google_dataflow_job.big_data"),
),
},
},
})
}

func TestAccDataflowJob_withRegion(t *testing.T) {
// Dataflow responses include serialized java classes and bash commands
// This makes body comparison infeasible
Expand Down Expand Up @@ -334,9 +361,18 @@ func testAccCheckDataflowJobDestroyProducer(t *testing.T) func(s *terraform.Stat
config := googleProviderConfig(t)
job, err := config.NewDataflowClient(config.userAgent).Projects.Jobs.Get(config.Project, rs.Primary.ID).Do()
if job != nil {
if _, ok := dataflowTerminalStatesMap[job.CurrentState]; !ok {
return fmt.Errorf("Job still present")
}
var ok bool
skipWait, err := strconv.ParseBool(rs.Primary.Attributes["skip_wait_on_job_termination"])
if err != nil {
return fmt.Errorf("could not parse attribute: %v", err)
}
_, ok = dataflowTerminalStatesMap[job.CurrentState]
if !ok && skipWait {
_, ok = dataflowTerminatingStatesMap[job.CurrentState]
}
if !ok {
return fmt.Errorf("Job still present")
}
} else if err != nil {
return err
}
Expand All @@ -356,9 +392,18 @@ func testAccCheckDataflowJobRegionDestroyProducer(t *testing.T) func(s *terrafor
config := googleProviderConfig(t)
job, err := config.NewDataflowClient(config.userAgent).Projects.Locations.Jobs.Get(config.Project, "us-central1", rs.Primary.ID).Do()
if job != nil {
if _, ok := dataflowTerminalStatesMap[job.CurrentState]; !ok {
return fmt.Errorf("Job still present")
}
var ok bool
skipWait, err := strconv.ParseBool(rs.Primary.Attributes["skip_wait_on_job_termination"])
if err != nil {
return fmt.Errorf("could not parse attribute: %v", err)
}
_, ok = dataflowTerminalStatesMap[job.CurrentState]
if !ok && skipWait {
_, ok = dataflowTerminatingStatesMap[job.CurrentState]
}
if !ok {
return fmt.Errorf("Job still present")
}
} else if err != nil {
return err
}
Expand Down Expand Up @@ -640,6 +685,32 @@ resource "google_dataflow_job" "big_data" {
`, bucket, job, zone, testDataflowJobTemplateWordCountUrl, testDataflowJobSampleFileUrl)
}

func testAccDataflowJobSkipWait_zone(bucket, job, zone string) string {
return fmt.Sprintf(`
resource "google_storage_bucket" "temp" {
name = "%s"
location = "US"
force_destroy = true
}

resource "google_dataflow_job" "big_data" {
name = "%s"

zone = "%s"

machine_type = "e2-standard-2"
template_gcs_path = "%s"
temp_gcs_location = google_storage_bucket.temp.url
parameters = {
inputFile = "%s"
output = "${google_storage_bucket.temp.url}/output"
}
on_delete = "cancel"
skip_wait_on_job_termination = true
}
`, bucket, job, zone, testDataflowJobTemplateWordCountUrl, testDataflowJobSampleFileUrl)
}

func testAccDataflowJob_region(bucket, job string) string {
return fmt.Sprintf(`
resource "google_storage_bucket" "temp" {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,38 @@ is "cancelled", but if a user sets `on_delete` to `"drain"` in the
configuration, you may experience a long wait for your `terraform destroy` to
complete.

You can potentially short-circuit the wait by setting `skip_wait_for_job_termination`
to `true`, but beware that unless you take active steps to ensure that the job
`name` parameter changes between instances, the name will conflict and the launch
of the new job will fail. One way to do this is with a
[random_id](https://registry.terraform.io/providers/hashicorp/random/latest/docs/resources/id)
resource, for example:

```hcl
variable "big_data_job_subscription_id" {
type = string
default = "projects/myproject/subscriptions/messages"
}
resource "random_id" "big_data_job_name_suffix" {
byte_length = 4
keepers = {
region = var.region
subscription_id = var.big_data_job_subscription_id
}
}
resource "google_dataflow_flex_template_job" "big_data_job" {
provider = google-beta
name = "dataflow-flextemplates-job-${random_id.big_data_job_name_suffix.dec}"
region = var.region
container_spec_gcs_path = "gs://my-bucket/templates/template.json"
skip_wait_for_job_termination = true
parameters = {
inputSubscription = var.big_data_job_subscription_id
}
}
```

## Argument Reference

The following arguments are supported:
Expand All @@ -74,6 +106,10 @@ labels will be ignored to prevent diffs on re-apply.
* `on_delete` - (Optional) One of "drain" or "cancel". Specifies behavior of
deletion during `terraform destroy`. See above note.

* `skip_wait_for_job_termination` - (Optional) If set to `true`, terraform will
treat `DRAINING` and `CANCELLING` as terminal states when deleting the resource,
and will remove the resource from terraform state and move on. See above note.

* `project` - (Optional) The project in which the resource belongs. If it is not
provided, the provider project is used.

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,33 @@ The Dataflow resource is considered 'existing' while it is in a nonterminal stat

A Dataflow job which is 'destroyed' may be "cancelled" or "drained". If "cancelled", the job terminates - any data written remains where it is, but no new data will be processed. If "drained", no new data will enter the pipeline, but any data currently in the pipeline will finish being processed. The default is "drain". When `on_delete` is set to `"drain"` in the configuration, you may experience a long wait for your `terraform destroy` to complete.

You can potentially short-circuit the wait by setting `skip_wait_for_job_termination` to `true`, but beware that unless you take active steps to ensure that the job `name` parameter changes between instances, the name will conflict and the launch of the new job will fail. One way to do this is with a [random_id](https://registry.terraform.io/providers/hashicorp/random/latest/docs/resources/id) resource, for example:

```hcl
variable "big_data_job_subscription_id" {
type = string
default = "projects/myproject/subscriptions/messages"
}
resource "random_id" "big_data_job_name_suffix" {
byte_length = 4
keepers = {
region = var.region
subscription_id = var.big_data_job_subscription_id
}
}
resource "google_dataflow_flex_template_job" "big_data_job" {
provider = google-beta
name = "dataflow-flextemplates-job-${random_id.big_data_job_name_suffix.dec}"
region = var.region
container_spec_gcs_path = "gs://my-bucket/templates/template.json"
skip_wait_for_job_termination = true
parameters = {
inputSubscription = var.big_data_job_subscription_id
}
}
```

## Argument Reference

The following arguments are supported:
Expand All @@ -83,6 +110,7 @@ The following arguments are supported:
* `transform_name_mapping` - (Optional) Only applicable when updating a pipeline. Map of transform name prefixes of the job to be replaced with the corresponding name prefixes of the new job. This field is not used outside of update.
* `max_workers` - (Optional) The number of workers permitted to work on the job. More workers may improve processing speed at additional cost.
* `on_delete` - (Optional) One of "drain" or "cancel". Specifies behavior of deletion during `terraform destroy`. See above note.
* `skip_wait_for_job_termination` - (Optional) If set to `true`, terraform will treat `DRAINING` and `CANCELLING` as terminal states when deleting the resource, and will remove the resource from terraform state and move on. See above note.
* `project` - (Optional) The project in which the resource belongs. If it is not provided, the provider project is used.
* `zone` - (Optional) The zone in which the created job should run. If it is not provided, the provider zone is used.
* `region` - (Optional) The region in which the created job should run.
Expand Down

0 comments on commit 5a5fe34

Please sign in to comment.