Skip to content

Commit

Permalink
add_kubernetes_metadata processor: prevent runtime error; add tests.
Browse files Browse the repository at this point in the history
  • Loading branch information
SvenWoltmann committed Aug 25, 2017
1 parent c811edd commit 3ddf3f9
Show file tree
Hide file tree
Showing 2 changed files with 55 additions and 30 deletions.
17 changes: 11 additions & 6 deletions filebeat/processor/add_kubernetes_metadata/indexing.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,21 +48,26 @@ func newLogsPathMatcher(cfg common.Config) (add_kubernetes_metadata.Matcher, err
return &LogPathMatcher{LogsPath: logPath}, nil
}

// Docker container ID is a 64-character-long hexadecimal string
const containerIdLen = 64

func (f *LogPathMatcher) MetadataIndex(event common.MapStr) string {
if value, ok := event["source"]; ok {
source := value.(string)
logp.Debug("kubernetes", "Incoming source value: %s", source)
cid := ""
if strings.Contains(source, f.LogsPath) {
if f.LogsPath == "/var/log/containers/" && strings.HasSuffix(source, ".log") {
sourceLen := len(source)
logsPathLen := len(f.LogsPath)

if f.LogsPath == "/var/log/containers/" && strings.HasSuffix(source, ".log") && sourceLen >= containerIdLen + 4 {
// In case of the Kubernetes log path "/var/log/containers/",
// the container ID will be located right before the ".log" ending.
sourceLen := len(source)
cid = source[sourceLen-68 : sourceLen-4]
} else {
containerIdEnd := sourceLen - 4
cid = source[containerIdEnd - containerIdLen : containerIdEnd]
} else if sourceLen >= logsPathLen + containerIdLen {
// In any other case, we assume the container ID will follow right after the log path.
//Docker container is 64 chars in length
cid = source[len(f.LogsPath) : len(f.LogsPath)+64]
cid = source[logsPathLen : logsPathLen + containerIdLen]
}
logp.Debug("kubernetes", "Using container id: %s", cid)
} else {
Expand Down
68 changes: 44 additions & 24 deletions filebeat/processor/add_kubernetes_metadata/indexing_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,43 +9,63 @@ import (
"github.com/elastic/beats/libbeat/common"
)

func TestLogsPathMatcher(t *testing.T) {
var testConfig = common.NewConfig()

logMatcher, err := newLogsPathMatcher(*testConfig)
assert.Nil(t, err)
// A random container ID that we use for our tests
const cid = "0069869de9adf97f574c62029aeba65d1ecd85a2a112e87fbc28afe4dec2b843"

func TestLogsPathMatcher_InvalidSource1(t *testing.T) {
cfgLogsPath := "" // use the default matcher configuration
source := "/var/log/messages"
expectedResult := ""
executeTest(t, cfgLogsPath, source, expectedResult);
}

cid := "0069869de9adf97f574c62029aeba65d1ecd85a2a112e87fbc28afe4dec2b843"
logPath := fmt.Sprintf("/var/lib/docker/containers/%s/%s-json.log", cid, cid)
func TestLogsPathMatcher_InvalidSource2(t *testing.T) {
cfgLogsPath := "" // use the default matcher configuration
source := "/var/lib/docker/containers/01234567/89abcdef-json.log"
expectedResult := ""
executeTest(t, cfgLogsPath, source, expectedResult);
}

input := common.MapStr{
"source": "/var/log/messages",
}
func TestLogsPathMatcher_InvalidSource3(t *testing.T) {
cfgLogsPath := "/var/log/containers/"
source := "/var/log/containers/pod_ns_container_01234567.log"
expectedResult := ""
executeTest(t, cfgLogsPath, source, expectedResult);
}

output := logMatcher.MetadataIndex(input)
assert.Equal(t, output, "")
func TestLogsPathMatcher_VarLibDockerContainers(t *testing.T) {
cfgLogsPath := "" // use the default matcher configuration
source := fmt.Sprintf("/var/lib/docker/containers/%s/%s-json.log", cid, cid)
expectedResult := cid;
executeTest(t, cfgLogsPath, source, expectedResult);
}

input["source"] = logPath
output = logMatcher.MetadataIndex(input)
func TestLogsPathMatcher_VarLogContainers(t *testing.T) {
cfgLogsPath := "/var/log/containers/"
source := fmt.Sprintf("/var/log/containers/kube-proxy-4d7nt_kube-system_kube-proxy-%s.log", cid)
expectedResult := cid;
executeTest(t, cfgLogsPath, source, expectedResult);
}

assert.Equal(t, output, cid)
func TestLogsPathMatcher_AnotherLogDir(t *testing.T) {
cfgLogsPath := "/var/log/other/"
source := fmt.Sprintf("/var/log/other/%s.log", cid)
expectedResult := cid;
executeTest(t, cfgLogsPath, source, expectedResult);
}

func TestLogsPathMatcherVarLogContainers(t *testing.T) {
func executeTest(t *testing.T, cfgLogsPath string, source string, expectedResult string) {
var testConfig = common.NewConfig()
testConfig.SetString("logs_path", -1, "/var/log/containers/")
if cfgLogsPath != "" {
testConfig.SetString("logs_path", -1, cfgLogsPath)
}

logMatcher, err := newLogsPathMatcher(*testConfig)
assert.Nil(t, err)

cid := "0069869de9adf97f574c62029aeba65d1ecd85a2a112e87fbc28afe4dec2b843"
logPath := fmt.Sprintf("/var/log/containers/kube-proxy-4d7nt_kube-system_kube-proxy-%s.log", cid)

input := common.MapStr{
"source": logPath,
"source": source,
}

output := logMatcher.MetadataIndex(input)

assert.Equal(t, output, cid)
assert.Equal(t, output, expectedResult)
}

0 comments on commit 3ddf3f9

Please sign in to comment.