From a6c1e2b96def6e6faa292dfc9642f8bc72ad25f4 Mon Sep 17 00:00:00 2001 From: Max U Date: Tue, 17 Jul 2018 16:08:02 -0700 Subject: [PATCH 01/23] unfinished csv parser --- plugins/parsers/csv/parser.go | 33 ++++++++++++++++++++++++++++++ plugins/parsers/csv/parser_test.go | 1 + 2 files changed, 34 insertions(+) create mode 100644 plugins/parsers/csv/parser.go create mode 100644 plugins/parsers/csv/parser_test.go diff --git a/plugins/parsers/csv/parser.go b/plugins/parsers/csv/parser.go new file mode 100644 index 0000000000000..5b9a9e6430f84 --- /dev/null +++ b/plugins/parsers/csv/parser.go @@ -0,0 +1,33 @@ +package csv + +import ( + "bytes" + "encoding/csv" + + "github.com/influxdata/telegraf" +) + +type CSVParser struct { + MetricName string + Delimiter string + DataColumns []string + TagColumns []string + FieldColumns []string + NameColumn string + TimestampColumn string + DefaultTags map[string]string + csvReader *csv.Reader +} + +func (p *CSVParser) Parse(buf []byte) ([]telegraf.Metric, error) { + r := bytes.NewReader(buf) + p.csvReader = csv.NewReader(r) +} + +func (p *CSVParser) ParseLine(line string) (telegraf.Metric, error) { + +} + +func (p *CSVParser) SetDefaultTags(tags map[string]string) { + p.DefaultTags = tags +} diff --git a/plugins/parsers/csv/parser_test.go b/plugins/parsers/csv/parser_test.go new file mode 100644 index 0000000000000..ed407dd0a8585 --- /dev/null +++ b/plugins/parsers/csv/parser_test.go @@ -0,0 +1 @@ +package csv From c839ce33a31d88ec785dc4c0fd2ee9137d46e3b7 Mon Sep 17 00:00:00 2001 From: Max U Date: Wed, 18 Jul 2018 15:57:00 -0700 Subject: [PATCH 02/23] functionality for csv parser, still needs unit tests --- docs/DATA_FORMATS_INPUT.md | 63 ++++++++++++++- internal/config/config.go | 89 +++++++++++++++++++++ plugins/parsers/csv/parser.go | 120 ++++++++++++++++++++++++++++- plugins/parsers/csv/parser_test.go | 6 ++ plugins/parsers/registry.go | 48 ++++++++++++ 5 files changed, 323 insertions(+), 3 deletions(-) diff --git a/docs/DATA_FORMATS_INPUT.md b/docs/DATA_FORMATS_INPUT.md index 24335a4531ad0..39e662b13614d 100644 --- a/docs/DATA_FORMATS_INPUT.md +++ b/docs/DATA_FORMATS_INPUT.md @@ -10,6 +10,7 @@ Telegraf is able to parse the following input data formats into metrics: 1. [Collectd](https://github.com/influxdata/telegraf/blob/master/docs/DATA_FORMATS_INPUT.md#collectd) 1. [Dropwizard](https://github.com/influxdata/telegraf/blob/master/docs/DATA_FORMATS_INPUT.md#dropwizard) 1. [Grok](https://github.com/influxdata/telegraf/blob/master/docs/DATA_FORMATS_INPUT.md#grok) +1. [CSV](https://github.com/influxdata/telegraf/blob/master/docs/DATA_FORMATS_INPUT.md#csv) Telegraf metrics, like InfluxDB [points](https://docs.influxdata.com/influxdb/v0.10/write_protocols/line/), @@ -761,4 +762,64 @@ HTTPD_ERRORLOG %{HTTPD20_ERRORLOG}|%{HTTPD24_ERRORLOG} ## 2. "Canada/Eastern" -- Unix TZ values like those found in https://en.wikipedia.org/wiki/List_of_tz_database_time_zones ## 3. UTC -- or blank/unspecified, will return timestamp in UTC grok_timezone = "Canada/Eastern" -``` \ No newline at end of file +``` + +#### CSV +Parse out metrics from a CSV formatted table. By default, the parser assumes there is a header and +will check the first row to extract column names from the header and will begin parsing data on the +second row. To prevent the parser from skipping the first line, set the `csv_header` config to false. +To assign custom column names, the `csv_data_columns` config is available. If the `csv_data_columns` +config is used, all columns must be named or an error will be thrown. If `csv_header` is set to false, +`csv_data_columns` must be specified. + +The `csv_tag_columns` and `csv_field_columns` configs are available to add the column data to the metric. +The name used to specify the column is the name in the header, or if specified, the corresponding +name assigned in `csv_data_columns`. If neither config is specified, no data will be added to the metric. + +Additional configs are available to dynamically name metrics and set custom timestamps. If the +`csv_name_column` config is specified, the parser will assign the metric name to the value found +in that column. If the `csv_timestamp_column` is specified, the parser will extract the timestamp from +that column. If `csv_timestamp_column` is specified, the `csv_timestamp_format` must also be specified +or an error will be thrown. + +#### CSV Configuration +```toml + data_format = "csv" + + ## Whether or not to treat the first row of data as a header + ## By default, the parser will not parse the first row and + ## will treat the header as a list of column names + # csv_header = true + + ## The seperator between csv fields + ## By default, the parser assumes a comma (",") + # csv_delimiter = "," + + ## For assigning custom names to columns + ## If this is specified, all columns must have a name + ## ie there should be the same number of names listed + ## as there are columns of data + ## If `csv_header` is set to false, this config must be used + # csv_data_columns = [] + + ## Columns listed here will be added as tags + csv_tag_columns = [] + + ## Columns listed here will be added as fields + ## the field type is infered from the value of the field + csv_field_columns = [] + + ## The column to extract the name of the metric from + ## By default, this is the name of the plugin + ## the `name_override` config overrides this + # csv_name_column = "" + + ## The column to extract time information for the metric + ## `csv_timestamp_format` must be specified if this is used + # csv_timestamp_column = "" + + ## The format of time data extracted from `csv_timestamp_column` + ## this must be specified if `csv_timestamp_column` is specified + # csv_timestamp_format = "" + ``` + diff --git a/internal/config/config.go b/internal/config/config.go index 21c71d94695c2..da354839edeee 100644 --- a/internal/config/config.go +++ b/internal/config/config.go @@ -1399,6 +1399,87 @@ func buildParser(name string, tbl *ast.Table) (parsers.Parser, error) { } } + //for csv parser + if node, ok := tbl.Fields["csv_data_columns"]; ok { + if kv, ok := node.(*ast.KeyValue); ok { + if ary, ok := kv.Value.(*ast.Array); ok { + for _, elem := range ary.Value { + if str, ok := elem.(*ast.String); ok { + c.CSVDataColumns = append(c.CSVDataColumns, str.Value) + } + } + } + } + } + + if node, ok := tbl.Fields["csv_tag_columns"]; ok { + if kv, ok := node.(*ast.KeyValue); ok { + if ary, ok := kv.Value.(*ast.Array); ok { + for _, elem := range ary.Value { + if str, ok := elem.(*ast.String); ok { + c.CSVTagColumns = append(c.CSVTagColumns, str.Value) + } + } + } + } + } + + if node, ok := tbl.Fields["csv_field_columns"]; ok { + if kv, ok := node.(*ast.KeyValue); ok { + if ary, ok := kv.Value.(*ast.Array); ok { + for _, elem := range ary.Value { + if str, ok := elem.(*ast.String); ok { + c.CSVFieldColumns = append(c.CSVFieldColumns, str.Value) + } + } + } + } + } + + if node, ok := tbl.Fields["csv_delimiter"]; ok { + if kv, ok := node.(*ast.KeyValue); ok { + if str, ok := kv.Value.(*ast.String); ok { + c.CSVDelimiter = str.Value + } + } + } + + if node, ok := tbl.Fields["csv_name_column"]; ok { + if kv, ok := node.(*ast.KeyValue); ok { + if str, ok := kv.Value.(*ast.String); ok { + c.CSVNameColumn = str.Value + } + } + } + + if node, ok := tbl.Fields["csv_timestamp_column"]; ok { + if kv, ok := node.(*ast.KeyValue); ok { + if str, ok := kv.Value.(*ast.String); ok { + c.CSVTimestampColumn = str.Value + } + } + } + + if node, ok := tbl.Fields["csv_timestamp_format"]; ok { + if kv, ok := node.(*ast.KeyValue); ok { + if str, ok := kv.Value.(*ast.String); ok { + c.CSVTimestampFormat = str.Value + } + } + } + + if node, ok := tbl.Fields["csv_header"]; ok { + if kv, ok := node.(*ast.KeyValue); ok { + if str, ok := kv.Value.(*ast.String); ok { + val, err := strconv.ParseBool(str.Value) + if err != nil { + log.Printf("E! could not parse: %v as bool", str) + } + c.CSVHeader = val + } + } + } + c.MetricName = name delete(tbl.Fields, "data_format") @@ -1420,6 +1501,14 @@ func buildParser(name string, tbl *ast.Table) (parsers.Parser, error) { delete(tbl.Fields, "grok_custom_patterns") delete(tbl.Fields, "grok_custom_pattern_files") delete(tbl.Fields, "grok_timezone") + delete(tbl.Fields, "csv_data_columns") + delete(tbl.Fields, "csv_tag_columns") + delete(tbl.Fields, "csv_field_columns") + delete(tbl.Fields, "csv_name_column") + delete(tbl.Fields, "csv_timestamp_column") + delete(tbl.Fields, "csv_timestamp_format") + delete(tbl.Fields, "csv_delimiter") + delete(tbl.Fields, "csv_header") return parsers.NewParser(c) } diff --git a/plugins/parsers/csv/parser.go b/plugins/parsers/csv/parser.go index 5b9a9e6430f84..577e86bb1f0d6 100644 --- a/plugins/parsers/csv/parser.go +++ b/plugins/parsers/csv/parser.go @@ -3,29 +3,145 @@ package csv import ( "bytes" "encoding/csv" + "fmt" + "log" + "time" "github.com/influxdata/telegraf" + "github.com/influxdata/telegraf/metric" ) type CSVParser struct { MetricName string + Header bool Delimiter string DataColumns []string TagColumns []string FieldColumns []string NameColumn string TimestampColumn string + TimestampFormat string DefaultTags map[string]string - csvReader *csv.Reader } func (p *CSVParser) Parse(buf []byte) ([]telegraf.Metric, error) { r := bytes.NewReader(buf) - p.csvReader = csv.NewReader(r) + csvReader := csv.NewReader(r) + csvReader.FieldsPerRecord = len(p.DataColumns) + + //if there is a header and nothing in DataColumns + //set DataColumns to names extracted from the header + if p.Header && len(p.DataColumns) == 0 { + header, err := csvReader.Read() + if err != nil { + return nil, err + } + p.DataColumns = header + + } else if p.Header { + //if there is a header and DataColumns is specified, just skip header + csvReader.Read() + + } else if !p.Header && len(p.DataColumns) == 0 { + //if there is no header and no DataColumns, that's an error + return nil, fmt.Errorf("there must be a header if `csv_data_columns` is not specified") + } + + table, err := csvReader.ReadAll() + if err != nil { + return nil, err + } + + metrics := make([]telegraf.Metric, 0) + for _, record := range table { + m, err := p.parseRecord(record) + if err != nil { + return metrics, err + } + metrics = append(metrics, m) + } + return metrics, nil } +//does not use any information in header and assumes DataColumns is set func (p *CSVParser) ParseLine(line string) (telegraf.Metric, error) { + r := bytes.NewReader([]byte(line)) + csvReader := csv.NewReader(r) + csvReader.FieldsPerRecord = len(p.DataColumns) + record, err := csvReader.Read() + if err != nil { + return nil, err + } + m, err := p.parseRecord(record) + if err != nil { + return nil, err + } + return m, nil +} + +func (p *CSVParser) parseRecord(record []string) (telegraf.Metric, error) { + recordFields := make(map[string]interface{}) + tags := make(map[string]string) + fields := make(map[string]interface{}) + for i, fieldName := range p.DataColumns { + recordFields[fieldName] = record[i] + } + + //add default tags + for k, v := range p.DefaultTags { + tags[k] = v + } + + for _, tagName := range p.TagColumns { + if recordFields[tagName] == "" { + return nil, fmt.Errorf("could not find field: %v", tagName) + } + tags[tagName] = recordFields[tagName].(string) + } + + for _, fieldName := range p.FieldColumns { + if recordFields[fieldName] == "" { + return nil, fmt.Errorf("could not find field: %v", fieldName) + } + switch value := recordFields[fieldName].(type) { + case int: + fields[fieldName] = value + case float64: + fields[fieldName] = value + case bool: + fields[fieldName] = value + case string: + fields[fieldName] = value + default: + log.Printf("E! [parsers.csv] Unrecognized type %T", value) + } + } + + //will default to plugin name + measurementName := p.MetricName + if recordFields[p.NameColumn] != "" { + measurementName = recordFields[p.NameColumn].(string) + } + + metricTime := time.Now() + if p.TimestampColumn != "" { + tStr := recordFields[p.TimestampColumn] + if p.TimestampFormat == "" { + return nil, fmt.Errorf("timestamp format must be specified") + } + + var err error + metricTime, err = time.Parse(p.TimestampFormat, tStr.(string)) + if err != nil { + return nil, err + } + } + m, err := metric.New(measurementName, tags, fields, metricTime) + if err != nil { + return nil, err + } + return m, nil } func (p *CSVParser) SetDefaultTags(tags map[string]string) { diff --git a/plugins/parsers/csv/parser_test.go b/plugins/parsers/csv/parser_test.go index ed407dd0a8585..182291d08f914 100644 --- a/plugins/parsers/csv/parser_test.go +++ b/plugins/parsers/csv/parser_test.go @@ -1 +1,7 @@ package csv + +import "testing" + +func TestBasicCSV(t *testing.T) { + +} diff --git a/plugins/parsers/registry.go b/plugins/parsers/registry.go index 24e73d4b63ca6..aa0c5495c4775 100644 --- a/plugins/parsers/registry.go +++ b/plugins/parsers/registry.go @@ -6,6 +6,7 @@ import ( "github.com/influxdata/telegraf" "github.com/influxdata/telegraf/plugins/parsers/collectd" + "github.com/influxdata/telegraf/plugins/parsers/csv" "github.com/influxdata/telegraf/plugins/parsers/dropwizard" "github.com/influxdata/telegraf/plugins/parsers/graphite" "github.com/influxdata/telegraf/plugins/parsers/grok" @@ -98,6 +99,16 @@ type Config struct { GrokCustomPatterns string GrokCustomPatternFiles []string GrokTimeZone string + + //csv configuration + CSVHeader bool + CSVDelimiter string + CSVDataColumns []string + CSVTagColumns []string + CSVFieldColumns []string + CSVNameColumn string + CSVTimestampColumn string + CSVTimestampFormat string } // NewParser returns a Parser interface based on the given config. @@ -139,12 +150,49 @@ func NewParser(config *Config) (Parser, error) { config.GrokCustomPatterns, config.GrokCustomPatternFiles, config.GrokTimeZone) + case "csv": + parser, err = newCSVParser(config.MetricName, + config.CSVHeader, + config.CSVDelimiter, + config.CSVDataColumns, + config.CSVTagColumns, + config.CSVFieldColumns, + config.CSVNameColumn, + config.CSVTimestampColumn, + config.CSVTimestampFormat, + config.DefaultTags) default: err = fmt.Errorf("Invalid data format: %s", config.DataFormat) } return parser, err } +func newCSVParser(metricName string, + header bool, + delimiter string, + dataColumns []string, + tagColumns []string, + fieldColumns []string, + nameColumn string, + timestampColumn string, + timestampFormat string, + defaultTags map[string]string) (Parser, error) { + parser := &csv.CSVParser{ + MetricName: metricName, + Header: header, + Delimiter: delimiter, + DataColumns: dataColumns, + TagColumns: tagColumns, + FieldColumns: fieldColumns, + NameColumn: nameColumn, + TimestampColumn: timestampColumn, + TimestampFormat: timestampFormat, + DefaultTags: defaultTags, + } + + return parser, nil +} + func newGrokParser(metricName string, patterns []string, nPatterns []string, From 3c8cb17726e049bd1135493042c25e4976472523 Mon Sep 17 00:00:00 2001 From: Max U Date: Thu, 19 Jul 2018 11:19:29 -0700 Subject: [PATCH 03/23] add unit tests for csv parser --- docs/DATA_FORMATS_INPUT.md | 21 +++--- plugins/parsers/csv/parser.go | 10 ++- plugins/parsers/csv/parser_test.go | 110 ++++++++++++++++++++++++++++- 3 files changed, 131 insertions(+), 10 deletions(-) diff --git a/docs/DATA_FORMATS_INPUT.md b/docs/DATA_FORMATS_INPUT.md index 39e662b13614d..9cbe075c1a333 100644 --- a/docs/DATA_FORMATS_INPUT.md +++ b/docs/DATA_FORMATS_INPUT.md @@ -765,12 +765,14 @@ HTTPD_ERRORLOG %{HTTPD20_ERRORLOG}|%{HTTPD24_ERRORLOG} ``` #### CSV -Parse out metrics from a CSV formatted table. By default, the parser assumes there is a header and -will check the first row to extract column names from the header and will begin parsing data on the -second row. To prevent the parser from skipping the first line, set the `csv_header` config to false. +Parse out metrics from a CSV formatted table. By default, the parser assumes there is no header and +will read data from the first line. If `csv_header` is true, the parser will extract column names from +the first row and will begin parsing data on the second row. + To assign custom column names, the `csv_data_columns` config is available. If the `csv_data_columns` config is used, all columns must be named or an error will be thrown. If `csv_header` is set to false, -`csv_data_columns` must be specified. +`csv_data_columns` must be specified. Names listed in `csv_data_columns` will override names extracted +from the header. The `csv_tag_columns` and `csv_field_columns` configs are available to add the column data to the metric. The name used to specify the column is the name in the header, or if specified, the corresponding @@ -787,9 +789,12 @@ or an error will be thrown. data_format = "csv" ## Whether or not to treat the first row of data as a header - ## By default, the parser will not parse the first row and - ## will treat the header as a list of column names - # csv_header = true + ## By default, the parser assumes there is no header and will parse the + ## first row as data. If set to true the parser will treat the first row + ## as a header, extract the list of column names, and begin parsing data + ## on the second line. If `csv_data_columns` is specified, the column + ## names in header will be overridden. + # csv_header = false ## The seperator between csv fields ## By default, the parser assumes a comma (",") @@ -800,7 +805,7 @@ or an error will be thrown. ## ie there should be the same number of names listed ## as there are columns of data ## If `csv_header` is set to false, this config must be used - # csv_data_columns = [] + csv_data_columns = [] ## Columns listed here will be added as tags csv_tag_columns = [] diff --git a/plugins/parsers/csv/parser.go b/plugins/parsers/csv/parser.go index 577e86bb1f0d6..9754bbaf4b749 100644 --- a/plugins/parsers/csv/parser.go +++ b/plugins/parsers/csv/parser.go @@ -28,6 +28,14 @@ func (p *CSVParser) Parse(buf []byte) ([]telegraf.Metric, error) { r := bytes.NewReader(buf) csvReader := csv.NewReader(r) csvReader.FieldsPerRecord = len(p.DataColumns) + if p.Delimiter != "" { + runeStr := []rune(p.Delimiter) + if len(runeStr) > 1 { + log.Printf("rune more than one char: %v", runeStr) + return nil, fmt.Errorf("delimiter must be a single character") + } + csvReader.Comma = runeStr[0] + } //if there is a header and nothing in DataColumns //set DataColumns to names extracted from the header @@ -119,7 +127,7 @@ func (p *CSVParser) parseRecord(record []string) (telegraf.Metric, error) { //will default to plugin name measurementName := p.MetricName - if recordFields[p.NameColumn] != "" { + if recordFields[p.NameColumn] != nil { measurementName = recordFields[p.NameColumn].(string) } diff --git a/plugins/parsers/csv/parser_test.go b/plugins/parsers/csv/parser_test.go index 182291d08f914..0c4fb0990d08b 100644 --- a/plugins/parsers/csv/parser_test.go +++ b/plugins/parsers/csv/parser_test.go @@ -1,7 +1,115 @@ package csv -import "testing" +import ( + "fmt" + "log" + "testing" + + "github.com/stretchr/testify/require" +) func TestBasicCSV(t *testing.T) { + p := CSVParser{ + DataColumns: []string{"first", "second", "third"}, + FieldColumns: []string{"first", "second"}, + TagColumns: []string{"third"}, + } + + m, err := p.ParseLine("1.4,true,hi") + require.NoError(t, err) + log.Printf("m: %v", m) + t.Error() +} + +func TestHeaderCSV(t *testing.T) { + p := CSVParser{ + Header: true, + FieldColumns: []string{"first", "second"}, + NameColumn: "third", + } + testCSV := `first,second,third +3.4,70,test_name` + + metrics, err := p.Parse([]byte(testCSV)) + require.NoError(t, err) + require.Equal(t, "test_name", metrics[0].Name()) +} + +func TestHeaderOverride(t *testing.T) { + p := CSVParser{ + Header: true, + DataColumns: []string{"first", "second", "third"}, + FieldColumns: []string{"first", "second"}, + NameColumn: "third", + } + testCSV := `line1,line2,line3 +3.4,70,test_name` + metrics, err := p.Parse([]byte(testCSV)) + require.NoError(t, err) + require.Equal(t, "test_name", metrics[0].Name()) +} + +func TestTimestamp(t *testing.T) { + p := CSVParser{ + Header: true, + DataColumns: []string{"first", "second", "third"}, + FieldColumns: []string{"second"}, + NameColumn: "third", + TimestampColumn: "first", + TimestampFormat: "02/01/06 03:04:05 PM", + } + testCSV := `line1,line2,line3 +23/05/09 04:05:06 PM,70,test_name +07/11/09 04:05:06 PM,80,test_name2` + metrics, err := p.Parse([]byte(testCSV)) + require.NoError(t, err) + log.Printf("metrics: %v", metrics) + require.NotEqual(t, metrics[1].Time(), metrics[0].Time()) + t.Error() +} + +func TestTimestampError(t *testing.T) { + p := CSVParser{ + Header: true, + DataColumns: []string{"first", "second", "third"}, + FieldColumns: []string{"second"}, + NameColumn: "third", + TimestampColumn: "first", + } + testCSV := `line1,line2,line3 +23/05/09 04:05:06 PM,70,test_name +07/11/09 04:05:06 PM,80,test_name2` + _, err := p.Parse([]byte(testCSV)) + require.Equal(t, fmt.Errorf("timestamp format must be specified"), err) +} + +func TestQuotedCharacter(t *testing.T) { + p := CSVParser{ + Header: true, + DataColumns: []string{"first", "second", "third"}, + FieldColumns: []string{"second", "first"}, + NameColumn: "third", + } + + testCSV := `line1,line2,line3 +"3,4",70,test_name` + metrics, err := p.Parse([]byte(testCSV)) + require.NoError(t, err) + require.Equal(t, "3,4", metrics[0].Fields()["first"]) +} + +func TestDelimiter(t *testing.T) { + p := CSVParser{ + Header: true, + Delimiter: "%", + DataColumns: []string{"first", "second", "third"}, + FieldColumns: []string{"second", "first"}, + NameColumn: "third", + } + testCSV := `line1%line2%line3 +3,4%70%test_name` + metrics, err := p.Parse([]byte(testCSV)) + require.NoError(t, err) + require.Equal(t, "3,4", metrics[0].Fields()["first"]) } From 4a0773482dfe221eb9f84da7e2418105fc660a3b Mon Sep 17 00:00:00 2001 From: Max U Date: Thu, 19 Jul 2018 14:02:50 -0700 Subject: [PATCH 04/23] mess with config options --- internal/config/config.go | 15 +++++++++++---- plugins/parsers/csv/parser.go | 7 +++---- 2 files changed, 14 insertions(+), 8 deletions(-) diff --git a/internal/config/config.go b/internal/config/config.go index da354839edeee..eb2d21bde3ae9 100644 --- a/internal/config/config.go +++ b/internal/config/config.go @@ -1470,12 +1470,19 @@ func buildParser(name string, tbl *ast.Table) (parsers.Parser, error) { if node, ok := tbl.Fields["csv_header"]; ok { if kv, ok := node.(*ast.KeyValue); ok { - if str, ok := kv.Value.(*ast.String); ok { - val, err := strconv.ParseBool(str.Value) + if str, ok := kv.Value.(*ast.Boolean); ok { + //for config with no quotes + val, _ := strconv.ParseBool(str.Value) + c.CSVHeader = val + } else { + //for config with quotes + strVal := kv.Value.(*ast.String) + val, err := strconv.ParseBool(strVal.Value) if err != nil { - log.Printf("E! could not parse: %v as bool", str) + log.Printf("E! parsing to bool: %v", err) + } else { + c.CSVHeader = val } - c.CSVHeader = val } } } diff --git a/plugins/parsers/csv/parser.go b/plugins/parsers/csv/parser.go index 9754bbaf4b749..45f1bb77242ea 100644 --- a/plugins/parsers/csv/parser.go +++ b/plugins/parsers/csv/parser.go @@ -31,8 +31,7 @@ func (p *CSVParser) Parse(buf []byte) ([]telegraf.Metric, error) { if p.Delimiter != "" { runeStr := []rune(p.Delimiter) if len(runeStr) > 1 { - log.Printf("rune more than one char: %v", runeStr) - return nil, fmt.Errorf("delimiter must be a single character") + return nil, fmt.Errorf("delimiter must be a single character, got: %v", p.Delimiter) } csvReader.Comma = runeStr[0] } @@ -101,14 +100,14 @@ func (p *CSVParser) parseRecord(record []string) (telegraf.Metric, error) { } for _, tagName := range p.TagColumns { - if recordFields[tagName] == "" { + if recordFields[tagName] == nil { return nil, fmt.Errorf("could not find field: %v", tagName) } tags[tagName] = recordFields[tagName].(string) } for _, fieldName := range p.FieldColumns { - if recordFields[fieldName] == "" { + if recordFields[fieldName] == nil { return nil, fmt.Errorf("could not find field: %v", fieldName) } switch value := recordFields[fieldName].(type) { From 48210f50f77f80e94b820f104d7f844ae03af1d7 Mon Sep 17 00:00:00 2001 From: Max U Date: Thu, 19 Jul 2018 14:12:03 -0700 Subject: [PATCH 05/23] fix unit tests --- plugins/parsers/csv/parser_test.go | 7 +------ 1 file changed, 1 insertion(+), 6 deletions(-) diff --git a/plugins/parsers/csv/parser_test.go b/plugins/parsers/csv/parser_test.go index 0c4fb0990d08b..5b2ec6d11f19c 100644 --- a/plugins/parsers/csv/parser_test.go +++ b/plugins/parsers/csv/parser_test.go @@ -2,7 +2,6 @@ package csv import ( "fmt" - "log" "testing" "github.com/stretchr/testify/require" @@ -15,10 +14,8 @@ func TestBasicCSV(t *testing.T) { TagColumns: []string{"third"}, } - m, err := p.ParseLine("1.4,true,hi") + _, err := p.ParseLine("1.4,true,hi") require.NoError(t, err) - log.Printf("m: %v", m) - t.Error() } func TestHeaderCSV(t *testing.T) { @@ -63,9 +60,7 @@ func TestTimestamp(t *testing.T) { 07/11/09 04:05:06 PM,80,test_name2` metrics, err := p.Parse([]byte(testCSV)) require.NoError(t, err) - log.Printf("metrics: %v", metrics) require.NotEqual(t, metrics[1].Time(), metrics[0].Time()) - t.Error() } func TestTimestampError(t *testing.T) { From 67f4929bcd31620ad121a95bcfdae4461c3ff53f Mon Sep 17 00:00:00 2001 From: Max U Date: Thu, 19 Jul 2018 14:19:31 -0700 Subject: [PATCH 06/23] change README --- docs/DATA_FORMATS_INPUT.md | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/docs/DATA_FORMATS_INPUT.md b/docs/DATA_FORMATS_INPUT.md index 9cbe075c1a333..2341dcf719f0e 100644 --- a/docs/DATA_FORMATS_INPUT.md +++ b/docs/DATA_FORMATS_INPUT.md @@ -661,7 +661,7 @@ For more information about the dropwizard json format see # tag2 = "tags.tag2" ``` -#### Grok +# Grok Parse logstash-style "grok" patterns. Patterns can be added to patterns, or custom patterns read from custom_pattern_files. # View logstash grok pattern docs here: @@ -764,7 +764,7 @@ HTTPD_ERRORLOG %{HTTPD20_ERRORLOG}|%{HTTPD24_ERRORLOG} grok_timezone = "Canada/Eastern" ``` -#### CSV +# CSV Parse out metrics from a CSV formatted table. By default, the parser assumes there is no header and will read data from the first line. If `csv_header` is true, the parser will extract column names from the first row and will begin parsing data on the second row. From d24e687a02875cf78cf259ec65a89845dc989b63 Mon Sep 17 00:00:00 2001 From: Max U Date: Wed, 25 Jul 2018 10:07:39 -0700 Subject: [PATCH 07/23] unfinished test case for csv --- plugins/parsers/csv/parser_test.go | 31 ++++++++++++++++++++++++++++++ 1 file changed, 31 insertions(+) diff --git a/plugins/parsers/csv/parser_test.go b/plugins/parsers/csv/parser_test.go index 5b2ec6d11f19c..a0de8d8de2c8d 100644 --- a/plugins/parsers/csv/parser_test.go +++ b/plugins/parsers/csv/parser_test.go @@ -3,7 +3,9 @@ package csv import ( "fmt" "testing" + "time" + "github.com/influxdata/telegraf/metric" "github.com/stretchr/testify/require" ) @@ -108,3 +110,32 @@ func TestDelimiter(t *testing.T) { require.NoError(t, err) require.Equal(t, "3,4", metrics[0].Fields()["first"]) } + +func TestValueConversion(t *testing.T) { + p := CSVParser{ + Header: false, + Delimiter: ",", + DataColumns: []string{"first", "second", "third", "fourth"}, + FieldColumns: []string{"second", "first", "third", "fourth"}, + MetricName: "test_value", + } + testCSV := `3.3,4.0,true,hello` + + expectedFields := map[string]interface{}{ + "first": 3.3, + "second": 4.0, + "third": true, + "fourth": "hello", + } + + metrics, err := p.Parse([]byte(testCSV)) + require.NoError(t, err) + + expectedTags := make(map[string]string) + goodMetric, err1 := metric.New("test_value", expectedTags, expectedFields, time.Unix(0, 0)) + returnedMetric, err2 := metric.New(metrics[0].Name(), metrics[0].Tags(), metrics[0].Fields(), time.Unix(0, 0)) + require.NoError(t, err1) + require.NoError(t, err2) + + require.Equal(t, goodMetric, returnedMetric) +} From e07ed589e1b8b8ae5ad6ca3edfa1b22bba40d6c2 Mon Sep 17 00:00:00 2001 From: Max U Date: Wed, 25 Jul 2018 10:29:09 -0700 Subject: [PATCH 08/23] fix type conversion and add unit test --- plugins/parsers/csv/parser.go | 36 +++++++++++++++--------------- plugins/parsers/csv/parser_test.go | 13 ++++++++--- 2 files changed, 28 insertions(+), 21 deletions(-) diff --git a/plugins/parsers/csv/parser.go b/plugins/parsers/csv/parser.go index 45f1bb77242ea..c81adad53a558 100644 --- a/plugins/parsers/csv/parser.go +++ b/plugins/parsers/csv/parser.go @@ -4,7 +4,7 @@ import ( "bytes" "encoding/csv" "fmt" - "log" + "strconv" "time" "github.com/influxdata/telegraf" @@ -87,7 +87,7 @@ func (p *CSVParser) ParseLine(line string) (telegraf.Metric, error) { } func (p *CSVParser) parseRecord(record []string) (telegraf.Metric, error) { - recordFields := make(map[string]interface{}) + recordFields := make(map[string]string) tags := make(map[string]string) fields := make(map[string]interface{}) for i, fieldName := range p.DataColumns { @@ -100,34 +100,34 @@ func (p *CSVParser) parseRecord(record []string) (telegraf.Metric, error) { } for _, tagName := range p.TagColumns { - if recordFields[tagName] == nil { + if recordFields[tagName] == "" { return nil, fmt.Errorf("could not find field: %v", tagName) } - tags[tagName] = recordFields[tagName].(string) + tags[tagName] = recordFields[tagName] } for _, fieldName := range p.FieldColumns { - if recordFields[fieldName] == nil { + if recordFields[fieldName] == "" { return nil, fmt.Errorf("could not find field: %v", fieldName) } - switch value := recordFields[fieldName].(type) { - case int: - fields[fieldName] = value - case float64: - fields[fieldName] = value - case bool: - fields[fieldName] = value - case string: + + //attempt type conversions + value := recordFields[fieldName] + if iValue, err := strconv.Atoi(value); err == nil { + fields[fieldName] = iValue + } else if fValue, err := strconv.ParseFloat(value, 64); err == nil { + fields[fieldName] = fValue + } else if bValue, err := strconv.ParseBool(value); err == nil { + fields[fieldName] = bValue + } else { fields[fieldName] = value - default: - log.Printf("E! [parsers.csv] Unrecognized type %T", value) } } //will default to plugin name measurementName := p.MetricName - if recordFields[p.NameColumn] != nil { - measurementName = recordFields[p.NameColumn].(string) + if recordFields[p.NameColumn] != "" { + measurementName = recordFields[p.NameColumn] } metricTime := time.Now() @@ -138,7 +138,7 @@ func (p *CSVParser) parseRecord(record []string) (telegraf.Metric, error) { } var err error - metricTime, err = time.Parse(p.TimestampFormat, tStr.(string)) + metricTime, err = time.Parse(p.TimestampFormat, tStr) if err != nil { return nil, err } diff --git a/plugins/parsers/csv/parser_test.go b/plugins/parsers/csv/parser_test.go index a0de8d8de2c8d..0762537c5e108 100644 --- a/plugins/parsers/csv/parser_test.go +++ b/plugins/parsers/csv/parser_test.go @@ -2,6 +2,8 @@ package csv import ( "fmt" + "log" + "reflect" "testing" "time" @@ -119,11 +121,11 @@ func TestValueConversion(t *testing.T) { FieldColumns: []string{"second", "first", "third", "fourth"}, MetricName: "test_value", } - testCSV := `3.3,4.0,true,hello` + testCSV := `3.3,4,true,hello` expectedFields := map[string]interface{}{ "first": 3.3, - "second": 4.0, + "second": 4, "third": true, "fourth": "hello", } @@ -137,5 +139,10 @@ func TestValueConversion(t *testing.T) { require.NoError(t, err1) require.NoError(t, err2) - require.Equal(t, goodMetric, returnedMetric) + //deep equal fields + for k := range goodMetric.Fields() { + log.Printf("expected field: %v, %T", goodMetric.Fields()[k], goodMetric.Fields()[k]) + log.Printf("returned field: %v, %T", returnedMetric.Fields()[k], returnedMetric.Fields()[k]) + } + require.True(t, reflect.DeepEqual(goodMetric.Fields(), returnedMetric.Fields())) } From edd8afc86d75f3a2e16dd514751731c614ab0d0a Mon Sep 17 00:00:00 2001 From: Max U Date: Fri, 27 Jul 2018 11:26:00 -0700 Subject: [PATCH 09/23] addresses greg and chris's comments --- plugins/parsers/csv/parser.go | 45 +++++++++++++++++++++++------------ 1 file changed, 30 insertions(+), 15 deletions(-) diff --git a/plugins/parsers/csv/parser.go b/plugins/parsers/csv/parser.go index c81adad53a558..6d1dd10798a92 100644 --- a/plugins/parsers/csv/parser.go +++ b/plugins/parsers/csv/parser.go @@ -24,20 +24,27 @@ type CSVParser struct { DefaultTags map[string]string } -func (p *CSVParser) Parse(buf []byte) ([]telegraf.Metric, error) { - r := bytes.NewReader(buf) +func (p *CSVParser) compile(r *bytes.Reader) (*csv.Reader, error) { csvReader := csv.NewReader(r) csvReader.FieldsPerRecord = len(p.DataColumns) if p.Delimiter != "" { runeStr := []rune(p.Delimiter) if len(runeStr) > 1 { - return nil, fmt.Errorf("delimiter must be a single character, got: %v", p.Delimiter) + return csvReader, fmt.Errorf("delimiter must be a single character, got: %s", p.Delimiter) } csvReader.Comma = runeStr[0] } + return csvReader, nil +} - //if there is a header and nothing in DataColumns - //set DataColumns to names extracted from the header +func (p *CSVParser) Parse(buf []byte) ([]telegraf.Metric, error) { + r := bytes.NewReader(buf) + csvReader, err := p.compile(r) + if err != nil { + return nil, err + } + // if there is a header and nothing in DataColumns + // set DataColumns to names extracted from the header if p.Header && len(p.DataColumns) == 0 { header, err := csvReader.Read() if err != nil { @@ -46,11 +53,11 @@ func (p *CSVParser) Parse(buf []byte) ([]telegraf.Metric, error) { p.DataColumns = header } else if p.Header { - //if there is a header and DataColumns is specified, just skip header + // if there is a header and DataColumns is specified, just skip header csvReader.Read() } else if !p.Header && len(p.DataColumns) == 0 { - //if there is no header and no DataColumns, that's an error + // if there is no header and no DataColumns, that's an error return nil, fmt.Errorf("there must be a header if `csv_data_columns` is not specified") } @@ -70,11 +77,19 @@ func (p *CSVParser) Parse(buf []byte) ([]telegraf.Metric, error) { return metrics, nil } -//does not use any information in header and assumes DataColumns is set +// ParseLine does not use any information in header and assumes DataColumns is set func (p *CSVParser) ParseLine(line string) (telegraf.Metric, error) { r := bytes.NewReader([]byte(line)) - csvReader := csv.NewReader(r) - csvReader.FieldsPerRecord = len(p.DataColumns) + csvReader, err := p.compile(r) + if err != nil { + return nil, err + } + + // if there is nothing in DataColumns, ParseLine will fail + if len(p.DataColumns) == 0 { + return nil, fmt.Errorf("[parsers.csv] data columns must be specified") + } + record, err := csvReader.Read() if err != nil { return nil, err @@ -94,7 +109,7 @@ func (p *CSVParser) parseRecord(record []string) (telegraf.Metric, error) { recordFields[fieldName] = record[i] } - //add default tags + // add default tags for k, v := range p.DefaultTags { tags[k] = v } @@ -107,12 +122,12 @@ func (p *CSVParser) parseRecord(record []string) (telegraf.Metric, error) { } for _, fieldName := range p.FieldColumns { - if recordFields[fieldName] == "" { + value, ok := recordFields[fieldName] + if !ok { return nil, fmt.Errorf("could not find field: %v", fieldName) } - //attempt type conversions - value := recordFields[fieldName] + // attempt type conversions if iValue, err := strconv.Atoi(value); err == nil { fields[fieldName] = iValue } else if fValue, err := strconv.ParseFloat(value, 64); err == nil { @@ -124,7 +139,7 @@ func (p *CSVParser) parseRecord(record []string) (telegraf.Metric, error) { } } - //will default to plugin name + // will default to plugin name measurementName := p.MetricName if recordFields[p.NameColumn] != "" { measurementName = recordFields[p.NameColumn] From b5ff78f57dd61c3ebff73b16feeb50926b7eab32 Mon Sep 17 00:00:00 2001 From: Max U Date: Fri, 27 Jul 2018 13:37:39 -0700 Subject: [PATCH 10/23] address some of greg+chris's comments. includes config for trimspace and comment --- docs/DATA_FORMATS_INPUT.md | 8 +++++ internal/config/config.go | 27 +++++++++++++++ plugins/parsers/csv/parser.go | 10 ++++++ plugins/parsers/csv/parser_test.go | 53 +++++++++++++++++++++++++++--- plugins/parsers/registry.go | 8 +++++ 5 files changed, 102 insertions(+), 4 deletions(-) diff --git a/docs/DATA_FORMATS_INPUT.md b/docs/DATA_FORMATS_INPUT.md index 2341dcf719f0e..7741877f76242 100644 --- a/docs/DATA_FORMATS_INPUT.md +++ b/docs/DATA_FORMATS_INPUT.md @@ -800,6 +800,14 @@ or an error will be thrown. ## By default, the parser assumes a comma (",") # csv_delimiter = "," + ## The character reserved for marking a row as a comment row + ## Commented rows are skipped and not parsed + # csv_comment = "" + + ## If set to true, the parser will remove leading whitespace from fields + ## By default, this is false + # csv_trim_space = false + ## For assigning custom names to columns ## If this is specified, all columns must have a name ## ie there should be the same number of names listed diff --git a/internal/config/config.go b/internal/config/config.go index eb2d21bde3ae9..f1238b5cb9252 100644 --- a/internal/config/config.go +++ b/internal/config/config.go @@ -1444,6 +1444,14 @@ func buildParser(name string, tbl *ast.Table) (parsers.Parser, error) { } } + if node, ok := tbl.Fields["csv_comment"]; ok { + if kv, ok := node.(*ast.KeyValue); ok { + if str, ok := kv.Value.(*ast.String); ok { + c.CSVComment = str.Value + } + } + } + if node, ok := tbl.Fields["csv_name_column"]; ok { if kv, ok := node.(*ast.KeyValue); ok { if str, ok := kv.Value.(*ast.String); ok { @@ -1487,6 +1495,25 @@ func buildParser(name string, tbl *ast.Table) (parsers.Parser, error) { } } + if node, ok := tbl.Fields["csv_trim_space"]; ok { + if kv, ok := node.(*ast.KeyValue); ok { + if str, ok := kv.Value.(*ast.Boolean); ok { + //for config with no quotes + val, _ := strconv.ParseBool(str.Value) + c.CSVTrimSpace = val + } else { + //for config with quotes + strVal := kv.Value.(*ast.String) + val, err := strconv.ParseBool(strVal.Value) + if err != nil { + log.Printf("E! parsing to bool: %v", err) + } else { + c.CSVTrimSpace = val + } + } + } + } + c.MetricName = name delete(tbl.Fields, "data_format") diff --git a/plugins/parsers/csv/parser.go b/plugins/parsers/csv/parser.go index 6d1dd10798a92..56c8eea8c3b3a 100644 --- a/plugins/parsers/csv/parser.go +++ b/plugins/parsers/csv/parser.go @@ -15,6 +15,8 @@ type CSVParser struct { MetricName string Header bool Delimiter string + Comment string + TrimSpace bool DataColumns []string TagColumns []string FieldColumns []string @@ -34,6 +36,14 @@ func (p *CSVParser) compile(r *bytes.Reader) (*csv.Reader, error) { } csvReader.Comma = runeStr[0] } + if p.Comment != "" { + runeStr := []rune(p.Comment) + if len(runeStr) > 1 { + return csvReader, fmt.Errorf("comment must be a single character, got: %s", p.Comment) + } + csvReader.Comment = runeStr[0] + } + csvReader.TrimLeadingSpace = p.TrimSpace return csvReader, nil } diff --git a/plugins/parsers/csv/parser_test.go b/plugins/parsers/csv/parser_test.go index 0762537c5e108..1de96be8d8d3b 100644 --- a/plugins/parsers/csv/parser_test.go +++ b/plugins/parsers/csv/parser_test.go @@ -140,9 +140,54 @@ func TestValueConversion(t *testing.T) { require.NoError(t, err2) //deep equal fields - for k := range goodMetric.Fields() { - log.Printf("expected field: %v, %T", goodMetric.Fields()[k], goodMetric.Fields()[k]) - log.Printf("returned field: %v, %T", returnedMetric.Fields()[k], returnedMetric.Fields()[k]) - } require.True(t, reflect.DeepEqual(goodMetric.Fields(), returnedMetric.Fields())) } + +func TestSkipComment(t *testing.T) { + p := CSVParser{ + Header: false, + Comment: "#", + DataColumns: []string{"first", "second", "third", "fourth"}, + FieldColumns: []string{"second", "first", "third", "fourth"}, + MetricName: "test_value", + } + testCSV := `#3.3,4,true,hello +4,9.9,true,name_this` + + expectedFields := map[string]interface{}{ + "first": int64(4), + "second": 9.9, + "third": true, + "fourth": "name_this", + } + + metrics, err := p.Parse([]byte(testCSV)) + require.NoError(t, err) + require.Equal(t, true, reflect.DeepEqual(expectedFields, metrics[0].Fields())) +} + +func TestTrimSpace(t *testing.T) { + p := CSVParser{ + Header: false, + TrimSpace: true, + DataColumns: []string{"first", "second", "third", "fourth"}, + FieldColumns: []string{"second", "first", "third", "fourth"}, + MetricName: "test_value", + } + testCSV := ` 3.3, 4, true,hello` + + expectedFields := map[string]interface{}{ + "first": 3.3, + "second": int64(4), + "third": true, + "fourth": "hello", + } + + metrics, err := p.Parse([]byte(testCSV)) + for k := range metrics[0].Fields() { + log.Printf("want: %v, %T", expectedFields[k], expectedFields[k]) + log.Printf("got: %v, %T", metrics[0].Fields()[k], metrics[0].Fields()[k]) + } + require.NoError(t, err) + require.Equal(t, true, reflect.DeepEqual(expectedFields, metrics[0].Fields())) +} diff --git a/plugins/parsers/registry.go b/plugins/parsers/registry.go index aa0c5495c4775..454990d531f2c 100644 --- a/plugins/parsers/registry.go +++ b/plugins/parsers/registry.go @@ -103,6 +103,8 @@ type Config struct { //csv configuration CSVHeader bool CSVDelimiter string + CSVComment string + CSVTrimSpace bool CSVDataColumns []string CSVTagColumns []string CSVFieldColumns []string @@ -154,6 +156,8 @@ func NewParser(config *Config) (Parser, error) { parser, err = newCSVParser(config.MetricName, config.CSVHeader, config.CSVDelimiter, + config.CSVComment, + config.CSVTrimSpace, config.CSVDataColumns, config.CSVTagColumns, config.CSVFieldColumns, @@ -170,6 +174,8 @@ func NewParser(config *Config) (Parser, error) { func newCSVParser(metricName string, header bool, delimiter string, + comment string, + trimSpace bool, dataColumns []string, tagColumns []string, fieldColumns []string, @@ -181,6 +187,8 @@ func newCSVParser(metricName string, MetricName: metricName, Header: header, Delimiter: delimiter, + Comment: comment, + TrimSpace: trimSpace, DataColumns: dataColumns, TagColumns: tagColumns, FieldColumns: fieldColumns, From 7704f3ef3f2c5b9b3e3c59e15299ce415ba7f594 Mon Sep 17 00:00:00 2001 From: Max U Date: Fri, 27 Jul 2018 16:10:09 -0700 Subject: [PATCH 11/23] get rid of grok changes on branch --- docs/DATA_FORMATS_INPUT.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/docs/DATA_FORMATS_INPUT.md b/docs/DATA_FORMATS_INPUT.md index 7741877f76242..7c26b9e130873 100644 --- a/docs/DATA_FORMATS_INPUT.md +++ b/docs/DATA_FORMATS_INPUT.md @@ -661,7 +661,7 @@ For more information about the dropwizard json format see # tag2 = "tags.tag2" ``` -# Grok +#### Grok Parse logstash-style "grok" patterns. Patterns can be added to patterns, or custom patterns read from custom_pattern_files. # View logstash grok pattern docs here: From 80135eed72ae1055032a2daa8e56a2101260311c Mon Sep 17 00:00:00 2001 From: Max U Date: Mon, 20 Aug 2018 09:50:40 -0700 Subject: [PATCH 12/23] initial config changes --- internal/config/config.go | 21 +++++---------------- plugins/parsers/csv/parser.go | 2 +- plugins/parsers/registry.go | 8 ++++---- 3 files changed, 10 insertions(+), 21 deletions(-) diff --git a/internal/config/config.go b/internal/config/config.go index f1238b5cb9252..ccec0e7e9860d 100644 --- a/internal/config/config.go +++ b/internal/config/config.go @@ -1400,7 +1400,7 @@ func buildParser(name string, tbl *ast.Table) (parsers.Parser, error) { } //for csv parser - if node, ok := tbl.Fields["csv_data_columns"]; ok { + if node, ok := tbl.Fields["csv_name_columns"]; ok { if kv, ok := node.(*ast.KeyValue); ok { if ary, ok := kv.Value.(*ast.Array); ok { for _, elem := range ary.Value { @@ -1452,7 +1452,7 @@ func buildParser(name string, tbl *ast.Table) (parsers.Parser, error) { } } - if node, ok := tbl.Fields["csv_name_column"]; ok { + if node, ok := tbl.Fields["csv_measurement_column"]; ok { if kv, ok := node.(*ast.KeyValue); ok { if str, ok := kv.Value.(*ast.String); ok { c.CSVNameColumn = str.Value @@ -1476,21 +1476,10 @@ func buildParser(name string, tbl *ast.Table) (parsers.Parser, error) { } } - if node, ok := tbl.Fields["csv_header"]; ok { + if node, ok := tbl.Fields["csv_header_row_count"]; ok { if kv, ok := node.(*ast.KeyValue); ok { - if str, ok := kv.Value.(*ast.Boolean); ok { - //for config with no quotes - val, _ := strconv.ParseBool(str.Value) - c.CSVHeader = val - } else { - //for config with quotes - strVal := kv.Value.(*ast.String) - val, err := strconv.ParseBool(strVal.Value) - if err != nil { - log.Printf("E! parsing to bool: %v", err) - } else { - c.CSVHeader = val - } + if str, ok := kv.Value.(*ast.Value); ok { + c.CSVHeaderRowCount = str } } } diff --git a/plugins/parsers/csv/parser.go b/plugins/parsers/csv/parser.go index 56c8eea8c3b3a..8221b02407af5 100644 --- a/plugins/parsers/csv/parser.go +++ b/plugins/parsers/csv/parser.go @@ -13,7 +13,7 @@ import ( type CSVParser struct { MetricName string - Header bool + HeaderRowCount int Delimiter string Comment string TrimSpace bool diff --git a/plugins/parsers/registry.go b/plugins/parsers/registry.go index 4eaff9d284d22..f01e687396a2f 100644 --- a/plugins/parsers/registry.go +++ b/plugins/parsers/registry.go @@ -102,7 +102,6 @@ type Config struct { GrokTimeZone string //csv configuration - CSVHeader bool CSVDelimiter string CSVComment string CSVTrimSpace bool @@ -112,6 +111,7 @@ type Config struct { CSVNameColumn string CSVTimestampColumn string CSVTimestampFormat string + CSVHeaderRowCount int } // NewParser returns a Parser interface based on the given config. @@ -157,7 +157,7 @@ func NewParser(config *Config) (Parser, error) { config.GrokTimeZone) case "csv": parser, err = newCSVParser(config.MetricName, - config.CSVHeader, + config.CSVHeaderRowCount, config.CSVDelimiter, config.CSVComment, config.CSVTrimSpace, @@ -175,7 +175,7 @@ func NewParser(config *Config) (Parser, error) { } func newCSVParser(metricName string, - header bool, + header int, delimiter string, comment string, trimSpace bool, @@ -188,7 +188,7 @@ func newCSVParser(metricName string, defaultTags map[string]string) (Parser, error) { parser := &csv.CSVParser{ MetricName: metricName, - Header: header, + HeaderRowCount: header, Delimiter: delimiter, Comment: comment, TrimSpace: trimSpace, From 60761d7a9df2e6bd477c0b05bd9cf67cf2ee9bb7 Mon Sep 17 00:00:00 2001 From: Max U Date: Mon, 20 Aug 2018 13:45:38 -0700 Subject: [PATCH 13/23] additional config options --- docs/DATA_FORMATS_INPUT.md | 14 +-- internal/config/config.go | 29 +++++- plugins/parsers/csv/parser.go | 82 ++++++++++------ plugins/parsers/csv/parser_test.go | 146 ++++++++++++++++++++--------- plugins/parsers/registry.go | 52 +++++----- 5 files changed, 217 insertions(+), 106 deletions(-) diff --git a/docs/DATA_FORMATS_INPUT.md b/docs/DATA_FORMATS_INPUT.md index e0766bd6101fd..be3d8fe179b3d 100644 --- a/docs/DATA_FORMATS_INPUT.md +++ b/docs/DATA_FORMATS_INPUT.md @@ -874,20 +874,20 @@ The `csv_skip_rows` config indicates the number of rows to skip before looking f to parse. By default, no rows will be skipped. The `csv_skip_columns` config indicates the number of columns to be skipped before parsing data. These -columns will not be read out of the header. Naming with the `csv_name_columns` will begin at the first +columns will not be read out of the header. Naming with the `csv_column_names` will begin at the first parsed column after skipping the indicated columns. By default, no columns are skipped. -To assign custom column names, the `csv_name_columns` config is available. If the `csv_name_columns` +To assign custom column names, the `csv_column_names` config is available. If the `csv_column_names` config is used, all columns must be named as additional columns will be ignored. If `csv_header_row_count` -is set to 0, `csv_name_columns` must be specified. Names listed in `csv_name_columns` will override names extracted +is set to 0, `csv_column_names` must be specified. Names listed in `csv_column_names` will override names extracted from the header. The `csv_tag_columns` and `csv_field_columns` configs are available to add the column data to the metric. The name used to specify the column is the name in the header, or if specified, the corresponding -name assigned in `csv_name_columns`. If neither config is specified, no data will be added to the metric. +name assigned in `csv_column_names`. If neither config is specified, no data will be added to the metric. Additional configs are available to dynamically name metrics and set custom timestamps. If the -`csv_name_column` config is specified, the parser will assign the metric name to the value found +`csv_column_names` config is specified, the parser will assign the metric name to the value found in that column. If the `csv_timestamp_column` is specified, the parser will extract the timestamp from that column. If `csv_timestamp_column` is specified, the `csv_timestamp_format` must also be specified or an error will be thrown. @@ -899,7 +899,7 @@ or an error will be thrown. ## Indicates how many rows to treat as a header. By default, the parser assumes ## there is no header and will parse the first row as data. If set to anything more ## than 1, column names will be concatenated with the name listed in the next header row. - ## If `csv_data_columns` is specified, the column names in header will be overridden. + ## If `csv_column_names` is specified, the column names in header will be overridden. # csv_header_row_count = 0 ## Indicates the number of rows to skip before looking for header information. @@ -925,7 +925,7 @@ or an error will be thrown. ## If this is specified, all columns should have a name ## Unnamed columns will be ignored by the parser. ## If `csv_header_row_count` is set to 0, this config must be used - csv_name_columns = [] + csv_column_names = [] ## Columns listed here will be added as tags csv_tag_columns = [] diff --git a/internal/config/config.go b/internal/config/config.go index ccec0e7e9860d..564b64cf7d627 100644 --- a/internal/config/config.go +++ b/internal/config/config.go @@ -1400,7 +1400,7 @@ func buildParser(name string, tbl *ast.Table) (parsers.Parser, error) { } //for csv parser - if node, ok := tbl.Fields["csv_name_columns"]; ok { + if node, ok := tbl.Fields["csv_column_names"]; ok { if kv, ok := node.(*ast.KeyValue); ok { if ary, ok := kv.Value.(*ast.Array); ok { for _, elem := range ary.Value { @@ -1479,7 +1479,32 @@ func buildParser(name string, tbl *ast.Table) (parsers.Parser, error) { if node, ok := tbl.Fields["csv_header_row_count"]; ok { if kv, ok := node.(*ast.KeyValue); ok { if str, ok := kv.Value.(*ast.Value); ok { - c.CSVHeaderRowCount = str + c.CSVHeaderRowCount, err = strconv.Aoit(str.Value) + if err != nil { + log.Printf("E! parsing to int: %v", err) + } + } + } + } + + if node, ok := tbl.Fields["csv_skip_rows"]; ok { + if kv, ok := node.(*ast.KeyValue); ok { + if str, ok := kv.Value.(*ast.Value); ok { + c.CSVSkipRows, err = strconv.Aoit(str.Value) + if err != nil { + log.Printf("E! parsing to int: %v", err) + } + } + } + } + + if node, ok := tbl.Fields["csv_skip_columns"]; ok { + if kv, ok := node.(*ast.KeyValue); ok { + if str, ok := kv.Value.(*ast.Value); ok { + c.CSVSkipColumns, err = strconv.Aoit(str.Value) + if err != nil { + log.Printf("E! parsing to int: %v", err) + } } } } diff --git a/plugins/parsers/csv/parser.go b/plugins/parsers/csv/parser.go index 8221b02407af5..3c01ec90ee8ce 100644 --- a/plugins/parsers/csv/parser.go +++ b/plugins/parsers/csv/parser.go @@ -12,23 +12,26 @@ import ( ) type CSVParser struct { - MetricName string - HeaderRowCount int - Delimiter string - Comment string - TrimSpace bool - DataColumns []string - TagColumns []string - FieldColumns []string - NameColumn string - TimestampColumn string - TimestampFormat string - DefaultTags map[string]string + MetricName string + HeaderRowCount int + SkipRows int + SkipColumns int + Delimiter string + Comment string + TrimSpace bool + ColumnNames []string + TagColumns []string + FieldColumns []string + MeasurementColumn string + TimestampColumn string + TimestampFormat string + DefaultTags map[string]string } func (p *CSVParser) compile(r *bytes.Reader) (*csv.Reader, error) { csvReader := csv.NewReader(r) - csvReader.FieldsPerRecord = len(p.DataColumns) + // ensures that the reader reads records of different lengths without an error + csvReader.FieldsPerRecord = -1 if p.Delimiter != "" { runeStr := []rune(p.Delimiter) if len(runeStr) > 1 { @@ -53,20 +56,35 @@ func (p *CSVParser) Parse(buf []byte) ([]telegraf.Metric, error) { if err != nil { return nil, err } + // skip first rows + for i := 0; i < p.SkipRows; i++ { + csvReader.Read() + } // if there is a header and nothing in DataColumns // set DataColumns to names extracted from the header - if p.Header && len(p.DataColumns) == 0 { - header, err := csvReader.Read() - if err != nil { - return nil, err + headerNames := make([]string, 0) + if len(p.ColumnNames) == 0 { + for i := 0; i < p.HeaderRowCount; i++ { + header, err := csvReader.Read() + if err != nil { + return nil, err + } + //concatenate header names + for i := range header { + if len(headerNames) <= i { + headerNames = append(headerNames, header[i]) + } else { + headerNames[i] = headerNames[i] + header[i] + } + } } - p.DataColumns = header - - } else if p.Header { - // if there is a header and DataColumns is specified, just skip header - csvReader.Read() - - } else if !p.Header && len(p.DataColumns) == 0 { + p.ColumnNames = headerNames[p.SkipColumns:] + } else if len(p.ColumnNames) > 0 { + // if columns are named, just skip header rows + for i := 0; i < p.HeaderRowCount; i++ { + csvReader.Read() + } + } else if p.HeaderRowCount == 0 && len(p.ColumnNames) == 0 { // if there is no header and no DataColumns, that's an error return nil, fmt.Errorf("there must be a header if `csv_data_columns` is not specified") } @@ -88,6 +106,7 @@ func (p *CSVParser) Parse(buf []byte) ([]telegraf.Metric, error) { } // ParseLine does not use any information in header and assumes DataColumns is set +// it will also not skip any rows func (p *CSVParser) ParseLine(line string) (telegraf.Metric, error) { r := bytes.NewReader([]byte(line)) csvReader, err := p.compile(r) @@ -96,7 +115,7 @@ func (p *CSVParser) ParseLine(line string) (telegraf.Metric, error) { } // if there is nothing in DataColumns, ParseLine will fail - if len(p.DataColumns) == 0 { + if len(p.ColumnNames) == 0 { return nil, fmt.Errorf("[parsers.csv] data columns must be specified") } @@ -115,8 +134,13 @@ func (p *CSVParser) parseRecord(record []string) (telegraf.Metric, error) { recordFields := make(map[string]string) tags := make(map[string]string) fields := make(map[string]interface{}) - for i, fieldName := range p.DataColumns { - recordFields[fieldName] = record[i] + + // skip columns in record + record = record[p.SkipColumns:] + for i, fieldName := range p.ColumnNames { + if i < len(record) { + recordFields[fieldName] = record[i] + } } // add default tags @@ -151,8 +175,8 @@ func (p *CSVParser) parseRecord(record []string) (telegraf.Metric, error) { // will default to plugin name measurementName := p.MetricName - if recordFields[p.NameColumn] != "" { - measurementName = recordFields[p.NameColumn] + if recordFields[p.MeasurementColumn] != "" { + measurementName = recordFields[p.MeasurementColumn] } metricTime := time.Now() diff --git a/plugins/parsers/csv/parser_test.go b/plugins/parsers/csv/parser_test.go index 1de96be8d8d3b..321406e578481 100644 --- a/plugins/parsers/csv/parser_test.go +++ b/plugins/parsers/csv/parser_test.go @@ -13,7 +13,7 @@ import ( func TestBasicCSV(t *testing.T) { p := CSVParser{ - DataColumns: []string{"first", "second", "third"}, + ColumnNames: []string{"first", "second", "third"}, FieldColumns: []string{"first", "second"}, TagColumns: []string{"third"}, } @@ -22,13 +22,14 @@ func TestBasicCSV(t *testing.T) { require.NoError(t, err) } -func TestHeaderCSV(t *testing.T) { +func TestHeaderConcatenationCSV(t *testing.T) { p := CSVParser{ - Header: true, - FieldColumns: []string{"first", "second"}, - NameColumn: "third", + HeaderRowCount: 2, + FieldColumns: []string{"first1", "second2"}, + MeasurementColumn: "3", } - testCSV := `first,second,third + testCSV := `first,second +1,2,3 3.4,70,test_name` metrics, err := p.Parse([]byte(testCSV)) @@ -38,10 +39,10 @@ func TestHeaderCSV(t *testing.T) { func TestHeaderOverride(t *testing.T) { p := CSVParser{ - Header: true, - DataColumns: []string{"first", "second", "third"}, - FieldColumns: []string{"first", "second"}, - NameColumn: "third", + HeaderRowCount: 1, + ColumnNames: []string{"first", "second", "third"}, + FieldColumns: []string{"first", "second"}, + MeasurementColumn: "third", } testCSV := `line1,line2,line3 3.4,70,test_name` @@ -52,12 +53,12 @@ func TestHeaderOverride(t *testing.T) { func TestTimestamp(t *testing.T) { p := CSVParser{ - Header: true, - DataColumns: []string{"first", "second", "third"}, - FieldColumns: []string{"second"}, - NameColumn: "third", - TimestampColumn: "first", - TimestampFormat: "02/01/06 03:04:05 PM", + HeaderRowCount: 1, + ColumnNames: []string{"first", "second", "third"}, + FieldColumns: []string{"second"}, + MeasurementColumn: "third", + TimestampColumn: "first", + TimestampFormat: "02/01/06 03:04:05 PM", } testCSV := `line1,line2,line3 23/05/09 04:05:06 PM,70,test_name @@ -69,11 +70,11 @@ func TestTimestamp(t *testing.T) { func TestTimestampError(t *testing.T) { p := CSVParser{ - Header: true, - DataColumns: []string{"first", "second", "third"}, - FieldColumns: []string{"second"}, - NameColumn: "third", - TimestampColumn: "first", + HeaderRowCount: 1, + ColumnNames: []string{"first", "second", "third"}, + FieldColumns: []string{"second"}, + MeasurementColumn: "third", + TimestampColumn: "first", } testCSV := `line1,line2,line3 23/05/09 04:05:06 PM,70,test_name @@ -84,10 +85,10 @@ func TestTimestampError(t *testing.T) { func TestQuotedCharacter(t *testing.T) { p := CSVParser{ - Header: true, - DataColumns: []string{"first", "second", "third"}, - FieldColumns: []string{"second", "first"}, - NameColumn: "third", + HeaderRowCount: 1, + ColumnNames: []string{"first", "second", "third"}, + FieldColumns: []string{"second", "first"}, + MeasurementColumn: "third", } testCSV := `line1,line2,line3 @@ -99,11 +100,11 @@ func TestQuotedCharacter(t *testing.T) { func TestDelimiter(t *testing.T) { p := CSVParser{ - Header: true, - Delimiter: "%", - DataColumns: []string{"first", "second", "third"}, - FieldColumns: []string{"second", "first"}, - NameColumn: "third", + HeaderRowCount: 1, + Delimiter: "%", + ColumnNames: []string{"first", "second", "third"}, + FieldColumns: []string{"second", "first"}, + MeasurementColumn: "third", } testCSV := `line1%line2%line3 @@ -115,14 +116,15 @@ func TestDelimiter(t *testing.T) { func TestValueConversion(t *testing.T) { p := CSVParser{ - Header: false, - Delimiter: ",", - DataColumns: []string{"first", "second", "third", "fourth"}, - FieldColumns: []string{"second", "first", "third", "fourth"}, - MetricName: "test_value", + HeaderRowCount: 0, + Delimiter: ",", + ColumnNames: []string{"first", "second", "third", "fourth"}, + FieldColumns: []string{"second", "first", "third", "fourth"}, + MetricName: "test_value", } testCSV := `3.3,4,true,hello` + expectedTags := make(map[string]string) expectedFields := map[string]interface{}{ "first": 3.3, "second": 4, @@ -133,7 +135,6 @@ func TestValueConversion(t *testing.T) { metrics, err := p.Parse([]byte(testCSV)) require.NoError(t, err) - expectedTags := make(map[string]string) goodMetric, err1 := metric.New("test_value", expectedTags, expectedFields, time.Unix(0, 0)) returnedMetric, err2 := metric.New(metrics[0].Name(), metrics[0].Tags(), metrics[0].Fields(), time.Unix(0, 0)) require.NoError(t, err1) @@ -145,11 +146,11 @@ func TestValueConversion(t *testing.T) { func TestSkipComment(t *testing.T) { p := CSVParser{ - Header: false, - Comment: "#", - DataColumns: []string{"first", "second", "third", "fourth"}, - FieldColumns: []string{"second", "first", "third", "fourth"}, - MetricName: "test_value", + HeaderRowCount: 0, + Comment: "#", + ColumnNames: []string{"first", "second", "third", "fourth"}, + FieldColumns: []string{"second", "first", "third", "fourth"}, + MetricName: "test_value", } testCSV := `#3.3,4,true,hello 4,9.9,true,name_this` @@ -168,11 +169,11 @@ func TestSkipComment(t *testing.T) { func TestTrimSpace(t *testing.T) { p := CSVParser{ - Header: false, - TrimSpace: true, - DataColumns: []string{"first", "second", "third", "fourth"}, - FieldColumns: []string{"second", "first", "third", "fourth"}, - MetricName: "test_value", + HeaderRowCount: 0, + TrimSpace: true, + ColumnNames: []string{"first", "second", "third", "fourth"}, + FieldColumns: []string{"second", "first", "third", "fourth"}, + MetricName: "test_value", } testCSV := ` 3.3, 4, true,hello` @@ -191,3 +192,56 @@ func TestTrimSpace(t *testing.T) { require.NoError(t, err) require.Equal(t, true, reflect.DeepEqual(expectedFields, metrics[0].Fields())) } + +func TestSkipRows(t *testing.T) { + p := CSVParser{ + HeaderRowCount: 1, + SkipRows: 1, + FieldColumns: []string{"line2", "line3"}, + MeasurementColumn: "line3", + } + testCSV := `garbage nonsense +line1,line2,line3 +hello,80,test_name2` + + expectedFields := map[string]interface{}{ + "line2": int64(80), + "line3": "test_name2", + } + metrics, err := p.Parse([]byte(testCSV)) + require.NoError(t, err) + require.Equal(t, expectedFields, metrics[0].Fields()) +} + +func TestSkipColumns(t *testing.T) { + p := CSVParser{ + SkipColumns: 1, + ColumnNames: []string{"line1", "line2"}, + FieldColumns: []string{"line1", "line2"}, + } + testCSV := `hello,80,test_name` + + expectedFields := map[string]interface{}{ + "line1": int64(80), + "line2": "test_name", + } + metrics, err := p.Parse([]byte(testCSV)) + require.NoError(t, err) + require.Equal(t, expectedFields, metrics[0].Fields()) +} + +func TestSkipColumnsWithHeader(t *testing.T) { + p := CSVParser{ + SkipColumns: 1, + HeaderRowCount: 2, + FieldColumns: []string{"col1"}, + } + testCSV := `col,col,col + 1,2,3 + trash,80,test_name` + + // we should expect an error if we try to get col1 + _, err := p.Parse([]byte(testCSV)) + log.Printf("%v", err) + require.Error(t, err) +} diff --git a/plugins/parsers/registry.go b/plugins/parsers/registry.go index f01e687396a2f..5397a6443079e 100644 --- a/plugins/parsers/registry.go +++ b/plugins/parsers/registry.go @@ -102,16 +102,18 @@ type Config struct { GrokTimeZone string //csv configuration - CSVDelimiter string - CSVComment string - CSVTrimSpace bool - CSVDataColumns []string - CSVTagColumns []string - CSVFieldColumns []string - CSVNameColumn string - CSVTimestampColumn string - CSVTimestampFormat string - CSVHeaderRowCount int + CSVDelimiter string + CSVComment string + CSVTrimSpace bool + CSVNameColumns []string + CSVTagColumns []string + CSVFieldColumns []string + CSVMeasurementColumn string + CSVTimestampColumn string + CSVTimestampFormat string + CSVHeaderRowCount int + CSVSkipRows int + CSVSkipHeaders int } // NewParser returns a Parser interface based on the given config. @@ -158,6 +160,8 @@ func NewParser(config *Config) (Parser, error) { case "csv": parser, err = newCSVParser(config.MetricName, config.CSVHeaderRowCount, + config.CSVSkipRows, + config.CSVSkipColumns, config.CSVDelimiter, config.CSVComment, config.CSVTrimSpace, @@ -176,6 +180,8 @@ func NewParser(config *Config) (Parser, error) { func newCSVParser(metricName string, header int, + skipRows int, + skipColumns int, delimiter string, comment string, trimSpace bool, @@ -187,18 +193,20 @@ func newCSVParser(metricName string, timestampFormat string, defaultTags map[string]string) (Parser, error) { parser := &csv.CSVParser{ - MetricName: metricName, - HeaderRowCount: header, - Delimiter: delimiter, - Comment: comment, - TrimSpace: trimSpace, - DataColumns: dataColumns, - TagColumns: tagColumns, - FieldColumns: fieldColumns, - NameColumn: nameColumn, - TimestampColumn: timestampColumn, - TimestampFormat: timestampFormat, - DefaultTags: defaultTags, + MetricName: metricName, + HeaderRowCount: header, + SkipRows: skipRows, + SkipColumns: skipColumns, + Delimiter: delimiter, + Comment: comment, + TrimSpace: trimSpace, + DataColumns: dataColumns, + TagColumns: tagColumns, + FieldColumns: fieldColumns, + MeasurementColumn: nameColumn, + TimestampColumn: timestampColumn, + TimestampFormat: timestampFormat, + DefaultTags: defaultTags, } return parser, nil From 24e38f30fd461f04e812f5a373ecfe66547fa284 Mon Sep 17 00:00:00 2001 From: Max U Date: Mon, 20 Aug 2018 14:36:56 -0700 Subject: [PATCH 14/23] start to remove field_column config --- docs/DATA_FORMATS_INPUT.md | 7 ++---- internal/config/config.go | 45 ++++++++++++----------------------- plugins/parsers/csv/parser.go | 1 - plugins/parsers/registry.go | 14 ++++------- 4 files changed, 22 insertions(+), 45 deletions(-) diff --git a/docs/DATA_FORMATS_INPUT.md b/docs/DATA_FORMATS_INPUT.md index be3d8fe179b3d..9faad8bf3c2ac 100644 --- a/docs/DATA_FORMATS_INPUT.md +++ b/docs/DATA_FORMATS_INPUT.md @@ -927,13 +927,10 @@ or an error will be thrown. ## If `csv_header_row_count` is set to 0, this config must be used csv_column_names = [] - ## Columns listed here will be added as tags + ## Columns listed here will be added as tags. Any other columns + ## will be added as fields. csv_tag_columns = [] - ## Columns listed here will be added as fields - ## the field type is infered from the value of the field - csv_field_columns = [] - ## The column to extract the name of the metric from ## By default, this is the name of the plugin ## the `name_override` config overrides this diff --git a/internal/config/config.go b/internal/config/config.go index 564b64cf7d627..b562c9754450f 100644 --- a/internal/config/config.go +++ b/internal/config/config.go @@ -1424,18 +1424,6 @@ func buildParser(name string, tbl *ast.Table) (parsers.Parser, error) { } } - if node, ok := tbl.Fields["csv_field_columns"]; ok { - if kv, ok := node.(*ast.KeyValue); ok { - if ary, ok := kv.Value.(*ast.Array); ok { - for _, elem := range ary.Value { - if str, ok := elem.(*ast.String); ok { - c.CSVFieldColumns = append(c.CSVFieldColumns, str.Value) - } - } - } - } - } - if node, ok := tbl.Fields["csv_delimiter"]; ok { if kv, ok := node.(*ast.KeyValue); ok { if str, ok := kv.Value.(*ast.String); ok { @@ -1455,7 +1443,7 @@ func buildParser(name string, tbl *ast.Table) (parsers.Parser, error) { if node, ok := tbl.Fields["csv_measurement_column"]; ok { if kv, ok := node.(*ast.KeyValue); ok { if str, ok := kv.Value.(*ast.String); ok { - c.CSVNameColumn = str.Value + c.CSVMeasurementColumn = str.Value } } } @@ -1478,10 +1466,11 @@ func buildParser(name string, tbl *ast.Table) (parsers.Parser, error) { if node, ok := tbl.Fields["csv_header_row_count"]; ok { if kv, ok := node.(*ast.KeyValue); ok { - if str, ok := kv.Value.(*ast.Value); ok { - c.CSVHeaderRowCount, err = strconv.Aoit(str.Value) + if str, ok := kv.Value.(*ast.String); ok { + iVal, err := strconv.Atoi(str.Value) + c.CSVHeaderRowCount = iVal if err != nil { - log.Printf("E! parsing to int: %v", err) + return nil, fmt.Errorf("E! parsing to int: %v", err) } } } @@ -1489,10 +1478,11 @@ func buildParser(name string, tbl *ast.Table) (parsers.Parser, error) { if node, ok := tbl.Fields["csv_skip_rows"]; ok { if kv, ok := node.(*ast.KeyValue); ok { - if str, ok := kv.Value.(*ast.Value); ok { - c.CSVSkipRows, err = strconv.Aoit(str.Value) + if str, ok := kv.Value.(*ast.String); ok { + iVal, err := strconv.Atoi(str.Value) + c.CSVSkipRows = iVal if err != nil { - log.Printf("E! parsing to int: %v", err) + return nil, fmt.Errorf("E! parsing to int: %v", err) } } } @@ -1500,10 +1490,11 @@ func buildParser(name string, tbl *ast.Table) (parsers.Parser, error) { if node, ok := tbl.Fields["csv_skip_columns"]; ok { if kv, ok := node.(*ast.KeyValue); ok { - if str, ok := kv.Value.(*ast.Value); ok { - c.CSVSkipColumns, err = strconv.Aoit(str.Value) + if str, ok := kv.Value.(*ast.String); ok { + iVal, err := strconv.Atoi(str.Value) + c.CSVSkipColumns = iVal if err != nil { - log.Printf("E! parsing to int: %v", err) + return nil, fmt.Errorf("E! parsing to int: %v", err) } } } @@ -1513,16 +1504,10 @@ func buildParser(name string, tbl *ast.Table) (parsers.Parser, error) { if kv, ok := node.(*ast.KeyValue); ok { if str, ok := kv.Value.(*ast.Boolean); ok { //for config with no quotes - val, _ := strconv.ParseBool(str.Value) + val, err := strconv.ParseBool(str.Value) c.CSVTrimSpace = val - } else { - //for config with quotes - strVal := kv.Value.(*ast.String) - val, err := strconv.ParseBool(strVal.Value) if err != nil { - log.Printf("E! parsing to bool: %v", err) - } else { - c.CSVTrimSpace = val + return nil, fmt.Errorf("E! parsing to bool: %v", err) } } } diff --git a/plugins/parsers/csv/parser.go b/plugins/parsers/csv/parser.go index 3c01ec90ee8ce..57446adfc7ed6 100644 --- a/plugins/parsers/csv/parser.go +++ b/plugins/parsers/csv/parser.go @@ -21,7 +21,6 @@ type CSVParser struct { TrimSpace bool ColumnNames []string TagColumns []string - FieldColumns []string MeasurementColumn string TimestampColumn string TimestampFormat string diff --git a/plugins/parsers/registry.go b/plugins/parsers/registry.go index 5397a6443079e..4cbd30529fff7 100644 --- a/plugins/parsers/registry.go +++ b/plugins/parsers/registry.go @@ -105,15 +105,14 @@ type Config struct { CSVDelimiter string CSVComment string CSVTrimSpace bool - CSVNameColumns []string + CSVColumnNames []string CSVTagColumns []string - CSVFieldColumns []string CSVMeasurementColumn string CSVTimestampColumn string CSVTimestampFormat string CSVHeaderRowCount int CSVSkipRows int - CSVSkipHeaders int + CSVSkipColumns int } // NewParser returns a Parser interface based on the given config. @@ -165,10 +164,9 @@ func NewParser(config *Config) (Parser, error) { config.CSVDelimiter, config.CSVComment, config.CSVTrimSpace, - config.CSVDataColumns, + config.CSVColumnNames, config.CSVTagColumns, - config.CSVFieldColumns, - config.CSVNameColumn, + config.CSVMeasurementColumn, config.CSVTimestampColumn, config.CSVTimestampFormat, config.DefaultTags) @@ -187,7 +185,6 @@ func newCSVParser(metricName string, trimSpace bool, dataColumns []string, tagColumns []string, - fieldColumns []string, nameColumn string, timestampColumn string, timestampFormat string, @@ -200,9 +197,8 @@ func newCSVParser(metricName string, Delimiter: delimiter, Comment: comment, TrimSpace: trimSpace, - DataColumns: dataColumns, + ColumnNames: dataColumns, TagColumns: tagColumns, - FieldColumns: fieldColumns, MeasurementColumn: nameColumn, TimestampColumn: timestampColumn, TimestampFormat: timestampFormat, From fc36fd54db237707a14838b266665d16a6010d3f Mon Sep 17 00:00:00 2001 From: Max U Date: Tue, 21 Aug 2018 10:11:35 -0700 Subject: [PATCH 15/23] just broke a lot. lovely --- plugins/parsers/csv/parser.go | 58 ++++++++++++++---------------- plugins/parsers/csv/parser_test.go | 21 +++-------- 2 files changed, 31 insertions(+), 48 deletions(-) diff --git a/plugins/parsers/csv/parser.go b/plugins/parsers/csv/parser.go index 57446adfc7ed6..31d601a7813f0 100644 --- a/plugins/parsers/csv/parser.go +++ b/plugins/parsers/csv/parser.go @@ -34,14 +34,14 @@ func (p *CSVParser) compile(r *bytes.Reader) (*csv.Reader, error) { if p.Delimiter != "" { runeStr := []rune(p.Delimiter) if len(runeStr) > 1 { - return csvReader, fmt.Errorf("delimiter must be a single character, got: %s", p.Delimiter) + return nil, fmt.Errorf("delimiter must be a single character, got: %s", p.Delimiter) } csvReader.Comma = runeStr[0] } if p.Comment != "" { runeStr := []rune(p.Comment) if len(runeStr) > 1 { - return csvReader, fmt.Errorf("comment must be a single character, got: %s", p.Comment) + return nil, fmt.Errorf("comment must be a single character, got: %s", p.Comment) } csvReader.Comment = runeStr[0] } @@ -130,7 +130,7 @@ func (p *CSVParser) ParseLine(line string) (telegraf.Metric, error) { } func (p *CSVParser) parseRecord(record []string) (telegraf.Metric, error) { - recordFields := make(map[string]string) + recordFields := make(map[string]interface{}) tags := make(map[string]string) fields := make(map[string]interface{}) @@ -138,7 +138,17 @@ func (p *CSVParser) parseRecord(record []string) (telegraf.Metric, error) { record = record[p.SkipColumns:] for i, fieldName := range p.ColumnNames { if i < len(record) { - recordFields[fieldName] = record[i] + value := record[i] + // attempt type conversions + if iValue, err := strconv.ParseInt(value, 10, 64); err == nil { + recordFields[fieldName] = iValue + } else if fValue, err := strconv.ParseFloat(value, 64); err == nil { + recordFields[fieldName] = fValue + } else if bValue, err := strconv.ParseBool(value); err == nil { + recordFields[fieldName] = bValue + } else { + recordFields[fieldName] = value + } } } @@ -147,40 +157,26 @@ func (p *CSVParser) parseRecord(record []string) (telegraf.Metric, error) { tags[k] = v } - for _, tagName := range p.TagColumns { - if recordFields[tagName] == "" { - return nil, fmt.Errorf("could not find field: %v", tagName) - } - tags[tagName] = recordFields[tagName] + // will default to plugin name + measurementName := p.MetricName + if recordFields[p.MeasurementColumn] != nil { + measurementName = recordFields[p.MeasurementColumn].(string) } - for _, fieldName := range p.FieldColumns { - value, ok := recordFields[fieldName] - if !ok { - return nil, fmt.Errorf("could not find field: %v", fieldName) - } - - // attempt type conversions - if iValue, err := strconv.Atoi(value); err == nil { - fields[fieldName] = iValue - } else if fValue, err := strconv.ParseFloat(value, 64); err == nil { - fields[fieldName] = fValue - } else if bValue, err := strconv.ParseBool(value); err == nil { - fields[fieldName] = bValue - } else { - fields[fieldName] = value + for _, tagName := range p.TagColumns { + if recordFields[tagName] == nil { + return nil, fmt.Errorf("could not find field: %v", tagName) } - } - - // will default to plugin name - measurementName := p.MetricName - if recordFields[p.MeasurementColumn] != "" { - measurementName = recordFields[p.MeasurementColumn] + tags[tagName] = recordFields[tagName].(string) + delete(recordFields, tagName) } metricTime := time.Now() if p.TimestampColumn != "" { - tStr := recordFields[p.TimestampColumn] + if recordFields[p.TimestampColumn] == nil { + return nil, fmt.Errorf("timestamp column: %v could not be found", p.TimestampColumn) + } + tStr := recordFields[p.TimestampColumn].(string) if p.TimestampFormat == "" { return nil, fmt.Errorf("timestamp format must be specified") } diff --git a/plugins/parsers/csv/parser_test.go b/plugins/parsers/csv/parser_test.go index 321406e578481..612d1e861194d 100644 --- a/plugins/parsers/csv/parser_test.go +++ b/plugins/parsers/csv/parser_test.go @@ -13,9 +13,8 @@ import ( func TestBasicCSV(t *testing.T) { p := CSVParser{ - ColumnNames: []string{"first", "second", "third"}, - FieldColumns: []string{"first", "second"}, - TagColumns: []string{"third"}, + ColumnNames: []string{"first", "second", "third"}, + TagColumns: []string{"third"}, } _, err := p.ParseLine("1.4,true,hi") @@ -25,7 +24,6 @@ func TestBasicCSV(t *testing.T) { func TestHeaderConcatenationCSV(t *testing.T) { p := CSVParser{ HeaderRowCount: 2, - FieldColumns: []string{"first1", "second2"}, MeasurementColumn: "3", } testCSV := `first,second @@ -41,7 +39,6 @@ func TestHeaderOverride(t *testing.T) { p := CSVParser{ HeaderRowCount: 1, ColumnNames: []string{"first", "second", "third"}, - FieldColumns: []string{"first", "second"}, MeasurementColumn: "third", } testCSV := `line1,line2,line3 @@ -55,7 +52,6 @@ func TestTimestamp(t *testing.T) { p := CSVParser{ HeaderRowCount: 1, ColumnNames: []string{"first", "second", "third"}, - FieldColumns: []string{"second"}, MeasurementColumn: "third", TimestampColumn: "first", TimestampFormat: "02/01/06 03:04:05 PM", @@ -72,7 +68,6 @@ func TestTimestampError(t *testing.T) { p := CSVParser{ HeaderRowCount: 1, ColumnNames: []string{"first", "second", "third"}, - FieldColumns: []string{"second"}, MeasurementColumn: "third", TimestampColumn: "first", } @@ -87,7 +82,6 @@ func TestQuotedCharacter(t *testing.T) { p := CSVParser{ HeaderRowCount: 1, ColumnNames: []string{"first", "second", "third"}, - FieldColumns: []string{"second", "first"}, MeasurementColumn: "third", } @@ -103,7 +97,6 @@ func TestDelimiter(t *testing.T) { HeaderRowCount: 1, Delimiter: "%", ColumnNames: []string{"first", "second", "third"}, - FieldColumns: []string{"second", "first"}, MeasurementColumn: "third", } @@ -119,7 +112,6 @@ func TestValueConversion(t *testing.T) { HeaderRowCount: 0, Delimiter: ",", ColumnNames: []string{"first", "second", "third", "fourth"}, - FieldColumns: []string{"second", "first", "third", "fourth"}, MetricName: "test_value", } testCSV := `3.3,4,true,hello` @@ -149,7 +141,6 @@ func TestSkipComment(t *testing.T) { HeaderRowCount: 0, Comment: "#", ColumnNames: []string{"first", "second", "third", "fourth"}, - FieldColumns: []string{"second", "first", "third", "fourth"}, MetricName: "test_value", } testCSV := `#3.3,4,true,hello @@ -172,7 +163,6 @@ func TestTrimSpace(t *testing.T) { HeaderRowCount: 0, TrimSpace: true, ColumnNames: []string{"first", "second", "third", "fourth"}, - FieldColumns: []string{"second", "first", "third", "fourth"}, MetricName: "test_value", } testCSV := ` 3.3, 4, true,hello` @@ -197,7 +187,6 @@ func TestSkipRows(t *testing.T) { p := CSVParser{ HeaderRowCount: 1, SkipRows: 1, - FieldColumns: []string{"line2", "line3"}, MeasurementColumn: "line3", } testCSV := `garbage nonsense @@ -215,9 +204,8 @@ hello,80,test_name2` func TestSkipColumns(t *testing.T) { p := CSVParser{ - SkipColumns: 1, - ColumnNames: []string{"line1", "line2"}, - FieldColumns: []string{"line1", "line2"}, + SkipColumns: 1, + ColumnNames: []string{"line1", "line2"}, } testCSV := `hello,80,test_name` @@ -234,7 +222,6 @@ func TestSkipColumnsWithHeader(t *testing.T) { p := CSVParser{ SkipColumns: 1, HeaderRowCount: 2, - FieldColumns: []string{"col1"}, } testCSV := `col,col,col 1,2,3 From 6e7ec3e2ee066f92ff77dc10ec69bdf92ffc52ce Mon Sep 17 00:00:00 2001 From: Max U Date: Wed, 22 Aug 2018 13:19:16 -0700 Subject: [PATCH 16/23] fixed it --- plugins/parsers/csv/parser.go | 3 +-- plugins/parsers/csv/parser_test.go | 7 ++++--- 2 files changed, 5 insertions(+), 5 deletions(-) diff --git a/plugins/parsers/csv/parser.go b/plugins/parsers/csv/parser.go index 31d601a7813f0..22eac719ba1dd 100644 --- a/plugins/parsers/csv/parser.go +++ b/plugins/parsers/csv/parser.go @@ -132,7 +132,6 @@ func (p *CSVParser) ParseLine(line string) (telegraf.Metric, error) { func (p *CSVParser) parseRecord(record []string) (telegraf.Metric, error) { recordFields := make(map[string]interface{}) tags := make(map[string]string) - fields := make(map[string]interface{}) // skip columns in record record = record[p.SkipColumns:] @@ -188,7 +187,7 @@ func (p *CSVParser) parseRecord(record []string) (telegraf.Metric, error) { } } - m, err := metric.New(measurementName, tags, fields, metricTime) + m, err := metric.New(measurementName, tags, recordFields, metricTime) if err != nil { return nil, err } diff --git a/plugins/parsers/csv/parser_test.go b/plugins/parsers/csv/parser_test.go index 612d1e861194d..d4864998e51b2 100644 --- a/plugins/parsers/csv/parser_test.go +++ b/plugins/parsers/csv/parser_test.go @@ -187,6 +187,7 @@ func TestSkipRows(t *testing.T) { p := CSVParser{ HeaderRowCount: 1, SkipRows: 1, + TagColumns: []string{"line1"}, MeasurementColumn: "line3", } testCSV := `garbage nonsense @@ -228,7 +229,7 @@ func TestSkipColumnsWithHeader(t *testing.T) { trash,80,test_name` // we should expect an error if we try to get col1 - _, err := p.Parse([]byte(testCSV)) - log.Printf("%v", err) - require.Error(t, err) + metrics, err := p.Parse([]byte(testCSV)) + require.NoError(t, err) + require.Equal(t, map[string]interface{}{"col2": int64(80), "col3": "test_name"}, metrics[0].Fields()) } From 0d7b236bad84fe4b14e710dbf2dd35b2c3e479a5 Mon Sep 17 00:00:00 2001 From: Max U Date: Wed, 22 Aug 2018 13:29:08 -0700 Subject: [PATCH 17/23] address some of daniel's comments --- plugins/parsers/csv/parser.go | 12 ++++----- plugins/parsers/csv/parser_test.go | 39 +++++++++++++++--------------- plugins/parsers/registry.go | 2 +- 3 files changed, 27 insertions(+), 26 deletions(-) diff --git a/plugins/parsers/csv/parser.go b/plugins/parsers/csv/parser.go index 22eac719ba1dd..33e20491b68da 100644 --- a/plugins/parsers/csv/parser.go +++ b/plugins/parsers/csv/parser.go @@ -11,7 +11,7 @@ import ( "github.com/influxdata/telegraf/metric" ) -type CSVParser struct { +type Parser struct { MetricName string HeaderRowCount int SkipRows int @@ -27,7 +27,7 @@ type CSVParser struct { DefaultTags map[string]string } -func (p *CSVParser) compile(r *bytes.Reader) (*csv.Reader, error) { +func (p *Parser) compile(r *bytes.Reader) (*csv.Reader, error) { csvReader := csv.NewReader(r) // ensures that the reader reads records of different lengths without an error csvReader.FieldsPerRecord = -1 @@ -49,7 +49,7 @@ func (p *CSVParser) compile(r *bytes.Reader) (*csv.Reader, error) { return csvReader, nil } -func (p *CSVParser) Parse(buf []byte) ([]telegraf.Metric, error) { +func (p *Parser) Parse(buf []byte) ([]telegraf.Metric, error) { r := bytes.NewReader(buf) csvReader, err := p.compile(r) if err != nil { @@ -106,7 +106,7 @@ func (p *CSVParser) Parse(buf []byte) ([]telegraf.Metric, error) { // ParseLine does not use any information in header and assumes DataColumns is set // it will also not skip any rows -func (p *CSVParser) ParseLine(line string) (telegraf.Metric, error) { +func (p *Parser) ParseLine(line string) (telegraf.Metric, error) { r := bytes.NewReader([]byte(line)) csvReader, err := p.compile(r) if err != nil { @@ -129,7 +129,7 @@ func (p *CSVParser) ParseLine(line string) (telegraf.Metric, error) { return m, nil } -func (p *CSVParser) parseRecord(record []string) (telegraf.Metric, error) { +func (p *Parser) parseRecord(record []string) (telegraf.Metric, error) { recordFields := make(map[string]interface{}) tags := make(map[string]string) @@ -194,6 +194,6 @@ func (p *CSVParser) parseRecord(record []string) (telegraf.Metric, error) { return m, nil } -func (p *CSVParser) SetDefaultTags(tags map[string]string) { +func (p *Parser) SetDefaultTags(tags map[string]string) { p.DefaultTags = tags } diff --git a/plugins/parsers/csv/parser_test.go b/plugins/parsers/csv/parser_test.go index d4864998e51b2..c346ac2c96d44 100644 --- a/plugins/parsers/csv/parser_test.go +++ b/plugins/parsers/csv/parser_test.go @@ -3,7 +3,6 @@ package csv import ( "fmt" "log" - "reflect" "testing" "time" @@ -12,7 +11,7 @@ import ( ) func TestBasicCSV(t *testing.T) { - p := CSVParser{ + p := Parser{ ColumnNames: []string{"first", "second", "third"}, TagColumns: []string{"third"}, } @@ -22,7 +21,7 @@ func TestBasicCSV(t *testing.T) { } func TestHeaderConcatenationCSV(t *testing.T) { - p := CSVParser{ + p := Parser{ HeaderRowCount: 2, MeasurementColumn: "3", } @@ -36,7 +35,7 @@ func TestHeaderConcatenationCSV(t *testing.T) { } func TestHeaderOverride(t *testing.T) { - p := CSVParser{ + p := Parser{ HeaderRowCount: 1, ColumnNames: []string{"first", "second", "third"}, MeasurementColumn: "third", @@ -49,7 +48,7 @@ func TestHeaderOverride(t *testing.T) { } func TestTimestamp(t *testing.T) { - p := CSVParser{ + p := Parser{ HeaderRowCount: 1, ColumnNames: []string{"first", "second", "third"}, MeasurementColumn: "third", @@ -60,12 +59,14 @@ func TestTimestamp(t *testing.T) { 23/05/09 04:05:06 PM,70,test_name 07/11/09 04:05:06 PM,80,test_name2` metrics, err := p.Parse([]byte(testCSV)) + require.NoError(t, err) - require.NotEqual(t, metrics[1].Time(), metrics[0].Time()) + require.Equal(t, metrics[0].Time().UnixNano(), int64(1243094706000000000)) + require.Equal(t, metrics[1].Time().UnixNano(), int64(1257609906000000000)) } func TestTimestampError(t *testing.T) { - p := CSVParser{ + p := Parser{ HeaderRowCount: 1, ColumnNames: []string{"first", "second", "third"}, MeasurementColumn: "third", @@ -79,7 +80,7 @@ func TestTimestampError(t *testing.T) { } func TestQuotedCharacter(t *testing.T) { - p := CSVParser{ + p := Parser{ HeaderRowCount: 1, ColumnNames: []string{"first", "second", "third"}, MeasurementColumn: "third", @@ -93,7 +94,7 @@ func TestQuotedCharacter(t *testing.T) { } func TestDelimiter(t *testing.T) { - p := CSVParser{ + p := Parser{ HeaderRowCount: 1, Delimiter: "%", ColumnNames: []string{"first", "second", "third"}, @@ -108,7 +109,7 @@ func TestDelimiter(t *testing.T) { } func TestValueConversion(t *testing.T) { - p := CSVParser{ + p := Parser{ HeaderRowCount: 0, Delimiter: ",", ColumnNames: []string{"first", "second", "third", "fourth"}, @@ -127,17 +128,17 @@ func TestValueConversion(t *testing.T) { metrics, err := p.Parse([]byte(testCSV)) require.NoError(t, err) - goodMetric, err1 := metric.New("test_value", expectedTags, expectedFields, time.Unix(0, 0)) + expectedMetric, err1 := metric.New("test_value", expectedTags, expectedFields, time.Unix(0, 0)) returnedMetric, err2 := metric.New(metrics[0].Name(), metrics[0].Tags(), metrics[0].Fields(), time.Unix(0, 0)) require.NoError(t, err1) require.NoError(t, err2) //deep equal fields - require.True(t, reflect.DeepEqual(goodMetric.Fields(), returnedMetric.Fields())) + require.Equal(t, expectedMetric.Fields(), returnedMetric.Fields()) } func TestSkipComment(t *testing.T) { - p := CSVParser{ + p := Parser{ HeaderRowCount: 0, Comment: "#", ColumnNames: []string{"first", "second", "third", "fourth"}, @@ -155,11 +156,11 @@ func TestSkipComment(t *testing.T) { metrics, err := p.Parse([]byte(testCSV)) require.NoError(t, err) - require.Equal(t, true, reflect.DeepEqual(expectedFields, metrics[0].Fields())) + require.Equal(t, expectedFields, metrics[0].Fields()) } func TestTrimSpace(t *testing.T) { - p := CSVParser{ + p := Parser{ HeaderRowCount: 0, TrimSpace: true, ColumnNames: []string{"first", "second", "third", "fourth"}, @@ -180,11 +181,11 @@ func TestTrimSpace(t *testing.T) { log.Printf("got: %v, %T", metrics[0].Fields()[k], metrics[0].Fields()[k]) } require.NoError(t, err) - require.Equal(t, true, reflect.DeepEqual(expectedFields, metrics[0].Fields())) + require.Equal(t, expectedFields, metrics[0].Fields()) } func TestSkipRows(t *testing.T) { - p := CSVParser{ + p := Parser{ HeaderRowCount: 1, SkipRows: 1, TagColumns: []string{"line1"}, @@ -204,7 +205,7 @@ hello,80,test_name2` } func TestSkipColumns(t *testing.T) { - p := CSVParser{ + p := Parser{ SkipColumns: 1, ColumnNames: []string{"line1", "line2"}, } @@ -220,7 +221,7 @@ func TestSkipColumns(t *testing.T) { } func TestSkipColumnsWithHeader(t *testing.T) { - p := CSVParser{ + p := Parser{ SkipColumns: 1, HeaderRowCount: 2, } diff --git a/plugins/parsers/registry.go b/plugins/parsers/registry.go index 4cbd30529fff7..b069b32a747c8 100644 --- a/plugins/parsers/registry.go +++ b/plugins/parsers/registry.go @@ -189,7 +189,7 @@ func newCSVParser(metricName string, timestampColumn string, timestampFormat string, defaultTags map[string]string) (Parser, error) { - parser := &csv.CSVParser{ + parser := &csv.Parser{ MetricName: metricName, HeaderRowCount: header, SkipRows: skipRows, From 20ed8198c483a6e0a0ee98ff23f2e092c15f6221 Mon Sep 17 00:00:00 2001 From: Max U Date: Wed, 22 Aug 2018 13:34:37 -0700 Subject: [PATCH 18/23] trim space manually --- plugins/parsers/csv/parser.go | 9 ++++++--- plugins/parsers/csv/parser_test.go | 5 ----- 2 files changed, 6 insertions(+), 8 deletions(-) diff --git a/plugins/parsers/csv/parser.go b/plugins/parsers/csv/parser.go index 33e20491b68da..2bbf2ce0df858 100644 --- a/plugins/parsers/csv/parser.go +++ b/plugins/parsers/csv/parser.go @@ -5,6 +5,7 @@ import ( "encoding/csv" "fmt" "strconv" + "strings" "time" "github.com/influxdata/telegraf" @@ -45,7 +46,6 @@ func (p *Parser) compile(r *bytes.Reader) (*csv.Reader, error) { } csvReader.Comment = runeStr[0] } - csvReader.TrimLeadingSpace = p.TrimSpace return csvReader, nil } @@ -70,10 +70,12 @@ func (p *Parser) Parse(buf []byte) ([]telegraf.Metric, error) { } //concatenate header names for i := range header { + name := header[i] + name = strings.Trim(name, " ") if len(headerNames) <= i { - headerNames = append(headerNames, header[i]) + headerNames = append(headerNames, name) } else { - headerNames[i] = headerNames[i] + header[i] + headerNames[i] = headerNames[i] + name } } } @@ -138,6 +140,7 @@ func (p *Parser) parseRecord(record []string) (telegraf.Metric, error) { for i, fieldName := range p.ColumnNames { if i < len(record) { value := record[i] + value = strings.Trim(value, " ") // attempt type conversions if iValue, err := strconv.ParseInt(value, 10, 64); err == nil { recordFields[fieldName] = iValue diff --git a/plugins/parsers/csv/parser_test.go b/plugins/parsers/csv/parser_test.go index c346ac2c96d44..b488a1f16e321 100644 --- a/plugins/parsers/csv/parser_test.go +++ b/plugins/parsers/csv/parser_test.go @@ -2,7 +2,6 @@ package csv import ( "fmt" - "log" "testing" "time" @@ -176,10 +175,6 @@ func TestTrimSpace(t *testing.T) { } metrics, err := p.Parse([]byte(testCSV)) - for k := range metrics[0].Fields() { - log.Printf("want: %v, %T", expectedFields[k], expectedFields[k]) - log.Printf("got: %v, %T", metrics[0].Fields()[k], metrics[0].Fields()[k]) - } require.NoError(t, err) require.Equal(t, expectedFields, metrics[0].Fields()) } From 50168993426a7a99c5f7d6898520783f8d21d012 Mon Sep 17 00:00:00 2001 From: Max U Date: Wed, 22 Aug 2018 14:31:48 -0700 Subject: [PATCH 19/23] fix config --- internal/config/config.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/internal/config/config.go b/internal/config/config.go index b562c9754450f..77d0b5b138d4c 100644 --- a/internal/config/config.go +++ b/internal/config/config.go @@ -1405,7 +1405,7 @@ func buildParser(name string, tbl *ast.Table) (parsers.Parser, error) { if ary, ok := kv.Value.(*ast.Array); ok { for _, elem := range ary.Value { if str, ok := elem.(*ast.String); ok { - c.CSVDataColumns = append(c.CSVDataColumns, str.Value) + c.CSVColumnNames = append(c.CSVColumnNames, str.Value) } } } From c058db6b893109430c7dd646ce0ba06f92954c38 Mon Sep 17 00:00:00 2001 From: Daniel Nelson Date: Thu, 23 Aug 2018 10:47:13 -0700 Subject: [PATCH 20/23] Remerge data format docs --- docs/DATA_FORMATS_INPUT.md | 269 ++++++++++++++++++------------------- 1 file changed, 133 insertions(+), 136 deletions(-) diff --git a/docs/DATA_FORMATS_INPUT.md b/docs/DATA_FORMATS_INPUT.md index ea6d2f7670a31..7e57d9657aae1 100644 --- a/docs/DATA_FORMATS_INPUT.md +++ b/docs/DATA_FORMATS_INPUT.md @@ -12,7 +12,7 @@ Telegraf is able to parse the following input data formats into metrics: 1. [Grok](#grok) 1. [Logfmt](#logfmt) 1. [Wavefront](#wavefront) -1. [CSV] (#CSV) +1. [CSV](#csv) Telegraf metrics, like InfluxDB [points](https://docs.influxdata.com/influxdb/v0.10/write_protocols/line/), @@ -108,28 +108,28 @@ but can be overridden using the `name_override` config option. #### JSON Configuration: -The JSON data format supports specifying "tag_keys", "string_keys", and "json_query". -If specified, keys in "tag_keys" and "string_keys" will be searched for in the root-level -and any nested lists of the JSON blob. All int and float values are added to fields by default. -If the key(s) exist, they will be applied as tags or fields to the Telegraf metrics. +The JSON data format supports specifying "tag_keys", "string_keys", and "json_query". +If specified, keys in "tag_keys" and "string_keys" will be searched for in the root-level +and any nested lists of the JSON blob. All int and float values are added to fields by default. +If the key(s) exist, they will be applied as tags or fields to the Telegraf metrics. If "string_keys" is specified, the string will be added as a field. -The "json_query" configuration is a gjson path to an JSON object or -list of JSON objects. If this path leads to an array of values or -single data point an error will be thrown. If this configuration +The "json_query" configuration is a gjson path to an JSON object or +list of JSON objects. If this path leads to an array of values or +single data point an error will be thrown. If this configuration is specified, only the result of the query will be parsed and returned as metrics. The "json_name_key" configuration specifies the key of the field whos value will be added as the metric name. -Object paths are specified using gjson path format, which is denoted by object keys -concatenated with "." to go deeper in nested JSON objects. +Object paths are specified using gjson path format, which is denoted by object keys +concatenated with "." to go deeper in nested JSON objects. Additional information on gjson paths can be found here: https://github.com/tidwall/gjson#path-syntax -The JSON data format also supports extracting time values through the -config "json_time_key" and "json_time_format". If "json_time_key" is set, -"json_time_format" must be specified. The "json_time_key" describes the -name of the field containing time information. The "json_time_format" +The JSON data format also supports extracting time values through the +config "json_time_key" and "json_time_format". If "json_time_key" is set, +"json_time_format" must be specified. The "json_time_key" describes the +name of the field containing time information. The "json_time_format" must be a recognized Go time format. If there is no year provided, the metrics will have the current year. More info on time formats can be found here: https://golang.org/pkg/time/#Parse @@ -162,8 +162,8 @@ For example, if you had this configuration: ## List of field names to extract from JSON and add as string fields # json_string_fields = [] - ## gjson query path to specify a specific chunk of JSON to be parsed with - ## the above configuration. If not specified, the whole file will be parsed. + ## gjson query path to specify a specific chunk of JSON to be parsed with + ## the above configuration. If not specified, the whole file will be parsed. ## gjson query paths are described here: https://github.com/tidwall/gjson#path-syntax # json_query = "" @@ -192,8 +192,8 @@ Your Telegraf metrics would get tagged with "my_tag_1" exec_mycollector,my_tag_1=foo a=5,b_c=6 ``` -If the JSON data is an array, then each element of the array is -parsed with the configured settings. Each resulting metric will +If the JSON data is an array, then each element of the array is +parsed with the configured settings. Each resulting metric will be output with the same timestamp. For example, if the following configuration: @@ -221,7 +221,7 @@ For example, if the following configuration: ## List of field names to extract from JSON and add as string fields # string_fields = [] - ## gjson query path to specify a specific chunk of JSON to be parsed with + ## gjson query path to specify a specific chunk of JSON to be parsed with ## the above configuration. If not specified, the whole file will be parsed # json_query = "" @@ -265,7 +265,7 @@ exec_mycollector,my_tag_1=foo,my_tag_2=baz b_c=6 1136387040000000000 exec_mycollector,my_tag_1=bar,my_tag_2=baz b_c=8 1168527840000000000 ``` -If you want to only use a specific portion of your JSON, use the "json_query" +If you want to only use a specific portion of your JSON, use the "json_query" configuration to specify a path to a JSON object. For example, with the following config: @@ -289,7 +289,7 @@ For example, with the following config: ## List of field names to extract from JSON and add as string fields string_fields = ["last"] - ## gjson query path to specify a specific chunk of JSON to be parsed with + ## gjson query path to specify a specific chunk of JSON to be parsed with ## the above configuration. If not specified, the whole file will be parsed json_query = "obj.friends" @@ -787,50 +787,6 @@ The best way to get acquainted with grok patterns is to read the logstash docs, which are available here: https://www.elastic.co/guide/en/logstash/current/plugins-filters-grok.html -#### Grok Configuration: -```toml -[[inputs.file]] - ## Files to parse each interval. - ## These accept standard unix glob matching rules, but with the addition of - ## ** as a "super asterisk". ie: - ## /var/log/**.log -> recursively find all .log files in /var/log - ## /var/log/*/*.log -> find all .log files with a parent dir in /var/log - ## /var/log/apache.log -> only tail the apache log file - files = ["/var/log/apache/access.log"] - - ## The dataformat to be read from files - ## Each data format has its own unique set of configuration options, read - ## more about them here: - ## https://github.com/influxdata/telegraf/blob/master/docs/DATA_FORMATS_INPUT.md - data_format = "grok" - - ## This is a list of patterns to check the given log file(s) for. - ## Note that adding patterns here increases processing time. The most - ## efficient configuration is to have one pattern. - ## Other common built-in patterns are: - ## %{COMMON_LOG_FORMAT} (plain apache & nginx access logs) - ## %{COMBINED_LOG_FORMAT} (access logs + referrer & agent) - grok_patterns = ["%{COMBINED_LOG_FORMAT}"] - - ## Full path(s) to custom pattern files. - grok_custom_pattern_files = [] - - ## Custom patterns can also be defined here. Put one pattern per line. - grok_custom_patterns = ''' - ''' - - ## Timezone allows you to provide an override for timestamps that - ## don't already include an offset - ## e.g. 04/06/2016 12:41:45 data one two 5.43µs - ## - ## Default: "" which renders UTC - ## Options are as follows: - ## 1. Local -- interpret based on machine localtime - ## 2. "Canada/Eastern" -- Unix TZ values like those found in https://en.wikipedia.org/wiki/List_of_tz_database_time_zones - ## 3. UTC -- or blank/unspecified, will return timestamp in UTC - grok_timezone = "Canada/Eastern" -``` - The grok parser uses a slightly modified version of logstash "grok" patterns, with the format: @@ -981,30 +937,133 @@ will be processed based on the current machine timezone configuration. Lastly, i timezone from the list of Unix [timezones](https://en.wikipedia.org/wiki/List_of_tz_database_time_zones), grok will offset the timestamp accordingly. +#### TOML Escaping + +When saving patterns to the configuration file, keep in mind the different TOML +[string](https://github.com/toml-lang/toml#string) types and the escaping +rules for each. These escaping rules must be applied in addition to the +escaping required by the grok syntax. Using the Multi-line line literal +syntax with `'''` may be useful. + +The following config examples will parse this input file: + +``` +|42|\uD83D\uDC2F|'telegraf'| +``` + +Since `|` is a special character in the grok language, we must escape it to +get a literal `|`. With a basic TOML string, special characters such as +backslash must be escaped, requiring us to escape the backslash a second time. + +```toml +[[inputs.file]] + grok_patterns = ["\\|%{NUMBER:value:int}\\|%{UNICODE_ESCAPE:escape}\\|'%{WORD:name}'\\|"] + grok_custom_patterns = "UNICODE_ESCAPE (?:\\\\u[0-9A-F]{4})+" +``` + +We cannot use a literal TOML string for the pattern, because we cannot match a +`'` within it. However, it works well for the custom pattern. +```toml +[[inputs.file]] + grok_patterns = ["\\|%{NUMBER:value:int}\\|%{UNICODE_ESCAPE:escape}\\|'%{WORD:name}'\\|"] + grok_custom_patterns = 'UNICODE_ESCAPE (?:\\u[0-9A-F]{4})+' +``` + +A multi-line literal string allows us to encode the pattern: +```toml +[[inputs.file]] + grok_patterns = [''' + \|%{NUMBER:value:int}\|%{UNICODE_ESCAPE:escape}\|'%{WORD:name}'\| + '''] + grok_custom_patterns = 'UNICODE_ESCAPE (?:\\u[0-9A-F]{4})+' +``` + +#### Tips for creating patterns + +Writing complex patterns can be difficult, here is some advice for writing a +new pattern or testing a pattern developed [online](https://grokdebug.herokuapp.com). + +Create a file output that writes to stdout, and disable other outputs while +testing. This will allow you to see the captured metrics. Keep in mind that +the file output will only print once per `flush_interval`. + +```toml +[[outputs.file]] + files = ["stdout"] +``` + +- Start with a file containing only a single line of your input. +- Remove all but the first token or piece of the line. +- Add the section of your pattern to match this piece to your configuration file. +- Verify that the metric is parsed successfully by running Telegraf. +- If successful, add the next token, update the pattern and retest. +- Continue one token at a time until the entire line is successfully parsed. + +# Logfmt +This parser implements the logfmt format by extracting and converting key-value pairs from log text in the form `=`. +At the moment, the plugin will produce one metric per line and all keys +are added as fields. +A typical log +``` +method=GET host=influxdata.org ts=2018-07-24T19:43:40.275Z +connect=4ms service=8ms status=200 bytes=1653 +``` +will be converted into +``` +logfmt method="GET",host="influxdata.org",ts="2018-07-24T19:43:40.275Z",connect="4ms",service="8ms",status=200i,bytes=1653i + +``` +Additional information about the logfmt format can be found [here](https://brandur.org/logfmt). + +# Wavefront: + +Wavefront Data Format is metrics are parsed directly into Telegraf metrics. +For more information about the Wavefront Data Format see +[here](https://docs.wavefront.com/wavefront_data_format.html). + +There are no additional configuration options for Wavefront Data Format line-protocol. + +#### Wavefront Configuration: + +```toml +[[inputs.exec]] + ## Commands array + commands = ["/tmp/test.sh", "/usr/bin/mycollector --foo=bar"] + + ## measurement name suffix (for separating different commands) + name_suffix = "_mycollector" + + ## Data format to consume. + ## Each data format has its own unique set of configuration options, read + ## more about them here: + ## https://github.com/influxdata/telegraf/blob/master/docs/DATA_FORMATS_INPUT.md + data_format = "wavefront" +``` + # CSV Parse out metrics from a CSV formatted table. By default, the parser assumes there is no header and will read data from the first line. If `csv_header_row_count` is set to anything besides 0, the parser -will extract column names from the first number of rows. Headers of more than 1 row will have their +will extract column names from the first number of rows. Headers of more than 1 row will have their names concatenated together. Any unnamed columns will be ignored by the parser. The `csv_skip_rows` config indicates the number of rows to skip before looking for header information or data to parse. By default, no rows will be skipped. -The `csv_skip_columns` config indicates the number of columns to be skipped before parsing data. These +The `csv_skip_columns` config indicates the number of columns to be skipped before parsing data. These columns will not be read out of the header. Naming with the `csv_column_names` will begin at the first parsed column after skipping the indicated columns. By default, no columns are skipped. -To assign custom column names, the `csv_column_names` config is available. If the `csv_column_names` -config is used, all columns must be named as additional columns will be ignored. If `csv_header_row_count` +To assign custom column names, the `csv_column_names` config is available. If the `csv_column_names` +config is used, all columns must be named as additional columns will be ignored. If `csv_header_row_count` is set to 0, `csv_column_names` must be specified. Names listed in `csv_column_names` will override names extracted from the header. The `csv_tag_columns` and `csv_field_columns` configs are available to add the column data to the metric. -The name used to specify the column is the name in the header, or if specified, the corresponding +The name used to specify the column is the name in the header, or if specified, the corresponding name assigned in `csv_column_names`. If neither config is specified, no data will be added to the metric. -Additional configs are available to dynamically name metrics and set custom timestamps. If the -`csv_column_names` config is specified, the parser will assign the metric name to the value found +Additional configs are available to dynamically name metrics and set custom timestamps. If the +`csv_column_names` config is specified, the parser will assign the metric name to the value found in that column. If the `csv_timestamp_column` is specified, the parser will extract the timestamp from that column. If `csv_timestamp_column` is specified, the `csv_timestamp_format` must also be specified or an error will be thrown. @@ -1013,12 +1072,12 @@ or an error will be thrown. ```toml data_format = "csv" - ## Indicates how many rows to treat as a header. By default, the parser assumes + ## Indicates how many rows to treat as a header. By default, the parser assumes ## there is no header and will parse the first row as data. If set to anything more ## than 1, column names will be concatenated with the name listed in the next header row. ## If `csv_column_names` is specified, the column names in header will be overridden. # csv_header_row_count = 0 - + ## Indicates the number of rows to skip before looking for header information. # csv_skip_rows = 0 @@ -1061,65 +1120,3 @@ or an error will be thrown. ## this must be specified if `csv_timestamp_column` is specified # csv_timestamp_format = "" ``` - -#### Tips for creating patterns - -Writing complex patterns can be difficult, here is some advice for writing a -new pattern or testing a pattern developed [online](https://grokdebug.herokuapp.com). - -Create a file output that writes to stdout, and disable other outputs while -testing. This will allow you to see the captured metrics. Keep in mind that -the file output will only print once per `flush_interval`. - -```toml -[[outputs.file]] - files = ["stdout"] -``` - -- Start with a file containing only a single line of your input. -- Remove all but the first token or piece of the line. -- Add the section of your pattern to match this piece to your configuration file. -- Verify that the metric is parsed successfully by running Telegraf. -- If successful, add the next token, update the pattern and retest. -- Continue one token at a time until the entire line is successfully parsed. - -# Logfmt -This parser implements the logfmt format by extracting and converting key-value pairs from log text in the form `=`. -At the moment, the plugin will produce one metric per line and all keys -are added as fields. -A typical log -``` -method=GET host=influxdata.org ts=2018-07-24T19:43:40.275Z -connect=4ms service=8ms status=200 bytes=1653 -``` -will be converted into -``` -logfmt method="GET",host="influxdata.org",ts="2018-07-24T19:43:40.275Z",connect="4ms",service="8ms",status=200i,bytes=1653i - -``` -Additional information about the logfmt format can be found [here](https://brandur.org/logfmt). - -# Wavefront: - -Wavefront Data Format is metrics are parsed directly into Telegraf metrics. -For more information about the Wavefront Data Format see -[here](https://docs.wavefront.com/wavefront_data_format.html). - -There are no additional configuration options for Wavefront Data Format line-protocol. - -#### Wavefront Configuration: - -```toml -[[inputs.exec]] - ## Commands array - commands = ["/tmp/test.sh", "/usr/bin/mycollector --foo=bar"] - - ## measurement name suffix (for separating different commands) - name_suffix = "_mycollector" - - ## Data format to consume. - ## Each data format has its own unique set of configuration options, read - ## more about them here: - ## https://github.com/influxdata/telegraf/blob/master/docs/DATA_FORMATS_INPUT.md - data_format = "wavefront" -``` From 4847a59d4c427d2efd9fb61a09ed8bbdb21f8da9 Mon Sep 17 00:00:00 2001 From: Max U Date: Fri, 24 Aug 2018 14:18:52 -0700 Subject: [PATCH 21/23] finally fixes hopefully. error checks in registry.go --- plugins/parsers/csv/parser.go | 31 ++++++++++++------------------- plugins/parsers/registry.go | 22 ++++++++++++++++++++++ 2 files changed, 34 insertions(+), 19 deletions(-) diff --git a/plugins/parsers/csv/parser.go b/plugins/parsers/csv/parser.go index 2bbf2ce0df858..4a86a33de2e30 100644 --- a/plugins/parsers/csv/parser.go +++ b/plugins/parsers/csv/parser.go @@ -33,18 +33,10 @@ func (p *Parser) compile(r *bytes.Reader) (*csv.Reader, error) { // ensures that the reader reads records of different lengths without an error csvReader.FieldsPerRecord = -1 if p.Delimiter != "" { - runeStr := []rune(p.Delimiter) - if len(runeStr) > 1 { - return nil, fmt.Errorf("delimiter must be a single character, got: %s", p.Delimiter) - } - csvReader.Comma = runeStr[0] + csvReader.Comma = []rune(p.Delimiter)[0] } if p.Comment != "" { - runeStr := []rune(p.Comment) - if len(runeStr) > 1 { - return nil, fmt.Errorf("comment must be a single character, got: %s", p.Comment) - } - csvReader.Comment = runeStr[0] + csvReader.Comment = []rune(p.Comment)[0] } return csvReader, nil } @@ -71,7 +63,9 @@ func (p *Parser) Parse(buf []byte) ([]telegraf.Metric, error) { //concatenate header names for i := range header { name := header[i] - name = strings.Trim(name, " ") + if p.TrimSpace { + name = strings.Trim(name, " ") + } if len(headerNames) <= i { headerNames = append(headerNames, name) } else { @@ -80,14 +74,11 @@ func (p *Parser) Parse(buf []byte) ([]telegraf.Metric, error) { } } p.ColumnNames = headerNames[p.SkipColumns:] - } else if len(p.ColumnNames) > 0 { + } else { // if columns are named, just skip header rows for i := 0; i < p.HeaderRowCount; i++ { csvReader.Read() } - } else if p.HeaderRowCount == 0 && len(p.ColumnNames) == 0 { - // if there is no header and no DataColumns, that's an error - return nil, fmt.Errorf("there must be a header if `csv_data_columns` is not specified") } table, err := csvReader.ReadAll() @@ -140,7 +131,9 @@ func (p *Parser) parseRecord(record []string) (telegraf.Metric, error) { for i, fieldName := range p.ColumnNames { if i < len(record) { value := record[i] - value = strings.Trim(value, " ") + if p.TrimSpace { + value = strings.Trim(value, " ") + } // attempt type conversions if iValue, err := strconv.ParseInt(value, 10, 64); err == nil { recordFields[fieldName] = iValue @@ -162,14 +155,14 @@ func (p *Parser) parseRecord(record []string) (telegraf.Metric, error) { // will default to plugin name measurementName := p.MetricName if recordFields[p.MeasurementColumn] != nil { - measurementName = recordFields[p.MeasurementColumn].(string) + measurementName = fmt.Sprintf("%v", recordFields[p.MeasurementColumn]) } for _, tagName := range p.TagColumns { if recordFields[tagName] == nil { return nil, fmt.Errorf("could not find field: %v", tagName) } - tags[tagName] = recordFields[tagName].(string) + tags[tagName] = fmt.Sprintf("%v", recordFields[tagName]) delete(recordFields, tagName) } @@ -178,7 +171,7 @@ func (p *Parser) parseRecord(record []string) (telegraf.Metric, error) { if recordFields[p.TimestampColumn] == nil { return nil, fmt.Errorf("timestamp column: %v could not be found", p.TimestampColumn) } - tStr := recordFields[p.TimestampColumn].(string) + tStr := fmt.Sprintf("%v", recordFields[p.TimestampColumn]) if p.TimestampFormat == "" { return nil, fmt.Errorf("timestamp format must be specified") } diff --git a/plugins/parsers/registry.go b/plugins/parsers/registry.go index eb1269da81532..32027e417a878 100644 --- a/plugins/parsers/registry.go +++ b/plugins/parsers/registry.go @@ -211,6 +211,28 @@ func newCSVParser(metricName string, timestampColumn string, timestampFormat string, defaultTags map[string]string) (Parser, error) { + + if header == 0 && len(dataColumns) == 0 { + // if there is no header and no DataColumns, that's an error + return nil, fmt.Errorf("there must be a header if `csv_data_columns` is not specified") + } + + if delimiter != "" { + runeStr := []rune(delimiter) + if len(runeStr) > 1 { + return nil, fmt.Errorf("delimiter must be a single character, got: %s", delimiter) + } + delimiter = fmt.Sprintf("%v", runeStr[0]) + } + + if comment != "" { + runeStr := []rune(comment) + if len(runeStr) > 1 { + return nil, fmt.Errorf("delimiter must be a single character, got: %s", comment) + } + comment = fmt.Sprintf("%v", runeStr[0]) + } + parser := &csv.Parser{ MetricName: metricName, HeaderRowCount: header, From b408ac40f892702d462a375f71d5d13544fa4174 Mon Sep 17 00:00:00 2001 From: Max U Date: Fri, 24 Aug 2018 14:54:05 -0700 Subject: [PATCH 22/23] tags are added before default tags --- plugins/parsers/csv/parser.go | 16 ++++++++-------- 1 file changed, 8 insertions(+), 8 deletions(-) diff --git a/plugins/parsers/csv/parser.go b/plugins/parsers/csv/parser.go index 4a86a33de2e30..5908f773670a2 100644 --- a/plugins/parsers/csv/parser.go +++ b/plugins/parsers/csv/parser.go @@ -134,6 +134,14 @@ func (p *Parser) parseRecord(record []string) (telegraf.Metric, error) { if p.TrimSpace { value = strings.Trim(value, " ") } + + for _, tagName := range p.TagColumns { + if tagName == fieldName { + tags[tagName] = record[i] + continue + } + } + // attempt type conversions if iValue, err := strconv.ParseInt(value, 10, 64); err == nil { recordFields[fieldName] = iValue @@ -158,14 +166,6 @@ func (p *Parser) parseRecord(record []string) (telegraf.Metric, error) { measurementName = fmt.Sprintf("%v", recordFields[p.MeasurementColumn]) } - for _, tagName := range p.TagColumns { - if recordFields[tagName] == nil { - return nil, fmt.Errorf("could not find field: %v", tagName) - } - tags[tagName] = fmt.Sprintf("%v", recordFields[tagName]) - delete(recordFields, tagName) - } - metricTime := time.Now() if p.TimestampColumn != "" { if recordFields[p.TimestampColumn] == nil { From acc5ea76cb11aea054eae81b6c445a9c30166b92 Mon Sep 17 00:00:00 2001 From: Max U Date: Fri, 24 Aug 2018 15:17:50 -0700 Subject: [PATCH 23/23] fix tags being removed from fields --- plugins/parsers/csv/parser.go | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/plugins/parsers/csv/parser.go b/plugins/parsers/csv/parser.go index 5908f773670a2..9193fbf5bff39 100644 --- a/plugins/parsers/csv/parser.go +++ b/plugins/parsers/csv/parser.go @@ -128,6 +128,7 @@ func (p *Parser) parseRecord(record []string) (telegraf.Metric, error) { // skip columns in record record = record[p.SkipColumns:] +outer: for i, fieldName := range p.ColumnNames { if i < len(record) { value := record[i] @@ -137,8 +138,8 @@ func (p *Parser) parseRecord(record []string) (telegraf.Metric, error) { for _, tagName := range p.TagColumns { if tagName == fieldName { - tags[tagName] = record[i] - continue + tags[tagName] = value + continue outer } }