Skip to content

Commit

Permalink
Implement new hashID func
Browse files Browse the repository at this point in the history
  • Loading branch information
felixge committed Nov 2, 2017
1 parent 8b85f0b commit 626f653
Show file tree
Hide file tree
Showing 2 changed files with 102 additions and 12 deletions.
42 changes: 34 additions & 8 deletions plugins/outputs/cratedb/cratedb.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,9 @@ package cratedb

import (
"context"
"crypto/sha512"
"database/sql"
"encoding/binary"
"fmt"
"sort"
"strings"
Expand Down Expand Up @@ -74,16 +76,9 @@ func (c *CrateDB) Write(metrics []telegraf.Metric) error {
func insertSQL(table string, metrics []telegraf.Metric) (string, error) {
rows := make([]string, len(metrics))
for i, m := range metrics {
// Note: We have to convert HashID from uint64 to int64 below because
// CrateDB only supports a signed 64 bit LONG type which would give us
// problems, e.g.:
//
// CREATE TABLE my_long (val LONG);
// INSERT INTO my_long(val) VALUES (14305102049502225714);
// -> ERROR: SQLParseException: For input string: "14305102049502225714"

cols := []interface{}{
int64(m.HashID()),
hashID(m),
m.Time().UTC(),
m.Name(),
m.Tags(),
Expand Down Expand Up @@ -181,6 +176,37 @@ func escapeString(s string, quote string) string {
return quote + strings.Replace(s, quote, quote+quote, -1) + quote
}

// hashID returns a cryptographic hash int64 hash that includes the metric name
// and tags. It's used instead of m.HashID() because it's not considered stable
// and because a cryptogtaphic hash makes more sense for the use case of
// deduplication.
// [1] https://github.com/influxdata/telegraf/pull/3210#discussion_r148411201
func hashID(m telegraf.Metric) int64 {
h := sha512.New()
h.Write([]byte(m.Name()))
tags := m.Tags()
tmp := make([]string, len(tags))
i := 0
for k, v := range tags {
tmp[i] = k + v
i++
}
sort.Strings(tmp)

for _, s := range tmp {
h.Write([]byte(s))
}
sum := h.Sum(nil)

// Note: We have to convert from uint64 to int64 below because CrateDB only
// supports a signed 64 bit LONG type:
//
// CREATE TABLE my_long (val LONG);
// INSERT INTO my_long(val) VALUES (14305102049502225714);
// -> ERROR: SQLParseException: For input string: "14305102049502225714"
return int64(binary.LittleEndian.Uint64(sum))
}

func (c *CrateDB) SampleConfig() string {
return sampleConfig
}
Expand Down
72 changes: 68 additions & 4 deletions plugins/outputs/cratedb/cratedb_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (

"github.com/influxdata/telegraf"
"github.com/influxdata/telegraf/internal"
"github.com/influxdata/telegraf/metric"
"github.com/influxdata/telegraf/testutil"
"github.com/stretchr/testify/require"
)
Expand Down Expand Up @@ -45,21 +46,21 @@ func TestConnectAndWrite(t *testing.T) {
// the rows using their primary keys in order to take advantage of
// read-after-write consistency in CrateDB.
for _, m := range metrics {
hashID, err := escapeValue(int64(m.HashID()))
hashIDVal, err := escapeValue(hashID(m))
require.NoError(t, err)
timestamp, err := escapeValue(m.Time())
require.NoError(t, err)

var id int64
row := db.QueryRow(
"SELECT hash_id FROM " + escapeString(table, `"`) + " " +
"WHERE hash_id = " + hashID + " " +
"WHERE hash_id = " + hashIDVal + " " +
"AND timestamp = " + timestamp,
)
require.NoError(t, row.Scan(&id))
// We could check the whole row, but this is meant to be more of a smoke
// test, so just checking the HashID seems fine.
require.Equal(t, id, int64(m.HashID()))
require.Equal(t, id, hashID(m))
}

require.NoError(t, c.Close())
Expand All @@ -75,7 +76,7 @@ func Test_insertSQL(t *testing.T) {
Want: strings.TrimSpace(`
INSERT INTO my_table ("hash_id", "timestamp", "name", "tags", "fields")
VALUES
(1845393540509842047, '2009-11-10T23:00:00+0000', 'test1', {"tag1" = 'value1'}, {"value" = 1});
(-4023501406646044814, '2009-11-10T23:00:00+0000', 'test1', {"tag1" = 'value1'}, {"value" = 1});
`),
},
}
Expand Down Expand Up @@ -142,6 +143,69 @@ func Test_escapeValue(t *testing.T) {
}
}

func Test_hashID(t *testing.T) {
tests := []struct {
Name string
Tags map[string]string
Fields map[string]interface{}
Want int64
}{
{
Name: "metric1",
Tags: map[string]string{"tag1": "val1", "tag2": "val2"},
Fields: map[string]interface{}{"field1": "val1", "field2": "val2"},
Want: 8973971082006474188,
},

// This metric has a different tag order (in a perhaps non-ideal attempt to
// trigger different pseudo-random map iteration)) and fields (none)
// compared to the previous metric, but should still get the same hash.
{
Name: "metric1",
Tags: map[string]string{"tag2": "val2", "tag1": "val1"},
Fields: map[string]interface{}{"field3": "val3"},
Want: 8973971082006474188,
},

// Different metric name -> different hash
{
Name: "metric2",
Tags: map[string]string{"tag1": "val1", "tag2": "val2"},
Fields: map[string]interface{}{"field1": "val1", "field2": "val2"},
Want: 306487682448261783,
},

// Different tag val -> different hash
{
Name: "metric1",
Tags: map[string]string{"tag1": "new-val", "tag2": "val2"},
Fields: map[string]interface{}{"field1": "val1", "field2": "val2"},
Want: 1938713695181062970,
},

// Different tag key -> different hash
{
Name: "metric1",
Tags: map[string]string{"new-key": "val1", "tag2": "val2"},
Fields: map[string]interface{}{"field1": "val1", "field2": "val2"},
Want: 7678889081527706328,
},
}

for i, test := range tests {
m, err := metric.New(
test.Name,
test.Tags,
test.Fields,
time.Date(2009, time.November, 10, 23, 0, 0, 0, time.UTC),
)
require.NoError(t, err)
if got := hashID(m); got != test.Want {
t.Errorf("test #%d: got=%d want=%d", i, got, test.Want)
}
}
}

func testURL() string {
url := os.Getenv("CRATE_URL")
if url == "" {
Expand Down

0 comments on commit 626f653

Please sign in to comment.