Skip to content

Commit

Permalink
Merge branch 'master' into hash-plugin
Browse files Browse the repository at this point in the history
  • Loading branch information
Yaroslav Kirillov committed Dec 20, 2024
2 parents 51797d1 + 28be3e4 commit 278c331
Show file tree
Hide file tree
Showing 8 changed files with 159 additions and 131 deletions.
42 changes: 19 additions & 23 deletions fd/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ func extractPipelineParams(settings *simplejson.Json) *pipeline.Settings {
avgInputEventSize := pipeline.DefaultAvgInputEventSize
maxInputEventSize := pipeline.DefaultMaxInputEventSize
cutOffEventByLimit := pipeline.DefaultCutOffEventByLimit
cutOffEventByLimitMsg := pipeline.DefaultCutOffEventByLimitMsg
cutOffEventByLimitField := pipeline.DefaultCutOffEventByLimitField
streamField := pipeline.DefaultStreamField
maintenanceInterval := pipeline.DefaultMaintenanceInterval
decoder := pipeline.DefaultDecoder
Expand Down Expand Up @@ -56,11 +56,7 @@ func extractPipelineParams(settings *simplejson.Json) *pipeline.Settings {
}

cutOffEventByLimit = settings.Get("cut_off_event_by_limit").MustBool()

cutOffEventByLimitMsg = settings.Get("cut_off_event_by_limit_message").MustString()
if maxInputEventSize > 0 && len(cutOffEventByLimitMsg) >= maxInputEventSize {
logger.Fatal("length of cut_off_event_by_limit_message must be less than max_event_size")
}
cutOffEventByLimitField = settings.Get("cut_off_event_by_limit_field").MustString()

str := settings.Get("decoder").MustString()
if str != "" {
Expand Down Expand Up @@ -124,23 +120,23 @@ func extractPipelineParams(settings *simplejson.Json) *pipeline.Settings {
}

return &pipeline.Settings{
Decoder: decoder,
DecoderParams: decoderParams,
Capacity: capacity,
MetaCacheSize: metaCacheSize,
AvgEventSize: avgInputEventSize,
MaxEventSize: maxInputEventSize,
CutOffEventByLimit: cutOffEventByLimit,
CutOffEventByLimitMsg: cutOffEventByLimitMsg,
AntispamThreshold: antispamThreshold,
AntispamExceptions: antispamExceptions,
SourceNameMetaField: sourceNameMetaField,
MaintenanceInterval: maintenanceInterval,
EventTimeout: eventTimeout,
StreamField: streamField,
IsStrict: isStrict,
MetricHoldDuration: metricHoldDuration,
Pool: pipeline.PoolType(pool),
Decoder: decoder,
DecoderParams: decoderParams,
Capacity: capacity,
MetaCacheSize: metaCacheSize,
AvgEventSize: avgInputEventSize,
MaxEventSize: maxInputEventSize,
CutOffEventByLimit: cutOffEventByLimit,
CutOffEventByLimitField: cutOffEventByLimitField,
AntispamThreshold: antispamThreshold,
AntispamExceptions: antispamExceptions,
SourceNameMetaField: sourceNameMetaField,
MaintenanceInterval: maintenanceInterval,
EventTimeout: eventTimeout,
StreamField: streamField,
IsStrict: isStrict,
MetricHoldDuration: metricHoldDuration,
Pool: pipeline.PoolType(pool),
}
}

Expand Down
89 changes: 48 additions & 41 deletions pipeline/pipeline.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,23 +24,23 @@ import (
)

const (
DefaultAntispamThreshold = 0
DefaultSourceNameMetaField = ""
DefaultDecoder = "auto"
DefaultIsStrict = false
DefaultStreamField = "stream"
DefaultCapacity = 1024
DefaultAvgInputEventSize = 4 * 1024
DefaultMaxInputEventSize = 0
DefaultCutOffEventByLimit = false
DefaultCutOffEventByLimitMsg = ""
DefaultJSONNodePoolSize = 1024
DefaultMaintenanceInterval = time.Second * 5
DefaultEventTimeout = time.Second * 30
DefaultFieldValue = "not_set"
DefaultStreamName = StreamName("not_set")
DefaultMetricHoldDuration = time.Minute * 30
DefaultMetaCacheSize = 1024
DefaultAntispamThreshold = 0
DefaultSourceNameMetaField = ""
DefaultDecoder = "auto"
DefaultIsStrict = false
DefaultStreamField = "stream"
DefaultCapacity = 1024
DefaultAvgInputEventSize = 4 * 1024
DefaultMaxInputEventSize = 0
DefaultCutOffEventByLimit = false
DefaultCutOffEventByLimitField = ""
DefaultJSONNodePoolSize = 1024
DefaultMaintenanceInterval = time.Second * 5
DefaultEventTimeout = time.Second * 30
DefaultFieldValue = "not_set"
DefaultStreamName = StreamName("not_set")
DefaultMetricHoldDuration = time.Minute * 30
DefaultMetaCacheSize = 1024

EventSeqIDError = uint64(0)

Expand Down Expand Up @@ -146,23 +146,23 @@ type Pipeline struct {
}

type Settings struct {
Decoder string
DecoderParams map[string]any
Capacity int
MetaCacheSize int
MaintenanceInterval time.Duration
EventTimeout time.Duration
AntispamThreshold int
AntispamExceptions antispam.Exceptions
SourceNameMetaField string
AvgEventSize int
MaxEventSize int
CutOffEventByLimit bool
CutOffEventByLimitMsg string
StreamField string
IsStrict bool
MetricHoldDuration time.Duration
Pool PoolType
Decoder string
DecoderParams map[string]any
Capacity int
MetaCacheSize int
MaintenanceInterval time.Duration
EventTimeout time.Duration
AntispamThreshold int
AntispamExceptions antispam.Exceptions
SourceNameMetaField string
AvgEventSize int
MaxEventSize int
CutOffEventByLimit bool
CutOffEventByLimitField string
StreamField string
IsStrict bool
MetricHoldDuration time.Duration
Pool PoolType
}

type PoolType string
Expand Down Expand Up @@ -419,9 +419,12 @@ type Offsets interface {

// In decodes message and passes it to event stream.
func (p *Pipeline) In(sourceID SourceID, sourceName string, offset Offsets, bytes []byte, isNewSource bool, meta metadata.MetaData) (seqID uint64) {
var (
ok bool
cutoff bool
)
// don't process mud.
var ok bool
bytes, ok = p.checkInputBytes(bytes, sourceName, meta)
bytes, cutoff, ok = p.checkInputBytes(bytes, sourceName, meta)
if !ok {
return EventSeqIDError
}
Expand Down Expand Up @@ -563,6 +566,9 @@ func (p *Pipeline) In(sourceID SourceID, sourceName string, offset Offsets, byte
}
}
}
if cutoff && p.settings.CutOffEventByLimitField != "" {
event.Root.AddFieldNoAlloc(event.Root, p.settings.CutOffEventByLimitField).MutateToBool(true)
}

event.Offset = offset.Current()
event.SourceID = sourceID
Expand All @@ -572,11 +578,11 @@ func (p *Pipeline) In(sourceID SourceID, sourceName string, offset Offsets, byte
return p.streamEvent(event)
}

func (p *Pipeline) checkInputBytes(bytes []byte, sourceName string, meta metadata.MetaData) ([]byte, bool) {
func (p *Pipeline) checkInputBytes(bytes []byte, sourceName string, meta metadata.MetaData) ([]byte, bool, bool) {
length := len(bytes)

if length == 0 || (bytes[0] == '\n' && length == 1) {
return bytes, false
return bytes, false, false
}

if p.settings.MaxEventSize != 0 && length > p.settings.MaxEventSize {
Expand All @@ -587,17 +593,18 @@ func (p *Pipeline) checkInputBytes(bytes []byte, sourceName string, meta metadat
p.IncMaxEventSizeExceeded(source)

if !p.settings.CutOffEventByLimit {
return bytes, false
return bytes, false, false
}

wasNewLine := bytes[len(bytes)-1] == '\n'
bytes = append(bytes[:p.settings.MaxEventSize], p.settings.CutOffEventByLimitMsg...)
bytes = bytes[:p.settings.MaxEventSize]
if wasNewLine {
bytes = append(bytes, '\n')
}
return bytes, true, true
}

return bytes, true
return bytes, false, true
}

func (p *Pipeline) streamEvent(event *Event) uint64 {
Expand Down
32 changes: 11 additions & 21 deletions pipeline/pipeline_whitebox_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@ func TestCheckInputBytes(t *testing.T) {
pipelineSettings *Settings
input []byte
want []byte
wantCutoff bool
wantOk bool
}{
{
Expand Down Expand Up @@ -111,9 +112,10 @@ func TestCheckInputBytes(t *testing.T) {
MaxEventSize: 10,
CutOffEventByLimit: true,
},
input: []byte("some loooooooog"),
want: []byte("some loooo"),
wantOk: true,
input: []byte("some loooooooog"),
want: []byte("some loooo"),
wantCutoff: true,
wantOk: true,
},
{
name: "cutoff_newline",
Expand All @@ -124,32 +126,20 @@ func TestCheckInputBytes(t *testing.T) {
MaxEventSize: 10,
CutOffEventByLimit: true,
},
input: []byte("some loooooooog\n"),
want: []byte("some loooo\n"),
wantOk: true,
},
{
name: "cutoff_with_msg",
pipelineSettings: &Settings{
Capacity: 5,
Decoder: "raw",
MetricHoldDuration: DefaultMetricHoldDuration,
MaxEventSize: 10,
CutOffEventByLimit: true,
CutOffEventByLimitMsg: "<cutoff>",
},
input: []byte("some loooooooog\n"),
want: []byte("some loooo<cutoff>\n"),
wantOk: true,
input: []byte("some loooooooog\n"),
want: []byte("some loooo\n"),
wantCutoff: true,
wantOk: true,
},
}

for _, tCase := range cases {
t.Run(tCase.name, func(t *testing.T) {
pipe := New("test_pipeline", tCase.pipelineSettings, prometheus.NewRegistry())

data, ok := pipe.checkInputBytes(tCase.input, "test", nil)
data, cutoff, ok := pipe.checkInputBytes(tCase.input, "test", nil)

assert.Equal(t, tCase.wantCutoff, cutoff)
assert.Equal(t, tCase.wantOk, ok)
if !tCase.wantOk {
return
Expand Down
2 changes: 0 additions & 2 deletions plugin/input/file/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -166,6 +166,4 @@ Dirs that don't meet this pattern will be ignored.
**`symlink`**

**`inode`**

**`offset`**
<br>*Generated using [__insane-doc__](https://github.com/vitkovskii/insane-doc)*
42 changes: 18 additions & 24 deletions plugin/input/file/worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,23 @@ func (w *worker) work(controller inputer, jobProvider *jobProvider, readBufferSi
readTotal := int64(0)
scanned := int64(0)

var metadataInfo metadata.MetaData
if w.metaTemplater != nil {
metaData, err := newMetaInformation(
job.filename,
job.symlink,
job.inode,
w.needK8sMeta,
)
if err != nil {
logger.Error("cannot parse meta info", zap.Error(err))
}
metadataInfo, err = w.metaTemplater.Render(metaData)
if err != nil {
logger.Error("cannot render meta info", zap.Error(err))
}
}

// append the data of the old work, this happens when the event was not completely written to the file
// for example: {"level": "info", "message": "some...
// the end of the message can be added later and will be read in this iteration
Expand Down Expand Up @@ -123,24 +140,6 @@ func (w *worker) work(controller inputer, jobProvider *jobProvider, readBufferSi
inBuf = accumBuf
}

var metadataInfo metadata.MetaData
if w.metaTemplater != nil {
metaData, err := newMetaInformation(
job.filename,
job.symlink,
job.inode,
lastOffset+scanned,
w.needK8sMeta,
)
if err != nil {
logger.Error("cannot parse meta info", zap.Error(err))
}
metadataInfo, err = w.metaTemplater.Render(metaData)
if err != nil {
logger.Error("cannot render meta info", zap.Error(err))
}
}

job.lastEventSeq = controller.In(sourceID, sourceName, Offset{lastOffset + scanned, offsets}, inBuf, isVirgin, metadataInfo)
}
// restore the line buffer
Expand Down Expand Up @@ -197,12 +196,11 @@ type metaInformation struct {
filename string
symlink string
inode uint64
offset int64

k8sMetadata *k8s_meta.K8sMetaInformation
}

func newMetaInformation(filename, symlink string, inode inodeID, offset int64, parseK8sMeta bool) (metaInformation, error) {
func newMetaInformation(filename, symlink string, inode inodeID, parseK8sMeta bool) (metaInformation, error) {
var metaData k8s_meta.K8sMetaInformation
var err error
if parseK8sMeta {
Expand All @@ -219,7 +217,6 @@ func newMetaInformation(filename, symlink string, inode inodeID, offset int64, p
filename: filename,
symlink: symlink,
inode: uint64(inode),
offset: offset,
k8sMetadata: &metaData,
}, nil
}
Expand All @@ -229,7 +226,6 @@ func (m metaInformation) GetData() map[string]any {
"filename": m.filename,
"symlink": m.symlink,
"inode": m.inode,
"offset": m.offset,
}

if m.k8sMetadata != nil {
Expand All @@ -255,8 +251,6 @@ func (m metaInformation) GetData() map[string]any {
**`symlink`**
**`inode`**
**`offset`**
}*/

type Offset struct {
Expand Down
Loading

0 comments on commit 278c331

Please sign in to comment.