-
Notifications
You must be signed in to change notification settings - Fork 4
/
ecs.go
445 lines (403 loc) · 17.6 KB
/
ecs.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
package awsecs
import (
"bytes"
"encoding/json"
"errors"
"fmt"
"github.com/aws/aws-sdk-go/aws"
"github.com/aws/aws-sdk-go/aws/arn"
"github.com/aws/aws-sdk-go/service/ecs"
"github.com/aws/aws-sdk-go/service/ecs/ecsiface"
"github.com/aws/aws-sdk-go/service/elbv2"
"github.com/aws/aws-sdk-go/service/elbv2/elbv2iface"
"github.com/cenkalti/backoff"
"log"
"reflect"
"strings"
)
var (
// EnvKnockOutValue value used to knock off environment variables
EnvKnockOutValue = ""
// ErrDeploymentChangedElsewhere the deployment was changed elsewhere
ErrDeploymentChangedElsewhere = errors.New("the deployment was changed elsewhere")
// ErrOtherThanPrimaryDeploymentFound service update didn't complete
ErrOtherThanPrimaryDeploymentFound = errors.New("other than PRIMARY deployment found")
// ErrNotRunningDesiredCount service update completed but number of containers not matching desired count
ErrNotRunningDesiredCount = errors.New("not running the desired count")
// ErrServiceNotFound trying to update a service that doesn't exist
ErrServiceNotFound = errors.New("the service does not exist")
// ErrWaitingForDrainingState the service doesn't have any target which transitioned to draining state
ErrWaitingForDrainingState = errors.New("waiting for draining state")
// ErrInvalidWaitUntil received an invalid wait until
ErrInvalidWaitUntil = errors.New("invalid wait until received")
// ErrServiceDeletedAfterUpdate service was updated and then deleted elsewhere
ErrServiceDeletedAfterUpdate = backoff.Permanent(errors.New("the service was deleted after the update"))
// ErrContainerInstanceNotFound the container instance was removed from the cluster elsewhere
ErrContainerInstanceNotFound = backoff.Permanent(errors.New("container instance not found"))
// ErrLoadBalancerNotConfigured the service doesn't have a load balancer configured
ErrLoadBalancerNotConfigured = backoff.Permanent(errors.New("the service was deleted after the update"))
)
var (
errNoPrimaryDeployment = backoff.Permanent(errors.New("no PRIMARY deployment"))
)
func ifEmptyThenNil(tags []*ecs.Tag) []*ecs.Tag {
l := len(tags)
if l == 0 {
return nil
}
return tags
}
func copyTd(input ecs.TaskDefinition, tags []*ecs.Tag) ecs.RegisterTaskDefinitionInput {
obj := panicMarshal(input)
inputClone := ecs.TaskDefinition{}
panicUnmarshal(obj, &inputClone)
output := ecs.RegisterTaskDefinitionInput{}
// TODO: replace with reflection
output.ContainerDefinitions = inputClone.ContainerDefinitions
output.Cpu = inputClone.Cpu
output.ExecutionRoleArn = inputClone.ExecutionRoleArn
output.Family = inputClone.Family
output.InferenceAccelerators = inputClone.InferenceAccelerators
output.IpcMode = inputClone.IpcMode
output.Memory = inputClone.Memory
output.NetworkMode = inputClone.NetworkMode
output.PidMode = inputClone.PidMode
output.PlacementConstraints = inputClone.PlacementConstraints
output.ProxyConfiguration = inputClone.ProxyConfiguration
output.RequiresCompatibilities = inputClone.RequiresCompatibilities
output.TaskRoleArn = inputClone.TaskRoleArn
output.Volumes = inputClone.Volumes
// can't be replaced with reflection
output.Tags = ifEmptyThenNil(tags)
return output
}
func alterImages(copy ecs.RegisterTaskDefinitionInput, imageMap map[string]string) ecs.RegisterTaskDefinitionInput {
obj := panicMarshal(copy)
copyClone := ecs.RegisterTaskDefinitionInput{}
panicUnmarshal(obj, ©Clone)
for name, image := range imageMap {
for _, containerDefinition := range copyClone.ContainerDefinitions {
if *containerDefinition.Name == name {
containerDefinition.Image = aws.String(image)
}
}
}
return copyClone
}
func alterEnvironments(copy ecs.RegisterTaskDefinitionInput, envMaps map[string]map[string]string) ecs.RegisterTaskDefinitionInput {
obj := panicMarshal(copy)
copyClone := ecs.RegisterTaskDefinitionInput{}
panicUnmarshal(obj, ©Clone)
for name, envMap := range envMaps {
for i, containerDefinition := range copyClone.ContainerDefinitions {
if *containerDefinition.Name == name {
altered := alterEnvironment(*containerDefinition, envMap)
copyClone.ContainerDefinitions[i] = &altered
}
}
}
return copyClone
}
func alterSecrets(copy ecs.RegisterTaskDefinitionInput, secretMaps map[string]map[string]string) ecs.RegisterTaskDefinitionInput {
obj := panicMarshal(copy)
copyClone := ecs.RegisterTaskDefinitionInput{}
panicUnmarshal(obj, ©Clone)
for name, secretMap := range secretMaps {
for i, containerDefinition := range copyClone.ContainerDefinitions {
if *containerDefinition.Name == name {
altered := alterSecret(*containerDefinition, secretMap)
copyClone.ContainerDefinitions[i] = &altered
}
}
}
return copyClone
}
func alterEnvironment(copy ecs.ContainerDefinition, envMap map[string]string) ecs.ContainerDefinition {
for name, value := range envMap {
i := 0
found := false
for i < len(copy.Environment) {
environment := copy.Environment[i]
if *environment.Name == name && value == EnvKnockOutValue {
copy.Environment = append(copy.Environment[:i], copy.Environment[i+1:]...)
found = true
i--
} else if *environment.Name == name {
environment.Value = aws.String(value)
found = true
}
i++
}
if !found && value != EnvKnockOutValue {
copy.Environment = append(copy.Environment, &ecs.KeyValuePair{Name: aws.String(name), Value: aws.String(value)})
}
}
return copy
}
func alterSecret(copy ecs.ContainerDefinition, secretMap map[string]string) ecs.ContainerDefinition {
for name, valueFrom := range secretMap {
i := 0
found := false
for i < len(copy.Secrets) {
secret := copy.Secrets[i]
if *secret.Name == name && valueFrom == EnvKnockOutValue {
copy.Secrets = append(copy.Secrets[:i], copy.Secrets[i+1:]...)
found = true
i--
} else if *secret.Name == name {
secret.ValueFrom = aws.String(valueFrom)
found = true
}
i++
}
if !found && valueFrom != EnvKnockOutValue {
copy.Secrets = append(copy.Secrets, &ecs.Secret{Name: aws.String(name), ValueFrom: aws.String(valueFrom)})
}
}
return copy
}
func copyTaskDef(api ecsiface.ECSAPI, taskdef string, imageMap map[string]string, envMaps map[string]map[string]string, secretMaps map[string]map[string]string, logopts map[string]map[string]map[string]string, logsecrets map[string]map[string]map[string]string, taskRole string) (string, error) {
output, err := api.DescribeTaskDefinition(&ecs.DescribeTaskDefinitionInput{TaskDefinition: aws.String(taskdef)})
if err != nil {
return "", fmt.Errorf("on copy task definition while describe existing task definition: %w", err)
}
asRegisterTaskDefinitionInput := copyTd(*output.TaskDefinition, output.Tags)
tdCopy := alterImages(asRegisterTaskDefinitionInput, imageMap)
tdCopy = alterEnvironments(tdCopy, envMaps)
tdCopy = alterSecrets(tdCopy, secretMaps)
tdCopy = alterLogConfigurations(tdCopy, logopts, logsecrets)
tdCopy = alterTaskRole(tdCopy, taskRole)
if reflect.DeepEqual(asRegisterTaskDefinitionInput, tdCopy) {
return *output.TaskDefinition.TaskDefinitionArn, nil
}
tdNew, err := api.RegisterTaskDefinition(&tdCopy)
if err != nil {
return "", fmt.Errorf("on copy task definition while register new task definition: %w", err)
}
taskDefinitionArn := tdNew.TaskDefinition.TaskDefinitionArn
return *taskDefinitionArn, nil
}
func alterService(api ecsiface.ECSAPI, cluster, service string, imageMap map[string]string, envMaps map[string]map[string]string, secretMaps map[string]map[string]string, logopts map[string]map[string]map[string]string, logsecrets map[string]map[string]map[string]string, taskRole string, desiredCount *int64, taskdef string) (ecs.Service, ecs.Service, error) {
output, err := api.DescribeServices(&ecs.DescribeServicesInput{Cluster: aws.String(cluster), Services: []*string{aws.String(service)}})
if err != nil {
return ecs.Service{}, ecs.Service{}, fmt.Errorf("on alter service while describe service: %w", err)
}
copyTaskDefinitionAction := func(sourceTaskDefinition string) (string, error) {
return copyTaskDef(api, sourceTaskDefinition, imageMap, envMaps, secretMaps, logopts, logsecrets, taskRole)
}
updateAction := func(newTaskDefinition *string, desiredCount *int64) (*ecs.UpdateServiceOutput, error) {
updateServiceInput := &ecs.UpdateServiceInput{
Cluster: aws.String(cluster),
Service: aws.String(service),
TaskDefinition: newTaskDefinition,
DesiredCount: desiredCount,
ForceNewDeployment: aws.Bool(true),
}
return api.UpdateService(updateServiceInput)
}
return findAndUpdateService(output, cluster, service, taskdef, desiredCount, copyTaskDefinitionAction, updateAction)
}
func findAndUpdateService(output *ecs.DescribeServicesOutput, cluster, service, taskDefinition string, desiredCount *int64, copyTdAction func(string) (string, error), updateSvcAction func(*string, *int64) (*ecs.UpdateServiceOutput, error)) (ecs.Service, ecs.Service, error) {
if len(output.Services) == 0 {
return ecs.Service{}, ecs.Service{}, ErrServiceNotFound
}
svc := output.Services[0]
clusterArn := *svc.ClusterArn
parsedClusterArn, err := arn.Parse(clusterArn)
if err != nil {
return ecs.Service{}, ecs.Service{}, err
}
return updateService(parsedClusterArn, svc, cluster, service, taskDefinition, desiredCount, copyTdAction, updateSvcAction)
}
func updateService(parsedClusterArn arn.ARN, svc *ecs.Service, cluster, service, td string, desiredCount *int64, copyTdAction func(string) (string, error), updateSvcAction func(*string, *int64) (*ecs.UpdateServiceOutput, error)) (ecs.Service, ecs.Service, error) {
clusterNameFound := strings.TrimPrefix(parsedClusterArn.Resource, "cluster/")
serviceNameFound := *svc.ServiceName
if clusterNameFound == cluster && serviceNameFound == service {
srcTaskDef := svc.TaskDefinition
if td != "" {
srcTaskDef = &td
}
newTd, err := copyTdAction(*srcTaskDef)
if err != nil {
return *svc, ecs.Service{}, err
}
if desiredCount == nil {
desiredCount = svc.DesiredCount
}
updated, err := updateSvcAction(aws.String(newTd), desiredCount)
if err != nil {
return *svc, ecs.Service{}, err
}
return *svc, *updated.Service, nil
}
return ecs.Service{}, ecs.Service{}, ErrServiceNotFound
}
func mapStringStringAsJson(input map[string]string) string {
buf := &bytes.Buffer{}
encoder := json.NewEncoder(buf)
_ = encoder.Encode(input)
return strings.TrimSpace(buf.String())
}
func getTargetStates(targetGroupArn string, elbv2api elbv2iface.ELBV2API) (map[string]string, error) {
describeLbOutput, err := elbv2api.DescribeTargetHealth(&elbv2.DescribeTargetHealthInput{
TargetGroupArn: aws.String(targetGroupArn),
})
if err != nil {
return nil, err
}
targetStates := map[string]string{}
for _, desc := range describeLbOutput.TargetHealthDescriptions {
target := desc.Target
health := desc.TargetHealth
targetStates[*target.Id] = *health.State
}
return targetStates, nil
}
func validateDraining(ecsapi ecsiface.ECSAPI, elbv2api elbv2iface.ELBV2API, ecsService ecs.Service, bo backoff.BackOff) error {
describeEcsOutput, err := ecsapi.DescribeServices(&ecs.DescribeServicesInput{Cluster: ecsService.ClusterArn, Services: []*string{ecsService.ServiceName}})
if err != nil {
return backoff.Permanent(fmt.Errorf("on validate draining while describe service: %w", err))
}
if len(describeEcsOutput.Services) == 0 {
return backoff.Permanent(ErrServiceNotFound)
}
service := describeEcsOutput.Services[0]
if len(service.LoadBalancers) == 0 {
return ErrLoadBalancerNotConfigured
}
loadBalancer := service.LoadBalancers[0]
targetGroupArn := loadBalancer.TargetGroupArn
initialTargetIdState, err := getTargetStates(*targetGroupArn, elbv2api)
if err != nil {
return backoff.Permanent(err)
}
initTargetState := mapStringStringAsJson(initialTargetIdState)
log.Printf("Initial target states: '%s'", initTargetState)
operation := func() error {
newTargetIdState, err := getTargetStates(*targetGroupArn, elbv2api)
if err != nil {
log.Print(err)
return err
}
newTargetState := mapStringStringAsJson(newTargetIdState)
log.Printf("Waiting for targets transitioning to draining state: '%s'", newTargetState)
for targetId, initialTargetState := range initialTargetIdState {
newTargetState := newTargetIdState[targetId]
if initialTargetState != newTargetState && newTargetState == elbv2.TargetHealthStateEnumDraining {
log.Printf("The target '%s' transitioned to draining state", targetId)
return nil
}
}
allInitialTargetsGone := true
for targetId := range initialTargetIdState {
if _, found := newTargetIdState[targetId]; found {
allInitialTargetsGone = false
}
}
if allInitialTargetsGone {
log.Printf("Either there are no initial targets or all targets are new or the service desired count was set to 0")
return nil
}
return ErrWaitingForDrainingState
}
return backoff.Retry(operation, bo)
}
func validateDeployment(api ecsiface.ECSAPI, _ elbv2iface.ELBV2API, ecsService ecs.Service, bo backoff.BackOff) error {
for _, ecsDeployment := range ecsService.Deployments {
if *ecsDeployment.Status == "PRIMARY" {
var output *ecs.DescribeServicesOutput
var err error
operation := func() error {
output, err = api.DescribeServices(&ecs.DescribeServicesInput{Cluster: ecsService.ClusterArn, Services: []*string{ecsService.ServiceName}})
if err != nil {
return fmt.Errorf("on validate deployment while describe service: %w", err)
}
for _, svc := range output.Services {
for _, deployment := range svc.Deployments {
if *deployment.Status == "PRIMARY" && *deployment.Id != *ecsDeployment.Id {
return ErrDeploymentChangedElsewhere
}
}
}
return nil
}
err = backoff.Retry(operation, backoff.WithMaxRetries(bo, 5))
if err == ErrDeploymentChangedElsewhere {
return backoff.Permanent(err)
}
if err != nil {
return err
}
for _, svc := range output.Services {
for _, deployment := range svc.Deployments {
if *deployment.Id != *ecsDeployment.Id {
return ErrOtherThanPrimaryDeploymentFound
}
}
for _, deployment := range svc.Deployments {
if *deployment.Id == *ecsDeployment.Id {
if *svc.RunningCount < *svc.DesiredCount {
return ErrNotRunningDesiredCount
}
return nil
}
}
}
return ErrServiceDeletedAfterUpdate
}
}
return errNoPrimaryDeployment
}
func alterServiceValidateDeployment(ecsapi ecsiface.ECSAPI, elbv2api elbv2iface.ELBV2API, cluster, service string, imageMap map[string]string, envMaps map[string]map[string]string, secretMaps map[string]map[string]string, logopts map[string]map[string]map[string]string, logsecrets map[string]map[string]map[string]string, taskRole string, desiredCount *int64, taskdef string, bo backoff.BackOff, validateDeployment validateDeploymentFunc) (ecs.Service, error) {
oldsvc, newsvc, err := alterService(ecsapi, cluster, service, imageMap, envMaps, secretMaps, logopts, logsecrets, taskRole, desiredCount, taskdef)
if err != nil {
return oldsvc, err
}
var prevErr error
operation := func() error {
err := validateDeployment(ecsapi, elbv2api, newsvc, bo)
if err != prevErr && err != nil {
prevErr = err
log.Print(err)
}
return err
}
return oldsvc, backoff.Retry(operation, bo)
}
const (
WaitUntilPrimaryRolled = "primary-rolled"
WaitUntilDrainingStarted = "draining-started"
)
var WaitUntilOptionList = []string{WaitUntilPrimaryRolled, WaitUntilDrainingStarted}
// ECSServiceUpdate encapsulates the attributes of an ECS service update
type ECSServiceUpdate struct {
EcsApi ecsiface.ECSAPI // ECS Api
ElbApi elbv2iface.ELBV2API // ELBV2 Api
Cluster string // Cluster which the service is deployed to
Service string // Name of the service
Image map[string]string // Map of container names and images
Environment map[string]map[string]string // Map of container names environment variable name and value
Secrets map[string]map[string]string // Map of container names environment variable name and valueFrom
LogDriverOptions map[string]map[string]map[string]string // Map of container names log driver name log driver option and value
LogDriverSecrets map[string]map[string]map[string]string // Map of container names log driver name log driver secret and valueFrom
TaskRole string // Task IAM Role if TaskRoleKnockoutValue used, it is cleared
DesiredCount *int64 // If nil the service desired count is not altered
BackOff backoff.BackOff // BackOff strategy to use when validating the update
Taskdef string // If non empty used as base task definition instead of the current task definition
WaitUntil *string // Decide wether to wait until the service "started-draining" (only valid for services with Load Balancers attached) or until the deployment "primary-rolled" (default)
}
// Apply the ECS Service Update
func (e *ECSServiceUpdate) Apply() error {
var useValidateDeploymentFunc validateDeploymentFunc = validateDeployment
if e.WaitUntil != nil {
switch *e.WaitUntil {
case WaitUntilDrainingStarted:
useValidateDeploymentFunc = validateDraining
case WaitUntilPrimaryRolled:
useValidateDeploymentFunc = validateDeployment
default:
return ErrInvalidWaitUntil
}
}
return alterServiceOrValidatedRollBack(e.EcsApi, e.ElbApi, e.Cluster, e.Service, e.Image, e.Environment, e.Secrets, e.LogDriverOptions, e.LogDriverSecrets, e.TaskRole, e.DesiredCount, e.Taskdef, e.BackOff, useValidateDeploymentFunc)
}