From ef25ecf72616b041c4c459ad12f51ea9564c4f78 Mon Sep 17 00:00:00 2001 From: Boaz Shuster <2453279+ripcurld0@users.noreply.github.com> Date: Mon, 19 Feb 2018 17:27:24 +0200 Subject: [PATCH] Add container short ID matching to add_docker_metadata (#6172) container short ID are a common way to represent containers. The short ID length is 12 characters while the long one is 64 which makes it more human readable. Previous to this patch, if the folder name was set to the containers' short IDs the match would have failed. For example, putting the container logs in ./container/${HOSTNAME} since ${HOSTNAME} inside the container is set to its short ID by default then the docker metadata won't be added to the log lines. The user has to explicetly specify short ID match by adding setting "match_short_id" to true in the configuration file. If "match_short_id" is not given in the configuration or is set to false, this feature is disabled. Signed-off-by: Boaz Shuster --- CHANGELOG.asciidoc | 1 + auditbeat/auditbeat.reference.yml | 1 + filebeat/filebeat.reference.yml | 1 + heartbeat/heartbeat.reference.yml | 1 + libbeat/_meta/config.reference.yml | 1 + .../autodiscover/providers/docker/docker.go | 2 +- libbeat/common/docker/watcher.go | 38 +++- libbeat/common/docker/watcher_test.go | 203 +++++++++++++++++- libbeat/docs/processors-using.asciidoc | 6 + .../add_docker_metadata.go | 2 +- .../add_docker_metadata_test.go | 2 +- .../processors/add_docker_metadata/config.go | 15 +- metricbeat/metricbeat.reference.yml | 1 + packetbeat/packetbeat.reference.yml | 1 + winlogbeat/winlogbeat.reference.yml | 1 + 15 files changed, 257 insertions(+), 19 deletions(-) diff --git a/CHANGELOG.asciidoc b/CHANGELOG.asciidoc index b717f148d6d..feacdea4cdc 100644 --- a/CHANGELOG.asciidoc +++ b/CHANGELOG.asciidoc @@ -281,6 +281,7 @@ The list below covers the changes between 6.0.0-rc2 and 6.0.0 GA only. *Filebeat* - Add Kubernetes manifests to deploy Filebeat. {pull}5349[5349] +- Add container short ID matching to add_docker_metadata. {pull}6172[6172] *Metricbeat* diff --git a/auditbeat/auditbeat.reference.yml b/auditbeat/auditbeat.reference.yml index 002e40686eb..b396ceb3e08 100644 --- a/auditbeat/auditbeat.reference.yml +++ b/auditbeat/auditbeat.reference.yml @@ -205,6 +205,7 @@ auditbeat.modules: # match_pids: ["process.pid", "process.ppid"] # match_source: true # match_source_index: 4 +# match_short_id: false # cleanup_timeout: 60 # # To connect to Docker over TLS you must specify a client and CA certificate. # #ssl: diff --git a/filebeat/filebeat.reference.yml b/filebeat/filebeat.reference.yml index 9f326855ee4..40fa723bf3d 100644 --- a/filebeat/filebeat.reference.yml +++ b/filebeat/filebeat.reference.yml @@ -679,6 +679,7 @@ filebeat.inputs: # match_pids: ["process.pid", "process.ppid"] # match_source: true # match_source_index: 4 +# match_short_id: false # cleanup_timeout: 60 # # To connect to Docker over TLS you must specify a client and CA certificate. # #ssl: diff --git a/heartbeat/heartbeat.reference.yml b/heartbeat/heartbeat.reference.yml index 20660d143aa..6480b240b82 100644 --- a/heartbeat/heartbeat.reference.yml +++ b/heartbeat/heartbeat.reference.yml @@ -314,6 +314,7 @@ heartbeat.scheduler: # match_pids: ["process.pid", "process.ppid"] # match_source: true # match_source_index: 4 +# match_short_id: false # cleanup_timeout: 60 # # To connect to Docker over TLS you must specify a client and CA certificate. # #ssl: diff --git a/libbeat/_meta/config.reference.yml b/libbeat/_meta/config.reference.yml index 24314cbc379..1d70f8f6413 100644 --- a/libbeat/_meta/config.reference.yml +++ b/libbeat/_meta/config.reference.yml @@ -100,6 +100,7 @@ # match_pids: ["process.pid", "process.ppid"] # match_source: true # match_source_index: 4 +# match_short_id: false # cleanup_timeout: 60 # # To connect to Docker over TLS you must specify a client and CA certificate. # #ssl: diff --git a/libbeat/autodiscover/providers/docker/docker.go b/libbeat/autodiscover/providers/docker/docker.go index e06718f7c47..3d90a69ca99 100644 --- a/libbeat/autodiscover/providers/docker/docker.go +++ b/libbeat/autodiscover/providers/docker/docker.go @@ -37,7 +37,7 @@ func AutodiscoverBuilder(bus bus.Bus, c *common.Config) (autodiscover.Provider, return nil, err } - watcher, err := docker.NewWatcher(config.Host, config.TLS) + watcher, err := docker.NewWatcher(config.Host, config.TLS, false) if err != nil { return nil, err } diff --git a/libbeat/common/docker/watcher.go b/libbeat/common/docker/watcher.go index 799632731d6..02c4a0cb11c 100644 --- a/libbeat/common/docker/watcher.go +++ b/libbeat/common/docker/watcher.go @@ -18,7 +18,10 @@ import ( ) // Select Docker API version -const dockerAPIVersion = "1.22" +const ( + dockerAPIVersion = "1.22" + shortIDLen = 12 +) // Watcher reads docker events and keeps a list of known containers type Watcher interface { @@ -59,6 +62,7 @@ type watcher struct { lastValidTimestamp int64 stopped sync.WaitGroup bus bus.Bus + shortID bool // whether to store short ID in "containers" too } // Container info retrieved by the watcher @@ -77,10 +81,11 @@ type Client interface { Events(ctx context.Context, options types.EventsOptions) (<-chan events.Message, <-chan error) } -type WatcherConstructor func(host string, tls *TLSConfig) (Watcher, error) +// WatcherConstructor represent a function that creates a new Watcher from giving parameters +type WatcherConstructor func(host string, tls *TLSConfig, storeShortID bool) (Watcher, error) // NewWatcher returns a watcher running for the given settings -func NewWatcher(host string, tls *TLSConfig) (Watcher, error) { +func NewWatcher(host string, tls *TLSConfig, storeShortID bool) (Watcher, error) { var httpClient *http.Client if tls != nil { options := tlsconfig.Options{ @@ -106,10 +111,11 @@ func NewWatcher(host string, tls *TLSConfig) (Watcher, error) { return nil, err } - return NewWatcherWithClient(client, 60*time.Second) + return NewWatcherWithClient(client, 60*time.Second, storeShortID) } -func NewWatcherWithClient(client Client, cleanupTimeout time.Duration) (*watcher, error) { +// NewWatcherWithClient creates a new Watcher from a given Docker client +func NewWatcherWithClient(client Client, cleanupTimeout time.Duration, storeShortID bool) (Watcher, error) { ctx, cancel := context.WithCancel(context.Background()) return &watcher{ client: client, @@ -119,6 +125,7 @@ func NewWatcherWithClient(client Client, cleanupTimeout time.Duration) (*watcher deleted: make(map[string]time.Time), cleanupTimeout: cleanupTimeout, bus: bus.New("docker"), + shortID: storeShortID, }, nil } @@ -126,13 +133,17 @@ func NewWatcherWithClient(client Client, cleanupTimeout time.Duration) (*watcher func (w *watcher) Container(ID string) *Container { w.RLock() container := w.containers[ID] - _, ok := w.deleted[ID] + if container == nil { + w.RUnlock() + return nil + } + _, ok := w.deleted[container.ID] w.RUnlock() // Update last access time if it's deleted if ok { w.Lock() - w.deleted[ID] = time.Now() + w.deleted[container.ID] = time.Now() w.Unlock() } @@ -145,7 +156,9 @@ func (w *watcher) Containers() map[string]*Container { defer w.RUnlock() res := make(map[string]*Container) for k, v := range w.containers { - res[k] = v + if !w.shortID || len(k) != shortIDLen { + res[k] = v + } } return res } @@ -165,6 +178,9 @@ func (w *watcher) Start() error { for _, c := range containers { w.containers[c.ID] = c + if w.shortID { + w.containers[c.ID[:shortIDLen]] = c + } } // Emit all start events (avoid blocking if the bus get's blocked) @@ -223,6 +239,9 @@ func (w *watcher) watch() { w.Lock() w.containers[event.Actor.ID] = container + if w.shortID { + w.containers[event.Actor.ID[:shortIDLen]] = container + } // un-delete if it's flagged (in case of update or recreation) delete(w.deleted, event.Actor.ID) w.Unlock() @@ -326,6 +345,9 @@ func (w *watcher) cleanupWorker() { for _, key := range toDelete { delete(w.deleted, key) delete(w.containers, key) + if w.shortID { + delete(w.containers, key[:shortIDLen]) + } } w.Unlock() } diff --git a/libbeat/common/docker/watcher_test.go b/libbeat/common/docker/watcher_test.go index 3ab16ffdf6e..a0a541ab36c 100644 --- a/libbeat/common/docker/watcher_test.go +++ b/libbeat/common/docker/watcher_test.go @@ -84,6 +84,51 @@ func TestWatcherInitialization(t *testing.T) { }, watcher.Containers()) } +func TestWatcherInitializationShortID(t *testing.T) { + watcher := runWatcherShortID(t, true, + [][]types.Container{ + []types.Container{ + types.Container{ + ID: "1234567890123", + Names: []string{"/containername", "othername"}, + Image: "busybox", + Labels: map[string]string{"foo": "bar"}, + NetworkSettings: &types.SummaryNetworkSettings{}, + }, + types.Container{ + ID: "2345678901234", + Names: []string{"/other"}, + Image: "nginx", + Labels: map[string]string{}, + NetworkSettings: &types.SummaryNetworkSettings{}, + }, + }, + }, + nil, true) + + assert.Equal(t, map[string]*Container{ + "1234567890123": &Container{ + ID: "1234567890123", + Name: "containername", + Image: "busybox", + Labels: map[string]string{"foo": "bar"}, + }, + "2345678901234": &Container{ + ID: "2345678901234", + Name: "other", + Image: "nginx", + Labels: map[string]string{}, + }, + }, watcher.Containers()) + + assert.Equal(t, &Container{ + ID: "1234567890123", + Name: "containername", + Image: "busybox", + Labels: map[string]string{"foo": "bar"}, + }, watcher.Container("123456789012")) +} + func TestWatcherAddEvents(t *testing.T) { watcher := runWatcher(t, true, [][]types.Container{ @@ -137,6 +182,60 @@ func TestWatcherAddEvents(t *testing.T) { }, watcher.Containers()) } +func TestWatcherAddEventsShortID(t *testing.T) { + watcher := runWatcherShortID(t, true, + [][]types.Container{ + []types.Container{ + types.Container{ + ID: "1234567890123", + Names: []string{"/containername", "othername"}, + Image: "busybox", + Labels: map[string]string{"foo": "bar"}, + NetworkSettings: &types.SummaryNetworkSettings{}, + }, + }, + []types.Container{ + types.Container{ + ID: "2345678901234", + Names: []string{"/other"}, + Image: "nginx", + Labels: map[string]string{"label": "value"}, + NetworkSettings: &types.SummaryNetworkSettings{}, + }, + }, + }, + []interface{}{ + events.Message{ + Action: "start", + Actor: events.Actor{ + ID: "2345678901234", + Attributes: map[string]string{ + "name": "other", + "image": "nginx", + "label": "value", + }, + }, + }, + }, + true, + ) + + assert.Equal(t, map[string]*Container{ + "1234567890123": &Container{ + ID: "1234567890123", + Name: "containername", + Image: "busybox", + Labels: map[string]string{"foo": "bar"}, + }, + "2345678901234": &Container{ + ID: "2345678901234", + Name: "other", + Image: "nginx", + Labels: map[string]string{"label": "value"}, + }, + }, watcher.Containers()) +} + func TestWatcherUpdateEvent(t *testing.T) { watcher := runWatcher(t, true, [][]types.Container{ @@ -185,6 +284,55 @@ func TestWatcherUpdateEvent(t *testing.T) { assert.Equal(t, 0, len(watcher.deleted)) } +func TestWatcherUpdateEventShortID(t *testing.T) { + watcher := runWatcherShortID(t, true, + [][]types.Container{ + []types.Container{ + types.Container{ + ID: "1234567890123", + Names: []string{"/containername", "othername"}, + Image: "busybox", + Labels: map[string]string{"label": "foo"}, + NetworkSettings: &types.SummaryNetworkSettings{}, + }, + }, + []types.Container{ + types.Container{ + ID: "1234567890123", + Names: []string{"/containername", "othername"}, + Image: "busybox", + Labels: map[string]string{"label": "bar"}, + NetworkSettings: &types.SummaryNetworkSettings{}, + }, + }, + }, + []interface{}{ + events.Message{ + Action: "update", + Actor: events.Actor{ + ID: "1234567890123", + Attributes: map[string]string{ + "name": "containername", + "image": "busybox", + "label": "bar", + }, + }, + }, + }, + true, + ) + + assert.Equal(t, map[string]*Container{ + "1234567890123": &Container{ + ID: "1234567890123", + Name: "containername", + Image: "busybox", + Labels: map[string]string{"label": "bar"}, + }, + }, watcher.Containers()) + assert.Equal(t, 0, len(watcher.deleted)) +} + func TestWatcherDie(t *testing.T) { watcher := runWatcher(t, false, [][]types.Container{ @@ -229,7 +377,56 @@ func TestWatcherDie(t *testing.T) { assert.Equal(t, 0, len(watcher.Containers())) } +func TestWatcherDieShortID(t *testing.T) { + watcher := runWatcherShortID(t, false, + [][]types.Container{ + []types.Container{ + types.Container{ + ID: "0332dbd79e20aaa", + Names: []string{"/containername", "othername"}, + Image: "busybox", + Labels: map[string]string{"label": "foo"}, + NetworkSettings: &types.SummaryNetworkSettings{}, + }, + }, + }, + []interface{}{ + events.Message{ + Action: "die", + Actor: events.Actor{ + ID: "0332dbd79e20aaa", + }, + }, + }, + true, + ) + defer watcher.Stop() + + // Check it doesn't get removed while we request meta for the container + for i := 0; i < 18; i++ { + watcher.Container("0332dbd79e20") + assert.Equal(t, 1, len(watcher.Containers())) + time.Sleep(50 * time.Millisecond) + } + + // Checks a max of 10s for the watcher containers to be updated + for i := 0; i < 100; i++ { + // Now it should get removed + time.Sleep(100 * time.Millisecond) + + if len(watcher.Containers()) == 0 { + break + } + } + + assert.Equal(t, 0, len(watcher.Containers())) +} + func runWatcher(t *testing.T, kill bool, containers [][]types.Container, events []interface{}) *watcher { + return runWatcherShortID(t, kill, containers, events, false) +} + +func runWatcherShortID(t *testing.T, kill bool, containers [][]types.Container, events []interface{}, enable bool) *watcher { logp.TestingSetup() client := &MockClient{ @@ -238,10 +435,14 @@ func runWatcher(t *testing.T, kill bool, containers [][]types.Container, events done: make(chan interface{}), } - watcher, err := NewWatcherWithClient(client, 200*time.Millisecond) + w, err := NewWatcherWithClient(client, 200*time.Millisecond, enable) if err != nil { t.Fatal(err) } + watcher, ok := w.(*watcher) + if !ok { + t.Fatal("'watcher' was supposed to be pointer to the watcher structure") + } err = watcher.Start() if err != nil { diff --git a/libbeat/docs/processors-using.asciidoc b/libbeat/docs/processors-using.asciidoc index c8b6405905e..3afc7501ca7 100644 --- a/libbeat/docs/processors-using.asciidoc +++ b/libbeat/docs/processors-using.asciidoc @@ -619,6 +619,7 @@ processors: #match_pids: ["process.pid", "process.ppid"] #match_source: true #match_source_index: 4 + #match_short_id: true #cleanup_timeout: 60 # To connect to Docker over TLS you must specify a client and CA certificate. #ssl: @@ -645,6 +646,11 @@ is `["process.pid", "process.ppid"]`. `match_source`:: (Optional) Match container ID from a log path present in the `source` field. Enabled by default. +`match_short_id`:: (Optional) Match container short ID from a log path present +in the `source` field. Disabled by default. +This allows to match directories names that have the first 12 characters +of the container ID. For example, `/var/log/containers/b7e3460e2b21/*.log`. + `match_source_index`:: (Optional) Index in the source path split by `/` to look for container ID. It defaults to 4 to match `/var/lib/docker/containers//*.log` diff --git a/libbeat/processors/add_docker_metadata/add_docker_metadata.go b/libbeat/processors/add_docker_metadata/add_docker_metadata.go index 8ae941ce03a..d240d52887e 100644 --- a/libbeat/processors/add_docker_metadata/add_docker_metadata.go +++ b/libbeat/processors/add_docker_metadata/add_docker_metadata.go @@ -54,7 +54,7 @@ func buildDockerMetadataProcessor(cfg *common.Config, watcherConstructor docker. return nil, errors.Wrapf(err, "fail to unpack the %v configuration", processorName) } - watcher, err := watcherConstructor(config.Host, config.TLS) + watcher, err := watcherConstructor(config.Host, config.TLS, config.MatchShortID) if err != nil { return nil, err } diff --git a/libbeat/processors/add_docker_metadata/add_docker_metadata_test.go b/libbeat/processors/add_docker_metadata/add_docker_metadata_test.go index 2769fdb476a..3e0e95a9fca 100644 --- a/libbeat/processors/add_docker_metadata/add_docker_metadata_test.go +++ b/libbeat/processors/add_docker_metadata/add_docker_metadata_test.go @@ -298,7 +298,7 @@ func MockWatcherFactory(containers map[string]*docker.Container) docker.WatcherC if containers == nil { containers = make(map[string]*docker.Container) } - return func(host string, tls *docker.TLSConfig) (docker.Watcher, error) { + return func(host string, tls *docker.TLSConfig, shortID bool) (docker.Watcher, error) { return &mockWatcher{containers: containers}, nil } } diff --git a/libbeat/processors/add_docker_metadata/config.go b/libbeat/processors/add_docker_metadata/config.go index 529175cace7..5755bc07e2b 100644 --- a/libbeat/processors/add_docker_metadata/config.go +++ b/libbeat/processors/add_docker_metadata/config.go @@ -8,13 +8,14 @@ import ( // Config for docker processor. type Config struct { - Host string `config:"host"` // Docker socket (UNIX or TCP socket). - TLS *docker.TLSConfig `config:"ssl"` // TLS settings for connecting to Docker. - Fields []string `config:"match_fields"` // A list of fields to match a container ID. - MatchSource bool `config:"match_source"` // Match container ID from a log path present in source field. - SourceIndex int `config:"match_source_index"` // Index in the source path split by / to look for container ID. - MatchPIDs []string `config:"match_pids"` // A list of fields containing process IDs (PIDs). - HostFS string `config:"system.hostfs"` // Specifies the mount point of the host’s filesystem for use in monitoring a host from within a container. + Host string `config:"host"` // Docker socket (UNIX or TCP socket). + TLS *docker.TLSConfig `config:"ssl"` // TLS settings for connecting to Docker. + Fields []string `config:"match_fields"` // A list of fields to match a container ID. + MatchSource bool `config:"match_source"` // Match container ID from a log path present in source field. + MatchShortID bool `config:"match_short_id"` // Match to container short ID from a log path present in source field. + SourceIndex int `config:"match_source_index"` // Index in the source path split by / to look for container ID. + MatchPIDs []string `config:"match_pids"` // A list of fields containing process IDs (PIDs). + HostFS string `config:"system.hostfs"` // Specifies the mount point of the host’s filesystem for use in monitoring a host from within a container. // Annotations are kept after container is killed, until they haven't been // accessed for a full `cleanup_timeout`: diff --git a/metricbeat/metricbeat.reference.yml b/metricbeat/metricbeat.reference.yml index 3f7811d74ad..356b855edf0 100644 --- a/metricbeat/metricbeat.reference.yml +++ b/metricbeat/metricbeat.reference.yml @@ -593,6 +593,7 @@ metricbeat.modules: # match_pids: ["process.pid", "process.ppid"] # match_source: true # match_source_index: 4 +# match_short_id: false # cleanup_timeout: 60 # # To connect to Docker over TLS you must specify a client and CA certificate. # #ssl: diff --git a/packetbeat/packetbeat.reference.yml b/packetbeat/packetbeat.reference.yml index a2078c05b99..c0c2a958a39 100644 --- a/packetbeat/packetbeat.reference.yml +++ b/packetbeat/packetbeat.reference.yml @@ -568,6 +568,7 @@ packetbeat.protocols: # match_pids: ["process.pid", "process.ppid"] # match_source: true # match_source_index: 4 +# match_short_id: false # cleanup_timeout: 60 # # To connect to Docker over TLS you must specify a client and CA certificate. # #ssl: diff --git a/winlogbeat/winlogbeat.reference.yml b/winlogbeat/winlogbeat.reference.yml index 866d9bf391f..11bd380759f 100644 --- a/winlogbeat/winlogbeat.reference.yml +++ b/winlogbeat/winlogbeat.reference.yml @@ -129,6 +129,7 @@ winlogbeat.event_logs: # match_pids: ["process.pid", "process.ppid"] # match_source: true # match_source_index: 4 +# match_short_id: false # cleanup_timeout: 60 # # To connect to Docker over TLS you must specify a client and CA certificate. # #ssl: