Skip to content

Commit

Permalink
Add database_tag option to influxdb_listener to add database from que…
Browse files Browse the repository at this point in the history
…ry string (influxdata#6257)
  • Loading branch information
morfien101 authored and bitcharmer committed Oct 18, 2019
1 parent a776744 commit 8e71c6c
Show file tree
Hide file tree
Showing 3 changed files with 54 additions and 27 deletions.
8 changes: 8 additions & 0 deletions plugins/inputs/influxdb_listener/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,14 @@ submits data to InfluxDB determines the destination database.
tls_cert = "/etc/telegraf/cert.pem"
tls_key = "/etc/telegraf/key.pem"

## Optional tag name used to store the database name.
## If the write has a database in the query string then it will be kept in this tag name.
## This tag can be used in downstream outputs.
## The default value of nothing means it will be off and the database will not be recorded.
## If you have a tag that is the same as the one specified below, and supply a database,
## the tag will be overwritten with the database supplied.
# database_tag = ""

## Optional username and password to accept for HTTP basic authentication.
## You probably want to make sure you have TLS configured above for this.
# basic_username = "foobar"
Expand Down
40 changes: 28 additions & 12 deletions plugins/inputs/influxdb_listener/http_listener.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,17 +37,18 @@ const (
type TimeFunc func() time.Time

type HTTPListener struct {
ServiceAddress string
ReadTimeout internal.Duration
WriteTimeout internal.Duration
MaxBodySize internal.Size
MaxLineSize internal.Size
Port int

ServiceAddress string `toml:"service_address"`
// Port gets pulled out of ServiceAddress
Port int
tlsint.ServerConfig

BasicUsername string
BasicPassword string
ReadTimeout internal.Duration `toml:"read_timeout"`
WriteTimeout internal.Duration `toml:"write_timeout"`
MaxBodySize internal.Size `toml:"max_body_size"`
MaxLineSize internal.Size `toml:"max_line_size"`
BasicUsername string `toml:"basic_username"`
BasicPassword string `toml:"basic_password"`
DatabaseTag string `toml:"database_tag"`

TimeFunc

Expand Down Expand Up @@ -93,6 +94,13 @@ const sampleConfig = `
## Maximum line size allowed to be sent in bytes.
## 0 means to use the default of 65536 bytes (64 kibibytes)
max_line_size = "64KiB"
## Optional tag name used to store the database.
## If the write has a database in the query string then it will be kept in this tag name.
## This tag can be used in downstream outputs.
## The default value of nothing means it will be off and the database will not be recorded.
# database_tag = ""
## Set one or more allowed client CA certificate file names to
## enable mutually authenticated TLS connections
Expand Down Expand Up @@ -258,6 +266,7 @@ func (h *HTTPListener) serveWrite(res http.ResponseWriter, req *http.Request) {
now := h.TimeFunc()

precision := req.URL.Query().Get("precision")
db := req.URL.Query().Get("db")

// Handle gzip request bodies
body := req.Body
Expand Down Expand Up @@ -315,7 +324,7 @@ func (h *HTTPListener) serveWrite(res http.ResponseWriter, req *http.Request) {

if err == io.ErrUnexpectedEOF {
// finished reading the request body
err = h.parse(buf[:n+bufStart], now, precision)
err = h.parse(buf[:n+bufStart], now, precision, db)
if err != nil {
log.Println("D! "+err.Error(), bufStart+n)
return400 = true
Expand Down Expand Up @@ -346,7 +355,7 @@ func (h *HTTPListener) serveWrite(res http.ResponseWriter, req *http.Request) {
bufStart = 0
continue
}
if err := h.parse(buf[:i+1], now, precision); err != nil {
if err := h.parse(buf[:i+1], now, precision, db); err != nil {
log.Println("D! " + err.Error())
return400 = true
}
Expand All @@ -359,7 +368,7 @@ func (h *HTTPListener) serveWrite(res http.ResponseWriter, req *http.Request) {
}
}

func (h *HTTPListener) parse(b []byte, t time.Time, precision string) error {
func (h *HTTPListener) parse(b []byte, t time.Time, precision, db string) error {
h.mu.Lock()
defer h.mu.Unlock()

Expand All @@ -371,6 +380,13 @@ func (h *HTTPListener) parse(b []byte, t time.Time, precision string) error {
}

for _, m := range metrics {
// Do we need to keep the database name in the query string.
// If a tag has been supplied to put the db in and we actually got a db query,
// then we write it in. This overwrites the database tag if one was sent.
// This makes it behave like the influx endpoint.
if h.DatabaseTag != "" && db != "" {
m.AddTag(h.DatabaseTag, db)
}
h.acc.AddFields(m.Name(), m.Fields(), m.Tags(), m.Time())
}

Expand Down
33 changes: 18 additions & 15 deletions plugins/inputs/influxdb_listener/http_listener_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -146,8 +146,11 @@ func TestWriteHTTPBasicAuth(t *testing.T) {
require.EqualValues(t, http.StatusNoContent, resp.StatusCode)
}

func TestWriteHTTP(t *testing.T) {
func TestWriteHTTPKeepDatabase(t *testing.T) {
testMsgWithDB := "cpu_load_short,host=server01,database=wrongdb value=12.0 1422568543702900257\n"

listener := newTestHTTPListener()
listener.DatabaseTag = "database"

acc := &testutil.Accumulator{}
require.NoError(t, listener.Start(acc))
Expand All @@ -162,7 +165,19 @@ func TestWriteHTTP(t *testing.T) {
acc.Wait(1)
acc.AssertContainsTaggedFields(t, "cpu_load_short",
map[string]interface{}{"value": float64(12)},
map[string]string{"host": "server01"},
map[string]string{"host": "server01", "database": "mydb"},
)

// post single message to listener with a database tag in it already. It should be clobbered.
resp, err = http.Post(createURL(listener, "http", "/write", "db=mydb"), "", bytes.NewBuffer([]byte(testMsgWithDB)))
require.NoError(t, err)
resp.Body.Close()
require.EqualValues(t, 204, resp.StatusCode)

acc.Wait(1)
acc.AssertContainsTaggedFields(t, "cpu_load_short",
map[string]interface{}{"value": float64(12)},
map[string]string{"host": "server01", "database": "mydb"},
)

// post multiple message to listener
Expand All @@ -177,21 +192,9 @@ func TestWriteHTTP(t *testing.T) {
for _, hostTag := range hostTags {
acc.AssertContainsTaggedFields(t, "cpu_load_short",
map[string]interface{}{"value": float64(12)},
map[string]string{"host": hostTag},
map[string]string{"host": hostTag, "database": "mydb"},
)
}

// Post a gigantic metric to the listener and verify that an error is returned:
resp, err = http.Post(createURL(listener, "http", "/write", "db=mydb"), "", bytes.NewBuffer([]byte(hugeMetric)))
require.NoError(t, err)
resp.Body.Close()
require.EqualValues(t, 400, resp.StatusCode)

acc.Wait(3)
acc.AssertContainsTaggedFields(t, "cpu_load_short",
map[string]interface{}{"value": float64(12)},
map[string]string{"host": "server01"},
)
}

// http listener should add a newline at the end of the buffer if it's not there
Expand Down

0 comments on commit 8e71c6c

Please sign in to comment.