Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add a reverse dns processor #7927

Merged
merged 5 commits into from
Aug 21, 2018
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,7 @@ https://github.com/elastic/beats/compare/v6.4.0...master[Check the HEAD diff]
- Add support to grow or shrink an existing spool file between restarts. {pull}7859[7859]
- Add debug check to logp.Logger {pull}7965[7965]
- Make kubernetes autodiscover ignore events with empty container IDs {pull}7971[7971]
- Add DNS processor with support for performing reverse lookups on IP addresses. {issue}7770[7770]
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thoughts on forward lookups? Wouldn't have to be in this PR, but those are really useful for log lines where a hostname is specified, but an IP is not.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I can see this being useful for certain use cases where the forward lookup (A or AAAA) has a deterministic response. I could see this being used to resolve hostnames from syslog events against your internal DNS. I don't plan to add this now but the config takes this into account with the lookup type field.

Someone might also want to lookup TXT records, where the query is derived from a field value. For example a lookup like %{[source][ip]}.origin.asn.cymru.com (for ip-to-asn lookup), but this is probably best suited to a centralized enrichment process.


*Auditbeat*

Expand Down
1 change: 1 addition & 0 deletions libbeat/cmd/instance/beat.go
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,7 @@ import (
_ "github.com/elastic/beats/libbeat/processors/add_kubernetes_metadata"
_ "github.com/elastic/beats/libbeat/processors/add_locale"
_ "github.com/elastic/beats/libbeat/processors/dissect"
_ "github.com/elastic/beats/libbeat/processors/dns"

// Register autodiscover providers
_ "github.com/elastic/beats/libbeat/autodiscover/providers/docker"
Expand Down
104 changes: 104 additions & 0 deletions libbeat/docs/processors-using.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -170,6 +170,7 @@ The supported processors are:
* <<add-docker-metadata,`add_docker_metadata`>>
* <<add-host-metadata,`add_host_metadata`>>
* <<dissect, `dissect`>>
* <<processor-dns, `dns`>>

[[conditions]]
==== Conditions
Expand Down Expand Up @@ -930,3 +931,106 @@ NOTE: A key can contain any characters except reserved suffix or prefix modifier
and `?`.

See <<conditions>> for a list of supported conditions.

[[processor-dns]]
=== DNS Reverse Lookup

The DNS processor performs reverse DNS lookups of IP addresses. It caches the
responses that it receives in accordance to the time-to-live (TTL) value
contained in the response. It also caches failures that occur during lookups.
Each instance of this processor maintains its own independent cache.

The processor uses its own DNS resolver to send requests to nameservers and does
not use the operating system's resolver. It does not read any values contained
in `/etc/hosts`.

This processor can significantly slow down your pipeline's throughput if you
have a high latency network or slow upstream nameserver. The cache will help
with performance, but if the addresses being resolved have a high cardinality
then the cache benefits will be diminished due to the high miss ratio.

By way of example, if each DNS lookup takes 2 milliseconds, the maximum
throughput you can achieve is 500 events per second (1000 milliseconds / 2
milliseconds). If you have a high cache hit ratio then your throughput can be
higher.

This is a minimal configuration example that resolves the IP addresses contained
in two fields.

[source,yaml]
----
processors:
- dns:
type: reverse
fields:
source.ip: source.hostname
destination.ip: destination.hostname
----

Next is a configuration example showing all options.

[source,yaml]
----
processors:
- dns:
type: reverse
action: append
fields:
server.ip: server.hostname
client.ip: client.hostname
success_cache:
capacity.initial: 1000
capacity.max: 10000
failure_cache:
capacity.initial: 1000
capacity.max: 10000
ttl: 1m
nameservers: ['192.0.2.1', '203.0.113.1']
timeout: 500ms
tag_on_failure: [_dns_reverse_lookup_failed]
----

The `dns` processor has the following configuration settings:

`type`:: The type of DNS lookup to perform. The only supported type is
`reverse` which queries for a PTR record.

`action`:: This defines the behavior of the processor when the target field
already exists in the event. The options are `append` (default) and `replace`.

`fields`:: This is a mapping of source field names to target field names. The
value of the source field will be used in the DNS query and result will be
written to the target field.

`success_cache.capacity.initial`:: The initial number of items that the success
cache will be allocated to hold. When initialized the processor will allocate
the memory for this number of items. Default value is `1000`.

`success_cache.capacity.max`:: The maximum number of items that the success
cache can hold. When the maximum capacity is reached a random item is evicted.
Default value is `10000`.

`failure_cache.capacity.initial`:: The initial number of items that the failure
cache will be allocated to hold. When initialized the processor will allocate
the memory for this number of items. Default value is `1000`.

`failure_cache.capacity.max`:: The maximum number of items that the failure
cache can hold. When the maximum capacity is reached a random item is evicted.
Default value is `10000`.

`failure_cache.ttl`:: The duration for which failures are cached. Valid time
units are "ns", "us" (or "µs"), "ms", "s", "m", "h". Default value is `1m`.

`nameservers`:: A list of nameservers to query. If there are multiple servers,
the resolver queries them in the order listed. If none are specified then it
will read the nameservers listed in `/etc/resolv.conf` once at initialization.
On Windows you must always supply at least one nameserver.

`timeout`:: The duration after which a DNS query will timeout. This is timeout
for each DNS request so if you have 2 nameservers then the total timeout will be
2 times this value. Valid time units are "ns", "us" (or "µs"), "ms", "s", "m",
"h". Default value is `500ms`.

`tag_on_failure`:: A list of tags to add to the event when any lookup fails. The
tags are only added once even if multiple lookups fail. By default no tags are
added upon failure.
26 changes: 25 additions & 1 deletion libbeat/monitoring/adapter/go-metrics-wrapper.go
Original file line number Diff line number Diff line change
Expand Up @@ -99,7 +99,31 @@ func (w goMetricsFuncGaugeFloat) Visit(_ monitoring.Mode, vs monitoring.Visitor)
func (w goMetricsHistogram) wrapped() interface{} { return w.h }
func (w goMetricsHistogram) Get() int64 { return w.h.Sum() }
func (w goMetricsHistogram) Visit(_ monitoring.Mode, vs monitoring.Visitor) {
vs.OnInt(w.Get())
vs.OnRegistryStart()
defer vs.OnRegistryFinished()

h := w.h.Snapshot()
ps := h.Percentiles([]float64{0.5, 0.75, 0.95, 0.99, 0.999})
vs.OnKey("count")
vs.OnInt(w.h.Count())
vs.OnKey("min")
vs.OnInt(w.h.Min())
vs.OnKey("max")
vs.OnInt(w.h.Max())
vs.OnKey("mean")
vs.OnFloat(w.h.Mean())
vs.OnKey("stddev")
vs.OnFloat(w.h.StdDev())
vs.OnKey("median")
vs.OnFloat(ps[0])
vs.OnKey("p75")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why are we returning these constant values? I assume this was intended to be the derived values at those percentiles from the actual histogram data?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The percentiles are derived from samples stored in the histogram. The p75 is the key name used to hold the 75th percentile which is obtained up on line 106 and stored in ps, the percentiles slice.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah, I misread that code. Thanks for the clarification.

vs.OnFloat(ps[1])
vs.OnKey("p95")
vs.OnFloat(ps[2])
vs.OnKey("p99")
vs.OnFloat(ps[3])
vs.OnKey("p999")
vs.OnFloat(ps[4])
}

func (w goMetricsMeter) wrapped() interface{} { return w.m }
Expand Down
210 changes: 210 additions & 0 deletions libbeat/processors/dns/cache.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,210 @@
// Licensed to Elasticsearch B.V. under one or more contributor
// license agreements. See the NOTICE file distributed with
// this work for additional information regarding copyright
// ownership. Elasticsearch B.V. licenses this file to you under
// the Apache License, Version 2.0 (the "License"); you may
// not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

package dns

import (
"sync"
"time"

"github.com/elastic/beats/libbeat/monitoring"
)

type ptrRecord struct {
host string
expires time.Time
}

func (r ptrRecord) IsExpired(now time.Time) bool {
return now.After(r.expires)
}

type ptrCache struct {
sync.RWMutex
data map[string]ptrRecord
maxSize int
}

func (c *ptrCache) set(now time.Time, key string, ptr *PTR) {
c.Lock()
defer c.Unlock()

if len(c.data) >= c.maxSize {
c.evict()
}

c.data[key] = ptrRecord{
host: ptr.Host,
expires: now.Add(time.Duration(ptr.TTL) * time.Second),
}
}

// evict removes a single random key from the cache.
func (c *ptrCache) evict() {
var key string
for k := range c.data {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This could use a comment, maybe it's because I'm a go noob, I'd appreciate a comment on this function:

// Evict removes a random key from the cache

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also, not something that belongs in an initial impl, but it might be nice to add a real LRU at some point.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I agree that a better cache implementation would be nice. I tested a few different cache libs and none of them seemed all that great or efficient. So opted to keep is simple (this was also the approach I found in the coredns cache).

In addition to improving on the algorithm, optimizing for the Go garbage collector would be nice too. When you get above 100k objects in a map the GC times start to grow. I looked at the bigcache which addresses this issue, and it worked very well, but it doesn't implement LRU.

But let's see how this gets used before optimizing. I plan to mark this as beta in the docs for the first release.

key = k
break
}
delete(c.data, key)
}

func (c *ptrCache) get(now time.Time, key string) *PTR {
c.RLock()
defer c.RUnlock()

r, found := c.data[key]
if found && !r.IsExpired(now) {
return &PTR{r.host, uint32(r.expires.Sub(now) / time.Second)}
}
return nil
}

type failureRecord struct {
error
expires time.Time
}

func (r failureRecord) IsExpired(now time.Time) bool {
return now.After(r.expires)
}

type failureCache struct {
sync.RWMutex
data map[string]failureRecord
maxSize int
failureTTL time.Duration
}

func (c *failureCache) set(now time.Time, key string, err error) {
c.Lock()
defer c.Unlock()
if len(c.data) >= c.maxSize {
c.evict()
}

c.data[key] = failureRecord{
error: err,
expires: now.Add(c.failureTTL),
}
}

// evict removes a single random key from the cache.
func (c *failureCache) evict() {
var key string
for k := range c.data {
key = k
break
}
delete(c.data, key)
}

func (c *failureCache) get(now time.Time, key string) error {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

My read here is that the tradeoff is between:

  1. Dual cache impl, one per type
  2. 'Generic' cache impl, with entries of (key,success|failure) (probably using two pointers, only one of which is non-null

where the first option is more memory efficient at the cost of duplicated code, while the second is a little less memory efficient but more DRY approach (though there is some extra complexity in methods since they have to determine the type they're working with).

I'm neutral on the choice. It's a tradeoff, and the difference in terms of both isn't very large since 1. the code is simple. 2. finding the hash code of a short string is fast, but I thought I'd just bring this up because I'm curious if there's general guidance on this issue.

In previous PL's I've used professionally the answer is generics. Not having them forces this choice, so I'd like to hear from more experience go programmers how this is handled.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I chose what I thought was a more memory efficient implementation (1) at the cost of some duplication. This was mainly because there could be many of these items in memory, and secondly because the code was simple.

I don't have any general advice 🤷‍♂️ other than to weigh the pros and cons for the choice like you have done here.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Makes sense, thanks for the feedback.

c.RLock()
defer c.RUnlock()

r, found := c.data[key]
if found && !r.IsExpired(now) {
return r.error
}
return nil
}

type cachedError struct {
err error
}

func (ce *cachedError) Error() string { return ce.err.Error() + " (from failure cache)" }
func (ce *cachedError) Cause() error { return ce.err }

// PTRLookupCache is a cache for storing and retrieving the results of
// reverse DNS queries. It caches the results of queries regardless of their
// outcome (success or failure).
type PTRLookupCache struct {
success *ptrCache
failure *failureCache
failureTTL time.Duration
resolver PTRResolver
stats cacheStats
}

type cacheStats struct {
Hit *monitoring.Int
Miss *monitoring.Int
}

// NewPTRLookupCache returns a new cache.
func NewPTRLookupCache(reg *monitoring.Registry, conf CacheConfig, resolver PTRResolver) (*PTRLookupCache, error) {
if err := conf.Validate(); err != nil {
return nil, err
}

c := &PTRLookupCache{
success: &ptrCache{
data: make(map[string]ptrRecord, conf.SuccessCache.InitialCapacity),
maxSize: conf.SuccessCache.MaxCapacity,
},
failure: &failureCache{
data: make(map[string]failureRecord, conf.FailureCache.InitialCapacity),
maxSize: conf.FailureCache.MaxCapacity,
failureTTL: conf.FailureCache.TTL,
},
resolver: resolver,
stats: cacheStats{
Hit: monitoring.NewInt(reg, "hits"),
Miss: monitoring.NewInt(reg, "misses"),
},
}

return c, nil
}

// LookupPTR performs a reverse lookup on the given IP address. A cached result
// will be returned if it is contained in the cache, otherwise a lookup is
// performed.
func (c PTRLookupCache) LookupPTR(ip string) (*PTR, error) {
now := time.Now()

ptr := c.success.get(now, ip)
if ptr != nil {
c.stats.Hit.Inc()
return ptr, nil
}

err := c.failure.get(now, ip)
if err != nil {
c.stats.Hit.Inc()
return nil, err
}
c.stats.Miss.Inc()

ptr, err = c.resolver.LookupPTR(ip)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Might be nice to have a stat for actual DNS queries sent to help users determine if their cache size is appropriate. They could check if increasing the cache size diminishes the number of queries sent.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Wouldn't the number of cache misses be the metric to look when doing this testing? You'd see if the miss number decreased, right?

Additionally there are metrics reported by the resolver that could indicate the total number of lookups if you summed success and failures across each DNS server.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, cache misses is an even better statistic.

I think I missed that we report those stats already, but it does make sense that cache misses would be a good stat.

if err != nil {
c.failure.set(now, ip, &cachedError{err})
return nil, err
}

c.success.set(now, ip, ptr)
return ptr, nil
}

func max(a, b int) int {
if a >= b {
return a
}
return b
}
Loading