diff --git a/CHANGELOG.asciidoc b/CHANGELOG.asciidoc index b8f114400253..c37d54229d78 100644 --- a/CHANGELOG.asciidoc +++ b/CHANGELOG.asciidoc @@ -81,6 +81,7 @@ https://github.com/elastic/beats/compare/v6.4.0...master[Check the HEAD diff] - Add support to grow or shrink an existing spool file between restarts. {pull}7859[7859] - Add debug check to logp.Logger {pull}7965[7965] - Make kubernetes autodiscover ignore events with empty container IDs {pull}7971[7971] +- Add DNS processor with support for performing reverse lookups on IP addresses. {issue}7770[7770] *Auditbeat* diff --git a/libbeat/cmd/instance/beat.go b/libbeat/cmd/instance/beat.go index 24d13281ed79..05f30940dcf3 100644 --- a/libbeat/cmd/instance/beat.go +++ b/libbeat/cmd/instance/beat.go @@ -73,6 +73,7 @@ import ( _ "github.com/elastic/beats/libbeat/processors/add_kubernetes_metadata" _ "github.com/elastic/beats/libbeat/processors/add_locale" _ "github.com/elastic/beats/libbeat/processors/dissect" + _ "github.com/elastic/beats/libbeat/processors/dns" // Register autodiscover providers _ "github.com/elastic/beats/libbeat/autodiscover/providers/docker" diff --git a/libbeat/docs/processors-using.asciidoc b/libbeat/docs/processors-using.asciidoc index d54f0dc6a35e..ecb09550319e 100644 --- a/libbeat/docs/processors-using.asciidoc +++ b/libbeat/docs/processors-using.asciidoc @@ -170,6 +170,7 @@ The supported processors are: * <> * <> * <> + * <> [[conditions]] ==== Conditions @@ -930,3 +931,106 @@ NOTE: A key can contain any characters except reserved suffix or prefix modifier and `?`. See <> for a list of supported conditions. + +[[processor-dns]] +=== DNS Reverse Lookup + +The DNS processor performs reverse DNS lookups of IP addresses. It caches the +responses that it receives in accordance to the time-to-live (TTL) value +contained in the response. It also caches failures that occur during lookups. +Each instance of this processor maintains its own independent cache. + +The processor uses its own DNS resolver to send requests to nameservers and does +not use the operating system's resolver. It does not read any values contained +in `/etc/hosts`. + +This processor can significantly slow down your pipeline's throughput if you +have a high latency network or slow upstream nameserver. The cache will help +with performance, but if the addresses being resolved have a high cardinality +then the cache benefits will be diminished due to the high miss ratio. + +By way of example, if each DNS lookup takes 2 milliseconds, the maximum +throughput you can achieve is 500 events per second (1000 milliseconds / 2 +milliseconds). If you have a high cache hit ratio then your throughput can be +higher. + +This is a minimal configuration example that resolves the IP addresses contained +in two fields. + +[source,yaml] +---- +processors: +- dns: + type: reverse + fields: + source.ip: source.hostname + destination.ip: destination.hostname +---- + +Next is a configuration example showing all options. + +[source,yaml] +---- +processors: +- dns: + type: reverse + action: append + fields: + server.ip: server.hostname + client.ip: client.hostname + success_cache: + capacity.initial: 1000 + capacity.max: 10000 + failure_cache: + capacity.initial: 1000 + capacity.max: 10000 + ttl: 1m + nameservers: ['192.0.2.1', '203.0.113.1'] + timeout: 500ms + tag_on_failure: [_dns_reverse_lookup_failed] +---- + +The `dns` processor has the following configuration settings: + +`type`:: The type of DNS lookup to perform. The only supported type is +`reverse` which queries for a PTR record. + +`action`:: This defines the behavior of the processor when the target field +already exists in the event. The options are `append` (default) and `replace`. + +`fields`:: This is a mapping of source field names to target field names. The +value of the source field will be used in the DNS query and result will be +written to the target field. + +`success_cache.capacity.initial`:: The initial number of items that the success +cache will be allocated to hold. When initialized the processor will allocate +the memory for this number of items. Default value is `1000`. + +`success_cache.capacity.max`:: The maximum number of items that the success +cache can hold. When the maximum capacity is reached a random item is evicted. +Default value is `10000`. + +`failure_cache.capacity.initial`:: The initial number of items that the failure +cache will be allocated to hold. When initialized the processor will allocate +the memory for this number of items. Default value is `1000`. + +`failure_cache.capacity.max`:: The maximum number of items that the failure +cache can hold. When the maximum capacity is reached a random item is evicted. +Default value is `10000`. + +`failure_cache.ttl`:: The duration for which failures are cached. Valid time +units are "ns", "us" (or "µs"), "ms", "s", "m", "h". Default value is `1m`. + +`nameservers`:: A list of nameservers to query. If there are multiple servers, +the resolver queries them in the order listed. If none are specified then it +will read the nameservers listed in `/etc/resolv.conf` once at initialization. +On Windows you must always supply at least one nameserver. + +`timeout`:: The duration after which a DNS query will timeout. This is timeout +for each DNS request so if you have 2 nameservers then the total timeout will be +2 times this value. Valid time units are "ns", "us" (or "µs"), "ms", "s", "m", +"h". Default value is `500ms`. + +`tag_on_failure`:: A list of tags to add to the event when any lookup fails. The +tags are only added once even if multiple lookups fail. By default no tags are +added upon failure. diff --git a/libbeat/monitoring/adapter/go-metrics-wrapper.go b/libbeat/monitoring/adapter/go-metrics-wrapper.go index 0f974efd0ee0..815bc1284870 100644 --- a/libbeat/monitoring/adapter/go-metrics-wrapper.go +++ b/libbeat/monitoring/adapter/go-metrics-wrapper.go @@ -99,7 +99,31 @@ func (w goMetricsFuncGaugeFloat) Visit(_ monitoring.Mode, vs monitoring.Visitor) func (w goMetricsHistogram) wrapped() interface{} { return w.h } func (w goMetricsHistogram) Get() int64 { return w.h.Sum() } func (w goMetricsHistogram) Visit(_ monitoring.Mode, vs monitoring.Visitor) { - vs.OnInt(w.Get()) + vs.OnRegistryStart() + defer vs.OnRegistryFinished() + + h := w.h.Snapshot() + ps := h.Percentiles([]float64{0.5, 0.75, 0.95, 0.99, 0.999}) + vs.OnKey("count") + vs.OnInt(w.h.Count()) + vs.OnKey("min") + vs.OnInt(w.h.Min()) + vs.OnKey("max") + vs.OnInt(w.h.Max()) + vs.OnKey("mean") + vs.OnFloat(w.h.Mean()) + vs.OnKey("stddev") + vs.OnFloat(w.h.StdDev()) + vs.OnKey("median") + vs.OnFloat(ps[0]) + vs.OnKey("p75") + vs.OnFloat(ps[1]) + vs.OnKey("p95") + vs.OnFloat(ps[2]) + vs.OnKey("p99") + vs.OnFloat(ps[3]) + vs.OnKey("p999") + vs.OnFloat(ps[4]) } func (w goMetricsMeter) wrapped() interface{} { return w.m } diff --git a/libbeat/processors/dns/cache.go b/libbeat/processors/dns/cache.go new file mode 100644 index 000000000000..57fa99a17b69 --- /dev/null +++ b/libbeat/processors/dns/cache.go @@ -0,0 +1,210 @@ +// Licensed to Elasticsearch B.V. under one or more contributor +// license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright +// ownership. Elasticsearch B.V. licenses this file to you under +// the Apache License, Version 2.0 (the "License"); you may +// not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package dns + +import ( + "sync" + "time" + + "github.com/elastic/beats/libbeat/monitoring" +) + +type ptrRecord struct { + host string + expires time.Time +} + +func (r ptrRecord) IsExpired(now time.Time) bool { + return now.After(r.expires) +} + +type ptrCache struct { + sync.RWMutex + data map[string]ptrRecord + maxSize int +} + +func (c *ptrCache) set(now time.Time, key string, ptr *PTR) { + c.Lock() + defer c.Unlock() + + if len(c.data) >= c.maxSize { + c.evict() + } + + c.data[key] = ptrRecord{ + host: ptr.Host, + expires: now.Add(time.Duration(ptr.TTL) * time.Second), + } +} + +// evict removes a single random key from the cache. +func (c *ptrCache) evict() { + var key string + for k := range c.data { + key = k + break + } + delete(c.data, key) +} + +func (c *ptrCache) get(now time.Time, key string) *PTR { + c.RLock() + defer c.RUnlock() + + r, found := c.data[key] + if found && !r.IsExpired(now) { + return &PTR{r.host, uint32(r.expires.Sub(now) / time.Second)} + } + return nil +} + +type failureRecord struct { + error + expires time.Time +} + +func (r failureRecord) IsExpired(now time.Time) bool { + return now.After(r.expires) +} + +type failureCache struct { + sync.RWMutex + data map[string]failureRecord + maxSize int + failureTTL time.Duration +} + +func (c *failureCache) set(now time.Time, key string, err error) { + c.Lock() + defer c.Unlock() + if len(c.data) >= c.maxSize { + c.evict() + } + + c.data[key] = failureRecord{ + error: err, + expires: now.Add(c.failureTTL), + } +} + +// evict removes a single random key from the cache. +func (c *failureCache) evict() { + var key string + for k := range c.data { + key = k + break + } + delete(c.data, key) +} + +func (c *failureCache) get(now time.Time, key string) error { + c.RLock() + defer c.RUnlock() + + r, found := c.data[key] + if found && !r.IsExpired(now) { + return r.error + } + return nil +} + +type cachedError struct { + err error +} + +func (ce *cachedError) Error() string { return ce.err.Error() + " (from failure cache)" } +func (ce *cachedError) Cause() error { return ce.err } + +// PTRLookupCache is a cache for storing and retrieving the results of +// reverse DNS queries. It caches the results of queries regardless of their +// outcome (success or failure). +type PTRLookupCache struct { + success *ptrCache + failure *failureCache + failureTTL time.Duration + resolver PTRResolver + stats cacheStats +} + +type cacheStats struct { + Hit *monitoring.Int + Miss *monitoring.Int +} + +// NewPTRLookupCache returns a new cache. +func NewPTRLookupCache(reg *monitoring.Registry, conf CacheConfig, resolver PTRResolver) (*PTRLookupCache, error) { + if err := conf.Validate(); err != nil { + return nil, err + } + + c := &PTRLookupCache{ + success: &ptrCache{ + data: make(map[string]ptrRecord, conf.SuccessCache.InitialCapacity), + maxSize: conf.SuccessCache.MaxCapacity, + }, + failure: &failureCache{ + data: make(map[string]failureRecord, conf.FailureCache.InitialCapacity), + maxSize: conf.FailureCache.MaxCapacity, + failureTTL: conf.FailureCache.TTL, + }, + resolver: resolver, + stats: cacheStats{ + Hit: monitoring.NewInt(reg, "hits"), + Miss: monitoring.NewInt(reg, "misses"), + }, + } + + return c, nil +} + +// LookupPTR performs a reverse lookup on the given IP address. A cached result +// will be returned if it is contained in the cache, otherwise a lookup is +// performed. +func (c PTRLookupCache) LookupPTR(ip string) (*PTR, error) { + now := time.Now() + + ptr := c.success.get(now, ip) + if ptr != nil { + c.stats.Hit.Inc() + return ptr, nil + } + + err := c.failure.get(now, ip) + if err != nil { + c.stats.Hit.Inc() + return nil, err + } + c.stats.Miss.Inc() + + ptr, err = c.resolver.LookupPTR(ip) + if err != nil { + c.failure.set(now, ip, &cachedError{err}) + return nil, err + } + + c.success.set(now, ip, ptr) + return ptr, nil +} + +func max(a, b int) int { + if a >= b { + return a + } + return b +} diff --git a/libbeat/processors/dns/cache_test.go b/libbeat/processors/dns/cache_test.go new file mode 100644 index 000000000000..b2ad5af75a54 --- /dev/null +++ b/libbeat/processors/dns/cache_test.go @@ -0,0 +1,101 @@ +// Licensed to Elasticsearch B.V. under one or more contributor +// license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright +// ownership. Elasticsearch B.V. licenses this file to you under +// the Apache License, Version 2.0 (the "License"); you may +// not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package dns + +import ( + "io" + "strings" + "testing" + + "github.com/stretchr/testify/assert" + + "github.com/elastic/beats/libbeat/monitoring" +) + +type stubResolver struct{} + +func (r *stubResolver) LookupPTR(ip string) (*PTR, error) { + if ip == gatewayIP { + return &PTR{Host: gatewayName, TTL: gatewayTTL}, nil + } else if strings.HasSuffix(ip, "11") { + return nil, io.ErrUnexpectedEOF + } + + return nil, &dnsError{"fake lookup returned NXDOMAIN"} +} + +func TestCache(t *testing.T) { + c, err := NewPTRLookupCache( + monitoring.NewRegistry(), + defaultConfig.CacheConfig, + &stubResolver{}) + if err != nil { + t.Fatal(err) + } + + // Initial success query. + ptr, err := c.LookupPTR(gatewayIP) + if assert.NoError(t, err) { + assert.EqualValues(t, gatewayName, ptr.Host) + assert.EqualValues(t, gatewayTTL, ptr.TTL) + assert.EqualValues(t, 0, c.stats.Hit.Get()) + assert.EqualValues(t, 1, c.stats.Miss.Get()) + } + + // Cached success query. + ptr, err = c.LookupPTR(gatewayIP) + if assert.NoError(t, err) { + assert.EqualValues(t, gatewayName, ptr.Host) + // TTL counts down while in cache. + assert.InDelta(t, gatewayTTL, ptr.TTL, 1) + assert.EqualValues(t, 1, c.stats.Hit.Get()) + assert.EqualValues(t, 1, c.stats.Miss.Get()) + } + + // Initial failure query (like a dns error response code). + ptr, err = c.LookupPTR(gatewayIP + "0") + if assert.Error(t, err) { + assert.Nil(t, ptr) + assert.EqualValues(t, 1, c.stats.Hit.Get()) + assert.EqualValues(t, 2, c.stats.Miss.Get()) + } + + // Cached failure query. + ptr, err = c.LookupPTR(gatewayIP + "0") + if assert.Error(t, err) { + assert.Nil(t, ptr) + assert.EqualValues(t, 2, c.stats.Hit.Get()) + assert.EqualValues(t, 2, c.stats.Miss.Get()) + } + + // Initial network failure (like I/O timeout). + ptr, err = c.LookupPTR(gatewayIP + "1") + if assert.Error(t, err) { + assert.Nil(t, ptr) + assert.EqualValues(t, 2, c.stats.Hit.Get()) + assert.EqualValues(t, 3, c.stats.Miss.Get()) + } + + // Check for a cache hit for the network failure. + ptr, err = c.LookupPTR(gatewayIP + "1") + if assert.Error(t, err) { + assert.Nil(t, ptr) + assert.EqualValues(t, 3, c.stats.Hit.Get()) + assert.EqualValues(t, 3, c.stats.Miss.Get()) // Cache miss. + } +} diff --git a/libbeat/processors/dns/config.go b/libbeat/processors/dns/config.go new file mode 100644 index 000000000000..4345d73db0a5 --- /dev/null +++ b/libbeat/processors/dns/config.go @@ -0,0 +1,159 @@ +// Licensed to Elasticsearch B.V. under one or more contributor +// license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright +// ownership. Elasticsearch B.V. licenses this file to you under +// the Apache License, Version 2.0 (the "License"); you may +// not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package dns + +import ( + "strconv" + "strings" + "time" + + "github.com/pkg/errors" + + "github.com/elastic/beats/libbeat/common" +) + +// Config defines the configuration options for the DNS processor. +type Config struct { + CacheConfig + Nameservers []string `config:"nameservers"` // Required on Windows. /etc/resolv.conf is used if none are given. + Timeout time.Duration `conifg:"timeout"` // Per request timeout (with 2 nameservers the total timeout would be 2x). + Type string `config:"type" validate:"required"` // Reverse is the only supported type currently. + Action FieldAction `config:"action"` // Append or replace (defaults to append) when target exists. + TagOnFailure []string `config:"tag_on_failure"` // Tags to append when a failure occurs. + Fields common.MapStr `config:"fields"` // Mapping of source fields to target fields. + reverseFlat map[string]string +} + +// FieldAction defines the behavior when the target field exists. +type FieldAction uint8 + +// List of FieldAction types. +const ( + ActionAppend FieldAction = iota + ActionReplace +) + +var fieldActionNames = map[FieldAction]string{ + ActionAppend: "append", + ActionReplace: "replace", +} + +// String returns a field action name. +func (fa FieldAction) String() string { + name, found := fieldActionNames[fa] + if found { + return name + } + return "unknown (" + strconv.Itoa(int(fa)) + ")" +} + +// Unpack unpacks a string to a FieldAction. +func (fa *FieldAction) Unpack(v string) error { + switch strings.ToLower(v) { + case "", "append": + *fa = ActionAppend + case "replace": + *fa = ActionReplace + default: + return errors.Errorf("invalid dns field action value '%v'", v) + } + return nil +} + +// CacheConfig defines the success and failure caching parameters. +type CacheConfig struct { + SuccessCache CacheSettings `config:"success_cache"` + FailureCache CacheSettings `config:"failure_cache"` +} + +// CacheSettings define the caching behavior for an individual cache. +type CacheSettings struct { + // TTL value for items in cache. Not used for success because we use TTL + // from the DNS record. + TTL time.Duration `config:"ttl"` + + // Initial capacity. How much space is allocated at initialization. + InitialCapacity int `config:"capacity.initial" validate:"min=0"` + + // Max capacity of the cache. When capacity is reached a random item is + // evicted from the cache. + MaxCapacity int `config:"capacity.max" validate:"min=1"` +} + +// Validate validates the data contained in the config. +func (c *Config) Validate() error { + // Validate lookup type. + c.Type = strings.ToLower(c.Type) + switch c.Type { + case "reverse": + default: + return errors.Errorf("invalid dns lookup type '%v' specified in "+ + "config (valid values are: reverse)", c.Type) + } + + // Flatten the mapping of source fields to target fields. + c.reverseFlat = map[string]string{} + for k, v := range c.Fields.Flatten() { + target, ok := v.(string) + if !ok { + return errors.Errorf("target field for dns lookup of %v "+ + "must be a string but got %T", k, v) + } + c.reverseFlat[k] = target + } + + return nil +} + +// Validate validates the data contained in the CacheConfig. +func (c *CacheConfig) Validate() error { + if c.FailureCache.TTL <= 0 { + return errors.Errorf("failure_cache.ttl must be > 0") + } + + if c.SuccessCache.MaxCapacity <= 0 { + return errors.Errorf("success_cache.capacity.max must be > 0") + } + if c.FailureCache.MaxCapacity <= 0 { + return errors.Errorf("failure_cache.capacity.max must be > 0") + } + + if c.SuccessCache.MaxCapacity < c.SuccessCache.InitialCapacity { + return errors.Errorf("success_cache.capacity.max must be >= success_cache.capacity.initial") + } + if c.FailureCache.MaxCapacity < c.FailureCache.InitialCapacity { + return errors.Errorf("failure_cache.capacity.max must be >= failure_cache.capacity.initial") + } + + return nil +} + +var defaultConfig = Config{ + CacheConfig: CacheConfig{ + SuccessCache: CacheSettings{ + InitialCapacity: 1000, + MaxCapacity: 10000, + }, + FailureCache: CacheSettings{ + TTL: time.Minute, + InitialCapacity: 1000, + MaxCapacity: 10000, + }, + }, + Timeout: 500 * time.Millisecond, +} diff --git a/libbeat/processors/dns/constants_test.go b/libbeat/processors/dns/constants_test.go new file mode 100644 index 000000000000..2c2fb8ab67ed --- /dev/null +++ b/libbeat/processors/dns/constants_test.go @@ -0,0 +1,25 @@ +// Licensed to Elasticsearch B.V. under one or more contributor +// license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright +// ownership. Elasticsearch B.V. licenses this file to you under +// the Apache License, Version 2.0 (the "License"); you may +// not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package dns + +// Test constants used across tests. +const ( + gatewayIP = "192.168.0.1" + gatewayName = "default.gateway.test" + gatewayTTL = 60 // Seconds +) diff --git a/libbeat/processors/dns/dns.go b/libbeat/processors/dns/dns.go new file mode 100644 index 000000000000..fb6daad56b09 --- /dev/null +++ b/libbeat/processors/dns/dns.go @@ -0,0 +1,136 @@ +// Licensed to Elasticsearch B.V. under one or more contributor +// license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright +// ownership. Elasticsearch B.V. licenses this file to you under +// the Apache License, Version 2.0 (the "License"); you may +// not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package dns + +import ( + "fmt" + "strconv" + "strings" + "sync" + + "github.com/pkg/errors" + + "github.com/elastic/beats/libbeat/beat" + "github.com/elastic/beats/libbeat/common" + "github.com/elastic/beats/libbeat/common/atomic" + "github.com/elastic/beats/libbeat/logp" + "github.com/elastic/beats/libbeat/monitoring" + "github.com/elastic/beats/libbeat/processors" +) + +const logName = "processor.dns" + +// instanceID is used to assign each instance a unique monitoring namespace. +var instanceID = atomic.MakeUint32(0) + +func init() { + processors.RegisterPlugin("dns", newDNSProcessor) +} + +type processor struct { + Config + resolver PTRResolver + log *logp.Logger +} + +func newDNSProcessor(cfg *common.Config) (processors.Processor, error) { + c := defaultConfig + if err := cfg.Unpack(&c); err != nil { + return nil, errors.Wrap(err, "fail to unpack the dns configuration") + } + + // Logging and metrics (each processor instance has a unique ID). + var ( + id = int(instanceID.Inc()) + log = logp.NewLogger(logName).With("instance_id", id) + metrics = monitoring.Default.NewRegistry(logName+"."+strconv.Itoa(id), monitoring.DoNotReport) + ) + + log.Debugf("DNS processor config: %+v", c) + resolver, err := NewMiekgResolver(metrics, c.Timeout, c.Nameservers...) + if err != nil { + return nil, err + } + + cache, err := NewPTRLookupCache(metrics.NewRegistry("cache"), c.CacheConfig, resolver) + if err != nil { + return nil, err + } + + return &processor{Config: c, resolver: cache, log: log}, nil +} + +func (p *processor) Run(event *beat.Event) (*beat.Event, error) { + var tagOnce sync.Once + for field, target := range p.reverseFlat { + if err := p.processField(field, target, p.Action, event); err != nil { + p.log.Debugf("DNS processor failed: %v", err) + tagOnce.Do(func() { common.AddTags(event.Fields, p.TagOnFailure) }) + } + } + return event, nil +} + +func (p *processor) processField(source, target string, action FieldAction, event *beat.Event) error { + v, err := event.GetValue(source) + if err != nil { + return nil + } + + maybeIP, ok := v.(string) + if !ok { + return nil + } + + ptrRecord, err := p.resolver.LookupPTR(maybeIP) + if err != nil { + return fmt.Errorf("reverse lookup of %v value '%v' failed: %v", source, maybeIP, err) + } + + return setFieldValue(action, event, target, ptrRecord.Host) +} + +func setFieldValue(action FieldAction, event *beat.Event, key string, value string) error { + switch action { + case ActionReplace: + _, err := event.PutValue(key, value) + return err + case ActionAppend: + old, err := event.PutValue(key, value) + if err != nil { + return err + } + + if old != nil { + switch v := old.(type) { + case string: + _, err = event.PutValue(key, []string{v, value}) + case []string: + _, err = event.PutValue(key, append(v, value)) + } + } + return err + default: + panic(errors.Errorf("Unexpected dns field action value encountered: %v", action)) + } +} + +func (p processor) String() string { + return fmt.Sprintf("dns=[timeout=%v, nameservers=[%v], action=%v, type=%v, fields=[%+v]", + p.Timeout, strings.Join(p.Nameservers, ","), p.Action, p.Type, p.reverseFlat) +} diff --git a/libbeat/processors/dns/dns_test.go b/libbeat/processors/dns/dns_test.go new file mode 100644 index 000000000000..d4fe14e28dfa --- /dev/null +++ b/libbeat/processors/dns/dns_test.go @@ -0,0 +1,163 @@ +// Licensed to Elasticsearch B.V. under one or more contributor +// license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright +// ownership. Elasticsearch B.V. licenses this file to you under +// the Apache License, Version 2.0 (the "License"); you may +// not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package dns + +import ( + "strconv" + "sync" + "testing" + + "github.com/stretchr/testify/assert" + + "github.com/elastic/beats/libbeat/beat" + "github.com/elastic/beats/libbeat/common" + "github.com/elastic/beats/libbeat/logp" + "github.com/elastic/beats/libbeat/monitoring" +) + +func TestDNSProcessorRun(t *testing.T) { + p := &processor{ + Config: defaultConfig, + resolver: &stubResolver{}, + log: logp.NewLogger(logName), + } + p.Config.reverseFlat = map[string]string{ + "source.ip": "source.domain", + } + t.Log(p.String()) + + t.Run("default", func(t *testing.T) { + event, err := p.Run(&beat.Event{ + Fields: common.MapStr{ + "source.ip": gatewayIP, + }, + }) + if err != nil { + t.Fatal(err) + } + + v, _ := event.GetValue("source.domain") + assert.Equal(t, gatewayName, v) + }) + + const forwardDomain = "www." + gatewayName + t.Run("append", func(t *testing.T) { + p.Config.Action = ActionAppend + + event, err := p.Run(&beat.Event{ + Fields: common.MapStr{ + "source.ip": gatewayIP, + "source.domain": forwardDomain, + }, + }) + if err != nil { + t.Fatal(err) + } + + v, _ := event.GetValue("source.domain") + assert.ElementsMatch(t, + []string{gatewayName, forwardDomain}, + v) + }) + + t.Run("replace", func(t *testing.T) { + p.Config.Action = ActionReplace + + event, err := p.Run(&beat.Event{ + Fields: common.MapStr{ + "source.ip": gatewayIP, + "source.domain": forwardDomain, + }, + }) + if err != nil { + t.Fatal(err) + } + + v, _ := event.GetValue("source.domain") + assert.Equal(t, gatewayName, v) + }) +} + +func TestDNSProcessorTagOnFailure(t *testing.T) { + p := &processor{ + Config: defaultConfig, + resolver: &stubResolver{}, + log: logp.NewLogger(logName), + } + p.Config.TagOnFailure = []string{"_lookup_failed"} + p.Config.reverseFlat = map[string]string{ + "source.ip": "source.domain", + "destination.ip": "destination.domain", + } + t.Log(p.String()) + + event, err := p.Run(&beat.Event{ + Fields: common.MapStr{ + "source.ip": "192.0.2.1", + "destination.ip": "192.0.2.2", + }, + }) + if err != nil { + t.Fatal(err) + } + + v, _ := event.GetValue("tags") + if assert.Len(t, v, 1) { + assert.ElementsMatch(t, v, p.Config.TagOnFailure) + } +} + +func TestDNSProcessorRunInParallel(t *testing.T) { + // This is a simple smoke test to make sure that there are no concurrency + // issues. It is most effective when run with the race detector. + + conf := defaultConfig + reg := monitoring.NewRegistry() + cache, err := NewPTRLookupCache(reg, conf.CacheConfig, &stubResolver{}) + if err != nil { + t.Fatal(err) + } + p := &processor{Config: conf, resolver: cache, log: logp.NewLogger(logName)} + p.Config.reverseFlat = map[string]string{"source.ip": "source.domain"} + + const numGoroutines = 10 + const numEvents = 500 + var wg sync.WaitGroup + + // Start several goroutines. + wg.Add(numGoroutines) + for i := 0; i < numGoroutines; i++ { + go func() { + defer wg.Done() + + // Execute processor. + for i := 0; i < numEvents; i++ { + _, err := p.Run(&beat.Event{ + Fields: common.MapStr{ + "source.ip": "192.168.0." + strconv.Itoa(i%256), + }, + }) + if err != nil { + t.Fatal(err) + } + } + }() + } + + wg.Wait() +} diff --git a/libbeat/processors/dns/doc.go b/libbeat/processors/dns/doc.go new file mode 100644 index 000000000000..8c895b268000 --- /dev/null +++ b/libbeat/processors/dns/doc.go @@ -0,0 +1,36 @@ +// Licensed to Elasticsearch B.V. under one or more contributor +// license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright +// ownership. Elasticsearch B.V. licenses this file to you under +// the Apache License, Version 2.0 (the "License"); you may +// not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +// Package dns implements a processor that can perform DNS lookups by sending +// a DNS request over UDP to a recursive nameserver. Each instance of the +// processor is independent (no shared cache) so it's best to only define one +// instance of the processor. +// +// It caches DNS results in memory and honors the record's TTL. It also caches +// failures for the configured failure TTL. The caches are simple, and they +// evict a random item when the configured maximum size is reached. +// +// This processor can significantly slow down your pipeline's throughput if you +// have a high latency network or slow upstream nameserver. The cache will help +// with performance, but if the addresses being resolved have a high cardinality +// then the cache benefits will be diminished due to the high miss ratio. +// +// By way of example, if each DNS lookup takes 2 milliseconds, the maximum +// throughput you can achieve is 500 events per second (1000 milliseconds / 2 +// milliseconds). If you have a high cache hit ratio then your throughput can be +// higher. +package dns diff --git a/libbeat/processors/dns/resolver.go b/libbeat/processors/dns/resolver.go new file mode 100644 index 000000000000..c4f54c73c87d --- /dev/null +++ b/libbeat/processors/dns/resolver.go @@ -0,0 +1,209 @@ +// Licensed to Elasticsearch B.V. under one or more contributor +// license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright +// ownership. Elasticsearch B.V. licenses this file to you under +// the Apache License, Version 2.0 (the "License"); you may +// not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package dns + +import ( + "net" + "strconv" + "strings" + "sync" + "time" + + "github.com/miekg/dns" + "github.com/pkg/errors" + "github.com/rcrowley/go-metrics" + + "github.com/elastic/beats/libbeat/monitoring" + "github.com/elastic/beats/libbeat/monitoring/adapter" +) + +const etcResolvConf = "/etc/resolv.conf" + +// PTR represents a DNS pointer record (IP to hostname). +type PTR struct { + Host string // Hostname. + TTL uint32 // Time to live in seconds. +} + +// PTRResolver performs PTR record lookups. +type PTRResolver interface { + LookupPTR(ip string) (*PTR, error) +} + +// MiekgResolver is a PTRResolver that is implemented using github.com/miekg/dns +// to send requests to DNS servers. It does not use the Go resolver. +type MiekgResolver struct { + client *dns.Client + servers []string + + registry *monitoring.Registry + nsStatsMutex sync.RWMutex + nsStats map[string]*nameserverStats +} + +type nameserverStats struct { + success *monitoring.Int // Number of responses from server. + failure *monitoring.Int // Number of failures (e.g. I/O timeout) (not NXDOMAIN). + ptrResponse metrics.Sample // Histogram of response times. +} + +// NewMiekgResolver returns a new MiekgResolver. It returns an error if no +// nameserver are given and none can be read from /etc/resolv.conf. +func NewMiekgResolver(reg *monitoring.Registry, timeout time.Duration, servers ...string) (*MiekgResolver, error) { + // Use /etc/resolv.conf if no nameservers are given. (Won't work for Windows). + if len(servers) == 0 { + config, err := dns.ClientConfigFromFile(etcResolvConf) + if err != nil || len(config.Servers) == 0 { + return nil, errors.New("no dns servers configured") + } + servers = config.Servers + } + + // Add port if one was not specified. + for i, s := range servers { + if _, _, err := net.SplitHostPort(s); err != nil { + withPort := s + ":53" + if _, _, retryErr := net.SplitHostPort(withPort); retryErr == nil { + servers[i] = withPort + continue + } + return nil, err + } + } + + if timeout == 0 { + timeout = defaultConfig.Timeout + } + + return &MiekgResolver{ + client: &dns.Client{ + Net: "udp", + Timeout: timeout, + }, + servers: servers, + registry: reg, + nsStats: map[string]*nameserverStats{}, + }, nil +} + +// dnsError represents a failure response from the DNS server (like NXDOMAIN), +// but not a communication failure to the server. The response is cacheable. +type dnsError struct { + err string +} + +func (e *dnsError) Error() string { + if e == nil { + return "dns: " + } + return "dns: " + e.err +} + +// LookupPTR performs a reverse lookup on the given IP address. +func (res *MiekgResolver) LookupPTR(ip string) (*PTR, error) { + if len(res.servers) == 0 { + return nil, errors.New("no dns servers configured") + } + + // Create PTR (reverse) DNS request. + m := new(dns.Msg) + arpa, err := dns.ReverseAddr(ip) + if err != nil { + return nil, err + } + m.SetQuestion(arpa, dns.TypePTR) + m.RecursionDesired = true + + // Try the nameservers until we get a response. + var rtnErr error + for _, server := range res.servers { + stats := res.getOrCreateNameserverStats(server) + + r, rtt, err := res.client.Exchange(m, server) + if err != nil { + // Try next server if any. Otherwise return retErr. + rtnErr = err + stats.failure.Inc() + continue + } + + // We got a response. + stats.success.Inc() + stats.ptrResponse.Update(int64(rtt)) + if r.Rcode != dns.RcodeSuccess { + name, found := dns.RcodeToString[r.Rcode] + if !found { + name = "response code " + strconv.Itoa(r.Rcode) + } + return nil, &dnsError{"nameserver " + server + " returned " + name} + } + + for _, a := range r.Answer { + if ptr, ok := a.(*dns.PTR); ok { + return &PTR{ + Host: strings.TrimSuffix(ptr.Ptr, "."), + TTL: ptr.Hdr.Ttl, + }, nil + } + } + + return nil, &dnsError{"no PTR record was found in the response"} + } + + if rtnErr != nil { + return nil, rtnErr + } + + // This should never get here. + panic("LookupPTR should have returned a response.") +} + +func (res *MiekgResolver) getOrCreateNameserverStats(ns string) *nameserverStats { + // Trim port. + ns = ns[:strings.LastIndex(ns, ":")] + + // Check if stats already exist. + res.nsStatsMutex.RLock() + stats, found := res.nsStats[ns] + if found { + res.nsStatsMutex.RUnlock() + return stats + } + res.nsStatsMutex.RUnlock() + + // Upgrade to a write lock and double-check. + res.nsStatsMutex.Lock() + defer res.nsStatsMutex.Unlock() + stats, found = res.nsStats[ns] + if found { + return stats + } + + // Create stats for the nameserver. + reg := res.registry.NewRegistry(strings.Replace(ns, ".", "_", -1)) + stats = &nameserverStats{ + success: monitoring.NewInt(reg, "success"), + failure: monitoring.NewInt(reg, "failure"), + ptrResponse: metrics.NewUniformSample(1028), + } + adapter.NewGoMetrics(reg, "response", adapter.Accept). + Register("ptr", metrics.NewHistogram(stats.ptrResponse)) + res.nsStats[ns] = stats + + return stats +} diff --git a/libbeat/processors/dns/resolver_test.go b/libbeat/processors/dns/resolver_test.go new file mode 100644 index 000000000000..a8e6e9ad13da --- /dev/null +++ b/libbeat/processors/dns/resolver_test.go @@ -0,0 +1,100 @@ +// Licensed to Elasticsearch B.V. under one or more contributor +// license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright +// ownership. Elasticsearch B.V. licenses this file to you under +// the Apache License, Version 2.0 (the "License"); you may +// not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package dns + +import ( + "net" + "strings" + "testing" + + "github.com/miekg/dns" + "github.com/stretchr/testify/assert" + + "github.com/elastic/beats/libbeat/monitoring" +) + +var _ PTRResolver = (*MiekgResolver)(nil) + +func TestMiekgResolverLookupPTR(t *testing.T) { + stop, addr, err := ServeDNS(FakeDNSHandler) + if err != nil { + t.Fatal(err) + } + defer stop() + + reg := monitoring.NewRegistry() + res, err := NewMiekgResolver(reg.NewRegistry(logName), 0, addr) + if err != nil { + t.Fatal(err) + } + + // Success + ptr, err := res.LookupPTR("8.8.8.8") + if err != nil { + t.Fatal(err) + } + assert.EqualValues(t, "google-public-dns-a.google.com", ptr.Host) + assert.EqualValues(t, 19273, ptr.TTL) + + // NXDOMAIN + _, err = res.LookupPTR("1.1.1.1") + if assert.Error(t, err) { + assert.Contains(t, err.Error(), "NXDOMAIN") + } + + // Validate that our metrics exist. + var metricCount int + reg.Do(monitoring.Full, func(name string, v interface{}) { + if strings.Contains(name, "processor.dns") { + metricCount++ + } + t.Logf("%v: %+v", name, v) + }) + assert.Equal(t, 12, metricCount) +} + +func ServeDNS(h dns.HandlerFunc) (cancel func() error, addr string, err error) { + // Setup listener on ephemeral port. + a, err := net.ResolveUDPAddr("udp4", "localhost:0") + if err != nil { + return nil, "", err + } + l, err := net.ListenUDP("udp4", a) + if err != nil { + return nil, "", err + } + + var s dns.Server + s.PacketConn = l + s.Handler = h + go s.ActivateAndServe() + return s.Shutdown, s.PacketConn.LocalAddr().String(), err +} + +func FakeDNSHandler(w dns.ResponseWriter, msg *dns.Msg) { + m := new(dns.Msg) + m.SetReply(msg) + switch { + case strings.HasPrefix(msg.Question[0].Name, "8.8.8.8"): + m.Answer = make([]dns.RR, 1) + m.Answer[0], _ = dns.NewRR("8.8.8.8.in-addr.arpa. 19273 IN PTR google-public-dns-a.google.com.") + default: + m.SetRcode(msg, dns.RcodeNameError) + } + w.WriteMsg(m) +} diff --git a/vendor/golang.org/x/net/internal/socket/zsys_netbsd_arm.go b/vendor/golang.org/x/net/internal/socket/zsys_netbsd_arm.go index 206ea2d11505..db60491fe37b 100644 --- a/vendor/golang.org/x/net/internal/socket/zsys_netbsd_arm.go +++ b/vendor/golang.org/x/net/internal/socket/zsys_netbsd_arm.go @@ -26,6 +26,11 @@ type msghdr struct { Flags int32 } +type mmsghdr struct { + Hdr msghdr + Len uint32 +} + type cmsghdr struct { Len uint32 Level int32 @@ -52,6 +57,7 @@ type sockaddrInet6 struct { const ( sizeofIovec = 0x8 sizeofMsghdr = 0x1c + sizeofMmsghdr = 0x20 sizeofCmsghdr = 0xc sizeofSockaddrInet = 0x10 diff --git a/vendor/vendor.json b/vendor/vendor.json index daf3ef5cc70c..992b4b1644a3 100644 --- a/vendor/vendor.json +++ b/vendor/vendor.json @@ -1658,8 +1658,8 @@ { "checksumSHA1": "RcrB7tgYS/GMW4QrwVdMOTNqIU8=", "path": "golang.org/x/net/idna", - "revision": "f5dfe339be1d06f81b22525fe34671ee7d2c8904", - "revisionTime": "2018-02-04T03:50:36Z" + "revision": "aaf60122140d3fcf75376d319f0554393160eb50", + "revisionTime": "2018-07-13T17:18:40Z" }, { "checksumSHA1": "5JWn/wMC+EWNDKI/AYE4JifQF54=", @@ -1670,10 +1670,10 @@ "versionExact": "release-branch.go1.9" }, { - "checksumSHA1": "WnI4058Oj6W4YSvyXAnK3qCKqvo=", + "checksumSHA1": "YsXlbexuTtUXHyhSv927ILOkf6A=", "path": "golang.org/x/net/internal/socket", - "revision": "f5dfe339be1d06f81b22525fe34671ee7d2c8904", - "revisionTime": "2018-02-04T03:50:36Z" + "revision": "aaf60122140d3fcf75376d319f0554393160eb50", + "revisionTime": "2018-07-13T17:18:40Z" }, { "checksumSHA1": "zPTKyZ1C55w1fk1W+/qGE15jaek=",