diff --git a/plugins/outputs/cratedb/cratedb.go b/plugins/outputs/cratedb/cratedb.go index 186ca01933cce..785da2a44014b 100644 --- a/plugins/outputs/cratedb/cratedb.go +++ b/plugins/outputs/cratedb/cratedb.go @@ -2,7 +2,9 @@ package cratedb import ( "context" + "crypto/sha512" "database/sql" + "encoding/binary" "fmt" "sort" "strings" @@ -74,16 +76,9 @@ func (c *CrateDB) Write(metrics []telegraf.Metric) error { func insertSQL(table string, metrics []telegraf.Metric) (string, error) { rows := make([]string, len(metrics)) for i, m := range metrics { - // Note: We have to convert HashID from uint64 to int64 below because - // CrateDB only supports a signed 64 bit LONG type which would give us - // problems, e.g.: - // - // CREATE TABLE my_long (val LONG); - // INSERT INTO my_long(val) VALUES (14305102049502225714); - // -> ERROR: SQLParseException: For input string: "14305102049502225714" cols := []interface{}{ - int64(m.HashID()), + hashID(m), m.Time().UTC(), m.Name(), m.Tags(), @@ -181,6 +176,37 @@ func escapeString(s string, quote string) string { return quote + strings.Replace(s, quote, quote+quote, -1) + quote } +// hashID returns a cryptographic hash int64 hash that includes the metric name +// and tags. It's used instead of m.HashID() because it's not considered stable +// and because a cryptogtaphic hash makes more sense for the use case of +// deduplication. +// [1] https://github.com/influxdata/telegraf/pull/3210#discussion_r148411201 +func hashID(m telegraf.Metric) int64 { + h := sha512.New() + h.Write([]byte(m.Name())) + tags := m.Tags() + tmp := make([]string, len(tags)) + i := 0 + for k, v := range tags { + tmp[i] = k + v + i++ + } + sort.Strings(tmp) + + for _, s := range tmp { + h.Write([]byte(s)) + } + sum := h.Sum(nil) + + // Note: We have to convert from uint64 to int64 below because CrateDB only + // supports a signed 64 bit LONG type: + // + // CREATE TABLE my_long (val LONG); + // INSERT INTO my_long(val) VALUES (14305102049502225714); + // -> ERROR: SQLParseException: For input string: "14305102049502225714" + return int64(binary.LittleEndian.Uint64(sum)) +} + func (c *CrateDB) SampleConfig() string { return sampleConfig } diff --git a/plugins/outputs/cratedb/cratedb_test.go b/plugins/outputs/cratedb/cratedb_test.go index 5ca918e5fab55..5900988349b27 100644 --- a/plugins/outputs/cratedb/cratedb_test.go +++ b/plugins/outputs/cratedb/cratedb_test.go @@ -9,6 +9,7 @@ import ( "github.com/influxdata/telegraf" "github.com/influxdata/telegraf/internal" + "github.com/influxdata/telegraf/metric" "github.com/influxdata/telegraf/testutil" "github.com/stretchr/testify/require" ) @@ -45,7 +46,7 @@ func TestConnectAndWrite(t *testing.T) { // the rows using their primary keys in order to take advantage of // read-after-write consistency in CrateDB. for _, m := range metrics { - hashID, err := escapeValue(int64(m.HashID())) + hashIDVal, err := escapeValue(hashID(m)) require.NoError(t, err) timestamp, err := escapeValue(m.Time()) require.NoError(t, err) @@ -53,13 +54,13 @@ func TestConnectAndWrite(t *testing.T) { var id int64 row := db.QueryRow( "SELECT hash_id FROM " + escapeString(table, `"`) + " " + - "WHERE hash_id = " + hashID + " " + + "WHERE hash_id = " + hashIDVal + " " + "AND timestamp = " + timestamp, ) require.NoError(t, row.Scan(&id)) // We could check the whole row, but this is meant to be more of a smoke // test, so just checking the HashID seems fine. - require.Equal(t, id, int64(m.HashID())) + require.Equal(t, id, hashID(m)) } require.NoError(t, c.Close()) @@ -75,7 +76,7 @@ func Test_insertSQL(t *testing.T) { Want: strings.TrimSpace(` INSERT INTO my_table ("hash_id", "timestamp", "name", "tags", "fields") VALUES -(1845393540509842047, '2009-11-10T23:00:00+0000', 'test1', {"tag1" = 'value1'}, {"value" = 1}); +(-4023501406646044814, '2009-11-10T23:00:00+0000', 'test1', {"tag1" = 'value1'}, {"value" = 1}); `), }, } @@ -142,6 +143,69 @@ func Test_escapeValue(t *testing.T) { } } +func Test_hashID(t *testing.T) { + tests := []struct { + Name string + Tags map[string]string + Fields map[string]interface{} + Want int64 + }{ + { + Name: "metric1", + Tags: map[string]string{"tag1": "val1", "tag2": "val2"}, + Fields: map[string]interface{}{"field1": "val1", "field2": "val2"}, + Want: 8973971082006474188, + }, + + // This metric has a different tag order (in a perhaps non-ideal attempt to + // trigger different pseudo-random map iteration)) and fields (none) + // compared to the previous metric, but should still get the same hash. + { + Name: "metric1", + Tags: map[string]string{"tag2": "val2", "tag1": "val1"}, + Fields: map[string]interface{}{"field3": "val3"}, + Want: 8973971082006474188, + }, + + // Different metric name -> different hash + { + Name: "metric2", + Tags: map[string]string{"tag1": "val1", "tag2": "val2"}, + Fields: map[string]interface{}{"field1": "val1", "field2": "val2"}, + Want: 306487682448261783, + }, + + // Different tag val -> different hash + { + Name: "metric1", + Tags: map[string]string{"tag1": "new-val", "tag2": "val2"}, + Fields: map[string]interface{}{"field1": "val1", "field2": "val2"}, + Want: 1938713695181062970, + }, + + // Different tag key -> different hash + { + Name: "metric1", + Tags: map[string]string{"new-key": "val1", "tag2": "val2"}, + Fields: map[string]interface{}{"field1": "val1", "field2": "val2"}, + Want: 7678889081527706328, + }, + } + + for i, test := range tests { + m, err := metric.New( + test.Name, + test.Tags, + test.Fields, + time.Date(2009, time.November, 10, 23, 0, 0, 0, time.UTC), + ) + require.NoError(t, err) + if got := hashID(m); got != test.Want { + t.Errorf("test #%d: got=%d want=%d", i, got, test.Want) + } + } +} + func testURL() string { url := os.Getenv("CRATE_URL") if url == "" {