-
Notifications
You must be signed in to change notification settings - Fork 4.9k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Move processors into new pipeline #4554
Conversation
789983d
to
fe75d24
Compare
// return event=nil to delete the entire event | ||
return nil, nil | ||
} | ||
|
||
func (f dropEvent) String() string { return "drop_event" } | ||
func (*dropEvent) String() string { return "drop_event" } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Didn't know this works, thought _
is required.
fe75d24
to
bbe6888
Compare
5a8b240
to
526cf2e
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM. Few questions:
- There are quite a few changes on the JSON part. I wasn't sure how this is related to the other changes?
- If I understand it right, config wise nothing changes for fields / tags, it's just the internal handling as processor. Also these cannot be configured by a user a processor as they are not registered processors.
actual, err := p.Run(input) | ||
|
||
return actual | ||
actual, _ := p.Run(&beat.Event{Fields: input}) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
No error check?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
the error has never been checked for. In this PR I'm trying to adjust interfaces, not fix/improve/change behavior in tests or other pieces.
if err != nil { | ||
// XXX: We don't drop the event, but continue filtering here iff the most |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Could we add an "error" to the event in case a processor did not work so it can be seen in the event, that something went wrong?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Same here. Old/Current logic does ignore errors and at most prints a debug message. I don't intend to change any processing logic.
// | ||
// Pipeline (C=client, P=pipeline) | ||
// | ||
// 1. (P) add EventMetadataKey fields + tags (to be removed in favor of 4) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We probably should have that somewhere in the docs
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah. Adding tag.
526cf2e
to
96eca70
Compare
There's some shared json parsing logic treating
Yep. The pipeline/processor.go defines some internal processors, not globally registered/configurable. |
96eca70
to
aab3cb1
Compare
aab3cb1
to
0a0bb10
Compare
- Update all processors to operate in *beat.Event - Update processors factory to use `*common.Config`, not `common.Config`. `common.Config` works, but go-ucfg interfaces/API assume to operate on pointers (passing by value might break functionality in future?) - Update most processors interface to pass the processors type/context by pointer instead of by value (due to use of interfaces, the processors have been allocated on heap anyways). - Install list of processors in new publisher pipeline. This allows publisher pipeline to account for events being dropped by filters, when reporting ACKs. From beats point of view, there should be no difference between dropped events (by filters) and ACKed events (by outputs). Interface mandates all events being properly ACKed by the publisher pipeline. - Tags, Fields and beat meta data settings are now implicitely converted to Processors as well - -> change ensures, all metadata (local and global) being applied before the actual processors (installed with the pipeline) are executed. Ensures all processors will see the very same events. - introduce `(*processors.Processors).RunBC(common.MapStr) common.MapStr`. Will be removed, once the beats themselves configure publisher pipeline to run processors (filebeat/metricbeat are executing processors on their own). - add `Private` field to `beat.Event` for event-based ACK callbacks. Use `Private` field to store additional event-metadata required for post-processing in the ACK callback (as processor pipeline can change event at will) - Prepare moving beats to new pipeline, but introducing `ConnectX` to BC API, for beats to connect to shared global pipeline (Note: will be removed when BC layer is removed). - Processors are still executed by the go-routine calling `(beat.Client).Publish`. - New Processor Execution Order (C=client based processor, P=pipeline based processor) 1. (P) extract EventMetadataKey fields + tags (to be removed in favor of 4) 2. (P) generalize/normalize event 3. (P) add beats metadata (name, hostname, version) 4. (P) add pipeline fields + tags 5. (C) add client fields + tags 6. (P/C) apply EventMetadataKey fields + tags (to be removed in favor of 4) 7. (C) client processors list 8. (P) pipeline processors list 9. (P) (if publish/debug enabled) log event 10. (P) (if output disabled) dropEvent client processors higher prio
0a0bb10
to
6f47a11
Compare
I'm removing the needs_docs label because any outstanding doc work required here is tracked by #4598 |
*common.Config
, notcommon.Config
.common.Config
works, but go-ucfg interfaces/API assume to operate on pointers (passing by value might break functionality in future?)actual processors (installed with the pipeline) are executed. Ensures all
processors will see the very same events.
(*processors.Processors).RunBC(common.MapStr) common.MapStr
. Will be removed, once the beats themselves configure publisher pipeline to run processors (filebeat/metricbeat are executing processors on their own).Private
field tobeat.Event
for event-based ACK callbacks. UsePrivate
field to store additional event-metadata required for post-processing in the ACK callback (as processor pipeline can change event at will)ConnectX
to BC API, for beats to connect to shared global pipeline (Note: will be removed when BC layer is removed).(beat.Client).Publish
.For docs: event processing execution order would be nice to have.