Skip to content

Commit

Permalink
Add container short ID matching to add_docker_metadata (#6172)
Browse files Browse the repository at this point in the history
container short ID are a common way to represent containers.
The short ID length is 12 characters while the long one is 64
which makes it more human readable.

Previous to this patch, if the folder name was set to
the containers' short IDs the match would have failed.
For example, putting the container logs in ./container/${HOSTNAME}
since ${HOSTNAME} inside the container is set to its short ID by
default then the docker metadata won't be added to the log lines.

The user has to explicetly specify short ID match by adding
setting "match_short_id" to true in the configuration file.

If "match_short_id" is not given in the configuration or is set
to false, this feature is disabled.

Signed-off-by: Boaz Shuster <ripcurld.github@gmail.com>
  • Loading branch information
boaz0 authored and exekias committed Feb 19, 2018
1 parent b98a987 commit ef25ecf
Show file tree
Hide file tree
Showing 15 changed files with 257 additions and 19 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -281,6 +281,7 @@ The list below covers the changes between 6.0.0-rc2 and 6.0.0 GA only.
*Filebeat*
- Add Kubernetes manifests to deploy Filebeat. {pull}5349[5349]
- Add container short ID matching to add_docker_metadata. {pull}6172[6172]
*Metricbeat*
Expand Down
1 change: 1 addition & 0 deletions auditbeat/auditbeat.reference.yml
Original file line number Diff line number Diff line change
Expand Up @@ -205,6 +205,7 @@ auditbeat.modules:
# match_pids: ["process.pid", "process.ppid"]
# match_source: true
# match_source_index: 4
# match_short_id: false
# cleanup_timeout: 60
# # To connect to Docker over TLS you must specify a client and CA certificate.
# #ssl:
Expand Down
1 change: 1 addition & 0 deletions filebeat/filebeat.reference.yml
Original file line number Diff line number Diff line change
Expand Up @@ -679,6 +679,7 @@ filebeat.inputs:
# match_pids: ["process.pid", "process.ppid"]
# match_source: true
# match_source_index: 4
# match_short_id: false
# cleanup_timeout: 60
# # To connect to Docker over TLS you must specify a client and CA certificate.
# #ssl:
Expand Down
1 change: 1 addition & 0 deletions heartbeat/heartbeat.reference.yml
Original file line number Diff line number Diff line change
Expand Up @@ -314,6 +314,7 @@ heartbeat.scheduler:
# match_pids: ["process.pid", "process.ppid"]
# match_source: true
# match_source_index: 4
# match_short_id: false
# cleanup_timeout: 60
# # To connect to Docker over TLS you must specify a client and CA certificate.
# #ssl:
Expand Down
1 change: 1 addition & 0 deletions libbeat/_meta/config.reference.yml
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,7 @@
# match_pids: ["process.pid", "process.ppid"]
# match_source: true
# match_source_index: 4
# match_short_id: false
# cleanup_timeout: 60
# # To connect to Docker over TLS you must specify a client and CA certificate.
# #ssl:
Expand Down
2 changes: 1 addition & 1 deletion libbeat/autodiscover/providers/docker/docker.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ func AutodiscoverBuilder(bus bus.Bus, c *common.Config) (autodiscover.Provider,
return nil, err
}

watcher, err := docker.NewWatcher(config.Host, config.TLS)
watcher, err := docker.NewWatcher(config.Host, config.TLS, false)
if err != nil {
return nil, err
}
Expand Down
38 changes: 30 additions & 8 deletions libbeat/common/docker/watcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,10 @@ import (
)

// Select Docker API version
const dockerAPIVersion = "1.22"
const (
dockerAPIVersion = "1.22"
shortIDLen = 12
)

// Watcher reads docker events and keeps a list of known containers
type Watcher interface {
Expand Down Expand Up @@ -59,6 +62,7 @@ type watcher struct {
lastValidTimestamp int64
stopped sync.WaitGroup
bus bus.Bus
shortID bool // whether to store short ID in "containers" too
}

// Container info retrieved by the watcher
Expand All @@ -77,10 +81,11 @@ type Client interface {
Events(ctx context.Context, options types.EventsOptions) (<-chan events.Message, <-chan error)
}

type WatcherConstructor func(host string, tls *TLSConfig) (Watcher, error)
// WatcherConstructor represent a function that creates a new Watcher from giving parameters
type WatcherConstructor func(host string, tls *TLSConfig, storeShortID bool) (Watcher, error)

// NewWatcher returns a watcher running for the given settings
func NewWatcher(host string, tls *TLSConfig) (Watcher, error) {
func NewWatcher(host string, tls *TLSConfig, storeShortID bool) (Watcher, error) {
var httpClient *http.Client
if tls != nil {
options := tlsconfig.Options{
Expand All @@ -106,10 +111,11 @@ func NewWatcher(host string, tls *TLSConfig) (Watcher, error) {
return nil, err
}

return NewWatcherWithClient(client, 60*time.Second)
return NewWatcherWithClient(client, 60*time.Second, storeShortID)
}

func NewWatcherWithClient(client Client, cleanupTimeout time.Duration) (*watcher, error) {
// NewWatcherWithClient creates a new Watcher from a given Docker client
func NewWatcherWithClient(client Client, cleanupTimeout time.Duration, storeShortID bool) (Watcher, error) {
ctx, cancel := context.WithCancel(context.Background())
return &watcher{
client: client,
Expand All @@ -119,20 +125,25 @@ func NewWatcherWithClient(client Client, cleanupTimeout time.Duration) (*watcher
deleted: make(map[string]time.Time),
cleanupTimeout: cleanupTimeout,
bus: bus.New("docker"),
shortID: storeShortID,
}, nil
}

// Container returns the running container with the given ID or nil if unknown
func (w *watcher) Container(ID string) *Container {
w.RLock()
container := w.containers[ID]
_, ok := w.deleted[ID]
if container == nil {
w.RUnlock()
return nil
}
_, ok := w.deleted[container.ID]
w.RUnlock()

// Update last access time if it's deleted
if ok {
w.Lock()
w.deleted[ID] = time.Now()
w.deleted[container.ID] = time.Now()
w.Unlock()
}

Expand All @@ -145,7 +156,9 @@ func (w *watcher) Containers() map[string]*Container {
defer w.RUnlock()
res := make(map[string]*Container)
for k, v := range w.containers {
res[k] = v
if !w.shortID || len(k) != shortIDLen {
res[k] = v
}
}
return res
}
Expand All @@ -165,6 +178,9 @@ func (w *watcher) Start() error {

for _, c := range containers {
w.containers[c.ID] = c
if w.shortID {
w.containers[c.ID[:shortIDLen]] = c
}
}

// Emit all start events (avoid blocking if the bus get's blocked)
Expand Down Expand Up @@ -223,6 +239,9 @@ func (w *watcher) watch() {

w.Lock()
w.containers[event.Actor.ID] = container
if w.shortID {
w.containers[event.Actor.ID[:shortIDLen]] = container
}
// un-delete if it's flagged (in case of update or recreation)
delete(w.deleted, event.Actor.ID)
w.Unlock()
Expand Down Expand Up @@ -326,6 +345,9 @@ func (w *watcher) cleanupWorker() {
for _, key := range toDelete {
delete(w.deleted, key)
delete(w.containers, key)
if w.shortID {
delete(w.containers, key[:shortIDLen])
}
}
w.Unlock()
}
Expand Down
Loading

0 comments on commit ef25ecf

Please sign in to comment.