diff --git a/pkg/stanza/operator/helper/flusher.go b/pkg/stanza/operator/helper/flusher.go new file mode 100644 index 000000000000..cdcf4d3a2e76 --- /dev/null +++ b/pkg/stanza/operator/helper/flusher.go @@ -0,0 +1,111 @@ +// Copyright The OpenTelemetry Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package helper // import "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/operator/helper" + +import ( + "bufio" + "time" +) + +// FlusherConfig is a configuration of Flusher helper +type FlusherConfig struct { + Period time.Duration `mapstructure:"force_flush_period"` +} + +// NewFlusherConfig creates a default Flusher config +func NewFlusherConfig() FlusherConfig { + return FlusherConfig{ + // Empty or `0s` means that we will never force flush + Period: time.Millisecond * 500, + } +} + +// Build creates Flusher from configuration +func (c *FlusherConfig) Build() *Flusher { + return &Flusher{ + lastDataChange: time.Now(), + forcePeriod: c.Period, + previousDataLength: 0, + } +} + +// Flusher keeps information about flush state +type Flusher struct { + // forcePeriod defines time from last flush which should pass before setting force to true. + // Never forces if forcePeriod is set to 0 + forcePeriod time.Duration + + // lastDataChange tracks date of last data change (including new data and flushes) + lastDataChange time.Time + + // previousDataLength: + // if previousDataLength = 0 - no new data have been received after flush + // if previousDataLength > 0 - there is data which has not been flushed yet and it doesn't changed since lastDataChange + previousDataLength int +} + +func (f *Flusher) UpdateDataChangeTime(length int) { + // Skip if length is greater than 0 and didn't changed + if length > 0 && length == f.previousDataLength { + return + } + + // update internal properties with new values if data length changed + // because it means that data is flowing and being processed + f.previousDataLength = length + f.lastDataChange = time.Now() +} + +// Flushed reset data length +func (f *Flusher) Flushed() { + f.UpdateDataChangeTime(0) +} + +// ShouldFlush returns true if data should be forcefully flushed +func (f *Flusher) ShouldFlush() bool { + // Returns true if there is f.forcePeriod after f.lastDataChange and data length is greater than 0 + return f.forcePeriod > 0 && time.Since(f.lastDataChange) > f.forcePeriod && f.previousDataLength > 0 +} + +func (f *Flusher) SplitFunc(splitFunc bufio.SplitFunc) bufio.SplitFunc { + return func(data []byte, atEOF bool) (advance int, token []byte, err error) { + advance, token, err = splitFunc(data, atEOF) + + // Return as it is in case of error + if err != nil { + return + } + + // Return token + if token != nil { + // Inform flusher that we just flushed + f.Flushed() + return + } + + // If there is no token, force flush eventually + if f.ShouldFlush() { + // Inform flusher that we just flushed + f.Flushed() + token = trimWhitespaces(data) + advance = len(data) + return + } + + // Inform flusher that we didn't flushed + f.UpdateDataChangeTime(len(data)) + return + } +} diff --git a/pkg/stanza/operator/helper/multiline.go b/pkg/stanza/operator/helper/multiline.go index 9a4ff58b5095..4da253708a71 100644 --- a/pkg/stanza/operator/helper/multiline.go +++ b/pkg/stanza/operator/helper/multiline.go @@ -19,102 +19,10 @@ import ( "bytes" "fmt" "regexp" - "time" "golang.org/x/text/encoding" ) -// FlusherConfig is a configuration of Flusher helper -type FlusherConfig struct { - Period time.Duration `mapstructure:"force_flush_period"` -} - -// NewFlusherConfig creates a default Flusher config -func NewFlusherConfig() FlusherConfig { - return FlusherConfig{ - // Empty or `0s` means that we will never force flush - Period: time.Millisecond * 500, - } -} - -// Build creates Flusher from configuration -func (c *FlusherConfig) Build() *Flusher { - return &Flusher{ - lastDataChange: time.Now(), - forcePeriod: c.Period, - previousDataLength: 0, - } -} - -// Flusher keeps information about flush state -type Flusher struct { - // forcePeriod defines time from last flush which should pass before setting force to true. - // Never forces if forcePeriod is set to 0 - forcePeriod time.Duration - - // lastDataChange tracks date of last data change (including new data and flushes) - lastDataChange time.Time - - // previousDataLength: - // if previousDataLength = 0 - no new data have been received after flush - // if previousDataLength > 0 - there is data which has not been flushed yet and it doesn't changed since lastDataChange - previousDataLength int -} - -func (f *Flusher) UpdateDataChangeTime(length int) { - // Skip if length is greater than 0 and didn't changed - if length > 0 && length == f.previousDataLength { - return - } - - // update internal properties with new values if data length changed - // because it means that data is flowing and being processed - f.previousDataLength = length - f.lastDataChange = time.Now() -} - -// Flushed reset data length -func (f *Flusher) Flushed() { - f.UpdateDataChangeTime(0) -} - -// ShouldFlush returns true if data should be forcefully flushed -func (f *Flusher) ShouldFlush() bool { - // Returns true if there is f.forcePeriod after f.lastDataChange and data length is greater than 0 - return f.forcePeriod > 0 && time.Since(f.lastDataChange) > f.forcePeriod && f.previousDataLength > 0 -} - -func (f *Flusher) SplitFunc(splitFunc bufio.SplitFunc) bufio.SplitFunc { - return func(data []byte, atEOF bool) (advance int, token []byte, err error) { - advance, token, err = splitFunc(data, atEOF) - - // Return as it is in case of error - if err != nil { - return - } - - // Return token - if token != nil { - // Inform flusher that we just flushed - f.Flushed() - return - } - - // If there is no token, force flush eventually - if f.ShouldFlush() { - // Inform flusher that we just flushed - f.Flushed() - token = trimWhitespaces(data) - advance = len(data) - return - } - - // Inform flusher that we didn't flushed - f.UpdateDataChangeTime(len(data)) - return - } -} - // Multiline consists of splitFunc and variables needed to perform force flush type Multiline struct { SplitFunc bufio.SplitFunc @@ -240,24 +148,6 @@ func NewLineStartSplitFunc(re *regexp.Regexp, flushAtEOF bool) bufio.SplitFunc { } } -// SplitNone doesn't split any of the bytes, it reads in all of the bytes and returns it all at once. This is for when the encoding is nop -func SplitNone(maxLogSize int) bufio.SplitFunc { - return func(data []byte, atEOF bool) (advance int, token []byte, err error) { - if len(data) >= maxLogSize { - return maxLogSize, data[:maxLogSize], nil - } - - if !atEOF { - return 0, nil, nil - } - - if len(data) == 0 { - return 0, nil, nil - } - return len(data), data, nil - } -} - // NewLineEndSplitFunc creates a bufio.SplitFunc that splits an incoming stream into // tokens that end with a match to the regex pattern provided func NewLineEndSplitFunc(re *regexp.Regexp, flushAtEOF bool) bufio.SplitFunc { @@ -345,47 +235,3 @@ func trimWhitespaces(data []byte) []byte { } return token } - -// SplitterConfig consolidates MultilineConfig and FlusherConfig -type SplitterConfig struct { - EncodingConfig EncodingConfig `mapstructure:",squash,omitempty"` - Multiline MultilineConfig `mapstructure:"multiline,omitempty"` - Flusher FlusherConfig `mapstructure:",squash,omitempty"` -} - -// NewSplitterConfig returns default SplitterConfig -func NewSplitterConfig() SplitterConfig { - return SplitterConfig{ - EncodingConfig: NewEncodingConfig(), - Multiline: NewMultilineConfig(), - Flusher: NewFlusherConfig(), - } -} - -// Build builds Splitter struct -func (c *SplitterConfig) Build(flushAtEOF bool, maxLogSize int) (*Splitter, error) { - enc, err := c.EncodingConfig.Build() - if err != nil { - return nil, err - } - - flusher := c.Flusher.Build() - splitFunc, err := c.Multiline.Build(enc.Encoding, flushAtEOF, flusher, maxLogSize) - - if err != nil { - return nil, err - } - - return &Splitter{ - Encoding: enc, - Flusher: flusher, - SplitFunc: splitFunc, - }, nil -} - -// Splitter consolidates Flusher and dependent splitFunc -type Splitter struct { - Encoding Encoding - SplitFunc bufio.SplitFunc - Flusher *Flusher -} diff --git a/pkg/stanza/operator/helper/splitter.go b/pkg/stanza/operator/helper/splitter.go new file mode 100644 index 000000000000..4dc2b1fba4c5 --- /dev/null +++ b/pkg/stanza/operator/helper/splitter.go @@ -0,0 +1,79 @@ +// Copyright The OpenTelemetry Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package helper // import "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/operator/helper" + +import "bufio" + +// SplitterConfig consolidates MultilineConfig and FlusherConfig +type SplitterConfig struct { + EncodingConfig EncodingConfig `mapstructure:",squash,omitempty"` + Flusher FlusherConfig `mapstructure:",squash,omitempty"` + Multiline MultilineConfig `mapstructure:"multiline,omitempty"` +} + +// NewSplitterConfig returns default SplitterConfig +func NewSplitterConfig() SplitterConfig { + return SplitterConfig{ + EncodingConfig: NewEncodingConfig(), + Multiline: NewMultilineConfig(), + Flusher: NewFlusherConfig(), + } +} + +// Build builds Splitter struct +func (c *SplitterConfig) Build(flushAtEOF bool, maxLogSize int) (*Splitter, error) { + enc, err := c.EncodingConfig.Build() + if err != nil { + return nil, err + } + + flusher := c.Flusher.Build() + splitFunc, err := c.Multiline.Build(enc.Encoding, flushAtEOF, flusher, maxLogSize) + + if err != nil { + return nil, err + } + + return &Splitter{ + Encoding: enc, + Flusher: flusher, + SplitFunc: splitFunc, + }, nil +} + +// Splitter consolidates Flusher and dependent splitFunc +type Splitter struct { + Encoding Encoding + SplitFunc bufio.SplitFunc + Flusher *Flusher +} + +// SplitNone doesn't split any of the bytes, it reads in all of the bytes and returns it all at once. This is for when the encoding is nop +func SplitNone(maxLogSize int) bufio.SplitFunc { + return func(data []byte, atEOF bool) (advance int, token []byte, err error) { + if len(data) >= maxLogSize { + return maxLogSize, data[:maxLogSize], nil + } + + if !atEOF { + return 0, nil, nil + } + + if len(data) == 0 { + return 0, nil, nil + } + return len(data), data, nil + } +}