diff --git a/pkg/stanza/fileconsumer/file_test.go b/pkg/stanza/fileconsumer/file_test.go index 93d9481eac89..5ec805147e43 100644 --- a/pkg/stanza/fileconsumer/file_test.go +++ b/pkg/stanza/fileconsumer/file_test.go @@ -86,221 +86,6 @@ See this issue for details: https://github.com/census-instrumentation/opencensus require.NoError(t, operator.Stop()) } -// AddFields tests that the `log.file.name` and `log.file.path` fields are included -// when IncludeFileName and IncludeFilePath are set to true -func TestAddFileFields(t *testing.T) { - t.Parallel() - - tempDir := t.TempDir() - cfg := NewConfig().includeDir(tempDir) - cfg.StartAt = "beginning" - cfg.IncludeFileName = true - cfg.IncludeFilePath = true - cfg.IncludeFileNameResolved = false - cfg.IncludeFilePathResolved = false - operator, sink := testManager(t, cfg) - - // Create a file, then start - temp := filetest.OpenTemp(t, tempDir) - filetest.WriteString(t, temp, "testlog\n") - - require.NoError(t, operator.Start(testutil.NewUnscopedMockPersister())) - defer func() { - require.NoError(t, operator.Stop()) - }() - - _, attributes := sink.NextCall(t) - require.Equal(t, filepath.Base(temp.Name()), attributes[attrs.LogFileName]) - require.Equal(t, temp.Name(), attributes[attrs.LogFilePath]) - require.Nil(t, attributes[attrs.LogFileNameResolved]) - require.Nil(t, attributes[attrs.LogFilePathResolved]) -} - -// AddFileResolvedFields tests that the `log.file.name_resolved` and `log.file.path_resolved` fields are included -// when IncludeFileNameResolved and IncludeFilePathResolved are set to true -func TestAddFileResolvedFields(t *testing.T) { - if runtime.GOOS == windowsOS { - t.Skip("Windows symlinks usage disabled for now. See https://github.com/open-telemetry/opentelemetry-collector-contrib/issues/21088") - } - t.Parallel() - - tempDir := t.TempDir() - cfg := NewConfig().includeDir(tempDir) - cfg.StartAt = "beginning" - cfg.IncludeFileName = true - cfg.IncludeFilePath = true - cfg.IncludeFileNameResolved = true - cfg.IncludeFilePathResolved = true - operator, sink := testManager(t, cfg) - - // Create temp dir with log file - dir := t.TempDir() - - file, err := os.CreateTemp(dir, "") - require.NoError(t, err) - t.Cleanup(func() { - require.NoError(t, file.Close()) - }) - - // Create symbolic link in monitored directory - symLinkPath := filepath.Join(tempDir, "symlink") - err = os.Symlink(file.Name(), symLinkPath) - require.NoError(t, err) - - // Populate data - filetest.WriteString(t, file, "testlog\n") - - // Resolve path - realPath, err := filepath.EvalSymlinks(file.Name()) - require.NoError(t, err) - resolved, err := filepath.Abs(realPath) - require.NoError(t, err) - - require.NoError(t, operator.Start(testutil.NewUnscopedMockPersister())) - defer func() { - require.NoError(t, operator.Stop()) - }() - - _, attributes := sink.NextCall(t) - require.Equal(t, filepath.Base(symLinkPath), attributes[attrs.LogFileName]) - require.Equal(t, symLinkPath, attributes[attrs.LogFilePath]) - require.Equal(t, filepath.Base(resolved), attributes[attrs.LogFileNameResolved]) - require.Equal(t, resolved, attributes[attrs.LogFilePathResolved]) -} - -// AddFileResolvedFields tests that the `log.file.name_resolved` and `log.file.path_resolved` fields are included -// when IncludeFileNameResolved and IncludeFilePathResolved are set to true and underlaying symlink change -// Scenario: -// monitored file (symlink) -> middleSymlink -> file_1 -// monitored file (symlink) -> middleSymlink -> file_2 -func TestAddFileResolvedFieldsWithChangeOfSymlinkTarget(t *testing.T) { - if runtime.GOOS == windowsOS { - t.Skip("Windows symlinks usage disabled for now. See https://github.com/open-telemetry/opentelemetry-collector-contrib/issues/21088") - } - t.Parallel() - - tempDir := t.TempDir() - cfg := NewConfig().includeDir(tempDir) - cfg.StartAt = "beginning" - cfg.IncludeFileName = true - cfg.IncludeFilePath = true - cfg.IncludeFileNameResolved = true - cfg.IncludeFilePathResolved = true - operator, sink := testManager(t, cfg) - - // Create temp dir with log file - dir := t.TempDir() - - file1, err := os.CreateTemp(dir, "") - require.NoError(t, err) - t.Cleanup(func() { - require.NoError(t, file1.Close()) - }) - - file2, err := os.CreateTemp(dir, "") - require.NoError(t, err) - t.Cleanup(func() { - require.NoError(t, file2.Close()) - }) - - // Resolve paths - real1, err := filepath.EvalSymlinks(file1.Name()) - require.NoError(t, err) - resolved1, err := filepath.Abs(real1) - require.NoError(t, err) - - real2, err := filepath.EvalSymlinks(file2.Name()) - require.NoError(t, err) - resolved2, err := filepath.Abs(real2) - require.NoError(t, err) - - // Create symbolic link in monitored directory - // symLinkPath(target of file input) -> middleSymLinkPath -> file1 - middleSymLinkPath := filepath.Join(dir, "symlink") - symLinkPath := filepath.Join(tempDir, "symlink") - err = os.Symlink(file1.Name(), middleSymLinkPath) - require.NoError(t, err) - err = os.Symlink(middleSymLinkPath, symLinkPath) - require.NoError(t, err) - - // Populate data - filetest.WriteString(t, file1, "testlog\n") - - require.NoError(t, operator.Start(testutil.NewUnscopedMockPersister())) - defer func() { - require.NoError(t, operator.Stop()) - }() - - _, attributes := sink.NextCall(t) - require.Equal(t, filepath.Base(symLinkPath), attributes[attrs.LogFileName]) - require.Equal(t, symLinkPath, attributes[attrs.LogFilePath]) - require.Equal(t, filepath.Base(resolved1), attributes[attrs.LogFileNameResolved]) - require.Equal(t, resolved1, attributes[attrs.LogFilePathResolved]) - - // Change middleSymLink to point to file2 - err = os.Remove(middleSymLinkPath) - require.NoError(t, err) - err = os.Symlink(file2.Name(), middleSymLinkPath) - require.NoError(t, err) - - // Populate data (different content due to fingerprint) - filetest.WriteString(t, file2, "testlog2\n") - - _, attributes = sink.NextCall(t) - require.Equal(t, filepath.Base(symLinkPath), attributes[attrs.LogFileName]) - require.Equal(t, symLinkPath, attributes[attrs.LogFilePath]) - require.Equal(t, filepath.Base(resolved2), attributes[attrs.LogFileNameResolved]) - require.Equal(t, resolved2, attributes[attrs.LogFilePathResolved]) -} - -func TestFileFieldsUpdatedAfterRestart(t *testing.T) { - t.Parallel() - - tempDir := t.TempDir() - cfg := NewConfig().includeDir(tempDir) - cfg.StartAt = "beginning" - cfg.IncludeFileName = true - cfg.IncludeFilePath = true - op1, sink1 := testManager(t, cfg) - - // Create a file, then start - temp, err := os.CreateTemp(tempDir, "") - require.NoError(t, err) - filetest.WriteString(t, temp, "testlog1\n") - - persister := testutil.NewUnscopedMockPersister() - require.NoError(t, op1.Start(persister)) - - token, attributes := sink1.NextCall(t) - assert.Equal(t, []byte("testlog1"), token) - assert.Equal(t, filepath.Base(temp.Name()), attributes[attrs.LogFileName]) - assert.Equal(t, temp.Name(), attributes[attrs.LogFilePath]) - assert.Nil(t, attributes[attrs.LogFileNameResolved]) - assert.Nil(t, attributes[attrs.LogFilePathResolved]) - - require.NoError(t, op1.Stop()) - temp.Close() // On windows, we must close the file before renaming it - - newPath := temp.Name() + "_renamed" - require.NoError(t, os.Rename(temp.Name(), newPath)) - - temp = filetest.OpenFile(t, newPath) - filetest.WriteString(t, temp, "testlog2\n") - - op2, sink2 := testManager(t, cfg) - - require.NoError(t, op2.Start(persister)) - - token, attributes = sink2.NextCall(t) - assert.Equal(t, []byte("testlog2"), token) - assert.Equal(t, filepath.Base(newPath), attributes[attrs.LogFileName]) - assert.Equal(t, newPath, attributes[attrs.LogFilePath]) - assert.Nil(t, attributes[attrs.LogFileNameResolved]) - assert.Nil(t, attributes[attrs.LogFilePathResolved]) - - require.NoError(t, op2.Stop()) -} - // ReadExistingLogs tests that, when starting from beginning, we // read all the lines that are already there func TestReadExistingLogs(t *testing.T) { diff --git a/pkg/stanza/fileconsumer/internal/reader/attributes_test.go b/pkg/stanza/fileconsumer/internal/reader/attributes_test.go new file mode 100644 index 000000000000..1b8bec2569d9 --- /dev/null +++ b/pkg/stanza/fileconsumer/internal/reader/attributes_test.go @@ -0,0 +1,133 @@ +// Copyright The OpenTelemetry Authors +// SPDX-License-Identifier: Apache-2.0 + +package reader + +import ( + "context" + "os" + "path/filepath" + "runtime" + "testing" + + "github.com/stretchr/testify/require" + + "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/attrs" + "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/fileconsumer/internal/filetest" +) + +// AddFields tests that the `log.file.name` and `log.file.path` fields are included +// when IncludeFileName and IncludeFilePath are set to true +func TestAttributes(t *testing.T) { + t.Parallel() + + // Create a file, then start + tempDir := t.TempDir() + temp := filetest.OpenTemp(t, tempDir) + filetest.WriteString(t, temp, "testlog\n") + + f, sink := testFactory(t, includeFileName(), includeFilePath()) + fp, err := f.NewFingerprint(temp) + require.NoError(t, err) + + reader, err := f.NewReader(temp, fp) + require.NoError(t, err) + defer reader.Close() + + reader.ReadToEnd(context.Background()) + + token, attributes := sink.NextCall(t) + require.Equal(t, []byte("testlog"), token) + require.Equal(t, filepath.Base(temp.Name()), attributes[attrs.LogFileName]) + require.Equal(t, temp.Name(), attributes[attrs.LogFilePath]) + require.Nil(t, attributes[attrs.LogFileNameResolved]) + require.Nil(t, attributes[attrs.LogFilePathResolved]) +} + +// TestAttributesResolved tests that the `log.file.name_resolved` and `log.file.path_resolved` fields are included +// when IncludeFileNameResolved and IncludeFilePathResolved are set to true +func TestAttributesResolved(t *testing.T) { + if runtime.GOOS == "windows" { + t.Skip("Windows symlinks usage disabled for now. See https://github.com/open-telemetry/opentelemetry-collector-contrib/issues/21088") + } + t.Parallel() + + // Set up actual file + actualDir := t.TempDir() + actualFile, err := os.CreateTemp(actualDir, "") + require.NoError(t, err) + t.Cleanup(func() { require.NoError(t, actualFile.Close()) }) + + // Resolve path + realPath, err := filepath.EvalSymlinks(actualFile.Name()) + require.NoError(t, err) + resolved, err := filepath.Abs(realPath) + require.NoError(t, err) + + // Create another directory with a symbolic link to the file + symlinkDir := t.TempDir() + symlinkPath := filepath.Join(symlinkDir, "symlink") + require.NoError(t, os.Symlink(actualFile.Name(), symlinkPath)) + symlinkFile := filetest.OpenFile(t, symlinkPath) + + // Populate data + filetest.WriteString(t, actualFile, "testlog\n") + + // Read the data + f, sink := testFactory(t, includeFileName(), includeFilePath(), includeFileNameResolved(), includeFilePathResolved()) + fp, err := f.NewFingerprint(symlinkFile) + require.NoError(t, err) + reader, err := f.NewReader(symlinkFile, fp) + require.NoError(t, err) + defer reader.Close() + reader.ReadToEnd(context.Background()) + + // Validate expectations + token, attributes := sink.NextCall(t) + require.Equal(t, []byte("testlog"), token) + require.Equal(t, filepath.Base(symlinkPath), attributes[attrs.LogFileName]) + require.Equal(t, symlinkPath, attributes[attrs.LogFilePath]) + require.Equal(t, filepath.Base(resolved), attributes[attrs.LogFileNameResolved]) + require.Equal(t, resolved, attributes[attrs.LogFilePathResolved]) + + // Move symlinked file + newActualPath := actualFile.Name() + "_renamed" + require.NoError(t, os.Remove(symlinkPath)) + require.NoError(t, os.Rename(actualFile.Name(), newActualPath)) + require.NoError(t, os.Symlink(newActualPath, symlinkPath)) + + // Append additional data + filetest.WriteString(t, actualFile, "testlog2\n") + + // Recreate the reader + symlinkFile = filetest.OpenFile(t, symlinkPath) + reader, err = f.NewReaderFromMetadata(symlinkFile, reader.Close()) + require.NoError(t, err) + reader.ReadToEnd(context.Background()) + + token, attributes = sink.NextCall(t) + require.Equal(t, []byte("testlog2"), token) + require.Equal(t, filepath.Base(symlinkPath), attributes[attrs.LogFileName]) + require.Equal(t, symlinkPath, attributes[attrs.LogFilePath]) + require.Equal(t, filepath.Base(resolved)+"_renamed", attributes[attrs.LogFileNameResolved]) + require.Equal(t, resolved+"_renamed", attributes[attrs.LogFilePathResolved]) + + // Append additional data + filetest.WriteString(t, actualFile, "testlog3\n") + + // Recreate the factory with the attributes disabled + f, sink = testFactory(t) + + // Recreate the reader and read new data + symlinkFile = filetest.OpenFile(t, symlinkPath) + reader, err = f.NewReaderFromMetadata(symlinkFile, reader.Close()) + require.NoError(t, err) + reader.ReadToEnd(context.Background()) + + token, attributes = sink.NextCall(t) + require.Equal(t, []byte("testlog3"), token) + require.Nil(t, attributes[attrs.LogFileName]) + require.Nil(t, attributes[attrs.LogFilePath]) + require.Nil(t, attributes[attrs.LogFileNameResolved]) + require.Nil(t, attributes[attrs.LogFilePathResolved]) +} diff --git a/pkg/stanza/fileconsumer/internal/reader/factory_test.go b/pkg/stanza/fileconsumer/internal/reader/factory_test.go index 3e9057c2c64c..4b87c4bb7c44 100644 --- a/pkg/stanza/fileconsumer/internal/reader/factory_test.go +++ b/pkg/stanza/fileconsumer/internal/reader/factory_test.go @@ -42,29 +42,37 @@ func testFactory(t *testing.T, opts ...testFactoryOpt) (*Factory, *emittest.Sink sink := emittest.NewSink(emittest.WithCallBuffer(cfg.sinkCallBufferSize)) return &Factory{ - SugaredLogger: testutil.Logger(t), - FromBeginning: cfg.fromBeginning, - FingerprintSize: cfg.fingerprintSize, - MaxLogSize: cfg.maxLogSize, - Encoding: cfg.encoding, - SplitFunc: splitFunc, - TrimFunc: cfg.trimFunc, - FlushTimeout: cfg.flushPeriod, - EmitFunc: sink.Callback, + SugaredLogger: testutil.Logger(t), + FromBeginning: cfg.fromBeginning, + FingerprintSize: cfg.fingerprintSize, + MaxLogSize: cfg.maxLogSize, + Encoding: cfg.encoding, + SplitFunc: splitFunc, + TrimFunc: cfg.trimFunc, + FlushTimeout: cfg.flushPeriod, + EmitFunc: sink.Callback, + IncludeFileName: cfg.includeFileName, + IncludeFilePath: cfg.includeFilePath, + IncludeFileNameResolved: cfg.includeFileNameResolved, + IncludeFilePathResolved: cfg.includeFilePathResolved, }, sink } type testFactoryOpt func(*testFactoryCfg) type testFactoryCfg struct { - fromBeginning bool - fingerprintSize int - maxLogSize int - encoding encoding.Encoding - splitCfg split.Config - trimFunc trim.Func - flushPeriod time.Duration - sinkCallBufferSize int + fromBeginning bool + fingerprintSize int + maxLogSize int + encoding encoding.Encoding + splitCfg split.Config + trimFunc trim.Func + flushPeriod time.Duration + sinkCallBufferSize int + includeFileName bool + includeFilePath bool + includeFileNameResolved bool + includeFilePathResolved bool } func withFingerprintSize(size int) testFactoryOpt { @@ -96,3 +104,27 @@ func withSinkBufferSize(n int) testFactoryOpt { c.sinkCallBufferSize = n } } + +func includeFileName() testFactoryOpt { + return func(c *testFactoryCfg) { + c.includeFileName = true + } +} + +func includeFilePath() testFactoryOpt { + return func(c *testFactoryCfg) { + c.includeFilePath = true + } +} + +func includeFileNameResolved() testFactoryOpt { + return func(c *testFactoryCfg) { + c.includeFileNameResolved = true + } +} + +func includeFilePathResolved() testFactoryOpt { + return func(c *testFactoryCfg) { + c.includeFilePathResolved = true + } +}