Skip to content

Commit

Permalink
[pkg/stanza/operator/transformer/recombine] Add max_unmatched_batch_s…
Browse files Browse the repository at this point in the history
…ize to recombine operator (#32168)

**Description:** Add a new max_unmatched_batch_size config parameter to
configure the maximum number of consecutive entries that will be
combined into a single entry before the match occurs

**Link to tracking Issue:**
#31653

**Testing:** unit tests, manual tests

**Documentation:** Add description of the new config option

Changes from
#32144
with improvements in tests
  • Loading branch information
kasia-kujawa authored Apr 11, 2024
1 parent 8c7ccd6 commit efefc6f
Show file tree
Hide file tree
Showing 7 changed files with 267 additions and 45 deletions.
27 changes: 27 additions & 0 deletions .chloggen/recombine_add_max_unmatched_batch_size.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
# Use this changelog template to create an entry for release notes.

# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix'
change_type: enhancement

# The name of the component, or a single word describing the area of concern, (e.g. filelogreceiver)
component: pkg/stanza/operator/transformer/recombine

# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`).
note: add a new "max_unmatched_batch_size" config parameter to configure the maximum number of consecutive entries that will be combined into a single entry before the match occurs

# Mandatory: One or more tracking issues related to the change. You can use the PR number here if no issue exists.
issues: [31653]

# (Optional) One or more lines of additional information to render under the primary note.
# These lines will be padded with 2 spaces and then inserted directly into the document.
# Use pipe (|) for multiline entries.
subtext:

# If your change doesn't affect end users or the exported elements of any package,
# you should instead start your pull request title with [chore] or use the "Skip Changelog" label.
# Optional: The change log or logs in which this entry should be included.
# e.g. '[user]' or '[user, api]'
# Include 'user' if the change is relevant to end users.
# Include 'api' if there is a change to a library API.
# Default: '[user]'
change_logs: []
31 changes: 16 additions & 15 deletions pkg/stanza/docs/operators/recombine.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,21 +4,22 @@ The `recombine` operator combines consecutive logs into single logs based on sim

### Configuration Fields

| Field | Default | Description |
| --- | --- | --- |
| `id` | `recombine` | A unique identifier for the operator. |
| `output` | Next in pipeline | The connected operator(s) that will receive all outbound entries. |
| `on_error` | `send` | The behavior of the operator if it encounters an error. See [on_error](../types/on_error.md). |
| `is_first_entry` | | An [expression](../types/expression.md) that returns true if the entry being processed is the first entry in a multiline series. |
| `is_last_entry` | | An [expression](../types/expression.md) that returns true if the entry being processed is the last entry in a multiline series. |
| `combine_field` | required | The [field](../types/field.md) from all the entries that will recombined. |
| `combine_with` | `"\n"` | The string that is put between the combined entries. This can be an empty string as well. When using special characters like `\n`, be sure to enclose the value in double quotes: `"\n"`. |
| `max_batch_size` | 1000 | The maximum number of consecutive entries that will be combined into a single entry. |
| `overwrite_with` | `newest` | Whether to use the fields from the `oldest` or the `newest` entry for all the fields that are not combined. |
| `force_flush_period` | `5s` | Flush timeout after which entries will be flushed aborting the wait for their sub parts to be merged with. |
| `source_identifier` | `$attributes["file.path"]` | The [field](../types/field.md) to separate one source of logs from others when combining them. |
| `max_sources` | 1000 | The maximum number of unique sources allowed concurrently to be tracked for combining separately. |
| `max_log_size` | 0 | The maximum bytes size of the combined field. Once the size exceeds the limit, all received entries of the source will be combined and flushed. "0" of max_log_size means no limit. |
| Field | Default | Description |
| --- | --- | --- |
| `id` | `recombine` | A unique identifier for the operator. |
| `output` | Next in pipeline | The connected operator(s) that will receive all outbound entries. |
| `on_error` | `send` | The behavior of the operator if it encounters an error. See [on_error](../types/on_error.md). |
| `is_first_entry` | | An [expression](../types/expression.md) that returns true if the entry being processed is the first entry in a multiline series. |
| `is_last_entry` | | An [expression](../types/expression.md) that returns true if the entry being processed is the last entry in a multiline series. |
| `combine_field` | required | The [field](../types/field.md) from all the entries that will recombined. |
| `combine_with` | `"\n"` | The string that is put between the combined entries. This can be an empty string as well. When using special characters like `\n`, be sure to enclose the value in double quotes: `"\n"`. |
| `max_batch_size` | 1000 | The maximum number of consecutive entries that will be combined into a single entry. |
| `max_unmatched_batch_size` | 100 | The maximum number of consecutive entries that will be combined into a single entry before the match occurs (with `is_first_entry` or `is_last_entry`), e.g. `max_unmatched_batch_size=0` - all entries combined, `max_unmatched_batch_size=1` - all entries uncombined until the first match occurs, `max_unmatched_batch_size=100` - entries combined into 100-entry-packages until the first match occurs |
| `overwrite_with` | `newest` | Whether to use the fields from the `oldest` or the `newest` entry for all the fields that are not combined. |
| `force_flush_period` | `5s` | Flush timeout after which entries will be flushed aborting the wait for their sub parts to be merged with. |
| `source_identifier` | `$attributes["file.path"]` | The [field](../types/field.md) to separate one source of logs from others when combining them. |
| `max_sources` | 1000 | The maximum number of unique sources allowed concurrently to be tracked for combining separately. |
| `max_log_size` | 0 | The maximum bytes size of the combined field. Once the size exceeds the limit, all received entries of the source will be combined and flushed. "0" of max_log_size means no limit. |

Exactly one of `is_first_entry` and `is_last_entry` must be specified.

Expand Down
31 changes: 17 additions & 14 deletions pkg/stanza/operator/transformer/recombine/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,13 +34,14 @@ func NewConfig() *Config {
// NewConfigWithID creates a new recombine config with default values
func NewConfigWithID(operatorID string) *Config {
return &Config{
TransformerConfig: helper.NewTransformerConfig(operatorID, operatorType),
MaxBatchSize: 1000,
MaxSources: 1000,
CombineWith: defaultCombineWith,
OverwriteWith: "oldest",
ForceFlushTimeout: 5 * time.Second,
SourceIdentifier: entry.NewAttributeField("file.path"),
TransformerConfig: helper.NewTransformerConfig(operatorID, operatorType),
MaxBatchSize: 1000,
MaxUnmatchedBatchSize: 100,
MaxSources: 1000,
CombineWith: defaultCombineWith,
OverwriteWith: "oldest",
ForceFlushTimeout: 5 * time.Second,
SourceIdentifier: entry.NewAttributeField("file.path"),
}
}

Expand All @@ -50,6 +51,7 @@ type Config struct {
IsFirstEntry string `mapstructure:"is_first_entry"`
IsLastEntry string `mapstructure:"is_last_entry"`
MaxBatchSize int `mapstructure:"max_batch_size"`
MaxUnmatchedBatchSize int `mapstructure:"max_unmatched_batch_size"`
CombineField entry.Field `mapstructure:"combine_field"`
CombineWith string `mapstructure:"combine_with"`
SourceIdentifier entry.Field `mapstructure:"source_identifier"`
Expand Down Expand Up @@ -105,13 +107,14 @@ func (c *Config) Build(logger *zap.SugaredLogger) (operator.Operator, error) {
}

return &Transformer{
TransformerOperator: transformer,
matchFirstLine: matchesFirst,
prog: prog,
maxBatchSize: c.MaxBatchSize,
maxSources: c.MaxSources,
overwriteWithNewest: overwriteWithNewest,
batchMap: make(map[string]*sourceBatch),
TransformerOperator: transformer,
matchFirstLine: matchesFirst,
prog: prog,
maxBatchSize: c.MaxBatchSize,
maxUnmatchedBatchSize: c.MaxUnmatchedBatchSize,
maxSources: c.MaxSources,
overwriteWithNewest: overwriteWithNewest,
batchMap: make(map[string]*sourceBatch),
batchPool: sync.Pool{
New: func() any {
return &sourceBatch{
Expand Down
9 changes: 9 additions & 0 deletions pkg/stanza/operator/transformer/recombine/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,15 @@ func TestUnmarshal(t *testing.T) {
return cfg
}(),
},
{
Name: "custom_max_unmatched_batch_size",
ExpectErr: false,
Expect: func() *Config {
cfg := NewConfig()
cfg.MaxUnmatchedBatchSize = 50
return cfg
}(),
},
},
}.Run(t)
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,5 +19,8 @@ custom_id:
custom_max_log_size:
type: recombine
max_log_size: 256kb
custom_max_unmatched_batch_size:
type: recombine
max_unmatched_batch_size: 50
default:
type: recombine
42 changes: 26 additions & 16 deletions pkg/stanza/operator/transformer/recombine/transformer.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,17 +22,18 @@ const DefaultSourceIdentifier = "DefaultSourceIdentifier"
// Transformer is an operator that combines a field from consecutive log entries into a single
type Transformer struct {
helper.TransformerOperator
matchFirstLine bool
prog *vm.Program
maxBatchSize int
maxSources int
overwriteWithNewest bool
combineField entry.Field
combineWith string
ticker *time.Ticker
forceFlushTimeout time.Duration
chClose chan struct{}
sourceIdentifier entry.Field
matchFirstLine bool
prog *vm.Program
maxBatchSize int
maxUnmatchedBatchSize int
maxSources int
overwriteWithNewest bool
combineField entry.Field
combineWith string
ticker *time.Ticker
forceFlushTimeout time.Duration
chClose chan struct{}
sourceIdentifier entry.Field

sync.Mutex
batchPool sync.Pool
Expand All @@ -46,6 +47,7 @@ type sourceBatch struct {
numEntries int
recombined *bytes.Buffer
firstEntryObservedTime time.Time
matchDetected bool
}

func (t *Transformer) Start(_ operator.Persister) error {
Expand Down Expand Up @@ -129,22 +131,22 @@ func (t *Transformer) Process(ctx context.Context, e *entry.Entry) error {
}

// Add the current log to the new batch
t.addToBatch(ctx, e, s)
t.addToBatch(ctx, e, s, matches)
return nil
// This is the last entry in a complete batch
case matches && !t.matchFirstLine:
t.addToBatch(ctx, e, s)
t.addToBatch(ctx, e, s, matches)
return t.flushSource(ctx, s)
}

// This is neither the first entry of a new log,
// nor the last entry of a log, so just add it to the batch
t.addToBatch(ctx, e, s)
t.addToBatch(ctx, e, s, matches)
return nil
}

// addToBatch adds the current entry to the current batch of entries that will be combined
func (t *Transformer) addToBatch(ctx context.Context, e *entry.Entry, source string) {
func (t *Transformer) addToBatch(ctx context.Context, e *entry.Entry, source string, matches bool) {
batch, ok := t.batchMap[source]
if !ok {
if len(t.batchMap) >= t.maxSources {
Expand All @@ -159,6 +161,11 @@ func (t *Transformer) addToBatch(ctx context.Context, e *entry.Entry, source str
}
}

// mark that match occurred to use max_unmatched_batch_size only when match didn't occur
if matches && !batch.matchDetected {
batch.matchDetected = true
}

// Combine the combineField of each entry in the batch,
// separated by newlines
var s string
Expand All @@ -172,7 +179,9 @@ func (t *Transformer) addToBatch(ctx context.Context, e *entry.Entry, source str
}
batch.recombined.WriteString(s)

if (t.maxLogSize > 0 && int64(batch.recombined.Len()) > t.maxLogSize) || batch.numEntries >= t.maxBatchSize {
if (t.maxLogSize > 0 && int64(batch.recombined.Len()) > t.maxLogSize) ||
batch.numEntries >= t.maxBatchSize ||
(!batch.matchDetected && t.maxUnmatchedBatchSize > 0 && batch.numEntries >= t.maxUnmatchedBatchSize) {
if err := t.flushSource(ctx, source); err != nil {
t.Errorf("there was error flushing combined logs %s", err)
}
Expand Down Expand Up @@ -224,6 +233,7 @@ func (t *Transformer) addNewBatch(source string, e *entry.Entry) *sourceBatch {
batch.numEntries = 1
batch.recombined.Reset()
batch.firstEntryObservedTime = e.ObservedTimestamp
batch.matchDetected = false
t.batchMap[source] = batch
return batch
}
Expand Down
Loading

0 comments on commit efefc6f

Please sign in to comment.