Skip to content

Commit

Permalink
[pkg/stanza] Always recombine if possible, even if incomplete (open-t…
Browse files Browse the repository at this point in the history
…elemetry#30797)

Previously, certain circumstances could result in partial logs being
emitted without any recombiniation. This could occur when using
`is_first_entry`, if the first partial log from a source was emitted
before a matching "start of log" indicator was found. This could also
occur when the collector was shutting down.
  • Loading branch information
djaglowski authored and cparkins committed Feb 1, 2024
1 parent bd0d207 commit 7ad2dc2
Show file tree
Hide file tree
Showing 4 changed files with 96 additions and 33 deletions.
31 changes: 31 additions & 0 deletions .chloggen/pkg-stanza-recombine-clean-state.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
# Use this changelog template to create an entry for release notes.

# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix'
change_type: bug_fix

# The name of the component, or a single word describing the area of concern, (e.g. filelogreceiver)
component: pkg/stanza

# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`).
note: Recombine operator should always recombine partial logs

# Mandatory: One or more tracking issues related to the change. You can use the PR number here if no issue exists.
issues: [30797]

# (Optional) One or more lines of additional information to render under the primary note.
# These lines will be padded with 2 spaces and then inserted directly into the document.
# Use pipe (|) for multiline entries.
subtext: |
Previously, certain circumstances could result in partial logs being emitted without any
recombiniation. This could occur when using `is_first_entry`, if the first partial log from
a source was emitted before a matching "start of log" indicator was found. This could also
occur when the collector was shutting down.
# If your change doesn't affect end users or the exported elements of any package,
# you should instead start your pull request title with [chore] or use the "Skip Changelog" label.
# Optional: The change log or logs in which this entry should be included.
# e.g. '[user]' or '[user, api]'
# Include 'user' if the change is relevant to end users.
# Include 'api' if there is a change to a library API.
# Default: '[user]'
change_logs: []
32 changes: 21 additions & 11 deletions pkg/stanza/operator/transformer/recombine/recombine.go
Original file line number Diff line number Diff line change
Expand Up @@ -178,7 +178,7 @@ func (r *Transformer) flushLoop() {
if timeSinceFirstEntry < r.forceFlushTimeout {
continue
}
if err := r.flushSource(source, true); err != nil {
if err := r.flushSource(context.Background(), source, true); err != nil {
r.Errorf("there was error flushing combined logs %s", err)
}
}
Expand All @@ -198,7 +198,7 @@ func (r *Transformer) Stop() error {

ctx, cancel := context.WithTimeout(context.Background(), time.Second)
defer cancel()
r.flushUncombined(ctx)
r.flushAllSources(ctx)

close(r.chClose)

Expand Down Expand Up @@ -241,7 +241,7 @@ func (r *Transformer) Process(ctx context.Context, e *entry.Entry) error {
// This is the first entry in the next batch
case matches && r.matchIndicatesFirst():
// Flush the existing batch
err := r.flushSource(s, true)
err := r.flushSource(ctx, s, true)
if err != nil {
return err
}
Expand All @@ -251,11 +251,8 @@ func (r *Transformer) Process(ctx context.Context, e *entry.Entry) error {
return nil
// This is the last entry in a complete batch
case matches && r.matchIndicatesLast():
fallthrough
// When matching on first entry, never batch partial first. Just emit immediately
case !matches && r.matchIndicatesFirst() && r.batchMap[s] == nil:
r.addToBatch(ctx, e, s)
return r.flushSource(s, true)
return r.flushSource(ctx, s, true)
}

// This is neither the first entry of a new log,
Expand All @@ -273,7 +270,7 @@ func (r *Transformer) matchIndicatesLast() bool {
}

// addToBatch adds the current entry to the current batch of entries that will be combined
func (r *Transformer) addToBatch(_ context.Context, e *entry.Entry, source string) {
func (r *Transformer) addToBatch(ctx context.Context, e *entry.Entry, source string) {
batch, ok := r.batchMap[source]
if !ok {
batch = r.addNewBatch(source, e)
Expand Down Expand Up @@ -305,7 +302,7 @@ func (r *Transformer) addToBatch(_ context.Context, e *entry.Entry, source strin
batch.recombined.WriteString(s)

if (r.maxLogSize > 0 && int64(batch.recombined.Len()) > r.maxLogSize) || len(batch.entries) >= r.maxBatchSize {
if err := r.flushSource(source, false); err != nil {
if err := r.flushSource(ctx, source, false); err != nil {
r.Errorf("there was error flushing combined logs %s", err)
}
}
Expand All @@ -325,9 +322,22 @@ func (r *Transformer) flushUncombined(ctx context.Context) {
r.ticker.Reset(r.forceFlushTimeout)
}

// flushAllSources flushes all sources.
func (r *Transformer) flushAllSources(ctx context.Context) {
var errs []error
for source := range r.batchMap {
if err := r.flushSource(ctx, source, true); err != nil {
errs = append(errs, err)
}
}
if len(errs) > 0 {
r.Errorf("there was error flushing combined logs %s", errs)
}
}

// flushSource combines the entries currently in the batch into a single entry,
// then forwards them to the next operator in the pipeline
func (r *Transformer) flushSource(source string, deleteSource bool) error {
func (r *Transformer) flushSource(ctx context.Context, source string, deleteSource bool) error {
batch := r.batchMap[source]
// Skip flushing a combined log if the batch is empty
if batch == nil {
Expand Down Expand Up @@ -355,7 +365,7 @@ func (r *Transformer) flushSource(source string, deleteSource bool) error {
return err
}

r.Write(context.Background(), base)
r.Write(ctx, base)
if deleteSource {
r.removeBatch(source)
} else {
Expand Down
45 changes: 25 additions & 20 deletions pkg/stanza/operator/transformer/recombine/recombine_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -158,6 +158,7 @@ func TestTransformer(t *testing.T) {
cfg.IsFirstEntry = "$body == 'test1'"
cfg.OutputIDs = []string{"fake"}
cfg.OverwriteWith = "newest"
cfg.ForceFlushTimeout = 100 * time.Millisecond
return cfg
}(),
[]*entry.Entry{
Expand All @@ -166,35 +167,34 @@ func TestTransformer(t *testing.T) {
entryWithBody(t2, "test4"),
},
[]*entry.Entry{
entryWithBody(t1, "test2"),
entryWithBody(t2, "test3"),
entryWithBody(t2, "test4"),
entryWithBody(t1, "test2\ntest3\ntest4"),
},
},
{
"EntriesMatchingForFirstEntryOneFileOnly",
func() *Config {
cfg := NewConfig()
cfg.CombineField = entry.NewBodyField()
cfg.IsFirstEntry = "body == 'file1'"
cfg.IsFirstEntry = "body == 'start'"
cfg.OutputIDs = []string{"fake"}
cfg.OverwriteWith = "oldest"
cfg.ForceFlushTimeout = 100 * time.Millisecond
return cfg
}(),
[]*entry.Entry{
entryWithBodyAttr(t1, "file1", map[string]string{"file.path": "file1"}),
entryWithBodyAttr(t1, "file3", map[string]string{"file.path": "file1"}),
entryWithBodyAttr(t1, "file1", map[string]string{"file.path": "file1"}),
entryWithBodyAttr(t2, "file2", map[string]string{"file.path": "file1"}),
entryWithBodyAttr(t1, "file1", map[string]string{"file.path": "file1"}),
entryWithBodyAttr(t2, "file2", map[string]string{"file.path": "file2"}),
entryWithBodyAttr(t2, "file3", map[string]string{"file.path": "file2"}),
entryWithBodyAttr(t1, "start", map[string]string{"file.path": "file1"}),
entryWithBodyAttr(t1, "more1a", map[string]string{"file.path": "file1"}),
entryWithBodyAttr(t1, "start", map[string]string{"file.path": "file1"}),
entryWithBodyAttr(t2, "more1b", map[string]string{"file.path": "file1"}),
entryWithBodyAttr(t2, "start", map[string]string{"file.path": "file1"}),
entryWithBodyAttr(t2, "more2a", map[string]string{"file.path": "file2"}),
entryWithBodyAttr(t2, "more2b", map[string]string{"file.path": "file2"}),
},
[]*entry.Entry{
entryWithBodyAttr(t1, "file1\nfile3", map[string]string{"file.path": "file1"}),
entryWithBodyAttr(t2, "file1\nfile2", map[string]string{"file.path": "file1"}),
entryWithBodyAttr(t2, "file2", map[string]string{"file.path": "file2"}),
entryWithBodyAttr(t2, "file3", map[string]string{"file.path": "file2"}),
entryWithBodyAttr(t1, "start\nmore1a", map[string]string{"file.path": "file1"}),
entryWithBodyAttr(t2, "start\nmore1b", map[string]string{"file.path": "file1"}),
entryWithBodyAttr(t2, "start", map[string]string{"file.path": "file1"}),
entryWithBodyAttr(t2, "more2a\nmore2b", map[string]string{"file.path": "file2"}),
},
},
{
Expand Down Expand Up @@ -507,9 +507,7 @@ func TestTransformer(t *testing.T) {
require.NoError(t, recombine.Process(context.Background(), e))
}

for _, expected := range tc.expectedOutput {
fake.ExpectEntry(t, expected)
}
fake.ExpectEntries(t, tc.expectedOutput)

select {
case e := <-fake.Received:
Expand Down Expand Up @@ -747,14 +745,21 @@ func TestSourceBatchDelete(t *testing.T) {
next := entry.New()
next.Timestamp = time.Now()
next.Body = "next"
start.AddAttribute("file.path", "file1")
next.AddAttribute("file.path", "file1")

expect := entry.New()
expect.ObservedTimestamp = start.ObservedTimestamp
expect.Timestamp = start.Timestamp
expect.AddAttribute("file.path", "file1")
expect.Body = "start\nnext"

ctx := context.Background()

require.NoError(t, recombine.Process(ctx, start))
require.NoError(t, recombine.Process(ctx, next))
require.Equal(t, 1, len(recombine.batchMap))
require.NoError(t, recombine.flushSource("file1", true))
require.NoError(t, recombine.flushSource(ctx, "file1", true))
require.Equal(t, 0, len(recombine.batchMap))
fake.ExpectEntry(t, expect)
require.NoError(t, recombine.Stop())
}
21 changes: 19 additions & 2 deletions pkg/stanza/testutil/mocks.go
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,7 @@ func (f *FakeOutput) ExpectBody(t testing.TB, body any) {
case e := <-f.Received:
require.Equal(t, body, e.Body)
case <-time.After(time.Second):
require.FailNow(t, "Timed out waiting for entry")
require.FailNowf(t, "Timed out waiting for entry", "%s", body)
}
}

Expand All @@ -96,10 +96,27 @@ func (f *FakeOutput) ExpectEntry(t testing.TB, expected *entry.Entry) {
case e := <-f.Received:
require.Equal(t, expected, e)
case <-time.After(time.Second):
require.FailNow(t, "Timed out waiting for entry")
require.FailNowf(t, "Timed out waiting for entry", "%v", expected)
}
}

// ExpectEntries expects that the given entries will be received in any order
func (f *FakeOutput) ExpectEntries(t testing.TB, expected []*entry.Entry) {
entries := make([]*entry.Entry, 0, len(expected))
for i := 0; i < len(expected); i++ {
select {
case e := <-f.Received:
entries = append(entries, e)
case <-time.After(time.Second):
require.Fail(t, "Timed out waiting for entry")
}
if t.Failed() {
break
}
}
require.ElementsMatch(t, expected, entries)
}

// ExpectNoEntry expects that no entry will be received within the specified time
func (f *FakeOutput) ExpectNoEntry(t testing.TB, timeout time.Duration) {
select {
Expand Down

0 comments on commit 7ad2dc2

Please sign in to comment.