-
Notifications
You must be signed in to change notification settings - Fork 4.9k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Allow delete and index actions with a document ID #12606
Allow delete and index actions with a document ID #12606
Conversation
Since this is a community submitted pull request, a Jenkins build has not been kicked off automatically. Can an Elastic organization member please verify the contents of this patch and then kick off a build manually? |
a7ca417
to
0cf34d4
Compare
if s, ok := tmp.(string); ok { | ||
val = s | ||
} else { | ||
logp.Err("Event[%s] '%v' is no string value", key, val) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I believe what you want here is tmp
instead of an empty val
.
i.e.
logp.Err("Event[%s] '%v' is no string value", key, val) | |
logp.Err("Event[%s] '%v' is no string value", key, tmp) |
This is also wrong in current code
Pinging @elastic/integrations-services (Team:Services) |
@@ -251,6 +251,20 @@ func (client *Client) publishEvents( | |||
return nil, nil | |||
} | |||
|
|||
func eventMetaValue(event *beat.Event, key string) string { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Since there's nothing in this function that's specific to the Elasticsearch output, how about making the following changes to it?
- move it to https://github.com/ycombinator/beats/blob/master/libbeat/beat/event.go,
- instead of a function that takes
event *beat.Event
as the first parameter, make it a method withe *beat.Event
as the receiver, and - for the implementation simply return the result of
e.GetValue("@metadata."+key)
.
@@ -270,7 +284,13 @@ func bulkEncodePublishRequest( | |||
log.Errorf("Failed to encode event meta data: %+v", err) | |||
continue | |||
} | |||
bulkItems = append(bulkItems, meta, event) | |||
opType := eventMetaValue(event, "op_type") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Since we are using the bare string "op_type"
in a couple of places, lets make it a const
at the top of this file and add a godoc comment above it.
@@ -270,7 +284,13 @@ func bulkEncodePublishRequest( | |||
log.Errorf("Failed to encode event meta data: %+v", err) | |||
continue | |||
} | |||
bulkItems = append(bulkItems, meta, event) | |||
opType := eventMetaValue(event, "op_type") | |||
if opType == "delete" { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Similar to making "op_type"
a const
, lets also make const
for this "delete"
bare string so we can give it a godoc comment. This will allow developers reading godocs for this package to discover that it's possible to delete events.
libbeat/esleg/eslegclient/enc.go
Outdated
if err := b.AddRaw(obj); err != nil { | ||
b.buf.Truncate(pos) | ||
return err | ||
if obj != nil { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Maybe add a comment here for when obj
might be nil
(in the delete case)?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hi @bestpath-gb, thanks for this PR. I've requested a few changes to the code. Additionally, it would be really nice to have unit tests for the bulkEncodePublishRequest
and createEventBulkMeta
functions that test the new (and existing) code paths.
Thanks so much for the feedback, everyone. I'll make the changes and report back. |
1ab70f3
to
b385a8f
Compare
Hi @bestpath-gb, I see you have recently added some commits to this PR. Is it ready for review again or do you need some more time? |
Hi @ycombinator . I've just got some tests to write to finish it up. I'm hoping to get them completed this weekend so I'll ask for another review once I've pushed those. |
@bestpath-gb May I ask what does your Beat do exactly? It seems interesting to me that you are literally shipping updates to ES, not events. I am just curious. :) |
@kvch Of course! We're bringing data from Cisco ACI into Elasticsearch. Some of it is time series log/telemetry data and we are enriching that with management data (using Logstash). The management data is indexed using a beat but has to be kept in sync with configuration in ACI, which is where this PR comes in. When objects in ACI are created, modified or deleted, our custom Beat gets notified and it can propagate that to Elasticsearch. I originally implemented this in Python but would have needed to implement much of what a Beat gets for free, like coping with back pressure (I used Celery to mitigate this), authentication, output to Logstash, configuring indices and pipelines, etc. |
Wow, nice! Thanks for sharing. |
💔 Build FailedExpand to view the summary
Build stats
Steps errorsExpand to view the steps failures
Log outputExpand to view the last 100 lines of log output
|
💚 Build SucceededExpand to view the summary
Build stats
Test stats 🧪
Steps errorsExpand to view the steps failures
|
|
||
encoded, bulkItems := bulkEncodePublishRequest(logp.L(), *common.MustNewVersion(version.GetDefaultVersion()), index, pipeline, events) | ||
assert.Equal(t, len(events)-1, len(encoded), "all events should have been encoded") | ||
assert.Equal(t, 9, len(bulkItems), "incomplete bulk") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Missed a couple assert
-> require
places. 🙂
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
D'oh! I've corrected those.
@@ -54,6 +54,19 @@ func (e *Event) SetID(id string) { | |||
e.Meta["_id"] = id | |||
} | |||
|
|||
func (e *Event) GetMetaStringValue(key string) (string, error) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nit: The function can be simplified to this:
tmp, err := e.GetValue("@metadata." + key)
if err == nil {
if s, ok := tmp.(string); ok {
return s, nil
}
}
return "", nil
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
how about (requires no alloc):
tmp, err := e.Meta.GetValue(key)
if err != nil {
return "", err
}
if s, ok := tmp.(string); ok {
return s, nil
}
return "", nil
All in all I'd prefer to keep the interface of Event
as small as possible and provide helpers as functions instead of methods.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is even better. \o/
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for the feedback. I've used the version from @urso . Nice to see this level of attention to detail!
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Minor nits, but nothing which blocks the merging of the PR.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM.
Jenkins CI is green and Travis CI failures are unrelated to the changes in this PR. Merging. Thanks for your contribution, @bestpath-gb! <3 |
* Add `op_type` meta key for delete and index operations with a document ID. * Add note on why `obj` can be nil. * Refactor and extract consts. * Don't include metadata prefix in key. * Stop appending `nil` else it ends up in the body. * Error when trying to delete with no _id. * Remove incorrect detail in comment. * Add unit test for new op_type meta key. * No longer required. * Return error to caller. * Fail event if op_type is no string. * Use consts in error. * Replace assert with require. * Fail instead of panic. * Change missed assert calls to require. * Simplify GetMetaStringValue. * Ignore err as key may not exist. Co-authored-by: George Bridgeman <49999150+bestpath-gb@users.noreply.github.com>
* Add `op_type` meta key for delete and index operations with a document ID. * Add note on why `obj` can be nil. * Refactor and extract consts. * Don't include metadata prefix in key. * Stop appending `nil` else it ends up in the body. * Error when trying to delete with no _id. * Remove incorrect detail in comment. * Add unit test for new op_type meta key. * No longer required. * Return error to caller. * Fail event if op_type is no string. * Use consts in error. * Replace assert with require. * Fail instead of panic. * Change missed assert calls to require. * Simplify GetMetaStringValue. * Ignore err as key may not exist. Co-authored-by: George Bridgeman <49999150+bestpath-gb@users.noreply.github.com>
Sorry to add to this so late but I've been unable to find any reference to this functionality anywhere else. What is the proper way to set the op_type metadata field? I'm wanting to use this with filebeat to use index instead of create, and the only way I've gotten to work is a roundabout three step processor arrangement to use add_fields to make some normal field with the value you want, convert that field into @metadata.op_type, then drop_fields to remove the field added in step 1. |
Our use-case requires our Beat be able to re-index and delete documents. We always know the document ID but libbeat currently only allows us to create a document when the ID is known.
I have implemented the ability to specify whether an event should index or delete documents with a specified ID by setting an
op_type
key in the event metadata. Possible values for the key arecreate
,index
ordelete
.The behaviour currently in master hasn't changed, so if no
op_type
is specified, an event will result in a create if the ID is given, or an index otherwise.I found issue #8534, where the author seems to be asking for update actions when the ID is known, which I could support as part of this PR by allowing an
update
value for theop_type
key.I'd love to see this functionality in libbeat and would welcome feedback on this PR. Does anyone have any thoughts about a better way to go about this?