diff --git a/CHANGELOG.next.asciidoc b/CHANGELOG.next.asciidoc index 6af30ecd9f6e..377457841149 100644 --- a/CHANGELOG.next.asciidoc +++ b/CHANGELOG.next.asciidoc @@ -360,6 +360,7 @@ https://github.com/elastic/beats/compare/v7.0.0-alpha2...master[Check the HEAD d - Add consumer_lag in Kafka consumergroup metricset {pull}14822[14822] - Make use of consumer_lag in Kafka dashboard {pull}14863[14863] - Refactor kubernetes autodiscover to enable different resource based discovery {pull}14738[14738] +- Add `add_id` processor. {pull}14524[14524] *Auditbeat* diff --git a/libbeat/docs/processors-list.asciidoc b/libbeat/docs/processors-list.asciidoc index b2e9daec7dfc..a9a4356377ae 100644 --- a/libbeat/docs/processors-list.asciidoc +++ b/libbeat/docs/processors-list.asciidoc @@ -14,6 +14,9 @@ endif::[] ifndef::no_add_host_metadata_processor[] * <> endif::[] +ifndef::no_add_id_processor[] +* <> +endif::[] ifndef::no_add_kubernetes_metadata_processor[] * <> endif::[] @@ -107,6 +110,9 @@ endif::[] ifndef::no_add_host_metadata_processor[] include::{libbeat-processors-dir}/add_host_metadata/docs/add_host_metadata.asciidoc[] endif::[] +ifndef::no_add_id[] +include::{libbeat-processors-dir}/add_id/docs/add_id.asciidoc[] +endif::[] ifndef::no_add_kubernetes_metadata_processor[] include::{libbeat-processors-dir}/add_kubernetes_metadata/docs/add_kubernetes_metadata.asciidoc[] endif::[] diff --git a/libbeat/processors/add_id/add_id.go b/libbeat/processors/add_id/add_id.go new file mode 100644 index 000000000000..5df081460c33 --- /dev/null +++ b/libbeat/processors/add_id/add_id.go @@ -0,0 +1,76 @@ +// Licensed to Elasticsearch B.V. under one or more contributor +// license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright +// ownership. Elasticsearch B.V. licenses this file to you under +// the Apache License, Version 2.0 (the "License"); you may +// not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package add_id + +import ( + "fmt" + + "github.com/elastic/beats/libbeat/processors/add_id/generator" + + "github.com/elastic/beats/libbeat/beat" + "github.com/elastic/beats/libbeat/common" + "github.com/elastic/beats/libbeat/processors" + jsprocessor "github.com/elastic/beats/libbeat/processors/script/javascript/module/processor" +) + +func init() { + processors.RegisterPlugin("add_id", New) + jsprocessor.RegisterPlugin("AddID", New) +} + +const processorName = "add_id" + +type addID struct { + config config + gen generator.IDGenerator +} + +// New constructs a new Add ID processor. +func New(cfg *common.Config) (processors.Processor, error) { + config := defaultConfig() + if err := cfg.Unpack(&config); err != nil { + return nil, makeErrConfigUnpack(err) + } + + gen, err := generator.Factory(config.Type) + if err != nil { + return nil, makeErrComputeID(err) + } + + p := &addID{ + config, + gen, + } + + return p, nil +} + +// Run enriches the given event with an ID +func (p *addID) Run(event *beat.Event) (*beat.Event, error) { + id := p.gen.NextID() + + if _, err := event.PutValue(p.config.TargetField, id); err != nil { + return nil, makeErrComputeID(err) + } + + return event, nil +} + +func (p *addID) String() string { + return fmt.Sprintf("%v=[target_field=[%v]]", processorName, p.config.TargetField) +} diff --git a/libbeat/processors/add_id/add_id_test.go b/libbeat/processors/add_id/add_id_test.go new file mode 100644 index 000000000000..a7bd9baa8c5a --- /dev/null +++ b/libbeat/processors/add_id/add_id_test.go @@ -0,0 +1,65 @@ +// Licensed to Elasticsearch B.V. under one or more contributor +// license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright +// ownership. Elasticsearch B.V. licenses this file to you under +// the Apache License, Version 2.0 (the "License"); you may +// not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package add_id + +import ( + "testing" + + "github.com/elastic/beats/libbeat/common" + + "github.com/elastic/beats/libbeat/beat" + + "github.com/stretchr/testify/assert" +) + +func TestDefaultTargetField(t *testing.T) { + p, err := New(common.MustNewConfigFrom(nil)) + assert.NoError(t, err) + + testEvent := &beat.Event{} + + newEvent, err := p.Run(testEvent) + assert.NoError(t, err) + + v, err := newEvent.GetValue("@metadata.id") + assert.NoError(t, err) + assert.NotEmpty(t, v) +} + +func TestNonDefaultTargetField(t *testing.T) { + cfg := common.MustNewConfigFrom(common.MapStr{ + "target_field": "foo", + }) + p, err := New(cfg) + assert.NoError(t, err) + + testEvent := &beat.Event{ + Fields: common.MapStr{}, + } + + newEvent, err := p.Run(testEvent) + assert.NoError(t, err) + + v, err := newEvent.GetValue("foo") + assert.NoError(t, err) + assert.NotEmpty(t, v) + + v, err = newEvent.GetValue("@metadata.id") + assert.NoError(t, err) + assert.Empty(t, v) +} diff --git a/libbeat/processors/add_id/config.go b/libbeat/processors/add_id/config.go new file mode 100644 index 000000000000..40b4d305de64 --- /dev/null +++ b/libbeat/processors/add_id/config.go @@ -0,0 +1,44 @@ +// Licensed to Elasticsearch B.V. under one or more contributor +// license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright +// ownership. Elasticsearch B.V. licenses this file to you under +// the Apache License, Version 2.0 (the "License"); you may +// not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package add_id + +import ( + "github.com/elastic/beats/libbeat/processors/add_id/generator" +) + +// configuration for Add ID processor. +type config struct { + TargetField string `config:"target_field"` // Target field for the ID + Type string `config:"type"` // Type of ID +} + +func defaultConfig() config { + return config{ + TargetField: "@metadata.id", + Type: "elasticsearch", + } +} + +func (c *config) Validate() error { + // Validate type of ID generator + if !generator.Exists(c.Type) { + return makeErrUnknownType(c.Type) + } + + return nil +} diff --git a/libbeat/processors/add_id/docs/add_id.asciidoc b/libbeat/processors/add_id/docs/add_id.asciidoc new file mode 100644 index 000000000000..64d475669a0b --- /dev/null +++ b/libbeat/processors/add_id/docs/add_id.asciidoc @@ -0,0 +1,18 @@ +[[add-id]] +=== Generate an ID for an event + +The `add_id` processor generates a unique ID for an event. + +[source,yaml] +----------------------------------------------------- +processors: + - add_id: ~ +----------------------------------------------------- + +The following settings are supported: + +`target_field`:: (Optional) Field where the generated ID will be stored. Default is `@metadata.id`. + +`type`:: (Optional) Type of ID to generate. Currently only `elasticsearch` is supported and is the default. +The `elasticsearch` type generates IDs using the same algorithm that Elasticsearch uses for auto-generating +document IDs. diff --git a/libbeat/processors/add_id/errors.go b/libbeat/processors/add_id/errors.go new file mode 100644 index 000000000000..59fc45494bd6 --- /dev/null +++ b/libbeat/processors/add_id/errors.go @@ -0,0 +1,55 @@ +// Licensed to Elasticsearch B.V. under one or more contributor +// license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright +// ownership. Elasticsearch B.V. licenses this file to you under +// the Apache License, Version 2.0 (the "License"); you may +// not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package add_id + +import ( + "fmt" +) + +type ( + errConfigUnpack struct{ cause error } + errComputeID struct{ cause error } + errUnknownType struct{ typ string } +) + +func makeErrConfigUnpack(cause error) errConfigUnpack { + return errConfigUnpack{cause} +} +func (e errConfigUnpack) Error() string { + return fmt.Sprintf("failed to unpack %v processor configuration: %v", processorName, e.cause) +} +func (e errConfigUnpack) Unwrap() error { + return e.cause +} + +func makeErrComputeID(cause error) errComputeID { + return errComputeID{cause} +} +func (e errComputeID) Error() string { + return fmt.Sprintf("failed to compute ID: %v", e.cause) +} +func (e errComputeID) Unwrap() error { + return e.cause +} + +func makeErrUnknownType(typ string) errUnknownType { + return errUnknownType{typ} +} +func (e errUnknownType) Error() string { + return fmt.Sprintf("invalid type [%s]", e.typ) +} diff --git a/libbeat/processors/add_id/generator/errors.go b/libbeat/processors/add_id/generator/errors.go new file mode 100644 index 000000000000..dda17c32e617 --- /dev/null +++ b/libbeat/processors/add_id/generator/errors.go @@ -0,0 +1,33 @@ +// Licensed to Elasticsearch B.V. under one or more contributor +// license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright +// ownership. Elasticsearch B.V. licenses this file to you under +// the Apache License, Version 2.0 (the "License"); you may +// not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package generator + +import ( + "fmt" +) + +type ( + errUnknownType struct{ typ string } +) + +func makeErrUnknownType(typ string) errUnknownType { + return errUnknownType{typ} +} +func (e errUnknownType) Error() string { + return fmt.Sprintf("invalid type [%s]", e.typ) +} diff --git a/libbeat/processors/add_id/generator/es_generator.go b/libbeat/processors/add_id/generator/es_generator.go new file mode 100644 index 000000000000..249b4af412a9 --- /dev/null +++ b/libbeat/processors/add_id/generator/es_generator.go @@ -0,0 +1,135 @@ +// Licensed to Elasticsearch B.V. under one or more contributor +// license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright +// ownership. Elasticsearch B.V. licenses this file to you under +// the Apache License, Version 2.0 (the "License"); you may +// not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package generator + +import ( + "encoding/base64" + "math/rand" + "sync" + "time" +) + +type esTimeBasedUUIDGenerator struct{} + +// Singleton instance +var _esTimeBasedUUIDGenerator IDGenerator = (*esTimeBasedUUIDGenerator)(nil) + +// ESTimeBasedUUIDGenerator returns the singleton instance for this generator +func ESTimeBasedUUIDGenerator() IDGenerator { + return _esTimeBasedUUIDGenerator +} + +var ( + sequenceNumber uint64 + lastTimestamp uint64 + once sync.Once + mac []byte + mu sync.Mutex +) + +// NextID returns a base64-encoded, randomly-generated, but roughly ordered (over time), unique +// ID. The algorithm used to generate the ID is the same as used by Elasticsearch. +// See https://github.com/elastic/elasticsearch/blob/a666fb2266/server/src/main/java/org/elasticsearch/common/TimeBasedUUIDGenerator.java +func (*esTimeBasedUUIDGenerator) NextID() string { + // Initialize sequence number and mac address. We do this here instead of doing it in a package-level + // init function to give the runtime time to generate enough entropy for randomization. + initOnce() + + ts, seq := nextIDData() + var uuidBytes [15]byte + + packID(uuidBytes[:], ts, seq) + return base64.RawURLEncoding.EncodeToString(uuidBytes[:]) +} + +func initOnce() { + once.Do(func() { + sequenceNumber = rand.Uint64() + m, err := getSecureMungedMACAddress() + if err != nil { + panic(err) + } + mac = m + }) +} + +func nextIDData() (uint64, uint64) { + mu.Lock() + defer mu.Unlock() + + sequenceNumber++ + + // We only use bottom 3 bytes for the sequence number. + s := sequenceNumber & 0xffffff + + lastTimestamp = timestamp(nowMS(), lastTimestamp, s) + return lastTimestamp, s +} + +// timestamp returns a monotonically-increasing timestamp (in ms) to use, +// while accounting for system clock going backwards (e.g. due to a DST change). +func timestamp(clockTS, lastTS uint64, seq uint64) uint64 { + // Don't let timestamp go backwards, at least "on our watch" (while this process is running). We are still vulnerable if we are + // shut down, clock goes backwards, and we restart... for this we randomize the sequenceNumber on init to decrease chance of + // collision. + newTS := lastTS + if clockTS > lastTS { + newTS = clockTS + } + + // Always force the clock to increment whenever sequence number is 0, in case we have a long time-slip backwards. + if seq == 0 { + newTS++ + } + + return newTS +} + +func nowMS() uint64 { + now := time.Now() + return uint64((now.Unix() * 1000) + (int64(now.Nanosecond()) / 1000000)) +} + +func packID(buf []byte, ts uint64, seq uint64) { + //// We have auto-generated ids, which are usually used for append-only workloads. + //// So we try to optimize the order of bytes for indexing speed (by having quite + //// unique bytes close to the beginning of the ids so that sorting is fast) and + //// compression (by making sure we share common prefixes between enough ids). + + // We use the sequence number rather than the timestamp because the distribution of + // the timestamp depends too much on the indexing rate, so it is less reliable. + buf[0] = byte(seq) // copy lowest-order byte from sequence number + buf[1] = byte(seq >> 16) // copy 3rd lowest-order byte from sequence number + + // Now we start focusing on compression and put bytes that should not change too often. + buf[2] = byte(ts >> 16) // 3rd lowest-order byte from timestamp; changes every ~65 secs + buf[3] = byte(ts >> 24) // 4th lowest-order byte from timestamp; changes every ~4.5h + buf[4] = byte(ts >> 32) // 5th lowest-order byte from timestamp; changes every ~50 days + buf[5] = byte(ts >> 40) // 6th lowest-order byte from timestamp; changes every 35 years + + // Copy mac address bytes (6 bytes) + copy(buf[6:6+addrLen], mac) + + // Finally we put the remaining bytes, which will likely not be compressed at all. + buf[12] = byte(ts >> 8) // 2nd lowest-order byte from timestamp + buf[13] = byte(seq >> 8) // 2nd lowest-order byte from sequence number + buf[14] = byte(ts) + + // See also: more detailed explanation of byte choices at + // https://github.com/elastic/elasticsearch/blob/a666fb22664284d8e2114841ebb58ea4e1924691/server/src/main/java/org/elasticsearch/common/TimeBasedUUIDGenerator.java#L80-L95 +} diff --git a/libbeat/processors/add_id/generator/es_generator_test.go b/libbeat/processors/add_id/generator/es_generator_test.go new file mode 100644 index 000000000000..d8801c8ea1db --- /dev/null +++ b/libbeat/processors/add_id/generator/es_generator_test.go @@ -0,0 +1,152 @@ +// Licensed to Elasticsearch B.V. under one or more contributor +// license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright +// ownership. Elasticsearch B.V. licenses this file to you under +// the Apache License, Version 2.0 (the "License"); you may +// not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package generator + +import ( + "encoding/base64" + "testing" + + "github.com/stretchr/testify/assert" +) + +func TestIDLen(t *testing.T) { + g := ESTimeBasedUUIDGenerator() + id := g.NextID() + + // Check that decoded ID is 15 bytes long + decodedBytes, err := base64.RawURLEncoding.DecodeString(id) + assert.NoError(t, err) + assert.Len(t, decodedBytes, 15) +} + +func TestIDDBytes(t *testing.T) { + g := ESTimeBasedUUIDGenerator() + id := g.NextID() + + // Check that bytes 7-12 are secure munged mac address + decodedBytes, err := base64.RawURLEncoding.DecodeString(id) + assert.NoError(t, err) + assert.Equal(t, mac, decodedBytes[6:6+addrLen]) +} + +func TestIDConsecutiveOrdering(t *testing.T) { + g := ESTimeBasedUUIDGenerator() + + prevID := g.NextID() + for i := 0; i < 10000; i++ { + decodedPrevID, err := base64.RawURLEncoding.DecodeString(prevID) + assert.NoError(t, err) + + currID := g.NextID() + decodedCurrID, err := base64.RawURLEncoding.DecodeString(currID) + assert.NoError(t, err) + + // Check if current ID is greater than previous ID (accounting for + // wrap around of first byte). + if decodedCurrID[0] == 0x00 { // first byte wrapped around + // Check that previous ID's first byte was max possible byte value (0xff) + assert.EqualValues(t, decodedPrevID[0], 0xff) + + // Check that rest of current ID (after first byte) is greater than rest of + // previous ID (after first byte) + assert.True(t, isGreaterThan(decodedCurrID[1:], decodedPrevID[1:])) + } else { + // Check that current ID's first byte is exactly 1 more than previous ID's + // first byte + assert.Equal(t, decodedPrevID[0]+1, decodedCurrID[0]) + + // Check that entire current ID is greater than entire previous ID + assert.True(t, isGreaterThan(decodedCurrID, decodedPrevID)) + } + + prevID = currID + } +} + +func TestMonotonicTimestamp(t *testing.T) { + now := nowMS() + tests := map[string]struct { + clockTimestamp uint64 + lastTimestamp uint64 + sequenceNumber uint64 + }{ + "uninitialized_timestamp": { + clockTimestamp: now, + sequenceNumber: 17, + }, + "clock_normal": { + clockTimestamp: now, + lastTimestamp: now - 1, + sequenceNumber: 17, + }, + "clock_normal_seq_wraparound": { + clockTimestamp: now, + lastTimestamp: now - 1, + sequenceNumber: 0, + }, + "clock_went_backwards": { + clockTimestamp: now - 1, + lastTimestamp: now, + sequenceNumber: 17, + }, + "clock_went_backwards_seq_wraparound": { + clockTimestamp: now - 1, + lastTimestamp: now, + sequenceNumber: 0, + }, + } + + for name, test := range tests { + t.Run(name, func(t *testing.T) { + assert.True(t, timestamp(test.clockTimestamp, test.lastTimestamp, test.sequenceNumber) >= test.lastTimestamp) + }) + } +} + +func BenchmarkID(b *testing.B) { + g := ESTimeBasedUUIDGenerator() + for n := 0; n < b.N; n++ { + g.NextID() + } +} + +func isGreaterThan(b1, b2 []byte) bool { + if len(b1) > len(b2) { + return true + } + + if len(b2) < len(b1) { + return false + } + + if len(b1) == 0 { + return false + } + + // Lengths are equal and at least 1, compare values + + if b1[0] < b2[0] { + return false + } + + if b1[0] > b2[0] { + return true + } + + return isGreaterThan(b1[1:], b2[1:]) +} diff --git a/libbeat/processors/add_id/generator/generator.go b/libbeat/processors/add_id/generator/generator.go new file mode 100644 index 000000000000..ca94ecd64815 --- /dev/null +++ b/libbeat/processors/add_id/generator/generator.go @@ -0,0 +1,48 @@ +// Licensed to Elasticsearch B.V. under one or more contributor +// license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright +// ownership. Elasticsearch B.V. licenses this file to you under +// the Apache License, Version 2.0 (the "License"); you may +// not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package generator + +import "strings" + +var generators = map[string]IDGenerator{ + "elasticsearch": ESTimeBasedUUIDGenerator(), +} + +// IDGenerator implementors know how to generate and return a new ID +type IDGenerator interface { + NextID() string +} + +// Factory takes as input the type of ID to generate and returns the +// generator of that ID type. +func Factory(val string) (IDGenerator, error) { + typ := strings.ToLower(val) + g, found := generators[typ] + if !found { + return nil, makeErrUnknownType(val) + } + + return g, nil +} + +// Exists returns whether the given type of ID generator exists. +func Exists(val string) bool { + typ := strings.ToLower(val) + _, found := generators[typ] + return found +} diff --git a/libbeat/processors/add_id/generator/generator_test.go b/libbeat/processors/add_id/generator/generator_test.go new file mode 100644 index 000000000000..f78c4ee6038d --- /dev/null +++ b/libbeat/processors/add_id/generator/generator_test.go @@ -0,0 +1,53 @@ +// Licensed to Elasticsearch B.V. under one or more contributor +// license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright +// ownership. Elasticsearch B.V. licenses this file to you under +// the Apache License, Version 2.0 (the "License"); you may +// not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package generator + +import ( + "testing" + + "github.com/stretchr/testify/assert" +) + +func TestFactory(t *testing.T) { + tests := map[string]struct { + expectedGen IDGenerator + expectedErr error + }{ + "elasticsearch": { + ESTimeBasedUUIDGenerator(), + nil, + }, + "foobar": { + nil, + makeErrUnknownType("foobar"), + }, + } + + for name, test := range tests { + t.Run(name, func(t *testing.T) { + typ := name + gen, err := Factory(typ) + if test.expectedGen != nil { + assert.Equal(t, test.expectedGen, gen) + } + if test.expectedErr != nil { + assert.EqualError(t, err, test.expectedErr.Error()) + } + }) + } +} diff --git a/libbeat/processors/add_id/generator/mac.go b/libbeat/processors/add_id/generator/mac.go new file mode 100644 index 000000000000..54237d7fa480 --- /dev/null +++ b/libbeat/processors/add_id/generator/mac.go @@ -0,0 +1,101 @@ +// Licensed to Elasticsearch B.V. under one or more contributor +// license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright +// ownership. Elasticsearch B.V. licenses this file to you under +// the Apache License, Version 2.0 (the "License"); you may +// not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package generator + +import ( + "crypto/rand" + "net" +) + +// Golang port of https://github.com/elastic/elasticsearch/blob/a666fb2266/server/src/main/java/org/elasticsearch/common/MacAddressProvider.java + +type id []byte + +const addrLen = 6 + +func getSecureMungedMACAddress() ([]byte, error) { + addr, err := getMacAddress() + if err != nil { + return nil, err + } + + if !isValidAddress(addr) { + addr, err = constructDummyMulticastAddress() + if err != nil { + return nil, err + } + } + + munged := make([]byte, addrLen) + _, err = rand.Read(munged) + if err != nil { + return nil, err + } + + for i := 0; i < addrLen; i++ { + munged[i] ^= addr[i] + } + + return munged, nil +} + +func getMacAddress() ([]byte, error) { + interfaces, err := net.Interfaces() + if err != nil { + return nil, err + + } + for _, i := range interfaces { + if i.Flags != net.FlagLoopback { + // Pick the first valid non loopback address we find + addr := i.HardwareAddr + if isValidAddress(addr) { + return addr, nil + } + } + } + + // Could not find a mac address + return nil, nil +} + +func isValidAddress(addr []byte) bool { + if addr == nil || len(addr) != 6 { + return false + } + + for _, b := range addr { + if b != 0x00 { + return true // If any of the bytes are non zero assume a good address + } + } + + return false +} + +func constructDummyMulticastAddress() ([]byte, error) { + dummy := make([]byte, addrLen) + _, err := rand.Read(dummy) + if err != nil { + return nil, err + } + + // Set the broadcast bit to indicate this is not a _real_ mac address + dummy[0] |= byte(0x01) + return dummy, nil +} diff --git a/libbeat/processors/add_id/generator/mac_test.go b/libbeat/processors/add_id/generator/mac_test.go new file mode 100644 index 000000000000..ec24439bab6e --- /dev/null +++ b/libbeat/processors/add_id/generator/mac_test.go @@ -0,0 +1,100 @@ +// Licensed to Elasticsearch B.V. under one or more contributor +// license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright +// ownership. Elasticsearch B.V. licenses this file to you under +// the Apache License, Version 2.0 (the "License"); you may +// not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package generator + +import ( + "net" + "testing" + + "github.com/stretchr/testify/assert" +) + +func TestIsValidAddress(t *testing.T) { + tests := map[string]struct { + addr []byte + expected bool + }{ + "nil": { + nil, + false, + }, + "too_short": { + []byte{0xde, 0xad, 0xbe, 0xef}, + false, + }, + "too_long": { + []byte{0xbe, 0xa7, 0x5a, 0x43, 0xda, 0xbe, 0x57}, + false, + }, + "all_zeros": { + []byte{0x00, 0x00, 0x00, 0x00, 0x00, 0x00}, + false, + }, + "good": { + []byte{0xbe, 0xa7, 0x5a, 0x43, 0x90, 0x0d}, + true, + }, + } + + for name, test := range tests { + t.Run(name, func(t *testing.T) { + v := isValidAddress(test.addr) + assert.Equal(t, test.expected, v) + }) + } +} + +func TestConstructDummyMulticastAddress(t *testing.T) { + addr, err := constructDummyMulticastAddress() + assert.NoError(t, err) + assert.Len(t, addr, addrLen) + + firstOctet := addr[0] + assert.EqualValues(t, 0x01, firstOctet&0x01) +} + +func TestSecureMungedMACAddress(t *testing.T) { + addr, err := getSecureMungedMACAddress() + assert.NoError(t, err) + assert.Len(t, addr, addrLen) +} + +func TestGetMacAddress(t *testing.T) { + addr, err := getMacAddress() + assert.NoError(t, err) + assert.Len(t, addr, addrLen) + + getLoopbackAddrs := func() [][]byte { + var loAddrs [][]byte + + interfaces, err := net.Interfaces() + assert.NoError(t, err) + + for _, i := range interfaces { + if i.Flags == net.FlagLoopback { + loAddrs = append(loAddrs, i.HardwareAddr) + } + } + + return loAddrs + } + + for _, loAddr := range getLoopbackAddrs() { + assert.NotEqual(t, loAddr, addr) + } +}