Skip to content

Commit

Permalink
addresses greg and chris's comments
Browse files Browse the repository at this point in the history
  • Loading branch information
maxunt committed Jul 27, 2018
1 parent fe6d641 commit e5c875c
Showing 1 changed file with 30 additions and 15 deletions.
45 changes: 30 additions & 15 deletions plugins/parsers/csv/parser.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,20 +24,27 @@ type CSVParser struct {
DefaultTags map[string]string
}

func (p *CSVParser) Parse(buf []byte) ([]telegraf.Metric, error) {
r := bytes.NewReader(buf)
func (p *CSVParser) compile(r *bytes.Reader) (*csv.Reader, error) {
csvReader := csv.NewReader(r)
csvReader.FieldsPerRecord = len(p.DataColumns)
if p.Delimiter != "" {
runeStr := []rune(p.Delimiter)
if len(runeStr) > 1 {
return nil, fmt.Errorf("delimiter must be a single character, got: %v", p.Delimiter)
return csvReader, fmt.Errorf("delimiter must be a single character, got: %s", p.Delimiter)
}
csvReader.Comma = runeStr[0]
}
return csvReader, nil
}

//if there is a header and nothing in DataColumns
//set DataColumns to names extracted from the header
func (p *CSVParser) Parse(buf []byte) ([]telegraf.Metric, error) {
r := bytes.NewReader(buf)
csvReader, err := p.compile(r)
if err != nil {
return nil, err
}
// if there is a header and nothing in DataColumns
// set DataColumns to names extracted from the header
if p.Header && len(p.DataColumns) == 0 {
header, err := csvReader.Read()
if err != nil {
Expand All @@ -46,11 +53,11 @@ func (p *CSVParser) Parse(buf []byte) ([]telegraf.Metric, error) {
p.DataColumns = header

} else if p.Header {
//if there is a header and DataColumns is specified, just skip header
// if there is a header and DataColumns is specified, just skip header
csvReader.Read()

} else if !p.Header && len(p.DataColumns) == 0 {
//if there is no header and no DataColumns, that's an error
// if there is no header and no DataColumns, that's an error
return nil, fmt.Errorf("there must be a header if `csv_data_columns` is not specified")
}

Expand All @@ -70,11 +77,19 @@ func (p *CSVParser) Parse(buf []byte) ([]telegraf.Metric, error) {
return metrics, nil
}

//does not use any information in header and assumes DataColumns is set
// ParseLine does not use any information in header and assumes DataColumns is set
func (p *CSVParser) ParseLine(line string) (telegraf.Metric, error) {
r := bytes.NewReader([]byte(line))
csvReader := csv.NewReader(r)
csvReader.FieldsPerRecord = len(p.DataColumns)
csvReader, err := p.compile(r)
if err != nil {
return nil, err
}

// if there is nothing in DataColumns, ParseLine will fail
if len(p.DataColumns) == 0 {
return nil, fmt.Errorf("[parsers.csv] data columns must be specified")
}

record, err := csvReader.Read()
if err != nil {
return nil, err
Expand All @@ -94,7 +109,7 @@ func (p *CSVParser) parseRecord(record []string) (telegraf.Metric, error) {
recordFields[fieldName] = record[i]
}

//add default tags
// add default tags
for k, v := range p.DefaultTags {
tags[k] = v
}
Expand All @@ -107,12 +122,12 @@ func (p *CSVParser) parseRecord(record []string) (telegraf.Metric, error) {
}

for _, fieldName := range p.FieldColumns {
if recordFields[fieldName] == "" {
value, ok := recordFields[fieldName]
if !ok {
return nil, fmt.Errorf("could not find field: %v", fieldName)
}

//attempt type conversions
value := recordFields[fieldName]
// attempt type conversions
if iValue, err := strconv.Atoi(value); err == nil {
fields[fieldName] = iValue
} else if fValue, err := strconv.ParseFloat(value, 64); err == nil {
Expand All @@ -124,7 +139,7 @@ func (p *CSVParser) parseRecord(record []string) (telegraf.Metric, error) {
}
}

//will default to plugin name
// will default to plugin name
measurementName := p.MetricName
if recordFields[p.NameColumn] != "" {
measurementName = recordFields[p.NameColumn]
Expand Down

0 comments on commit e5c875c

Please sign in to comment.