diff --git a/.chloggen/recombine_add_max_unmatched_batch_size.yaml b/.chloggen/recombine_add_max_unmatched_batch_size.yaml new file mode 100644 index 000000000000..ec6565746bd4 --- /dev/null +++ b/.chloggen/recombine_add_max_unmatched_batch_size.yaml @@ -0,0 +1,27 @@ +# Use this changelog template to create an entry for release notes. + +# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix' +change_type: enhancement + +# The name of the component, or a single word describing the area of concern, (e.g. filelogreceiver) +component: pkg/stanza/operator/transformer/recombine + +# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`). +note: add a new "max_unmatched_batch_size" config parameter to configure the maximum number of consecutive entries that will be combined into a single entry before the match occurs + +# Mandatory: One or more tracking issues related to the change. You can use the PR number here if no issue exists. +issues: [31653] + +# (Optional) One or more lines of additional information to render under the primary note. +# These lines will be padded with 2 spaces and then inserted directly into the document. +# Use pipe (|) for multiline entries. +subtext: + +# If your change doesn't affect end users or the exported elements of any package, +# you should instead start your pull request title with [chore] or use the "Skip Changelog" label. +# Optional: The change log or logs in which this entry should be included. +# e.g. '[user]' or '[user, api]' +# Include 'user' if the change is relevant to end users. +# Include 'api' if there is a change to a library API. +# Default: '[user]' +change_logs: [] diff --git a/pkg/stanza/docs/operators/recombine.md b/pkg/stanza/docs/operators/recombine.md index c0632dcc67fe..ffb368bc11bf 100644 --- a/pkg/stanza/docs/operators/recombine.md +++ b/pkg/stanza/docs/operators/recombine.md @@ -4,21 +4,22 @@ The `recombine` operator combines consecutive logs into single logs based on sim ### Configuration Fields -| Field | Default | Description | -| --- | --- | --- | -| `id` | `recombine` | A unique identifier for the operator. | -| `output` | Next in pipeline | The connected operator(s) that will receive all outbound entries. | -| `on_error` | `send` | The behavior of the operator if it encounters an error. See [on_error](../types/on_error.md). | -| `is_first_entry` | | An [expression](../types/expression.md) that returns true if the entry being processed is the first entry in a multiline series. | -| `is_last_entry` | | An [expression](../types/expression.md) that returns true if the entry being processed is the last entry in a multiline series. | -| `combine_field` | required | The [field](../types/field.md) from all the entries that will recombined. | -| `combine_with` | `"\n"` | The string that is put between the combined entries. This can be an empty string as well. When using special characters like `\n`, be sure to enclose the value in double quotes: `"\n"`. | -| `max_batch_size` | 1000 | The maximum number of consecutive entries that will be combined into a single entry. | -| `overwrite_with` | `newest` | Whether to use the fields from the `oldest` or the `newest` entry for all the fields that are not combined. | -| `force_flush_period` | `5s` | Flush timeout after which entries will be flushed aborting the wait for their sub parts to be merged with. | -| `source_identifier` | `$attributes["file.path"]` | The [field](../types/field.md) to separate one source of logs from others when combining them. | -| `max_sources` | 1000 | The maximum number of unique sources allowed concurrently to be tracked for combining separately. | -| `max_log_size` | 0 | The maximum bytes size of the combined field. Once the size exceeds the limit, all received entries of the source will be combined and flushed. "0" of max_log_size means no limit. | +| Field | Default | Description | +| --- | --- | --- | +| `id` | `recombine` | A unique identifier for the operator. | +| `output` | Next in pipeline | The connected operator(s) that will receive all outbound entries. | +| `on_error` | `send` | The behavior of the operator if it encounters an error. See [on_error](../types/on_error.md). | +| `is_first_entry` | | An [expression](../types/expression.md) that returns true if the entry being processed is the first entry in a multiline series. | +| `is_last_entry` | | An [expression](../types/expression.md) that returns true if the entry being processed is the last entry in a multiline series. | +| `combine_field` | required | The [field](../types/field.md) from all the entries that will recombined. | +| `combine_with` | `"\n"` | The string that is put between the combined entries. This can be an empty string as well. When using special characters like `\n`, be sure to enclose the value in double quotes: `"\n"`. | +| `max_batch_size` | 1000 | The maximum number of consecutive entries that will be combined into a single entry. | +| `max_unmatched_batch_size` | 100 | The maximum number of consecutive entries that will be combined into a single entry before the match occurs (with `is_first_entry` or `is_last_entry`), e.g. `max_unmatched_batch_size=0` - all entries combined, `max_unmatched_batch_size=1` - all entries uncombined until the first match occurs, `max_unmatched_batch_size=100` - entries combined into 100-entry-packages until the first match occurs | +| `overwrite_with` | `newest` | Whether to use the fields from the `oldest` or the `newest` entry for all the fields that are not combined. | +| `force_flush_period` | `5s` | Flush timeout after which entries will be flushed aborting the wait for their sub parts to be merged with. | +| `source_identifier` | `$attributes["file.path"]` | The [field](../types/field.md) to separate one source of logs from others when combining them. | +| `max_sources` | 1000 | The maximum number of unique sources allowed concurrently to be tracked for combining separately. | +| `max_log_size` | 0 | The maximum bytes size of the combined field. Once the size exceeds the limit, all received entries of the source will be combined and flushed. "0" of max_log_size means no limit. | Exactly one of `is_first_entry` and `is_last_entry` must be specified. diff --git a/pkg/stanza/operator/transformer/recombine/config.go b/pkg/stanza/operator/transformer/recombine/config.go index e994668815f5..cacfad768168 100644 --- a/pkg/stanza/operator/transformer/recombine/config.go +++ b/pkg/stanza/operator/transformer/recombine/config.go @@ -34,13 +34,14 @@ func NewConfig() *Config { // NewConfigWithID creates a new recombine config with default values func NewConfigWithID(operatorID string) *Config { return &Config{ - TransformerConfig: helper.NewTransformerConfig(operatorID, operatorType), - MaxBatchSize: 1000, - MaxSources: 1000, - CombineWith: defaultCombineWith, - OverwriteWith: "oldest", - ForceFlushTimeout: 5 * time.Second, - SourceIdentifier: entry.NewAttributeField("file.path"), + TransformerConfig: helper.NewTransformerConfig(operatorID, operatorType), + MaxBatchSize: 1000, + MaxUnmatchedBatchSize: 100, + MaxSources: 1000, + CombineWith: defaultCombineWith, + OverwriteWith: "oldest", + ForceFlushTimeout: 5 * time.Second, + SourceIdentifier: entry.NewAttributeField("file.path"), } } @@ -50,6 +51,7 @@ type Config struct { IsFirstEntry string `mapstructure:"is_first_entry"` IsLastEntry string `mapstructure:"is_last_entry"` MaxBatchSize int `mapstructure:"max_batch_size"` + MaxUnmatchedBatchSize int `mapstructure:"max_unmatched_batch_size"` CombineField entry.Field `mapstructure:"combine_field"` CombineWith string `mapstructure:"combine_with"` SourceIdentifier entry.Field `mapstructure:"source_identifier"` @@ -105,13 +107,14 @@ func (c *Config) Build(logger *zap.SugaredLogger) (operator.Operator, error) { } return &Transformer{ - TransformerOperator: transformer, - matchFirstLine: matchesFirst, - prog: prog, - maxBatchSize: c.MaxBatchSize, - maxSources: c.MaxSources, - overwriteWithNewest: overwriteWithNewest, - batchMap: make(map[string]*sourceBatch), + TransformerOperator: transformer, + matchFirstLine: matchesFirst, + prog: prog, + maxBatchSize: c.MaxBatchSize, + maxUnmatchedBatchSize: c.MaxUnmatchedBatchSize, + maxSources: c.MaxSources, + overwriteWithNewest: overwriteWithNewest, + batchMap: make(map[string]*sourceBatch), batchPool: sync.Pool{ New: func() any { return &sourceBatch{ diff --git a/pkg/stanza/operator/transformer/recombine/config_test.go b/pkg/stanza/operator/transformer/recombine/config_test.go index 3f81113676a6..0594b44d8321 100644 --- a/pkg/stanza/operator/transformer/recombine/config_test.go +++ b/pkg/stanza/operator/transformer/recombine/config_test.go @@ -84,6 +84,15 @@ func TestUnmarshal(t *testing.T) { return cfg }(), }, + { + Name: "custom_max_unmatched_batch_size", + ExpectErr: false, + Expect: func() *Config { + cfg := NewConfig() + cfg.MaxUnmatchedBatchSize = 50 + return cfg + }(), + }, }, }.Run(t) } diff --git a/pkg/stanza/operator/transformer/recombine/testdata/config.yaml b/pkg/stanza/operator/transformer/recombine/testdata/config.yaml index 11e337ad987f..9a030d6c707c 100644 --- a/pkg/stanza/operator/transformer/recombine/testdata/config.yaml +++ b/pkg/stanza/operator/transformer/recombine/testdata/config.yaml @@ -19,5 +19,8 @@ custom_id: custom_max_log_size: type: recombine max_log_size: 256kb +custom_max_unmatched_batch_size: + type: recombine + max_unmatched_batch_size: 50 default: type: recombine diff --git a/pkg/stanza/operator/transformer/recombine/transformer.go b/pkg/stanza/operator/transformer/recombine/transformer.go index 2cc0e8f880c7..bb14328bc2cf 100644 --- a/pkg/stanza/operator/transformer/recombine/transformer.go +++ b/pkg/stanza/operator/transformer/recombine/transformer.go @@ -22,17 +22,18 @@ const DefaultSourceIdentifier = "DefaultSourceIdentifier" // Transformer is an operator that combines a field from consecutive log entries into a single type Transformer struct { helper.TransformerOperator - matchFirstLine bool - prog *vm.Program - maxBatchSize int - maxSources int - overwriteWithNewest bool - combineField entry.Field - combineWith string - ticker *time.Ticker - forceFlushTimeout time.Duration - chClose chan struct{} - sourceIdentifier entry.Field + matchFirstLine bool + prog *vm.Program + maxBatchSize int + maxUnmatchedBatchSize int + maxSources int + overwriteWithNewest bool + combineField entry.Field + combineWith string + ticker *time.Ticker + forceFlushTimeout time.Duration + chClose chan struct{} + sourceIdentifier entry.Field sync.Mutex batchPool sync.Pool @@ -46,6 +47,7 @@ type sourceBatch struct { numEntries int recombined *bytes.Buffer firstEntryObservedTime time.Time + matchDetected bool } func (t *Transformer) Start(_ operator.Persister) error { @@ -129,22 +131,22 @@ func (t *Transformer) Process(ctx context.Context, e *entry.Entry) error { } // Add the current log to the new batch - t.addToBatch(ctx, e, s) + t.addToBatch(ctx, e, s, matches) return nil // This is the last entry in a complete batch case matches && !t.matchFirstLine: - t.addToBatch(ctx, e, s) + t.addToBatch(ctx, e, s, matches) return t.flushSource(ctx, s) } // This is neither the first entry of a new log, // nor the last entry of a log, so just add it to the batch - t.addToBatch(ctx, e, s) + t.addToBatch(ctx, e, s, matches) return nil } // addToBatch adds the current entry to the current batch of entries that will be combined -func (t *Transformer) addToBatch(ctx context.Context, e *entry.Entry, source string) { +func (t *Transformer) addToBatch(ctx context.Context, e *entry.Entry, source string, matches bool) { batch, ok := t.batchMap[source] if !ok { if len(t.batchMap) >= t.maxSources { @@ -159,6 +161,11 @@ func (t *Transformer) addToBatch(ctx context.Context, e *entry.Entry, source str } } + // mark that match occurred to use max_unmatched_batch_size only when match didn't occur + if matches && !batch.matchDetected { + batch.matchDetected = true + } + // Combine the combineField of each entry in the batch, // separated by newlines var s string @@ -172,7 +179,9 @@ func (t *Transformer) addToBatch(ctx context.Context, e *entry.Entry, source str } batch.recombined.WriteString(s) - if (t.maxLogSize > 0 && int64(batch.recombined.Len()) > t.maxLogSize) || batch.numEntries >= t.maxBatchSize { + if (t.maxLogSize > 0 && int64(batch.recombined.Len()) > t.maxLogSize) || + batch.numEntries >= t.maxBatchSize || + (!batch.matchDetected && t.maxUnmatchedBatchSize > 0 && batch.numEntries >= t.maxUnmatchedBatchSize) { if err := t.flushSource(ctx, source); err != nil { t.Errorf("there was error flushing combined logs %s", err) } @@ -224,6 +233,7 @@ func (t *Transformer) addNewBatch(source string, e *entry.Entry) *sourceBatch { batch.numEntries = 1 batch.recombined.Reset() batch.firstEntryObservedTime = e.ObservedTimestamp + batch.matchDetected = false t.batchMap[source] = batch return batch } diff --git a/pkg/stanza/operator/transformer/recombine/transformer_test.go b/pkg/stanza/operator/transformer/recombine/transformer_test.go index 5928f9298f88..b82e46c33ab1 100644 --- a/pkg/stanza/operator/transformer/recombine/transformer_test.go +++ b/pkg/stanza/operator/transformer/recombine/transformer_test.go @@ -496,6 +496,175 @@ func TestTransformer(t *testing.T) { entryWithBodyAttr(t1, "content5\ncontent6\ncontent7\ncontent8\ncontent9", map[string]string{"file.path": "file1"}), }, }, + { + "EntriesNonMatchingForFirstEntryWithMaxUnmatchedBatchSize=0", + func() *Config { + cfg := NewConfig() + cfg.CombineField = entry.NewBodyField() + cfg.IsFirstEntry = "body == 'test1'" + cfg.OutputIDs = []string{"fake"} + cfg.MaxUnmatchedBatchSize = 0 + cfg.ForceFlushTimeout = 10 * time.Millisecond + return cfg + }(), + []*entry.Entry{ + entryWithBody(t1, "test2"), + entryWithBody(t1, "test3"), + entryWithBody(t1, "test4"), + }, + []*entry.Entry{ + entryWithBody(t1, "test2\ntest3\ntest4"), + }, + }, + { + "EntriesNonMatchingForFirstEntryWithMaxUnmatchedBatchSize=1", + func() *Config { + cfg := NewConfig() + cfg.CombineField = entry.NewBodyField() + cfg.IsFirstEntry = "body == 'test1'" + cfg.OutputIDs = []string{"fake"} + cfg.MaxUnmatchedBatchSize = 1 + cfg.ForceFlushTimeout = 10 * time.Millisecond + return cfg + }(), + []*entry.Entry{ + entryWithBody(t1, "test2"), + entryWithBody(t1, "test3"), + entryWithBody(t1, "test4"), + }, + []*entry.Entry{ + entryWithBody(t1, "test2"), + entryWithBody(t1, "test3"), + entryWithBody(t1, "test4"), + }, + }, + { + "TestMaxUnmatchedBatchSizeForFirstEntry", + func() *Config { + cfg := NewConfig() + cfg.CombineField = entry.NewBodyField() + cfg.IsFirstEntry = "body == 'test1'" + cfg.OutputIDs = []string{"fake"} + cfg.MaxUnmatchedBatchSize = 2 + cfg.ForceFlushTimeout = 10 * time.Millisecond + return cfg + }(), + []*entry.Entry{ + entryWithBody(t1, "test2"), + entryWithBody(t1, "test3"), + entryWithBody(t1, "test4"), + entryWithBody(t1, "test5"), + entryWithBody(t1, "test6"), + entryWithBody(t1, "test1"), + entryWithBody(t1, "test7"), + entryWithBody(t1, "test8"), + entryWithBody(t1, "test1"), + entryWithBody(t1, "test9"), + entryWithBody(t1, "test10"), + }, + []*entry.Entry{ + entryWithBody(t1, "test2\ntest3"), + entryWithBody(t1, "test4\ntest5"), + entryWithBody(t1, "test6"), + entryWithBody(t1, "test1\ntest7\ntest8"), + entryWithBody(t1, "test1\ntest9\ntest10"), + }, + }, + { + "EntriesNonMatchingForLastEntryWithMaxUnmatchedBatchSize=0", + func() *Config { + cfg := NewConfig() + cfg.CombineField = entry.NewBodyField() + cfg.IsLastEntry = "body == 'test1'" + cfg.OutputIDs = []string{"fake"} + cfg.MaxUnmatchedBatchSize = 0 + cfg.ForceFlushTimeout = 10 * time.Millisecond + return cfg + }(), + []*entry.Entry{ + entryWithBody(t1, "test2"), + entryWithBody(t1, "test3"), + entryWithBody(t1, "test4"), + }, + []*entry.Entry{ + entryWithBody(t1, "test2\ntest3\ntest4"), + }, + }, + { + "EntriesNonMatchingForLastEntryWithMaxUnmatchedBatchSize=1", + func() *Config { + cfg := NewConfig() + cfg.CombineField = entry.NewBodyField() + cfg.IsLastEntry = "body == 'test1'" + cfg.OutputIDs = []string{"fake"} + cfg.MaxUnmatchedBatchSize = 1 + return cfg + }(), + []*entry.Entry{ + entryWithBody(t1, "test2"), + entryWithBody(t1, "test3"), + entryWithBody(t1, "test4"), + }, + []*entry.Entry{ + entryWithBody(t1, "test2"), + entryWithBody(t1, "test3"), + entryWithBody(t1, "test4"), + }, + }, + { + "EntriesMatchingForLastEntryMaxUnmatchedBatchSize=2", + func() *Config { + cfg := NewConfig() + cfg.CombineField = entry.NewBodyField() + cfg.IsLastEntry = "body == 'test1'" + cfg.OutputIDs = []string{"fake"} + cfg.MaxUnmatchedBatchSize = 2 + return cfg + }(), + []*entry.Entry{ + entryWithBody(t1, "test2"), + entryWithBody(t1, "test3"), + entryWithBody(t1, "test4"), + entryWithBody(t1, "test5"), + entryWithBody(t1, "test1"), + entryWithBody(t1, "test6"), + entryWithBody(t1, "test7"), + entryWithBody(t1, "test1"), + }, + []*entry.Entry{ + entryWithBody(t1, "test2\ntest3"), + entryWithBody(t1, "test4\ntest5"), + entryWithBody(t1, "test1"), + entryWithBody(t1, "test6\ntest7"), + entryWithBody(t1, "test1"), + }, + }, + { + "EntriesMatchingForLastEntryMaxUnmatchedBatchSize=3", + func() *Config { + cfg := NewConfig() + cfg.CombineField = entry.NewBodyField() + cfg.IsLastEntry = "body == 'test1'" + cfg.OutputIDs = []string{"fake"} + cfg.MaxUnmatchedBatchSize = 3 + return cfg + }(), + []*entry.Entry{ + entryWithBody(t1, "test2"), + entryWithBody(t1, "test3"), + entryWithBody(t1, "test4"), + entryWithBody(t1, "test5"), + entryWithBody(t1, "test1"), + entryWithBody(t1, "test6"), + entryWithBody(t1, "test7"), + entryWithBody(t1, "test1"), + }, + []*entry.Entry{ + entryWithBody(t1, "test2\ntest3\ntest4"), + entryWithBody(t1, "test5\ntest1"), + entryWithBody(t1, "test6\ntest7\ntest1"), + }, + }, } for _, tc := range cases {