Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add ID processor #14524

Merged
merged 50 commits into from
Dec 11, 2019
Merged
Show file tree
Hide file tree
Changes from 46 commits
Commits
Show all changes
50 commits
Select commit Hold shift + click to select a range
e0dc306
WIP: Flake ID processor
ycombinator Nov 13, 2019
4269333
Fleshing out implementation of generator
ycombinator Nov 14, 2019
bfa27b1
Rename package
ycombinator Nov 14, 2019
98a891d
Unexport const
ycombinator Nov 14, 2019
1133141
Use increment operator
ycombinator Nov 14, 2019
f35064b
Adding processor scaffolding
ycombinator Nov 15, 2019
f6ed19c
Fixing default field
ycombinator Nov 15, 2019
e2d85fe
Adding CHANGELOG entry
ycombinator Nov 15, 2019
477fe7f
Fixing compile errors
ycombinator Nov 15, 2019
998a85f
WIP: unit tests
ycombinator Nov 15, 2019
e4a2af6
Fixing byte copy
ycombinator Nov 15, 2019
229d57d
Fixing up tests
ycombinator Nov 15, 2019
c9a7bae
Adding test TODOs
ycombinator Nov 15, 2019
4cc8c56
Adding non-default target field unit test
ycombinator Nov 15, 2019
a6b3646
Adding one more test TODO
ycombinator Nov 15, 2019
950cd5b
Adding TODO for post-benchmarking
ycombinator Nov 15, 2019
02cdbf6
Introduce type
ycombinator Nov 15, 2019
6881da2
Adding unit test for factory
ycombinator Nov 15, 2019
b3af9e8
Adding unit test for mac
ycombinator Nov 15, 2019
ae02eb2
Adding unit test for mac
ycombinator Nov 15, 2019
b60e88c
Fleshing out remaining mac unit tests
ycombinator Nov 15, 2019
881c040
Adding tests for ES ID generator
ycombinator Nov 15, 2019
f405381
Remove TODO after experimenting with IIFE (perf was worse)
ycombinator Nov 15, 2019
7245204
Moving doc
ycombinator Nov 15, 2019
3f9652a
Adding UUID processor to list in docs
ycombinator Nov 15, 2019
63fe346
Apply suggestions from docs code review
ycombinator Nov 20, 2019
63a96fc
Adding godoc
ycombinator Nov 21, 2019
fc0e6bb
Rename generator function type
ycombinator Nov 21, 2019
539188d
Exporting and adding godoc
ycombinator Nov 21, 2019
ef1e22c
Adding godoc
ycombinator Nov 21, 2019
848c35e
Updating godoc
ycombinator Nov 21, 2019
6d604d9
Adding Unwrap error methods
ycombinator Nov 22, 2019
c89d3c4
Moving ES ID generator into generators package + singleton construction
ycombinator Dec 5, 2019
1e62ad7
Addressing Hound feedback
ycombinator Dec 5, 2019
fb6e89d
Renaming processor to `add_id`
ycombinator Dec 5, 2019
e00cb7b
Updating processor name in CHANGELOG entry
ycombinator Dec 5, 2019
439b56f
More refactoring updates
ycombinator Dec 5, 2019
92637a5
Fixing more vet errors
ycombinator Dec 5, 2019
b7fe6fb
Unexport config struct as it's only used within this package
ycombinator Dec 5, 2019
d45e591
Fixing doc anchor
ycombinator Dec 5, 2019
0a76462
Moving generator construction to processor constructor; simplifying f…
ycombinator Dec 5, 2019
1b44e75
Fixing compile error
ycombinator Dec 5, 2019
aa8739b
Validate ID generator type in config
ycombinator Dec 6, 2019
7c79403
Finer-grained locking to reduce mutex contention
ycombinator Dec 6, 2019
99c01b7
Initialize package global variables that depend on randomness, later
ycombinator Dec 9, 2019
e4f9fe9
Compute last timestamp while accounting for system time going backwards
ycombinator Dec 9, 2019
f2faf14
Simpler and testable timestamp() function
ycombinator Dec 10, 2019
51f8eed
Adding unit test for timestamp function
ycombinator Dec 10, 2019
72b97c5
Re-implementing ES timestamp algorithm
ycombinator Dec 10, 2019
5e8420c
Removing unused variable
ycombinator Dec 10, 2019
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.next.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -360,6 +360,7 @@ https://github.com/elastic/beats/compare/v7.0.0-alpha2...master[Check the HEAD d
- Add consumer_lag in Kafka consumergroup metricset {pull}14822[14822]
- Make use of consumer_lag in Kafka dashboard {pull}14863[14863]
- Refactor kubernetes autodiscover to enable different resource based discovery {pull}14738[14738]
- Add `add_id` processor. {pull}14524[14524]

*Auditbeat*

Expand Down
6 changes: 6 additions & 0 deletions libbeat/docs/processors-list.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,9 @@ endif::[]
ifndef::no_add_host_metadata_processor[]
* <<add-host-metadata,`add_host_metadata`>>
endif::[]
ifndef::no_add_id_processor[]
* <<add-id,`add_id`>>
endif::[]
ifndef::no_add_kubernetes_metadata_processor[]
* <<add-kubernetes-metadata,`add_kubernetes_metadata`>>
endif::[]
Expand Down Expand Up @@ -107,6 +110,9 @@ endif::[]
ifndef::no_add_host_metadata_processor[]
include::{libbeat-processors-dir}/add_host_metadata/docs/add_host_metadata.asciidoc[]
endif::[]
ifndef::no_add_id[]
include::{libbeat-processors-dir}/add_id/docs/add_id.asciidoc[]
endif::[]
ifndef::no_add_kubernetes_metadata_processor[]
include::{libbeat-processors-dir}/add_kubernetes_metadata/docs/add_kubernetes_metadata.asciidoc[]
endif::[]
Expand Down
76 changes: 76 additions & 0 deletions libbeat/processors/add_id/add_id.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,76 @@
// Licensed to Elasticsearch B.V. under one or more contributor
// license agreements. See the NOTICE file distributed with
// this work for additional information regarding copyright
// ownership. Elasticsearch B.V. licenses this file to you under
// the Apache License, Version 2.0 (the "License"); you may
// not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

package add_id
ycombinator marked this conversation as resolved.
Show resolved Hide resolved
ycombinator marked this conversation as resolved.
Show resolved Hide resolved

import (
"fmt"

"github.com/elastic/beats/libbeat/processors/add_id/generator"

"github.com/elastic/beats/libbeat/beat"
"github.com/elastic/beats/libbeat/common"
"github.com/elastic/beats/libbeat/processors"
jsprocessor "github.com/elastic/beats/libbeat/processors/script/javascript/module/processor"
)

func init() {
processors.RegisterPlugin("add_id", New)
jsprocessor.RegisterPlugin("AddID", New)
}

const processorName = "add_id"

type addID struct {
config config
gen generator.IDGenerator
}

// New constructs a new Add ID processor.
func New(cfg *common.Config) (processors.Processor, error) {
config := defaultConfig()
if err := cfg.Unpack(&config); err != nil {
return nil, makeErrConfigUnpack(err)
}

gen, err := generator.Factory(config.Type)
if err != nil {
return nil, makeErrComputeID(err)
}

p := &addID{
config,
gen,
}

return p, nil
}

// Run enriches the given event with an ID
func (p *addID) Run(event *beat.Event) (*beat.Event, error) {
id := p.gen.NextID()

if _, err := event.PutValue(p.config.TargetField, id); err != nil {
return nil, makeErrComputeID(err)
}

return event, nil
}

func (p *addID) String() string {
return fmt.Sprintf("%v=[target_field=[%v]]", processorName, p.config.TargetField)
}
65 changes: 65 additions & 0 deletions libbeat/processors/add_id/add_id_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,65 @@
// Licensed to Elasticsearch B.V. under one or more contributor
// license agreements. See the NOTICE file distributed with
// this work for additional information regarding copyright
// ownership. Elasticsearch B.V. licenses this file to you under
// the Apache License, Version 2.0 (the "License"); you may
// not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

package add_id
ycombinator marked this conversation as resolved.
Show resolved Hide resolved

import (
"testing"

"github.com/elastic/beats/libbeat/common"

"github.com/elastic/beats/libbeat/beat"

"github.com/stretchr/testify/assert"
)

func TestDefaultTargetField(t *testing.T) {
p, err := New(common.MustNewConfigFrom(nil))
assert.NoError(t, err)

testEvent := &beat.Event{}

newEvent, err := p.Run(testEvent)
assert.NoError(t, err)

v, err := newEvent.GetValue("@metadata.id")
assert.NoError(t, err)
assert.NotEmpty(t, v)
}

func TestNonDefaultTargetField(t *testing.T) {
cfg := common.MustNewConfigFrom(common.MapStr{
"target_field": "foo",
})
p, err := New(cfg)
assert.NoError(t, err)

testEvent := &beat.Event{
Fields: common.MapStr{},
}

newEvent, err := p.Run(testEvent)
assert.NoError(t, err)

v, err := newEvent.GetValue("foo")
assert.NoError(t, err)
assert.NotEmpty(t, v)

v, err = newEvent.GetValue("@metadata.id")
assert.NoError(t, err)
assert.Empty(t, v)
}
44 changes: 44 additions & 0 deletions libbeat/processors/add_id/config.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
// Licensed to Elasticsearch B.V. under one or more contributor
// license agreements. See the NOTICE file distributed with
// this work for additional information regarding copyright
// ownership. Elasticsearch B.V. licenses this file to you under
// the Apache License, Version 2.0 (the "License"); you may
// not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

package add_id
ycombinator marked this conversation as resolved.
Show resolved Hide resolved

import (
"github.com/elastic/beats/libbeat/processors/add_id/generator"
)

// configuration for Add ID processor.
type config struct {
TargetField string `config:"target_field"` // Target field for the ID
Type string `config:"type"` // Type of ID
}
ycombinator marked this conversation as resolved.
Show resolved Hide resolved

func defaultConfig() config {
return config{
TargetField: "@metadata.id",
Type: "elasticsearch",
}
}

func (c *config) Validate() error {
// Validate type of ID generator
if !generator.Exists(c.Type) {
return makeErrUnknownType(c.Type)
}

return nil
}
18 changes: 18 additions & 0 deletions libbeat/processors/add_id/docs/add_id.asciidoc
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
[[add-id]]
=== Generate an ID for an event

The `add_id` processor generates a unique ID for an event.

[source,yaml]
-----------------------------------------------------
processors:
- add_id: ~
-----------------------------------------------------

The following settings are supported:

`target_field`:: (Optional) Field where the generated ID will be stored. Default is `@metadata.id`.
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

somewhere we should document that the Elasticsearch output will use @metadata.id as document ID if present. Plus we should document on how users can use the field if sending via Logstash.

Copy link
Contributor Author

@ycombinator ycombinator Dec 5, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good idea. I was initially thinking of doing this in a blog post (that would cover the introduction of this processor and the fingerprint processor, from a use case perspective). But I think it's useful to have this in our docs somewhere as well, since users will be looking at those more over time.

I wonder if this sort of documentation belongs in the ES output and LS output docs, since it's strictly agnostic to what generated the ID (e.g. this processor, but could be something else too). @dedemorton WDYT?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@ycombinator I'm planning to work on a related issue this week: #13739. We should talk about your plans for the blog post and see what makes sense for the docs. I'd like to avoid burying the content too deeply under the config settings. I think we need a topic that describes what the document ID is (with links to the ES docs), how it's set, and why you want to set it (deduplication etc). Maybe we can chat later this week.


`type`:: (Optional) Type of ID to generate. Currently only `elasticsearch` is supported and is the default.
The `elasticsearch` type generates IDs using the same algorithm that Elasticsearch uses for auto-generating
document IDs.
55 changes: 55 additions & 0 deletions libbeat/processors/add_id/errors.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
// Licensed to Elasticsearch B.V. under one or more contributor
// license agreements. See the NOTICE file distributed with
// this work for additional information regarding copyright
// ownership. Elasticsearch B.V. licenses this file to you under
// the Apache License, Version 2.0 (the "License"); you may
// not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

package add_id
ycombinator marked this conversation as resolved.
Show resolved Hide resolved

import (
"fmt"
)

type (
errConfigUnpack struct{ cause error }
errComputeID struct{ cause error }
errUnknownType struct{ typ string }
)

func makeErrConfigUnpack(cause error) errConfigUnpack {
return errConfigUnpack{cause}
}
func (e errConfigUnpack) Error() string {
return fmt.Sprintf("failed to unpack %v processor configuration: %v", processorName, e.cause)
}
func (e errConfigUnpack) Unwrap() error {
return e.cause
}

func makeErrComputeID(cause error) errComputeID {
return errComputeID{cause}
}
func (e errComputeID) Error() string {
return fmt.Sprintf("failed to compute ID: %v", e.cause)
}
func (e errComputeID) Unwrap() error {
return e.cause
}

func makeErrUnknownType(typ string) errUnknownType {
return errUnknownType{typ}
}
func (e errUnknownType) Error() string {
return fmt.Sprintf("invalid type [%s]", e.typ)
}
33 changes: 33 additions & 0 deletions libbeat/processors/add_id/generator/errors.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
// Licensed to Elasticsearch B.V. under one or more contributor
// license agreements. See the NOTICE file distributed with
// this work for additional information regarding copyright
// ownership. Elasticsearch B.V. licenses this file to you under
// the Apache License, Version 2.0 (the "License"); you may
// not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

package generator

import (
"fmt"
)

type (
errUnknownType struct{ typ string }
)

func makeErrUnknownType(typ string) errUnknownType {
return errUnknownType{typ}
}
func (e errUnknownType) Error() string {
return fmt.Sprintf("invalid type [%s]", e.typ)
}
Loading