Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support task placement via ecs-params #586

Merged
merged 7 commits into from
Aug 21, 2018
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
36 changes: 36 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -434,6 +434,13 @@ run_params:
subnets: array of strings // These should be in the same VPC and Availability Zone as your instance
security_groups: array of strings // These should be in the same VPC as your instance
assign_public_ip: string // supported values: ENABLED or DISABLED
task_placement:
strategy:
- type: string // Valid values: "spread"|"binpack"|"random"
field: string // Not valid if type is "random"
constraints:
- type: string // Valid values: "memberOf"|"distinctInstance"
expression: string // Not valid if type is "distinctInstance"
```

**Version**
Expand Down Expand Up @@ -478,6 +485,17 @@ Currently, the only parameter supported under `run_params` is `network_configura
* `subnets`: list of subnet ids used to launch tasks. ***NOTE*** These should be in the same VPC and availability zone as the instances on which you wish to launch your tasks.
* `security_groups`: list of securtiy-group ids used to launch tasks. ***NOTE*** These should be in the same VPC as the instances on which you wish to launch your tasks.
* `assign_public_ip`: supported values for this field are either "ENABLED" or "DISABLED". This field is *only* used for tasks launched with Fargate launch type. If this field is present in tasks with network configuration launched with EC2 launch type, the request will fail.
* `task_placement` is an optional field with `EC2` launch-type only (it is *not* valid for `FARGATE`). It has two subfields:
* `strategy`: A list of objects, with two keys. Valid keys are `type` and `field`.
* `type`: Valid values are `random`, `binpack`, or `spread`. If `random` is specified, the `field` key is not necessary.
* `field`: Valid values depend on the strategy type.
* For `spread`, valid values are `instanceId`, `host`, or attribute key/value pairs, e.g. `attribute:ecs.instance-type =~ t2.*`
* For "binpack", valid values are "cpu" or "memory".
* `constraint`: A list of objects, with two keys. Valid keys are `type` and `expression`.
* `type`: Valid values are `distinctInstance` and `memberOf`. If `distinctInstance` is specified, the `expression key is not necessary.
* `expression`: When `type` is `memberOf`, valid values are key/value pairs for attributes or task groups, e.g. `task:group == databases` or `attribute:color =~ green`.

For more information on task placement, see [Amazon ECS TaskPlacement] (https://docs.aws.amazon.com/AmazonECS/latest/developerguide/task-placement.html).

Example `ecs-params.yml` file:

Expand Down Expand Up @@ -537,6 +555,24 @@ run_params:
assign_public_ip: ENABLED
```

Example `ecs-params.yml` with task placement:

```
version: 1
run_params:
task_placement:
strategy:
- field: memory
type: binpack
- field: attribute:ecs.availability-zone
type: spread
- type: random
constraints:
- expression: attribute:ecs.instance-type =~ t2.*
type: memberOf
- type: distinctInstance`
```

You can then start a task by calling:
```
ecs-cli compose --ecs-params my-ecs-params.yml up
Expand Down
2 changes: 2 additions & 0 deletions ecs-cli/modules/cli/compose/entity/entity_helper.go
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,8 @@ func SetupTaskDefinitionCache() cache.Cache {
// GetOrCreateTaskDefinition gets the task definition from cache if present, else
// creates it in ECS and persists in a local cache. It also sets the latest
// taskDefinition to the current instance of task
// TODO: convert to method on entity, since it changes state of entity?
// Also, since this is called before other task/service API calls, might be good to add Fargate validation here
func GetOrCreateTaskDefinition(entity ProjectEntity) (*ecs.TaskDefinition, error) {
taskDefinition := entity.TaskDefinition()
log.WithFields(log.Fields{
Expand Down
20 changes: 19 additions & 1 deletion ecs-cli/modules/cli/compose/entity/service/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -364,12 +364,22 @@ func (s *Service) EntityType() types.Type {
func (s *Service) buildCreateServiceInput(serviceName, taskDefName string) (*ecs.CreateServiceInput, error) {
launchType := s.Context().CommandConfig.LaunchType
cluster := s.Context().CommandConfig.Cluster
ecsParams := s.ecsContext.ECSParams

networkConfig, err := composeutils.ConvertToECSNetworkConfiguration(s.ecsContext.ECSParams)
networkConfig, err := composeutils.ConvertToECSNetworkConfiguration(ecsParams)
if err != nil {
return nil, err
}
placementConstraints, err := composeutils.ConvertToECSPlacementConstraints(ecsParams)
if err != nil {
return nil, err
}
placementStrategy, err := composeutils.ConvertToECSPlacementStrategy(ecsParams)
if err != nil {
return nil, err
}

// NOTE: this validation is not useful if called after GetOrCreateTaskDefinition()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm curious about this- my reading of ValidateFargateParams() doesn't indicate it matters if its called before or after GetOrCreateTaskDefinition()- what am I missing?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So there is nothing wrong with the ValidateFargateParams() method itself, but in the places it's being called from the top-level compose commands (in this case, Create(), we are calling entity.GetOrCreateTaskDefinitions(s) first (see the comment in entity_helper.go from the same commit). In other words, due to the order-dependent nature of actual calls to ECS and the fact that we mock these in our unit tests, this Validation is not useful for validating the actual request, in this case, to CreateService, because bad parameters on the TaskDefinition itself will cause the server to return a validation error. E.g.:

// with the following inputs:

~/ecs/ecs-cli/testing/projects/test_fields$ cat ecs_params/ecs-params-task-placement.yml 
version: 1
task_definition:
run_params:
  task_placement:
    strategy:
      - field: memory
        type: binpack
      - field: attribute:ecs.availability-zone
        type: spread
    constraints:
      - expression: attribute:color =~ blue
        type: memberOf


~/ecs/ecs-cli/testing/projects/test_fields$ cat docker-compose.yml
version: '2'
services:
  wordpress:
    image: wordpress
    ports:
      - "80:80"
  mysql:
    image: mysql
    environment:
      MYSQL_ROOT_PASSWORD: foobar

ECS returns a ClientException on RegisterTaskDefinition:

~/ecs/ecs-cli/testing/projects/test_fields$ ecs-cli compose --ecs-params ecs_params/ecs-params-task-placement.yml up --launch-type FARGATE
ERRO[0000] Error registering task definition             error="ClientException: Fargate only supports network mode ‘awsvpc’.\n\tstatus code: 400, request id: c6f9aede-a562-11e8-bbee-ebd059c53f92" family=test_fields
ERRO[0000] Create task definition failed                 error="ClientException: Fargate only supports network mode ‘awsvpc’.\n\tstatus code: 400, request id: c6f9aede-a562-11e8-bbee-ebd059c53f92"
FATA[0000] ClientException: Fargate only supports network mode ‘awsvpc’.
	status code: 400, request id: c6f9aede-a562-11e8-bbee-ebd059c53f92 

I discovered this case when trying to test Fargate validation (since placement is not supported with Fargate launch type).

if err = entity.ValidateFargateParams(s.Context().ECSParams, launchType); err != nil {
return nil, err
}
Expand All @@ -396,6 +406,14 @@ func (s *Service) buildCreateServiceInput(serviceName, taskDefName string) (*ecs
createServiceInput.NetworkConfiguration = networkConfig
}

if placementConstraints != nil {
createServiceInput.PlacementConstraints = placementConstraints
}

if placementStrategy != nil {
createServiceInput.PlacementStrategy = placementStrategy
}

if launchType != "" {
createServiceInput.LaunchType = aws.String(launchType)
}
Expand Down
63 changes: 63 additions & 0 deletions ecs-cli/modules/cli/compose/entity/service/service_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -205,6 +205,69 @@ func TestCreateEC2Explicitly(t *testing.T) {
)
}

func TestCreateWithTaskPlacement(t *testing.T) {
flagSet := flag.NewFlagSet("ecs-cli-up", 0)

createServiceTest(
t,
flagSet,
&config.CommandConfig{},
ecsParamsWithTaskPlacement(),
func(input *ecs.CreateServiceInput) {
placementConstraints := input.PlacementConstraints
placementStrategy := input.PlacementStrategy
expectedConstraints := []*ecs.PlacementConstraint{
{
Type: aws.String("distinctInstance"),
}, {
Expression: aws.String("attribute:ecs.instance-type =~ t2.*"),
Type: aws.String("memberOf"),
},
}
expectedStrategy := []*ecs.PlacementStrategy{
{
Type: aws.String("random"),
}, {
Field: aws.String("instanceId"),
Type: aws.String("binpack"),
},
}

assert.Len(t, placementConstraints, 2)
assert.Equal(t, expectedConstraints, placementConstraints, "Expected Placement Constraints to match")
assert.Len(t, placementStrategy, 2)
assert.Equal(t, expectedStrategy, placementStrategy, "Expected Placement Strategy to match")
},
)
}

func ecsParamsWithTaskPlacement() *utils.ECSParams {
return &utils.ECSParams{
RunParams: utils.RunParams{
TaskPlacement: utils.TaskPlacement{
Constraints: []utils.Constraint{
utils.Constraint{
Type: ecs.PlacementConstraintTypeDistinctInstance,
},
utils.Constraint{
Expression: "attribute:ecs.instance-type =~ t2.*",
Type: ecs.PlacementConstraintTypeMemberOf,
},
},
Strategies: []utils.Strategy{
utils.Strategy{
Type: ecs.PlacementStrategyTypeRandom,
},
utils.Strategy{
Field: "instanceId",
Type: ecs.PlacementStrategyTypeBinpack,
},
},
},
},
}
}

// Specifies TargeGroupArn to test ALB
func TestCreateWithALB(t *testing.T) {
targetGroupArn := "targetGroupArn"
Expand Down
53 changes: 39 additions & 14 deletions ecs-cli/modules/cli/compose/entity/task/task.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ import (
"github.com/aws/amazon-ecs-cli/ecs-cli/modules/cli/compose/context"
"github.com/aws/amazon-ecs-cli/ecs-cli/modules/cli/compose/entity"
"github.com/aws/amazon-ecs-cli/ecs-cli/modules/cli/compose/entity/types"
"github.com/aws/amazon-ecs-cli/ecs-cli/modules/commands/flags"
"github.com/aws/amazon-ecs-cli/ecs-cli/modules/utils"
"github.com/aws/amazon-ecs-cli/ecs-cli/modules/utils/cache"
composeutils "github.com/aws/amazon-ecs-cli/ecs-cli/modules/utils/compose"
Expand Down Expand Up @@ -99,7 +100,8 @@ func (t *Task) Start() error {
// if count of running tasks = 0, starts 1
// if count != 0, and the task definitions differed, then its stops the old ones and starts the new ones
func (t *Task) Up() error {
return t.up(true)
updateTasks := t.Context().CLIContext.Bool(flags.ForceUpdateFlag)
return t.up(updateTasks)
}

// Info returns a formatted list of containers (running and stopped) in the current cluster
Expand All @@ -108,19 +110,20 @@ func (t *Task) Info(filterLocal bool) (project.InfoSet, error) {
return entity.Info(t, filterLocal)
}

// Scale finds out the current count of running tasks for this project and scales to the desired count
// Scale finds out the current count of running tasks for this project and scales to the desired count.
// Any run params specified will be taken into account.
// if desired = current, noop
// if desired > current, stops the extra ones
// if desired < current, start new ones (also if current was 0, create a new task definition)
func (t *Task) Scale(expectedCount int) error {
func (t *Task) Scale(desiredCount int) error {
ecsTasks, err := entity.CollectTasksWithStatus(t, ecs.DesiredStatusRunning, true)
if err != nil {
return err
}

observedCount := len(ecsTasks)

if expectedCount == observedCount {
if desiredCount == observedCount {
// NoOp
log.WithFields(log.Fields{
"countOfRunningTasks": observedCount,
Expand All @@ -129,18 +132,18 @@ func (t *Task) Scale(expectedCount int) error {
return nil
}

// running more than expected, stop the tasks
if expectedCount < observedCount {
diff := observedCount - expectedCount
// running more than desired, stop the extra tasks
if desiredCount < observedCount {
diff := observedCount - desiredCount
ecsTasksToStop := []*ecs.Task{}
for i := 0; i < diff; i++ {
ecsTasksToStop = append(ecsTasksToStop, ecsTasks[i])
}
return t.stopTasks(ecsTasksToStop)
}

// if expected > observed, then run the difference
diff := expectedCount - observedCount
// if desired > observed, then run the difference
diff := desiredCount - observedCount

var taskDef string
// if nothing was running, create new task definition
Expand Down Expand Up @@ -250,6 +253,7 @@ func (t *Task) stopTasks(ecsTasks []*ecs.Task) error {
}

// runTasks issues run task request to ECS Service in chunks of count=10
// it always takes into account the latest ECS params
func (t *Task) runTasks(taskDefinition string, totalCount int) ([]*ecs.Task, error) {
result := []*ecs.Task{}
chunkSize := 10 // can issue only up to 10 tasks in a RunTask Call
Expand Down Expand Up @@ -302,6 +306,7 @@ func convertToECSTaskOverride(overrides map[string][]string) (*ecs.TaskOverride,
return ecsOverrides, nil
}

// buildRunTaskInput will account for what is currently specified in ECS Params
func (t *Task) buildRunTaskInput(taskDefinition string, count int, overrides map[string][]string) (*ecs.RunTaskInput, error) {
cluster := t.Context().CommandConfig.Cluster
launchType := t.Context().CommandConfig.LaunchType
Expand All @@ -314,6 +319,17 @@ func (t *Task) buildRunTaskInput(taskDefinition string, count int, overrides map
return nil, err
}

placementConstraints, err := composeutils.ConvertToECSPlacementConstraints(ecsParams)
if err != nil {
return nil, err
}

placementStrategy, err := composeutils.ConvertToECSPlacementStrategy(ecsParams)
if err != nil {
return nil, err
}

// NOTE: this validation is not useful if called after RegisterTaskDefinition
if err := entity.ValidateFargateParams(ecsParams, launchType); err != nil {
return nil, err
}
Expand All @@ -338,6 +354,14 @@ func (t *Task) buildRunTaskInput(taskDefinition string, count int, overrides map
runTaskInput.Overrides = taskOverride
}

if placementConstraints != nil {
runTaskInput.PlacementConstraints = placementConstraints
}

if placementStrategy != nil {
runTaskInput.PlacementStrategy = placementStrategy
}

if launchType != "" {
runTaskInput.LaunchType = aws.String(launchType)
}
Expand All @@ -354,10 +378,11 @@ func (t *Task) createOne() error {
return t.waitForRunTasks(ecsTask)
}

// up gets a list of running tasks and if updateTasks is set to true, it updates it with the latest task definition
// if count of running tasks = 0, starts 1
// if count != 0, and the task definitions differed, then its stops the old ones and starts the new ones
func (t *Task) up(updateTasks bool) error {
// up gets a list of running tasks. If there are no running tasks, it starts 1 task.
// If there are no running tasks, and either the task definition has changed or
// forceUpdate is specified, then the running tasks are stopped and relaunched
// with the task definition and run parameters in the current call.
func (t *Task) up(forceUpdate bool) error {
ecsTasks, err := entity.CollectTasksWithStatus(t, ecs.DesiredStatusRunning, true)
if err != nil {
return err
Expand Down Expand Up @@ -388,7 +413,7 @@ func (t *Task) up(updateTasks bool) error {

ecsTaskArns := make(map[string]bool)

if oldTaskDef != newTaskDef {
if oldTaskDef != newTaskDef || forceUpdate {
log.WithFields(log.Fields{"taskDefinition": newTaskDef}).Info("Updating to new task definition")

chunkSize := 10
Expand Down
3 changes: 2 additions & 1 deletion ecs-cli/modules/clients/aws/ecs/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -224,8 +224,9 @@ func (c *ecsClient) RegisterTaskDefinition(request *ecs.RegisterTaskDefinitionIn
func (c *ecsClient) RegisterTaskDefinitionIfNeeded(
request *ecs.RegisterTaskDefinitionInput,
taskDefinitionCache cache.Cache) (*ecs.TaskDefinition, error) {

if request.Family == nil {
return nil, errors.New("invalid task definitions: family is required")
return nil, errors.New("invalid task definition: family is required")
}

taskDefResp, err := c.DescribeTaskDefinition(aws.StringValue(request.Family))
Expand Down
48 changes: 48 additions & 0 deletions ecs-cli/modules/clients/aws/ecs/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -607,6 +607,54 @@ func TestRunTask_WithTaskNetworking(t *testing.T) {
assert.NoError(t, err, "Unexpected error when calling RunTask")
}

func TestRunTask_WithTaskPlacement(t *testing.T) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just something to think about (not a blocker for merging): Now that you've refactored the ECS Client's RunTask to just take the RunTaskInput and then call the SDK's RunTask, do we really need tests like this? This test just passes in the 2 new things through the RunTaskInput object and then verifies that the client was called with the same inputs. One might say that it never hurts to have more tests... but I disagree because the more tests you have, the more code you have to change during future refactorings/code changes.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I definitely agree that tests should be treated as first-class citizens for purposes of code maintenance; in this case however, given our mocking strategy I think this is still a useful test to ensure that we are passing the right inputs through to our ECS Client (and that certain fields are not passed through in certain scenarios, to ensure we get the expected defaults that ECS provides.

mockEcs, _, client, ctrl := setupTestController(t, getDefaultCLIConfigParams(t))
defer ctrl.Finish()

td := "taskDef"
group := "taskGroup"
count := 5

placementConstraints := []*ecs.PlacementConstraint{
{
Type: aws.String("distinctInstance"),
}, {
Expression: aws.String("attribute:ecs.instance-type =~ t2.*"),
Type: aws.String("memberOf"),
},
}
placementStrategy := []*ecs.PlacementStrategy{
{
Type: aws.String("random"),
}, {
Field: aws.String("instanceId"),
Type: aws.String("binpack"),
},
}

mockEcs.EXPECT().RunTask(gomock.Any()).Do(func(input interface{}) {
req := input.(*ecs.RunTaskInput)
assert.Equal(t, clusterName, aws.StringValue(req.Cluster), "Expected clusterName to match")
assert.Equal(t, td, aws.StringValue(req.TaskDefinition), "Expected taskDefinition to match")
assert.Equal(t, group, aws.StringValue(req.Group), "Expected group to match")
assert.Equal(t, int64(count), aws.Int64Value(req.Count), "Expected count to match")
assert.Equal(t, placementConstraints, req.PlacementConstraints, "Expected placement constraints to match")
assert.Equal(t, placementStrategy, req.PlacementStrategy, "Expected placement strategy to match")
}).Return(&ecs.RunTaskOutput{}, nil)

runTaskInput := &ecs.RunTaskInput{
Cluster: aws.String(clusterName),
TaskDefinition: aws.String(td),
Group: aws.String(group),
Count: aws.Int64(int64(count)),
LaunchType: aws.String("EC2"),
PlacementConstraints: placementConstraints,
PlacementStrategy: placementStrategy,
}
_, err := client.RunTask(runTaskInput)
assert.NoError(t, err, "Unexpected error when calling RunTask")
}

func TestIsActiveCluster(t *testing.T) {
mockEcs, _, client, ctrl := setupTestController(t, getDefaultCLIConfigParams(t))
defer ctrl.Finish()
Expand Down
2 changes: 1 addition & 1 deletion ecs-cli/modules/commands/compose/compose_command.go
Original file line number Diff line number Diff line change
Expand Up @@ -132,7 +132,7 @@ func upCommand(factory composeFactory.ProjectFactory) cli.Command {
Name: "up",
Usage: "Creates an ECS task definition from your compose file (if it does not already exist) and runs one instance of that task on your cluster (a combination of create and start).",
Action: compose.WithProject(factory, compose.ProjectUp, false),
Flags: append(flags.OptionalConfigFlags(), flags.OptionalLaunchTypeFlag(), flags.OptionalCreateLogsFlag()),
Flags: append(flags.OptionalConfigFlags(), flags.OptionalLaunchTypeFlag(), flags.OptionalCreateLogsFlag(), flags.OptionalForceUpdateFlag()),
OnUsageError: flags.UsageErrorFactory("up"),
}
}
Expand Down
Loading