Skip to content

Commit

Permalink
Review fixes
Browse files Browse the repository at this point in the history
  • Loading branch information
vadimalekseev committed Sep 26, 2023
1 parent f352d1b commit 5a5d7af
Show file tree
Hide file tree
Showing 2 changed files with 11 additions and 8 deletions.
15 changes: 9 additions & 6 deletions pipeline/processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,8 @@ const (
// check out Commit()/Propagate() functions in InputPluginController.
// plugin may receive event with EventKindTimeout if it takes to long to read next event from same stream.
ActionHold
ActionSpawned = 4
// ActionBreak abort the event processing and pass it to an output.
ActionBreak
)

type eventStatus string
Expand All @@ -34,6 +35,7 @@ const (
eventStatusDiscarded eventStatus = "discarded"
eventStatusCollapse eventStatus = "collapsed"
eventStatusHold eventStatus = "held"
eventStatusBroke eventStatus = "broke"
)

func allEventStatuses() []eventStatus {
Expand Down Expand Up @@ -191,14 +193,15 @@ func (p *processor) doActions(event *Event) (isPassed bool, lastAction int) {

result := action.Do(event)
switch result {
case ActionPass, ActionSpawned:
case ActionPass:
p.countEvent(event, index, eventStatusPassed)
p.tryResetBusy(index)
p.actionWatcher.setEventAfter(index, event, eventStatusPassed)

if result == ActionSpawned {
return true, index
}
case ActionBreak:
p.countEvent(event, index, eventStatusBroke)
p.tryResetBusy(index)
p.actionWatcher.setEventAfter(index, event, eventStatusBroke)
return true, index
case ActionDiscard:
p.countEvent(event, index, eventStatusDiscarded)
p.tryResetBusy(index)
Expand Down
4 changes: 2 additions & 2 deletions plugin/action/split/split.go
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,7 @@ func (p *Plugin) Do(event *pipeline.Event) pipeline.ActionResult {
}

if !data.IsArray() {
p.logger.Warn("skip an event because is not an array", zap.String("type", data.TypeStr()))
p.logger.Warn("skip an event because field is not an array", zap.String("type", data.TypeStr()))
return pipeline.ActionPass
}

Expand All @@ -101,5 +101,5 @@ func (p *Plugin) Do(event *pipeline.Event) pipeline.ActionResult {

p.pluginController.Spawn(event, children)

return pipeline.ActionSpawned
return pipeline.ActionBreak
}

0 comments on commit 5a5d7af

Please sign in to comment.