Skip to content

Commit

Permalink
Merge pull request #2857 from jlhawn/update_constraint_enforcer
Browse files Browse the repository at this point in the history
Use Service Placement Constraints in Enforcer
  • Loading branch information
dperny authored May 20, 2019
2 parents 88dcc0f + b39ccd7 commit 1e20cbf
Show file tree
Hide file tree
Showing 2 changed files with 160 additions and 4 deletions.
56 changes: 52 additions & 4 deletions manager/orchestrator/constraintenforcer/constraint_enforcer.go
Original file line number Diff line number Diff line change
Expand Up @@ -76,8 +76,22 @@ func (ce *ConstraintEnforcer) rejectNoncompliantTasks(node *api.Node) {
err error
)

services := map[string]*api.Service{}
ce.store.View(func(tx store.ReadTx) {
tasks, err = store.FindTasks(tx, store.ByNodeID(node.ID))
if err != nil {
return
}

// Deduplicate service IDs using the services map. It's okay for the
// values to be nil for now, we will look them up from the store next.
for _, task := range tasks {
services[task.ServiceID] = nil
}

for serviceID := range services {
services[serviceID] = store.GetService(tx, serviceID)
}
})

if err != nil {
Expand Down Expand Up @@ -105,10 +119,44 @@ loop:
continue
}

// Ensure that the task still meets scheduling
// constraints.
if t.Spec.Placement != nil && len(t.Spec.Placement.Constraints) != 0 {
constraints, _ := constraint.Parse(t.Spec.Placement.Constraints)
// Ensure that the node still satisfies placement constraints.
// NOTE: If the task is associacted with a service then we must use the
// constraints from the current service spec rather than the
// constraints from the task spec because they may be outdated. This
// will happen if the service was previously updated in a way which
// only changes the placement constraints and the node matched the
// placement constraints both before and after that update. In the case
// of such updates, the tasks are not considered "dirty" and are not
// restarted but it will mean that the task spec's placement
// constraints are outdated. Consider this example:
// - A service is created with no constraints and a task is scheduled
// to a node.
// - The node is updated to add a label, this doesn't affect the task
// on that node because it has no constraints.
// - The service is updated to add a node label constraint which
// matches the label which was just added to the node. The updater
// does not shut down the task because the only the constraints have
// changed and the node still matches the updated constraints.
// - The node is updated to remove the node label. The node no longer
// satisfies the placement constraints of the service, so the task
// should be shutdown. However, the task's spec still has the
// original and outdated constraints (that are still satisfied by
// the node). If we used those original constraints then the task
// would incorrectly not be removed. This is why the constraints
// from the service spec should be used instead.
var placement *api.Placement
if service := services[t.ServiceID]; service != nil {
// This task is associated with a service, so we use the service's
// current placement constraints.
placement = service.Spec.Task.Placement
} else {
// This task is not associated with a service (or the service no
// longer exists), so we use the placement constraints from the
// original task spec.
placement = t.Spec.Placement
}
if placement != nil && len(placement.Constraints) > 0 {
constraints, _ := constraint.Parse(placement.Constraints)
if !constraint.NodeMatches(constraints, node) {
removeTasks[t.ID] = t
continue
Expand Down
108 changes: 108 additions & 0 deletions manager/orchestrator/constraintenforcer/constraint_enforcer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (
"github.com/docker/swarmkit/manager/state"
"github.com/docker/swarmkit/manager/state/store"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)

func TestConstraintEnforcer(t *testing.T) {
Expand Down Expand Up @@ -168,3 +169,110 @@ func TestConstraintEnforcer(t *testing.T) {
assert.Equal(t, "id4", shutdown3.ID)
assert.Equal(t, api.TaskStateRejected, shutdown3.Status.State)
}

// TestOutdatedPlacementConstraints tests the following scenario: If a task is
// associacted with a service then we must use the constraints from the current
// service spec rather than the constraints from the task spec because they may
// be outdated. This will happen if the service was previously updated in a way
// which only changes the placement constraints and the node matched the
// placement constraints both before and after that update. In the case of such
// updates, the tasks are not considered "dirty" and are not restarted but it
// will mean that the task spec's placement constraints are outdated. Consider
// this example:
// - A service is created with no constraints and a task is scheduled
// to a node.
// - The node is updated to add a label, this doesn't affect the task
// on that node because it has no constraints.
// - The service is updated to add a node label constraint which
// matches the label which was just added to the node. The updater
// does not shut down the task because the only the constraints have
// changed and the node still matches the updated constraints.
// This test initializes a new in-memory store with the expected state from
// above, starts a new constraint enforcer, and then updates the node to remove
// the node label. Since the node no longer satisfies the placement constraints
// of the service spec, the task should be shutdown despite the fact that the
// task's own spec still has the original placement constraints.
func TestOutdatedTaskPlacementConstraints(t *testing.T) {
node := &api.Node{
ID: "id0",
Spec: api.NodeSpec{
Annotations: api.Annotations{
Name: "node1",
Labels: map[string]string{
"foo": "bar",
},
},
Availability: api.NodeAvailabilityActive,
},
Status: api.NodeStatus{
State: api.NodeStatus_READY,
},
Role: api.NodeRoleWorker,
}

service := &api.Service{
ID: "id1",
Spec: api.ServiceSpec{
Annotations: api.Annotations{
Name: "service1",
},
Task: api.TaskSpec{
Placement: &api.Placement{
Constraints: []string{
"node.labels.foo == bar",
},
},
},
},
}

task := &api.Task{
ID: "id2",
Spec: api.TaskSpec{
Placement: nil, // Note: No placement constraints.
},
ServiceID: service.ID,
NodeID: node.ID,
Status: api.TaskStatus{
State: api.TaskStateRunning,
},
DesiredState: api.TaskStateRunning,
}

s := store.NewMemoryStore(nil)
require.NotNil(t, s)
defer s.Close()

require.NoError(t, s.Update(func(tx store.Tx) error {
// Prepoulate node, service, and task.
for _, err := range []error{
store.CreateNode(tx, node),
store.CreateService(tx, service),
store.CreateTask(tx, task),
} {
if err != nil {
return err
}
}
return nil
}))

watch, cancel := state.Watch(s.WatchQueue(), api.EventUpdateTask{})
defer cancel()

constraintEnforcer := New(s)
defer constraintEnforcer.Stop()

go constraintEnforcer.Run()

// Update the node to remove the node label.
require.NoError(t, s.Update(func(tx store.Tx) error {
node = store.GetNode(tx, node.ID)
delete(node.Spec.Annotations.Labels, "foo")
return store.UpdateNode(tx, node)
}))

// The task should be rejected immediately.
task = testutils.WatchTaskUpdate(t, watch)
assert.Equal(t, api.TaskStateRejected, task.Status.State)
}

0 comments on commit 1e20cbf

Please sign in to comment.