Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Collect Docker Swarm metrics in docker input plugin #3141

Merged
merged 14 commits into from
Oct 3, 2017
13 changes: 13 additions & 0 deletions plugins/inputs/docker/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
"net/http"

"github.com/docker/docker/api/types"
"github.com/docker/docker/api/types/swarm"
docker "github.com/docker/docker/client"
"github.com/docker/go-connections/sockets"
)
Expand All @@ -20,6 +21,9 @@ type Client interface {
ContainerList(ctx context.Context, options types.ContainerListOptions) ([]types.Container, error)
ContainerStats(ctx context.Context, containerID string, stream bool) (types.ContainerStats, error)
ContainerInspect(ctx context.Context, containerID string) (types.ContainerJSON, error)
ServiceList(ctx context.Context, options types.ServiceListOptions) ([]swarm.Service, error)
TaskList(ctx context.Context, options types.TaskListOptions) ([]swarm.Task, error)
NodeList(ctx context.Context, options types.NodeListOptions) ([]swarm.Node, error)
}

func NewEnvClient() (Client, error) {
Expand Down Expand Up @@ -65,3 +69,12 @@ func (c *SocketClient) ContainerStats(ctx context.Context, containerID string, s
func (c *SocketClient) ContainerInspect(ctx context.Context, containerID string) (types.ContainerJSON, error) {
return c.client.ContainerInspect(ctx, containerID)
}
func (c *SocketClient) ServiceList(ctx context.Context, options types.ServiceListOptions) ([]swarm.Service, error) {
return c.client.ServiceList(ctx, options)
}
func (c *SocketClient) TaskList(ctx context.Context, options types.TaskListOptions) ([]swarm.Task, error) {
return c.client.TaskList(ctx, options)
}
func (c *SocketClient) NodeList(ctx context.Context, options types.NodeListOptions) ([]swarm.Node, error) {
return c.client.NodeList(ctx, options)
}
80 changes: 80 additions & 0 deletions plugins/inputs/docker/docker.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ import (
"time"

"github.com/docker/docker/api/types"
"github.com/docker/docker/api/types/swarm"
"github.com/influxdata/telegraf"
"github.com/influxdata/telegraf/filter"
"github.com/influxdata/telegraf/internal"
Expand All @@ -35,6 +36,8 @@ type Docker struct {
Endpoint string
ContainerNames []string

SwarmEnabled bool `toml:"swarm_enabled"`

Timeout internal.Duration
PerDevice bool `toml:"perdevice"`
Total bool `toml:"total"`
Expand Down Expand Up @@ -82,6 +85,9 @@ var sampleConfig = `
## To use environment variables (ie, docker-machine), set endpoint = "ENV"
endpoint = "unix:///var/run/docker.sock"

## Set to true to collect Swarm metrics(desired_replicas, running_replicas)
swarm_enabled = false
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe this should be named gather_services = true or gather = ["services"] since the similar docker command is simply docker service.


## Only collect metrics for these containers, collect all if empty
container_names = []

Expand Down Expand Up @@ -160,6 +166,13 @@ func (d *Docker) Gather(acc telegraf.Accumulator) error {
acc.AddError(err)
}

if d.SwarmEnabled {
err := d.gatherSwarmInfo(acc)
if err != nil {
acc.AddError(err)
}
}

// List containers
opts := types.ContainerListOptions{}
ctx, cancel := context.WithTimeout(context.Background(), d.Timeout.Duration)
Expand Down Expand Up @@ -187,6 +200,73 @@ func (d *Docker) Gather(acc telegraf.Accumulator) error {
return nil
}

func (d *Docker) gatherSwarmInfo(acc telegraf.Accumulator) error {

ctx, cancel := context.WithTimeout(context.Background(), d.Timeout.Duration)
defer cancel()
services, err := d.client.ServiceList(ctx, types.ServiceListOptions{})
if err != nil {
return err
}

if len(services) > 0 {

tasks, err := d.client.TaskList(ctx, types.TaskListOptions{})
if err != nil {
return err
}

nodes, err := d.client.NodeList(ctx, types.NodeListOptions{})
if err != nil {
return err
}

running := map[string]int{}
tasksNoShutdown := map[string]int{}

activeNodes := make(map[string]struct{})
for _, n := range nodes {
if n.Status.State != swarm.NodeStateDown {
activeNodes[n.ID] = struct{}{}
}
}

for _, task := range tasks {
if task.DesiredState != swarm.TaskStateShutdown {
tasksNoShutdown[task.ServiceID]++
}

if _, nodeActive := activeNodes[task.NodeID]; nodeActive && task.Status.State == swarm.TaskStateRunning {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm new to Docker Swarm, but why do you check if the node is not in the shutdown state, instead of just recording the running status? It seems like almost always the task will not be running if the node is down.

running[task.ServiceID]++
}
}

for _, service := range services {
tags := map[string]string{}
fields := make(map[string]interface{})
now := time.Now()
tags["swarm_service_id"] = service.ID
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should consider not including this since looks to be a random identifier string, which can cause high cardinality depending on how quickly it changes.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For each service there will be unique ID. Since services are not something which frequently changes as in containers I think we can still keep this.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since the measurement name is docker_swarm, I think we should call the tags and fields without the swarm prefix: service_name, service_mode.

tags["swarm_service_name"] = service.Spec.Name
if service.Spec.Mode.Replicated != nil && service.Spec.Mode.Replicated.Replicas != nil {
fields["swarm_service_mode"] = "replicated"
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think swarm_service_mode should be a tag.

fields["swarm_tasks_running"] = running[service.ID]
fields["swarm_tasks_desired"] = *service.Spec.Mode.Replicated.Replicas
} else if service.Spec.Mode.Global != nil {
fields["swarm_service_mode"] = "global"
fields["swarm_tasks_running"] = running[service.ID]
fields["swarm_tasks_desired"] = tasksNoShutdown[service.ID]
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't understand why non-shutdown tasks are the desired number of tasks. Shouldn't this be equal to the number of Nodes since global services are on every node?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There is chance that the on one of the nodes or on few nodes the task is not running. When the mode is "global", swarm tries to deploy containers(for swarm service it is tasks) on all nodes. However, there is a chance that on any of the node the container might not get started due to reasons like registry is not accessible from that node, /var/lib/docker/images directory is corrupted etc..

}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In case the Replicated.Replicas is nil or another Mode is added, we should have an else condition that continues and perhaps logs (depending on if Replicas being nil is an error or not).

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Handled this with "log"

// Add metrics
acc.AddFields("docker_swarm",
fields,
tags,
now)
}
}

return nil
}

func (d *Docker) gatherInfo(acc telegraf.Accumulator) error {
// Init vars
dataFields := make(map[string]interface{})
Expand Down
82 changes: 82 additions & 0 deletions plugins/inputs/docker/docker_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (
"github.com/influxdata/telegraf/testutil"

"github.com/docker/docker/api/types"
"github.com/docker/docker/api/types/swarm"
"github.com/stretchr/testify/require"
)

Expand All @@ -16,6 +17,9 @@ type MockClient struct {
ContainerListF func(ctx context.Context, options types.ContainerListOptions) ([]types.Container, error)
ContainerStatsF func(ctx context.Context, containerID string, stream bool) (types.ContainerStats, error)
ContainerInspectF func(ctx context.Context, containerID string) (types.ContainerJSON, error)
ServiceListF func(ctx context.Context, options types.ServiceListOptions) ([]swarm.Service, error)
TaskListF func(ctx context.Context, options types.TaskListOptions) ([]swarm.Task, error)
NodeListF func(ctx context.Context, options types.NodeListOptions) ([]swarm.Node, error)
}

func (c *MockClient) Info(ctx context.Context) (types.Info, error) {
Expand Down Expand Up @@ -44,6 +48,27 @@ func (c *MockClient) ContainerInspect(
return c.ContainerInspectF(ctx, containerID)
}

func (c *MockClient) ServiceList(
ctx context.Context,
options types.ServiceListOptions,
) ([]swarm.Service, error) {
return c.ServiceListF(ctx, options)
}

func (c *MockClient) TaskList(
ctx context.Context,
options types.TaskListOptions,
) ([]swarm.Task, error) {
return c.TaskListF(ctx, options)
}

func (c *MockClient) NodeList(
ctx context.Context,
options types.NodeListOptions,
) ([]swarm.Node, error) {
return c.NodeListF(ctx, options)
}

func newClient(host string, tlsConfig *tls.Config) (Client, error) {
return &MockClient{
InfoF: func(context.Context) (types.Info, error) {
Expand All @@ -58,6 +83,15 @@ func newClient(host string, tlsConfig *tls.Config) (Client, error) {
ContainerInspectF: func(context.Context, string) (types.ContainerJSON, error) {
return containerInspect, nil
},
ServiceListF: func(context.Context, types.ServiceListOptions) ([]swarm.Service, error) {
return ServiceList, nil
},
TaskListF: func(context.Context, types.TaskListOptions) ([]swarm.Task, error) {
return TaskList, nil
},
NodeListF: func(context.Context, types.NodeListOptions) ([]swarm.Node, error) {
return NodeList, nil
},
}, nil
}

Expand Down Expand Up @@ -227,6 +261,15 @@ func TestDocker_WindowsMemoryContainerStats(t *testing.T) {
ContainerInspectF: func(ctx context.Context, containerID string) (types.ContainerJSON, error) {
return containerInspect, nil
},
ServiceListF: func(context.Context, types.ServiceListOptions) ([]swarm.Service, error) {
return ServiceList, nil
},
TaskListF: func(context.Context, types.TaskListOptions) ([]swarm.Task, error) {
return TaskList, nil
},
NodeListF: func(context.Context, types.NodeListOptions) ([]swarm.Node, error) {
return NodeList, nil
},
}, nil
},
}
Expand Down Expand Up @@ -436,3 +479,42 @@ func TestDockerGatherInfo(t *testing.T) {
},
)
}

func TestDockerGatherSwarmInfo(t *testing.T) {
var acc testutil.Accumulator
d := Docker{
newClient: newClient,
}

err := acc.GatherError(d.Gather)
require.NoError(t, err)

d.gatherSwarmInfo(&acc)

// test docker_container_net measurement
acc.AssertContainsTaggedFields(t,
"docker_swarm",
map[string]interface{}{
"swarm_service_mode": "replicated",
"swarm_tasks_running": int(2),
"swarm_tasks_desired": uint64(2),
},
map[string]string{
"swarm_service_id": "qolkls9g5iasdiuihcyz9rnx2",
"swarm_service_name": "test1",
},
)

acc.AssertContainsTaggedFields(t,
"docker_swarm",
map[string]interface{}{
"swarm_service_mode": "global",
"swarm_tasks_running": int(1),
"swarm_tasks_desired": int(1),
},
map[string]string{
"swarm_service_id": "qolkls9g5iasdiuihcyz9rn3",
"swarm_service_name": "test2",
},
)
}
74 changes: 74 additions & 0 deletions plugins/inputs/docker/docker_testdata.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (
"github.com/docker/docker/api/types"
"github.com/docker/docker/api/types/container"
"github.com/docker/docker/api/types/registry"
"github.com/docker/docker/api/types/swarm"
)

var info = types.Info{
Expand Down Expand Up @@ -133,6 +134,79 @@ var containerList = []types.Container{
},
}

var two = uint64(2)
var ServiceList = []swarm.Service{
swarm.Service{
ID: "qolkls9g5iasdiuihcyz9rnx2",
Spec: swarm.ServiceSpec{
Annotations: swarm.Annotations{
Name: "test1",
},
Mode: swarm.ServiceMode{
Replicated: &swarm.ReplicatedService{
Replicas: &two,
},
},
},
},
swarm.Service{
ID: "qolkls9g5iasdiuihcyz9rn3",
Spec: swarm.ServiceSpec{
Annotations: swarm.Annotations{
Name: "test2",
},
Mode: swarm.ServiceMode{
Global: &swarm.GlobalService{},
},
},
},
}

var TaskList = []swarm.Task{
swarm.Task{
ID: "kwh0lv7hwwbh",
ServiceID: "qolkls9g5iasdiuihcyz9rnx2",
NodeID: "0cl4jturcyd1ks3fwpd010kor",
Status: swarm.TaskStatus{
State: "running",
},
DesiredState: "running",
},
swarm.Task{
ID: "u78m5ojbivc3",
ServiceID: "qolkls9g5iasdiuihcyz9rnx2",
NodeID: "0cl4jturcyd1ks3fwpd010kor",
Status: swarm.TaskStatus{
State: "running",
},
DesiredState: "running",
},
swarm.Task{
ID: "1n1uilkhr98l",
ServiceID: "qolkls9g5iasdiuihcyz9rn3",
NodeID: "0cl4jturcyd1ks3fwpd010kor",
Status: swarm.TaskStatus{
State: "running",
},
DesiredState: "running",
},
}

var NodeList = []swarm.Node{
swarm.Node{
ID: "0cl4jturcyd1ks3fwpd010kor",
Status: swarm.NodeStatus{
State: "ready",
},
},
swarm.Node{
ID: "0cl4jturcyd1ks3fwpd010kor",
Status: swarm.NodeStatus{
State: "ready",
},
},
}

func containerStats() types.ContainerStats {
var stat types.ContainerStats
jsonStat := `
Expand Down