Skip to content

Commit

Permalink
Refactor out the docker process mapper for use in tracer.
Browse files Browse the repository at this point in the history
  • Loading branch information
Tom Wilkie committed Jun 3, 2015
1 parent bba5222 commit 7a5995e
Show file tree
Hide file tree
Showing 5 changed files with 62 additions and 37 deletions.
2 changes: 1 addition & 1 deletion probe/pidtree.go → probe/docker/pidtree.go
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
package main
package docker

import (
"fmt"
Expand Down
2 changes: 1 addition & 1 deletion probe/pidtree_test.go → probe/docker/pidtree_test.go
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
package main
package docker

import (
"fmt"
Expand Down
70 changes: 47 additions & 23 deletions probe/docker_process_mapper.go → probe/docker/watcher.go
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
package main
package docker

import (
"fmt"
Expand All @@ -15,7 +15,8 @@ const (
start = "start"
)

type dockerMapper struct {
// Watcher keep track of docker containers, images and the pidtree.
type Watcher struct {
sync.RWMutex
quit chan struct{}
interval time.Duration
Expand All @@ -28,13 +29,14 @@ type dockerMapper struct {
pidTree *pidTree
}

func newDockerMapper(procRoot string, interval time.Duration) (*dockerMapper, error) {
// NewWatcher creates a new Watcher.
func NewWatcher(procRoot string, interval time.Duration) (*Watcher, error) {
pidTree, err := newPIDTreeStub(procRoot)
if err != nil {
return nil, err
}

m := dockerMapper{
m := Watcher{
containers: map[string]*docker.Container{},
containersByPID: map[int]*docker.Container{},
images: map[string]*docker.APIImages{},
Expand All @@ -50,11 +52,12 @@ func newDockerMapper(procRoot string, interval time.Duration) (*dockerMapper, er
return &m, nil
}

func (m *dockerMapper) Stop() {
// Stop the Docker watcher.
func (m *Watcher) Stop() {
close(m.quit)
}

func (m *dockerMapper) loop() {
func (m *Watcher) loop() {
if !m.update() {
return
}
Expand All @@ -73,6 +76,19 @@ func (m *dockerMapper) loop() {
}
}

// Containers returns the list of Containers this watcher knows about.
func (m *Watcher) Containers() []*docker.Container {
containers := []*docker.Container{}

m.RLock()
for _, container := range m.containers {
containers = append(containers, container)
}
m.RUnlock()

return containers
}

// for mocking
type dockerClient interface {
ListContainers(docker.ListContainersOptions) ([]docker.APIContainers, error)
Expand All @@ -92,7 +108,7 @@ var (
)

// returns false when stopping.
func (m *dockerMapper) update() bool {
func (m *Watcher) update() bool {
endpoint := "unix:///var/run/docker.sock"
client, err := newDockerClient(endpoint)
if err != nil {
Expand Down Expand Up @@ -144,7 +160,7 @@ func (m *dockerMapper) update() bool {
}
}

func (m *dockerMapper) updateContainers(client dockerClient) error {
func (m *Watcher) updateContainers(client dockerClient) error {
apiContainers, err := client.ListContainers(docker.ListContainersOptions{All: true})
if err != nil {
return err
Expand Down Expand Up @@ -175,7 +191,7 @@ func (m *dockerMapper) updateContainers(client dockerClient) error {
return nil
}

func (m *dockerMapper) updateImages(client dockerClient) error {
func (m *Watcher) updateImages(client dockerClient) error {
images, err := client.ListImages(docker.ListImagesOptions{})
if err != nil {
return err
Expand All @@ -191,7 +207,7 @@ func (m *dockerMapper) updateImages(client dockerClient) error {
return nil
}

func (m *dockerMapper) handleEvent(event *docker.APIEvents, client dockerClient) {
func (m *Watcher) handleEvent(event *docker.APIEvents, client dockerClient) {
switch event.Status {
case stop:
containerID := event.ID
Expand Down Expand Up @@ -224,7 +240,7 @@ func (m *dockerMapper) handleEvent(event *docker.APIEvents, client dockerClient)
}
}

func (m *dockerMapper) updatePIDTree() error {
func (m *Watcher) updatePIDTree() error {
pidTree, err := newPIDTreeStub(m.procRoot)
if err != nil {
return err
Expand All @@ -236,14 +252,18 @@ func (m *dockerMapper) updatePIDTree() error {
return nil
}

type dockerProcessMapper struct {
*dockerMapper
// ProcessMapper can map pids to arbitrary strings.
type ProcessMapper struct {
*Watcher
key string
f func(*docker.Container) string
}

func (m *dockerProcessMapper) Key() string { return m.key }
func (m *dockerProcessMapper) Map(pid uint) (string, error) {
// Key returns the key name for this process mapper.
func (m *ProcessMapper) Key() string { return m.key }

// Map returns the mapper value for a given pid.
func (m *ProcessMapper) Map(pid uint) (string, error) {
var (
container *docker.Container
ok bool
Expand Down Expand Up @@ -271,26 +291,30 @@ func (m *dockerProcessMapper) Map(pid uint) (string, error) {
return m.f(container), nil
}

func (m *dockerMapper) idMapper() processMapper {
return &dockerProcessMapper{m, "docker_id", func(c *docker.Container) string {
// IDMapper returns a process mapper for docker container IDs
func (m *Watcher) IDMapper() *ProcessMapper {
return &ProcessMapper{m, "docker_id", func(c *docker.Container) string {
return c.ID
}}
}

func (m *dockerMapper) nameMapper() processMapper {
return &dockerProcessMapper{m, "docker_name", func(c *docker.Container) string {
// NameMapper returns a process mapper for docker container Names.
func (m *Watcher) NameMapper() *ProcessMapper {
return &ProcessMapper{m, "docker_name", func(c *docker.Container) string {
return strings.TrimPrefix(c.Name, "/")
}}
}

func (m *dockerMapper) imageIDMapper() processMapper {
return &dockerProcessMapper{m, "docker_image_id", func(c *docker.Container) string {
// ImageIDMapper returns a process mapper for docker container image ids.
func (m *Watcher) ImageIDMapper() *ProcessMapper {
return &ProcessMapper{m, "docker_image_id", func(c *docker.Container) string {
return c.Image
}}
}

func (m *dockerMapper) imageNameMapper() processMapper {
return &dockerProcessMapper{m, "docker_image_name", func(c *docker.Container) string {
// ImageNameMapper returns a process mapper for docker container image names.
func (m *Watcher) ImageNameMapper() *ProcessMapper {
return &ProcessMapper{m, "docker_image_name", func(c *docker.Container) string {
m.RLock()
image, ok := m.images[c.Image]
m.RUnlock()
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
package main
package docker

import (
"runtime"
Expand Down Expand Up @@ -68,11 +68,11 @@ func TestDockerProcessMapper(t *testing.T) {
}, nil
}

dockerMapper, _ := newDockerMapper("/proc", 10*time.Second)
dockerIDMapper := dockerMapper.idMapper()
dockerNameMapper := dockerMapper.nameMapper()
dockerImageIDMapper := dockerMapper.imageIDMapper()
dockerImageNameMapper := dockerMapper.imageNameMapper()
dockerMapper, _ := NewWatcher("/proc", 10*time.Second)
dockerIDMapper := dockerMapper.IDMapper()
dockerNameMapper := dockerMapper.NameMapper()
dockerImageIDMapper := dockerMapper.ImageIDMapper()
dockerImageNameMapper := dockerMapper.ImageNameMapper()

runtime.Gosched()

Expand Down
13 changes: 7 additions & 6 deletions probe/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ import (
"time"

"github.com/weaveworks/procspy"
"github.com/weaveworks/scope/probe/docker"
"github.com/weaveworks/scope/report"
"github.com/weaveworks/scope/xfer"
)
Expand Down Expand Up @@ -80,17 +81,17 @@ func main() {
}

if *dockerMapper {
docker, err := newDockerMapper(*procRoot, *dockerInterval)
dockerWatcher, err := docker.NewWatcher(*procRoot, *dockerInterval)
if err != nil {
log.Fatal(err)
}
defer docker.Stop()
defer dockerWatcher.Stop()

pms = append(pms,
docker.idMapper(),
docker.nameMapper(),
docker.imageIDMapper(),
docker.imageNameMapper(),
dockerWatcher.IDMapper(),
dockerWatcher.NameMapper(),
dockerWatcher.ImageIDMapper(),
dockerWatcher.ImageNameMapper(),
)
}

Expand Down

0 comments on commit 7a5995e

Please sign in to comment.