Skip to content

Commit

Permalink
[chore][pkg/stanza] Localize reader attribute tests (open-telemetry#2…
Browse files Browse the repository at this point in the history
  • Loading branch information
djaglowski authored and jayasai470 committed Dec 8, 2023
1 parent 962e116 commit 2521a10
Show file tree
Hide file tree
Showing 3 changed files with 182 additions and 232 deletions.
215 changes: 0 additions & 215 deletions pkg/stanza/fileconsumer/file_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -86,221 +86,6 @@ See this issue for details: https://github.com/census-instrumentation/opencensus
require.NoError(t, operator.Stop())
}

// AddFields tests that the `log.file.name` and `log.file.path` fields are included
// when IncludeFileName and IncludeFilePath are set to true
func TestAddFileFields(t *testing.T) {
t.Parallel()

tempDir := t.TempDir()
cfg := NewConfig().includeDir(tempDir)
cfg.StartAt = "beginning"
cfg.IncludeFileName = true
cfg.IncludeFilePath = true
cfg.IncludeFileNameResolved = false
cfg.IncludeFilePathResolved = false
operator, sink := testManager(t, cfg)

// Create a file, then start
temp := filetest.OpenTemp(t, tempDir)
filetest.WriteString(t, temp, "testlog\n")

require.NoError(t, operator.Start(testutil.NewUnscopedMockPersister()))
defer func() {
require.NoError(t, operator.Stop())
}()

_, attributes := sink.NextCall(t)
require.Equal(t, filepath.Base(temp.Name()), attributes[attrs.LogFileName])
require.Equal(t, temp.Name(), attributes[attrs.LogFilePath])
require.Nil(t, attributes[attrs.LogFileNameResolved])
require.Nil(t, attributes[attrs.LogFilePathResolved])
}

// AddFileResolvedFields tests that the `log.file.name_resolved` and `log.file.path_resolved` fields are included
// when IncludeFileNameResolved and IncludeFilePathResolved are set to true
func TestAddFileResolvedFields(t *testing.T) {
if runtime.GOOS == windowsOS {
t.Skip("Windows symlinks usage disabled for now. See https://github.com/open-telemetry/opentelemetry-collector-contrib/issues/21088")
}
t.Parallel()

tempDir := t.TempDir()
cfg := NewConfig().includeDir(tempDir)
cfg.StartAt = "beginning"
cfg.IncludeFileName = true
cfg.IncludeFilePath = true
cfg.IncludeFileNameResolved = true
cfg.IncludeFilePathResolved = true
operator, sink := testManager(t, cfg)

// Create temp dir with log file
dir := t.TempDir()

file, err := os.CreateTemp(dir, "")
require.NoError(t, err)
t.Cleanup(func() {
require.NoError(t, file.Close())
})

// Create symbolic link in monitored directory
symLinkPath := filepath.Join(tempDir, "symlink")
err = os.Symlink(file.Name(), symLinkPath)
require.NoError(t, err)

// Populate data
filetest.WriteString(t, file, "testlog\n")

// Resolve path
realPath, err := filepath.EvalSymlinks(file.Name())
require.NoError(t, err)
resolved, err := filepath.Abs(realPath)
require.NoError(t, err)

require.NoError(t, operator.Start(testutil.NewUnscopedMockPersister()))
defer func() {
require.NoError(t, operator.Stop())
}()

_, attributes := sink.NextCall(t)
require.Equal(t, filepath.Base(symLinkPath), attributes[attrs.LogFileName])
require.Equal(t, symLinkPath, attributes[attrs.LogFilePath])
require.Equal(t, filepath.Base(resolved), attributes[attrs.LogFileNameResolved])
require.Equal(t, resolved, attributes[attrs.LogFilePathResolved])
}

// AddFileResolvedFields tests that the `log.file.name_resolved` and `log.file.path_resolved` fields are included
// when IncludeFileNameResolved and IncludeFilePathResolved are set to true and underlaying symlink change
// Scenario:
// monitored file (symlink) -> middleSymlink -> file_1
// monitored file (symlink) -> middleSymlink -> file_2
func TestAddFileResolvedFieldsWithChangeOfSymlinkTarget(t *testing.T) {
if runtime.GOOS == windowsOS {
t.Skip("Windows symlinks usage disabled for now. See https://github.com/open-telemetry/opentelemetry-collector-contrib/issues/21088")
}
t.Parallel()

tempDir := t.TempDir()
cfg := NewConfig().includeDir(tempDir)
cfg.StartAt = "beginning"
cfg.IncludeFileName = true
cfg.IncludeFilePath = true
cfg.IncludeFileNameResolved = true
cfg.IncludeFilePathResolved = true
operator, sink := testManager(t, cfg)

// Create temp dir with log file
dir := t.TempDir()

file1, err := os.CreateTemp(dir, "")
require.NoError(t, err)
t.Cleanup(func() {
require.NoError(t, file1.Close())
})

file2, err := os.CreateTemp(dir, "")
require.NoError(t, err)
t.Cleanup(func() {
require.NoError(t, file2.Close())
})

// Resolve paths
real1, err := filepath.EvalSymlinks(file1.Name())
require.NoError(t, err)
resolved1, err := filepath.Abs(real1)
require.NoError(t, err)

real2, err := filepath.EvalSymlinks(file2.Name())
require.NoError(t, err)
resolved2, err := filepath.Abs(real2)
require.NoError(t, err)

// Create symbolic link in monitored directory
// symLinkPath(target of file input) -> middleSymLinkPath -> file1
middleSymLinkPath := filepath.Join(dir, "symlink")
symLinkPath := filepath.Join(tempDir, "symlink")
err = os.Symlink(file1.Name(), middleSymLinkPath)
require.NoError(t, err)
err = os.Symlink(middleSymLinkPath, symLinkPath)
require.NoError(t, err)

// Populate data
filetest.WriteString(t, file1, "testlog\n")

require.NoError(t, operator.Start(testutil.NewUnscopedMockPersister()))
defer func() {
require.NoError(t, operator.Stop())
}()

_, attributes := sink.NextCall(t)
require.Equal(t, filepath.Base(symLinkPath), attributes[attrs.LogFileName])
require.Equal(t, symLinkPath, attributes[attrs.LogFilePath])
require.Equal(t, filepath.Base(resolved1), attributes[attrs.LogFileNameResolved])
require.Equal(t, resolved1, attributes[attrs.LogFilePathResolved])

// Change middleSymLink to point to file2
err = os.Remove(middleSymLinkPath)
require.NoError(t, err)
err = os.Symlink(file2.Name(), middleSymLinkPath)
require.NoError(t, err)

// Populate data (different content due to fingerprint)
filetest.WriteString(t, file2, "testlog2\n")

_, attributes = sink.NextCall(t)
require.Equal(t, filepath.Base(symLinkPath), attributes[attrs.LogFileName])
require.Equal(t, symLinkPath, attributes[attrs.LogFilePath])
require.Equal(t, filepath.Base(resolved2), attributes[attrs.LogFileNameResolved])
require.Equal(t, resolved2, attributes[attrs.LogFilePathResolved])
}

func TestFileFieldsUpdatedAfterRestart(t *testing.T) {
t.Parallel()

tempDir := t.TempDir()
cfg := NewConfig().includeDir(tempDir)
cfg.StartAt = "beginning"
cfg.IncludeFileName = true
cfg.IncludeFilePath = true
op1, sink1 := testManager(t, cfg)

// Create a file, then start
temp, err := os.CreateTemp(tempDir, "")
require.NoError(t, err)
filetest.WriteString(t, temp, "testlog1\n")

persister := testutil.NewUnscopedMockPersister()
require.NoError(t, op1.Start(persister))

token, attributes := sink1.NextCall(t)
assert.Equal(t, []byte("testlog1"), token)
assert.Equal(t, filepath.Base(temp.Name()), attributes[attrs.LogFileName])
assert.Equal(t, temp.Name(), attributes[attrs.LogFilePath])
assert.Nil(t, attributes[attrs.LogFileNameResolved])
assert.Nil(t, attributes[attrs.LogFilePathResolved])

require.NoError(t, op1.Stop())
temp.Close() // On windows, we must close the file before renaming it

newPath := temp.Name() + "_renamed"
require.NoError(t, os.Rename(temp.Name(), newPath))

temp = filetest.OpenFile(t, newPath)
filetest.WriteString(t, temp, "testlog2\n")

op2, sink2 := testManager(t, cfg)

require.NoError(t, op2.Start(persister))

token, attributes = sink2.NextCall(t)
assert.Equal(t, []byte("testlog2"), token)
assert.Equal(t, filepath.Base(newPath), attributes[attrs.LogFileName])
assert.Equal(t, newPath, attributes[attrs.LogFilePath])
assert.Nil(t, attributes[attrs.LogFileNameResolved])
assert.Nil(t, attributes[attrs.LogFilePathResolved])

require.NoError(t, op2.Stop())
}

// ReadExistingLogs tests that, when starting from beginning, we
// read all the lines that are already there
func TestReadExistingLogs(t *testing.T) {
Expand Down
133 changes: 133 additions & 0 deletions pkg/stanza/fileconsumer/internal/reader/attributes_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,133 @@
// Copyright The OpenTelemetry Authors
// SPDX-License-Identifier: Apache-2.0

package reader

import (
"context"
"os"
"path/filepath"
"runtime"
"testing"

"github.com/stretchr/testify/require"

"github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/attrs"
"github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/fileconsumer/internal/filetest"
)

// AddFields tests that the `log.file.name` and `log.file.path` fields are included
// when IncludeFileName and IncludeFilePath are set to true
func TestAttributes(t *testing.T) {
t.Parallel()

// Create a file, then start
tempDir := t.TempDir()
temp := filetest.OpenTemp(t, tempDir)
filetest.WriteString(t, temp, "testlog\n")

f, sink := testFactory(t, includeFileName(), includeFilePath())
fp, err := f.NewFingerprint(temp)
require.NoError(t, err)

reader, err := f.NewReader(temp, fp)
require.NoError(t, err)
defer reader.Close()

reader.ReadToEnd(context.Background())

token, attributes := sink.NextCall(t)
require.Equal(t, []byte("testlog"), token)
require.Equal(t, filepath.Base(temp.Name()), attributes[attrs.LogFileName])
require.Equal(t, temp.Name(), attributes[attrs.LogFilePath])
require.Nil(t, attributes[attrs.LogFileNameResolved])
require.Nil(t, attributes[attrs.LogFilePathResolved])
}

// TestAttributesResolved tests that the `log.file.name_resolved` and `log.file.path_resolved` fields are included
// when IncludeFileNameResolved and IncludeFilePathResolved are set to true
func TestAttributesResolved(t *testing.T) {
if runtime.GOOS == "windows" {
t.Skip("Windows symlinks usage disabled for now. See https://github.com/open-telemetry/opentelemetry-collector-contrib/issues/21088")
}
t.Parallel()

// Set up actual file
actualDir := t.TempDir()
actualFile, err := os.CreateTemp(actualDir, "")
require.NoError(t, err)
t.Cleanup(func() { require.NoError(t, actualFile.Close()) })

// Resolve path
realPath, err := filepath.EvalSymlinks(actualFile.Name())
require.NoError(t, err)
resolved, err := filepath.Abs(realPath)
require.NoError(t, err)

// Create another directory with a symbolic link to the file
symlinkDir := t.TempDir()
symlinkPath := filepath.Join(symlinkDir, "symlink")
require.NoError(t, os.Symlink(actualFile.Name(), symlinkPath))
symlinkFile := filetest.OpenFile(t, symlinkPath)

// Populate data
filetest.WriteString(t, actualFile, "testlog\n")

// Read the data
f, sink := testFactory(t, includeFileName(), includeFilePath(), includeFileNameResolved(), includeFilePathResolved())
fp, err := f.NewFingerprint(symlinkFile)
require.NoError(t, err)
reader, err := f.NewReader(symlinkFile, fp)
require.NoError(t, err)
defer reader.Close()
reader.ReadToEnd(context.Background())

// Validate expectations
token, attributes := sink.NextCall(t)
require.Equal(t, []byte("testlog"), token)
require.Equal(t, filepath.Base(symlinkPath), attributes[attrs.LogFileName])
require.Equal(t, symlinkPath, attributes[attrs.LogFilePath])
require.Equal(t, filepath.Base(resolved), attributes[attrs.LogFileNameResolved])
require.Equal(t, resolved, attributes[attrs.LogFilePathResolved])

// Move symlinked file
newActualPath := actualFile.Name() + "_renamed"
require.NoError(t, os.Remove(symlinkPath))
require.NoError(t, os.Rename(actualFile.Name(), newActualPath))
require.NoError(t, os.Symlink(newActualPath, symlinkPath))

// Append additional data
filetest.WriteString(t, actualFile, "testlog2\n")

// Recreate the reader
symlinkFile = filetest.OpenFile(t, symlinkPath)
reader, err = f.NewReaderFromMetadata(symlinkFile, reader.Close())
require.NoError(t, err)
reader.ReadToEnd(context.Background())

token, attributes = sink.NextCall(t)
require.Equal(t, []byte("testlog2"), token)
require.Equal(t, filepath.Base(symlinkPath), attributes[attrs.LogFileName])
require.Equal(t, symlinkPath, attributes[attrs.LogFilePath])
require.Equal(t, filepath.Base(resolved)+"_renamed", attributes[attrs.LogFileNameResolved])
require.Equal(t, resolved+"_renamed", attributes[attrs.LogFilePathResolved])

// Append additional data
filetest.WriteString(t, actualFile, "testlog3\n")

// Recreate the factory with the attributes disabled
f, sink = testFactory(t)

// Recreate the reader and read new data
symlinkFile = filetest.OpenFile(t, symlinkPath)
reader, err = f.NewReaderFromMetadata(symlinkFile, reader.Close())
require.NoError(t, err)
reader.ReadToEnd(context.Background())

token, attributes = sink.NextCall(t)
require.Equal(t, []byte("testlog3"), token)
require.Nil(t, attributes[attrs.LogFileName])
require.Nil(t, attributes[attrs.LogFilePath])
require.Nil(t, attributes[attrs.LogFileNameResolved])
require.Nil(t, attributes[attrs.LogFilePathResolved])
}
Loading

0 comments on commit 2521a10

Please sign in to comment.