Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[chore][pkg/stanza] Add file disambiguation tests #31171

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
92 changes: 92 additions & 0 deletions pkg/stanza/fileconsumer/file_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1318,3 +1318,95 @@ func TestWindowsFilesClosedImmediately(t *testing.T) {
// On Windows, poll should close the file after reading it. We can test this by trying to move it.
require.NoError(t, os.Rename(temp.Name(), temp.Name()+"_renamed"))
}

func TestDelayedDisambiguation(t *testing.T) {
t.Parallel()
tempDir := t.TempDir()
cfg := NewConfig().includeDir(tempDir)
cfg.FingerprintSize = 18
cfg.StartAt = "beginning"
operator, sink := testManager(t, cfg)
operator.persister = testutil.NewMockPersister("test")

// Two identical files, smaller than fingerprint size
file1 := filetest.OpenTempWithPattern(t, tempDir, "*.log1")
file2 := filetest.OpenTempWithPattern(t, tempDir, "*.log2")

sameContent := "aaaaaaaaaaa"
filetest.WriteString(t, file1, sameContent+"\n")
filetest.WriteString(t, file2, sameContent+"\n")
operator.poll(context.Background())

token, attributes := sink.NextCall(t)
require.Equal(t, []byte(sameContent), token)
sink.ExpectNoCallsUntil(t, 100*time.Millisecond)
operator.wg.Wait()

// Append different data
newContent1 := "more content in file 1 only"
newContent2 := "different content in file 2"
filetest.WriteString(t, file1, newContent1+"\n")
filetest.WriteString(t, file2, newContent2+"\n")
operator.poll(context.Background())

var sameTokenOtherFile emittest.Call
if attributes[attrs.LogFileName].(string) == filepath.Base(file1.Name()) {
sameTokenOtherFile = emittest.Call{Token: []byte(sameContent), Attrs: map[string]any{attrs.LogFileName: filepath.Base(file2.Name())}}
} else {
sameTokenOtherFile = emittest.Call{Token: []byte(sameContent), Attrs: map[string]any{attrs.LogFileName: filepath.Base(file1.Name())}}
}
newFromFile1 := emittest.Call{Token: []byte(newContent1), Attrs: map[string]any{attrs.LogFileName: filepath.Base(file1.Name())}}
newFromFile2 := emittest.Call{Token: []byte(newContent2), Attrs: map[string]any{attrs.LogFileName: filepath.Base(file2.Name())}}
sink.ExpectCalls(t, &sameTokenOtherFile, &newFromFile1, &newFromFile2)
}

func TestNoLostPartial(t *testing.T) {
t.Parallel()
tempDir := t.TempDir()
cfg := NewConfig().includeDir(tempDir)
cfg.FingerprintSize = 18
cfg.StartAt = "beginning"
operator, sink := testManager(t, cfg)
operator.persister = testutil.NewMockPersister("test")

// Two same fingerprint file , and smaller than config size
file1 := filetest.OpenTempWithPattern(t, tempDir, "*.log1")
file2 := filetest.OpenTempWithPattern(t, tempDir, "*.log2")

sameContent := "aaaaaaaaaaa"
filetest.WriteString(t, file1, sameContent+"\n")
filetest.WriteString(t, file2, sameContent+"\n")
operator.poll(context.Background())

token, attributes := sink.NextCall(t)
require.Equal(t, []byte(sameContent), token)
sink.ExpectNoCallsUntil(t, 100*time.Millisecond)
operator.wg.Wait()

newContent1 := "additional content in file 1 only"
filetest.WriteString(t, file1, newContent1+"\n")

var otherFileName string
if attributes[attrs.LogFileName].(string) == filepath.Base(file1.Name()) {
otherFileName = filepath.Base(file2.Name())
} else {
otherFileName = filepath.Base(file1.Name())
}

var foundSameFromOtherFile, foundNewFromFileOne bool
require.Eventually(t, func() bool {
operator.poll(context.Background())
defer operator.wg.Wait()

token, attributes = sink.NextCall(t)
switch {
case string(token) == sameContent && attributes[attrs.LogFileName].(string) == otherFileName:
foundSameFromOtherFile = true
case string(token) == newContent1 && attributes[attrs.LogFileName].(string) == filepath.Base(file1.Name()):
foundNewFromFileOne = true
default:
t.Errorf("unexpected token from file %q: %s", filepath.Base(attributes[attrs.LogFileName].(string)), token)
}
return foundSameFromOtherFile && foundNewFromFileOne
}, time.Second, 100*time.Millisecond)
}
40 changes: 27 additions & 13 deletions pkg/stanza/fileconsumer/internal/emittest/sink.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,13 +22,13 @@ type sinkCfg struct {

type SinkOpt func(*sinkCfg)

type call struct {
token []byte
attrs map[string]any
type Call struct {
Token []byte
Attrs map[string]any
}

type Sink struct {
emitChan chan *call
emitChan chan *Call
timeout time.Duration
emit.Callback
}
Expand All @@ -53,14 +53,14 @@ func NewSink(opts ...SinkOpt) *Sink {
for _, opt := range opts {
opt(cfg)
}
emitChan := make(chan *call, cfg.emitChanLen)
emitChan := make(chan *Call, cfg.emitChanLen)
return &Sink{
emitChan: emitChan,
timeout: cfg.timeout,
Callback: func(_ context.Context, token []byte, attrs map[string]any) error {
copied := make([]byte, len(token))
copy(copied, token)
emitChan <- &call{copied, attrs}
emitChan <- &Call{copied, attrs}
return nil
},
}
Expand All @@ -76,7 +76,7 @@ func (s *Sink) NextTokens(t *testing.T, n int) [][]byte {
for i := 0; i < n; i++ {
select {
case call := <-s.emitChan:
emitChan = append(emitChan, call.token)
emitChan = append(emitChan, call.Token)
case <-time.After(s.timeout):
assert.Fail(t, "Timed out waiting for message")
return nil
Expand All @@ -88,7 +88,7 @@ func (s *Sink) NextTokens(t *testing.T, n int) [][]byte {
func (s *Sink) NextCall(t *testing.T) ([]byte, map[string]any) {
select {
case c := <-s.emitChan:
return c.token, c.attrs
return c.Token, c.Attrs
case <-time.After(s.timeout):
assert.Fail(t, "Timed out waiting for message")
return nil, nil
Expand All @@ -98,7 +98,7 @@ func (s *Sink) NextCall(t *testing.T) ([]byte, map[string]any) {
func (s *Sink) ExpectToken(t *testing.T, expected []byte) {
select {
case call := <-s.emitChan:
assert.Equal(t, expected, call.token)
assert.Equal(t, expected, call.Token)
case <-time.After(s.timeout):
assert.Fail(t, fmt.Sprintf("Timed out waiting for token: %s", expected))
}
Expand All @@ -109,7 +109,7 @@ func (s *Sink) ExpectTokens(t *testing.T, expected ...[]byte) {
for i := 0; i < len(expected); i++ {
select {
case call := <-s.emitChan:
actual = append(actual, call.token)
actual = append(actual, call.Token)
case <-time.After(s.timeout):
assert.Fail(t, "Timed out waiting for message")
return
Expand All @@ -121,21 +121,35 @@ func (s *Sink) ExpectTokens(t *testing.T, expected ...[]byte) {
func (s *Sink) ExpectCall(t *testing.T, expected []byte, attrs map[string]any) {
select {
case c := <-s.emitChan:
assert.Equal(t, expected, c.token)
assert.Equal(t, attrs, c.attrs)
assert.Equal(t, expected, c.Token)
assert.Equal(t, attrs, c.Attrs)
case <-time.After(s.timeout):
assert.Fail(t, fmt.Sprintf("Timed out waiting for token: %s", expected))
}
}

func (s *Sink) ExpectCalls(t *testing.T, expected ...*Call) {
actual := make([]*Call, 0, len(expected))
for i := 0; i < len(expected); i++ {
select {
case call := <-s.emitChan:
actual = append(actual, call)
case <-time.After(s.timeout):
assert.Fail(t, "Timed out waiting for message")
return
}
}
require.ElementsMatch(t, expected, actual)
}

func (s *Sink) ExpectNoCalls(t *testing.T) {
s.ExpectNoCallsUntil(t, 200*time.Millisecond)
}

func (s *Sink) ExpectNoCallsUntil(t *testing.T, d time.Duration) {
select {
case c := <-s.emitChan:
assert.Fail(t, "Received unexpected message", "Message: %s", c.token)
assert.Fail(t, "Received unexpected message", "Message: %s", c.Token)
case <-time.After(d):
}
}
75 changes: 52 additions & 23 deletions pkg/stanza/fileconsumer/internal/emittest/sink_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,15 +17,15 @@ func TestNextToken(t *testing.T) {
s, testCalls := sinkTest(t)
for _, c := range testCalls {
token := s.NextToken(t)
assert.Equal(t, c.token, token)
assert.Equal(t, c.Token, token)
}
}

func TestNextTokenTimeout(t *testing.T) {
s, testCalls := sinkTest(t, WithTimeout(10*time.Millisecond))
for _, c := range testCalls {
token := s.NextToken(t)
assert.Equal(t, c.token, token)
assert.Equal(t, c.Token, token)
}

// Create a new T so we can expect it to fail without failing the overall test.
Expand All @@ -38,17 +38,17 @@ func TestNextTokens(t *testing.T) {
s, testCalls := sinkTest(t)
for i := 0; i < 5; i++ {
tokens := s.NextTokens(t, 2)
assert.Equal(t, testCalls[2*i].token, tokens[0])
assert.Equal(t, testCalls[2*i+1].token, tokens[1])
assert.Equal(t, testCalls[2*i].Token, tokens[0])
assert.Equal(t, testCalls[2*i+1].Token, tokens[1])
}
}

func TestNextTokensTimeout(t *testing.T) {
s, testCalls := sinkTest(t, WithTimeout(10*time.Millisecond))
for i := 0; i < 5; i++ {
tokens := s.NextTokens(t, 2)
assert.Equal(t, testCalls[2*i].token, tokens[0])
assert.Equal(t, testCalls[2*i+1].token, tokens[1])
assert.Equal(t, testCalls[2*i].Token, tokens[0])
assert.Equal(t, testCalls[2*i+1].Token, tokens[1])
}

// Create a new T so we can expect it to fail without failing the overall test.
Expand All @@ -61,17 +61,17 @@ func TestNextCall(t *testing.T) {
s, testCalls := sinkTest(t)
for _, c := range testCalls {
token, attributes := s.NextCall(t)
require.Equal(t, c.token, token)
require.Equal(t, c.attrs, attributes)
require.Equal(t, c.Token, token)
require.Equal(t, c.Attrs, attributes)
}
}

func TestNextCallTimeout(t *testing.T) {
s, testCalls := sinkTest(t, WithTimeout(10*time.Millisecond))
for _, c := range testCalls {
token, attributes := s.NextCall(t)
require.Equal(t, c.token, token)
require.Equal(t, c.attrs, attributes)
require.Equal(t, c.Token, token)
require.Equal(t, c.Attrs, attributes)
}

// Create a new T so we can expect it to fail without failing the overall test.
Expand All @@ -83,14 +83,14 @@ func TestNextCallTimeout(t *testing.T) {
func TestExpectToken(t *testing.T) {
s, testCalls := sinkTest(t)
for _, c := range testCalls {
s.ExpectToken(t, c.token)
s.ExpectToken(t, c.Token)
}
}

func TestExpectTokenTimeout(t *testing.T) {
s, testCalls := sinkTest(t, WithTimeout(10*time.Millisecond))
for _, c := range testCalls {
s.ExpectToken(t, c.token)
s.ExpectToken(t, c.Token)
}

// Create a new T so we can expect it to fail without failing the overall test.
Expand All @@ -102,14 +102,14 @@ func TestExpectTokenTimeout(t *testing.T) {
func TestExpectTokens(t *testing.T) {
s, testCalls := sinkTest(t)
for i := 0; i < 5; i++ {
s.ExpectTokens(t, testCalls[2*i].token, testCalls[2*i+1].token)
s.ExpectTokens(t, testCalls[2*i].Token, testCalls[2*i+1].Token)
}
}

func TestExpectTokensTimeout(t *testing.T) {
s, testCalls := sinkTest(t, WithTimeout(10*time.Millisecond))
for i := 0; i < 5; i++ {
s.ExpectTokens(t, testCalls[2*i].token, testCalls[2*i+1].token)
s.ExpectTokens(t, testCalls[2*i].Token, testCalls[2*i+1].Token)
}

// Create a new T so we can expect it to fail without failing the overall test.
Expand All @@ -121,14 +121,14 @@ func TestExpectTokensTimeout(t *testing.T) {
func TestExpectCall(t *testing.T) {
s, testCalls := sinkTest(t)
for _, c := range testCalls {
s.ExpectCall(t, c.token, c.attrs)
s.ExpectCall(t, c.Token, c.Attrs)
}
}

func TestExpectCallTimeout(t *testing.T) {
s, testCalls := sinkTest(t, WithTimeout(10*time.Millisecond))
for _, c := range testCalls {
s.ExpectCall(t, c.token, c.attrs)
s.ExpectCall(t, c.Token, c.Attrs)
}

// Create a new T so we can expect it to fail without failing the overall test.
Expand All @@ -137,6 +137,35 @@ func TestExpectCallTimeout(t *testing.T) {
assert.True(t, tt.Failed())
}

func TestExpectCalls(t *testing.T) {
s, testCalls := sinkTest(t)
testCallsOutOfOrder := make([]*Call, 0, 10)
for i := 0; i < len(testCalls); i += 2 {
testCallsOutOfOrder = append(testCallsOutOfOrder, testCalls[i])
}
for i := 1; i < len(testCalls); i += 2 {
testCallsOutOfOrder = append(testCallsOutOfOrder, testCalls[i])
}
s.ExpectCalls(t, testCallsOutOfOrder...)
}

func TestExpectCallsTimeout(t *testing.T) {
s, testCalls := sinkTest(t, WithTimeout(10*time.Millisecond))
testCallsOutOfOrder := make([]*Call, 0, 10)
for i := 0; i < len(testCalls); i += 2 {
testCallsOutOfOrder = append(testCallsOutOfOrder, testCalls[i])
}
for i := 1; i < len(testCalls); i += 2 {
testCallsOutOfOrder = append(testCallsOutOfOrder, testCalls[i])
}
s.ExpectCalls(t, testCallsOutOfOrder...)

// Create a new T so we can expect it to fail without failing the overall test.
tt := new(testing.T)
s.ExpectCalls(tt, new(Call))
assert.True(t, tt.Failed())
}

func TestExpectNoCalls(t *testing.T) {
s, _ := sinkTest(t)
s.NextTokens(t, 10) // drain the channel
Expand All @@ -156,24 +185,24 @@ func TestExpectNoCallsFailure(t *testing.T) {
func TestWithCallBuffer(t *testing.T) {
s, testCalls := sinkTest(t, WithCallBuffer(5))
for i := 0; i < 10; i++ {
s.ExpectCall(t, testCalls[i].token, testCalls[i].attrs)
s.ExpectCall(t, testCalls[i].Token, testCalls[i].Attrs)
}
}

func sinkTest(t *testing.T, opts ...SinkOpt) (*Sink, []*call) {
func sinkTest(t *testing.T, opts ...SinkOpt) (*Sink, []*Call) {
s := NewSink(opts...)
testCalls := make([]*call, 0, 10)
testCalls := make([]*Call, 0, 10)
for i := 0; i < 10; i++ {
testCalls = append(testCalls, &call{
token: []byte(fmt.Sprintf("token-%d", i)),
attrs: map[string]any{
testCalls = append(testCalls, &Call{
Token: []byte(fmt.Sprintf("token-%d", i)),
Attrs: map[string]any{
"key": fmt.Sprintf("value-%d", i),
},
})
}
go func() {
for _, c := range testCalls {
require.NoError(t, s.Callback(context.Background(), c.token, c.attrs))
require.NoError(t, s.Callback(context.Background(), c.Token, c.Attrs))
}
}()
return s, testCalls
Expand Down
Loading