Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

dataflow job update-by-replacement #6257

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions .changelog/3387.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
```release-note:enhancement
dataflow: Added support for update-by-replacement to `google_dataflow_job`
```
```release-note:enhancement
dataflow: Added drift detection for `google_dataflow_job` `template_gcs_path` and `temp_gcs_location` fields
```
9 changes: 9 additions & 0 deletions google/error_retry_predicates.go
Original file line number Diff line number Diff line change
Expand Up @@ -222,3 +222,12 @@ func isStoragePreconditionError(err error) (bool, string) {
}
return false, ""
}

func isDataflowJobUpdateRetryableError(err error) (bool, string) {
if gerr, ok := err.(*googleapi.Error); ok {
if gerr.Code == 404 && strings.Contains(gerr.Body, "in RUNNING OR DRAINING state") {
return true, "Waiting for job to be in a valid state"
}
}
return false, ""
}
156 changes: 124 additions & 32 deletions google/resource_dataflow_job.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
"strings"
"time"

"github.com/hashicorp/terraform-plugin-sdk/helper/customdiff"
"github.com/hashicorp/terraform-plugin-sdk/helper/schema"
"github.com/hashicorp/terraform-plugin-sdk/helper/validation"

Expand Down Expand Up @@ -44,54 +45,56 @@ func resourceDataflowJob() *schema.Resource {
return &schema.Resource{
Create: resourceDataflowJobCreate,
Read: resourceDataflowJobRead,
Update: resourceDataflowJobUpdateByReplacement,
Delete: resourceDataflowJobDelete,
CustomizeDiff: customdiff.All(
resourceDataflowJobTypeCustomizeDiff,
),
Schema: map[string]*schema.Schema{
"name": {
Type: schema.TypeString,
Required: true,
// ForceNew applies to both stream and batch jobs
ForceNew: true,
},

"template_gcs_path": {
Type: schema.TypeString,
Required: true,
ForceNew: true,
},

"temp_gcs_location": {
Type: schema.TypeString,
Required: true,
ForceNew: true,
},

"zone": {
Type: schema.TypeString,
Optional: true,
// ForceNew applies to both stream and batch jobs
ForceNew: true,
},

"region": {
Type: schema.TypeString,
Optional: true,
// ForceNew applies to both stream and batch jobs
ForceNew: true,
},

"max_workers": {
Type: schema.TypeInt,
Optional: true,
ForceNew: true,
},

"parameters": {
Type: schema.TypeMap,
Optional: true,
ForceNew: true,
},

"labels": {
Type: schema.TypeMap,
Optional: true,
ForceNew: true,
DiffSuppressFunc: resourceDataflowJobLabelDiffSuppress,
},

Expand All @@ -100,13 +103,13 @@ func resourceDataflowJob() *schema.Resource {
ValidateFunc: validation.StringInSlice([]string{"cancel", "drain"}, false),
Optional: true,
Default: "drain",
ForceNew: true,
},

"project": {
Type: schema.TypeString,
Optional: true,
Computed: true,
// ForceNew applies to both stream and batch jobs
ForceNew: true,
},

Expand All @@ -121,40 +124,34 @@ func resourceDataflowJob() *schema.Resource {
"service_account_email": {
Type: schema.TypeString,
Optional: true,
ForceNew: true,
},

"network": {
Type: schema.TypeString,
Optional: true,
ForceNew: true,
DiffSuppressFunc: compareSelfLinkOrResourceName,
},

"subnetwork": {
Type: schema.TypeString,
Optional: true,
ForceNew: true,
DiffSuppressFunc: compareSelfLinkOrResourceName,
},

"machine_type": {
Type: schema.TypeString,
Optional: true,
ForceNew: true,
},

"ip_configuration": {
Type: schema.TypeString,
Optional: true,
ForceNew: true,
ValidateFunc: validation.StringInSlice([]string{"WORKER_IP_PUBLIC", "WORKER_IP_PRIVATE", ""}, false),
},

"additional_experiments": {
Type: schema.TypeSet,
Optional: true,
ForceNew: true,
Elem: &schema.Schema{
Type: schema.TypeString,
},
Expand All @@ -168,6 +165,23 @@ func resourceDataflowJob() *schema.Resource {
}
}

func resourceDataflowJobTypeCustomizeDiff(d *schema.ResourceDiff, meta interface{}) error {
// All changes are ForceNew for batch jobs
if d.Get("type") == "JOB_TYPE_BATCH" {
resourceSchema := resourceDataflowJob().Schema
for field, fieldSchema := range resourceSchema {
// Each key within a map must be checked for a change
if fieldSchema.Type == schema.TypeMap {
resourceDataflowJobIterateMapForceNew(field, d)
} else if d.HasChange(field) {
d.ForceNew(field)
}
}
}

return nil
}

func resourceDataflowJobCreate(d *schema.ResourceData, meta interface{}) error {
config := meta.(*Config)

Expand All @@ -176,31 +190,16 @@ func resourceDataflowJobCreate(d *schema.ResourceData, meta interface{}) error {
return err
}

zone, err := getZone(d, config)
if err != nil {
return err
}

region, err := getRegion(d, config)
if err != nil {
return err
}

params := expandStringMap(d, "parameters")
labels := expandStringMap(d, "labels")
additionalExperiments := convertStringSet(d.Get("additional_experiments").(*schema.Set))

env := dataflow.RuntimeEnvironment{
MaxWorkers: int64(d.Get("max_workers").(int)),
Network: d.Get("network").(string),
ServiceAccountEmail: d.Get("service_account_email").(string),
Subnetwork: d.Get("subnetwork").(string),
TempLocation: d.Get("temp_gcs_location").(string),
MachineType: d.Get("machine_type").(string),
IpConfiguration: d.Get("ip_configuration").(string),
AdditionalUserLabels: labels,
Zone: zone,
AdditionalExperiments: additionalExperiments,
env, err := resourceDataflowJobSetupEnv(d, config)
if err != nil {
return err
}

request := dataflow.CreateJobFromTemplateRequest{
Expand Down Expand Up @@ -246,6 +245,14 @@ func resourceDataflowJobRead(d *schema.ResourceData, meta interface{}) error {
d.Set("project", project)
d.Set("labels", job.Labels)

sdkPipelineOptions, err := ConvertToMap(job.Environment.SdkPipelineOptions)
if err != nil {
return err
}
optionsMap := sdkPipelineOptions["options"].(map[string]interface{})
d.Set("template_gcs_path", optionsMap["templateLocation"])
d.Set("temp_gcs_location", optionsMap["tempLocation"])

if _, ok := dataflowTerminalStatesMap[job.CurrentState]; ok {
log.Printf("[DEBUG] Removing resource '%s' because it is in state %s.\n", job.Name, job.CurrentState)
d.SetId("")
Expand All @@ -256,6 +263,47 @@ func resourceDataflowJobRead(d *schema.ResourceData, meta interface{}) error {
return nil
}

// Stream update method. Batch job changes should have been set to ForceNew via custom diff
func resourceDataflowJobUpdateByReplacement(d *schema.ResourceData, meta interface{}) error {
config := meta.(*Config)

project, err := getProject(d, config)
if err != nil {
return err
}

region, err := getRegion(d, config)
if err != nil {
return err
}

params := expandStringMap(d, "parameters")

env, err := resourceDataflowJobSetupEnv(d, config)
if err != nil {
return err
}

request := dataflow.LaunchTemplateParameters{
JobName: d.Get("name").(string),
Parameters: params,
Environment: &env,
Update: true,
}

var response *dataflow.LaunchTemplateResponse
err = retryTimeDuration(func() (updateErr error) {
response, updateErr = resourceDataflowJobLaunchTemplate(config, project, region, d.Get("template_gcs_path").(string), &request)
return updateErr
}, time.Minute*time.Duration(5), isDataflowJobUpdateRetryableError)
if err != nil {
return err
}
d.SetId(response.Job.Id)

return resourceDataflowJobRead(d, meta)
}

func resourceDataflowJobDelete(d *schema.ResourceData, meta interface{}) error {
config := meta.(*Config)

Expand Down Expand Up @@ -353,9 +401,9 @@ func resourceDataflowJobCreateJob(config *Config, project string, region string,

func resourceDataflowJobGetJob(config *Config, project string, region string, id string) (*dataflow.Job, error) {
if region == "" {
return config.clientDataflow.Projects.Jobs.Get(project, id).Do()
return config.clientDataflow.Projects.Jobs.Get(project, id).View("JOB_VIEW_ALL").Do()
}
return config.clientDataflow.Projects.Locations.Jobs.Get(project, region, id).Do()
return config.clientDataflow.Projects.Locations.Jobs.Get(project, region, id).View("JOB_VIEW_ALL").Do()
}

func resourceDataflowJobUpdateJob(config *Config, project string, region string, id string, job *dataflow.Job) (*dataflow.Job, error) {
Expand All @@ -364,3 +412,47 @@ func resourceDataflowJobUpdateJob(config *Config, project string, region string,
}
return config.clientDataflow.Projects.Locations.Jobs.Update(project, region, id, job).Do()
}

func resourceDataflowJobLaunchTemplate(config *Config, project string, region string, gcsPath string, request *dataflow.LaunchTemplateParameters) (*dataflow.LaunchTemplateResponse, error) {
if region == "" {
return config.clientDataflow.Projects.Templates.Launch(project, request).GcsPath(gcsPath).Do()
}
return config.clientDataflow.Projects.Locations.Templates.Launch(project, region, request).GcsPath(gcsPath).Do()
}

func resourceDataflowJobSetupEnv(d *schema.ResourceData, config *Config) (dataflow.RuntimeEnvironment, error) {
zone, err := getZone(d, config)
if err != nil {
return dataflow.RuntimeEnvironment{}, err
}

labels := expandStringMap(d, "labels")

additionalExperiments := convertStringSet(d.Get("additional_experiments").(*schema.Set))

env := dataflow.RuntimeEnvironment{
MaxWorkers: int64(d.Get("max_workers").(int)),
Network: d.Get("network").(string),
ServiceAccountEmail: d.Get("service_account_email").(string),
Subnetwork: d.Get("subnetwork").(string),
TempLocation: d.Get("temp_gcs_location").(string),
MachineType: d.Get("machine_type").(string),
IpConfiguration: d.Get("ip_configuration").(string),
AdditionalUserLabels: labels,
Zone: zone,
AdditionalExperiments: additionalExperiments,
}
return env, nil
}

func resourceDataflowJobIterateMapForceNew(mapKey string, d *schema.ResourceDiff) {
obj := d.Get(mapKey).(map[string]interface{})
for k := range obj {
entrySchemaKey := mapKey + "." + k
if d.HasChange(entrySchemaKey) {
// ForceNew must be called on the parent map to trigger
d.ForceNew(mapKey)
break
}
}
}
Loading