Skip to content

Commit

Permalink
resource/aws_emr_cluster: Add step support
Browse files Browse the repository at this point in the history
  • Loading branch information
bflad committed Mar 8, 2018
1 parent d0de0e4 commit 6a5a20b
Show file tree
Hide file tree
Showing 3 changed files with 459 additions and 19 deletions.
214 changes: 201 additions & 13 deletions aws/resource_aws_emr_cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -264,6 +264,63 @@ func resourceAwsEMRCluster() *schema.Resource {
},
},
},
"step": {
Type: schema.TypeList,
Optional: true,
Computed: true,
ForceNew: true,
Elem: &schema.Resource{
Schema: map[string]*schema.Schema{
"action_on_failure": {
Type: schema.TypeString,
Required: true,
ForceNew: true,
ValidateFunc: validation.StringInSlice([]string{
emr.ActionOnFailureCancelAndWait,
emr.ActionOnFailureContinue,
emr.ActionOnFailureTerminateCluster,
emr.ActionOnFailureTerminateJobFlow,
}, false),
},
"hadoop_jar_step": {
Type: schema.TypeList,
MaxItems: 1,
Required: true,
ForceNew: true,
Elem: &schema.Resource{
Schema: map[string]*schema.Schema{
"args": {
Type: schema.TypeList,
Optional: true,
ForceNew: true,
Elem: &schema.Schema{Type: schema.TypeString},
},
"jar": {
Type: schema.TypeString,
Required: true,
ForceNew: true,
},
"main_class": {
Type: schema.TypeString,
Optional: true,
ForceNew: true,
},
"properties": {
Type: schema.TypeMap,
Optional: true,
ForceNew: true,
},
},
},
},
"name": {
Type: schema.TypeString,
Required: true,
ForceNew: true,
},
},
},
},
"tags": tagsSchema(),
"configurations": {
Type: schema.TypeString,
Expand Down Expand Up @@ -441,6 +498,10 @@ func resourceAwsEMRClusterCreate(d *schema.ResourceData, meta interface{}) error
bootstrapActions := v.(*schema.Set).List()
params.BootstrapActions = expandBootstrapActions(bootstrapActions)
}
if v, ok := d.GetOk("step"); ok {
steps := v.([]interface{})
params.Steps = expandEmrStepConfigs(steps)
}
if v, ok := d.GetOk("tags"); ok {
tagsIn := v.(map[string]interface{})
params.Tags = expandTags(tagsIn)
Expand Down Expand Up @@ -482,8 +543,14 @@ func resourceAwsEMRClusterCreate(d *schema.ResourceData, meta interface{}) error
log.Println("[INFO] Waiting for EMR Cluster to be available")

stateConf := &resource.StateChangeConf{
Pending: []string{"STARTING", "BOOTSTRAPPING"},
Target: []string{"WAITING", "RUNNING"},
Pending: []string{
emr.ClusterStateBootstrapping,
emr.ClusterStateStarting,
},
Target: []string{
emr.ClusterStateRunning,
emr.ClusterStateWaiting,
},
Refresh: resourceAwsEMRClusterStateRefreshFunc(d, meta),
Timeout: 75 * time.Minute,
MinTimeout: 10 * time.Second,
Expand Down Expand Up @@ -519,19 +586,15 @@ func resourceAwsEMRClusterRead(d *schema.ResourceData, meta interface{}) error {
cluster := resp.Cluster

if cluster.Status != nil {
if *cluster.Status.State == "TERMINATED" {
log.Printf("[DEBUG] EMR Cluster (%s) was TERMINATED already", d.Id())
d.SetId("")
return nil
}
state := aws.StringValue(cluster.Status.State)

if *cluster.Status.State == "TERMINATED_WITH_ERRORS" {
log.Printf("[DEBUG] EMR Cluster (%s) was TERMINATED_WITH_ERRORS already", d.Id())
if state == emr.ClusterStateTerminated || state == emr.ClusterStateTerminatedWithErrors {
log.Printf("[WARN] EMR Cluster (%s) was %s already, removing from state", d.Id(), state)
d.SetId("")
return nil
}

d.Set("cluster_state", cluster.Status.State)
d.Set("cluster_state", state)
}

instanceGroups, err := fetchAllEMRInstanceGroups(emrconn, d.Id())
Expand Down Expand Up @@ -591,6 +654,24 @@ func resourceAwsEMRClusterRead(d *schema.ResourceData, meta interface{}) error {
log.Printf("[WARN] Error setting Bootstrap Actions: %s", err)
}

var stepSummaries []*emr.StepSummary
listStepsInput := &emr.ListStepsInput{
ClusterId: aws.String(d.Id()),
}
err = emrconn.ListStepsPages(listStepsInput, func(page *emr.ListStepsOutput, lastPage bool) bool {
// ListSteps returns steps in reverse order (newest first)
for _, step := range page.Steps {
stepSummaries = append([]*emr.StepSummary{step}, stepSummaries...)
}
return !lastPage
})
if err != nil {
return fmt.Errorf("error listing steps: %s", err)
}
if err := d.Set("step", flattenEmrStepSummaries(stepSummaries)); err != nil {
return fmt.Errorf("error setting step: %s", err)
}

return nil
}

Expand Down Expand Up @@ -633,8 +714,14 @@ func resourceAwsEMRClusterUpdate(d *schema.ResourceData, meta interface{}) error
log.Println("[INFO] Waiting for EMR Cluster to be available")

stateConf := &resource.StateChangeConf{
Pending: []string{"STARTING", "BOOTSTRAPPING"},
Target: []string{"WAITING", "RUNNING"},
Pending: []string{
emr.ClusterStateBootstrapping,
emr.ClusterStateStarting,
},
Target: []string{
emr.ClusterStateRunning,
emr.ClusterStateWaiting,
},
Refresh: resourceAwsEMRClusterStateRefreshFunc(d, meta),
Timeout: 40 * time.Minute,
MinTimeout: 10 * time.Second,
Expand Down Expand Up @@ -718,7 +805,7 @@ func resourceAwsEMRClusterDelete(d *schema.ResourceData, meta interface{}) error
var terminated []string
for j, i := range resp.Instances {
if i.Status != nil {
if *i.Status.State == "TERMINATED" {
if aws.StringValue(i.Status.State) == emr.InstanceStateTerminated {
terminated = append(terminated, *i.Ec2InstanceId)
}
} else {
Expand Down Expand Up @@ -833,6 +920,49 @@ func flattenEmrKerberosAttributes(d *schema.ResourceData, kerberosAttributes *em
return l
}

func flattenEmrHadoopStepConfig(config *emr.HadoopStepConfig) map[string]interface{} {
if config == nil {
return nil
}

m := map[string]interface{}{
"args": aws.StringValueSlice(config.Args),
"jar": aws.StringValue(config.Jar),
"main_class": aws.StringValue(config.MainClass),
"properties": aws.StringValueMap(config.Properties),
}

return m
}

func flattenEmrStepSummaries(stepSummaries []*emr.StepSummary) []map[string]interface{} {
l := make([]map[string]interface{}, 0)

if len(stepSummaries) == 0 {
return l
}

for _, stepSummary := range stepSummaries {
l = append(l, flattenEmrStepSummary(stepSummary))
}

return l
}

func flattenEmrStepSummary(stepSummary *emr.StepSummary) map[string]interface{} {
if stepSummary == nil {
return nil
}

m := map[string]interface{}{
"action_on_failure": aws.StringValue(stepSummary.ActionOnFailure),
"hadoop_jar_step": []map[string]interface{}{flattenEmrHadoopStepConfig(stepSummary.Config)},
"name": aws.StringValue(stepSummary.Name),
}

return m
}

func flattenInstanceGroups(igs []*emr.InstanceGroup) []map[string]interface{} {
result := make([]map[string]interface{}, 0)

Expand Down Expand Up @@ -1014,6 +1144,40 @@ func expandBootstrapActions(bootstrapActions []interface{}) []*emr.BootstrapActi
return actionsOut
}

func expandEmrHadoopJarStepConfig(m map[string]interface{}) *emr.HadoopJarStepConfig {
hadoopJarStepConfig := &emr.HadoopJarStepConfig{
Jar: aws.String(m["jar"].(string)),
}

if v, ok := m["args"]; ok {
hadoopJarStepConfig.Args = expandStringList(v.([]interface{}))
}

if v, ok := m["main_class"]; ok {
hadoopJarStepConfig.MainClass = aws.String(v.(string))
}

if v, ok := m["properties"]; ok {
hadoopJarStepConfig.Properties = expandEmrKeyValues(v.(map[string]interface{}))
}

return hadoopJarStepConfig
}

func expandEmrKeyValues(m map[string]interface{}) []*emr.KeyValue {
keyValues := make([]*emr.KeyValue, 0)

for k, v := range m {
keyValue := &emr.KeyValue{
Key: aws.String(k),
Value: aws.String(v.(string)),
}
keyValues = append(keyValues, keyValue)
}

return keyValues
}

func expandEmrKerberosAttributes(m map[string]interface{}) *emr.KerberosAttributes {
kerberosAttributes := &emr.KerberosAttributes{
KdcAdminPassword: aws.String(m["kdc_admin_password"].(string)),
Expand All @@ -1031,6 +1195,30 @@ func expandEmrKerberosAttributes(m map[string]interface{}) *emr.KerberosAttribut
return kerberosAttributes
}

func expandEmrStepConfig(m map[string]interface{}) *emr.StepConfig {
hadoopJarStepList := m["hadoop_jar_step"].([]interface{})
hadoopJarStepMap := hadoopJarStepList[0].(map[string]interface{})

stepConfig := &emr.StepConfig{
ActionOnFailure: aws.String(m["action_on_failure"].(string)),
HadoopJarStep: expandEmrHadoopJarStepConfig(hadoopJarStepMap),
Name: aws.String(m["name"].(string)),
}

return stepConfig
}

func expandEmrStepConfigs(l []interface{}) []*emr.StepConfig {
stepConfigs := []*emr.StepConfig{}

for _, raw := range l {
m := raw.(map[string]interface{})
stepConfigs = append(stepConfigs, expandEmrStepConfig(m))
}

return stepConfigs
}

func expandInstanceGroupConfigs(instanceGroupConfigs []interface{}) []*emr.InstanceGroupConfig {
instanceGroupConfig := []*emr.InstanceGroupConfig{}

Expand Down
Loading

0 comments on commit 6a5a20b

Please sign in to comment.