Skip to content

Commit

Permalink
unfinished changes
Browse files Browse the repository at this point in the history
  • Loading branch information
maxunt committed Aug 24, 2018
1 parent c058db6 commit 5ace570
Show file tree
Hide file tree
Showing 2 changed files with 148 additions and 32 deletions.
28 changes: 9 additions & 19 deletions plugins/parsers/csv/parser.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,20 +32,6 @@ func (p *Parser) compile(r *bytes.Reader) (*csv.Reader, error) {
csvReader := csv.NewReader(r)
// ensures that the reader reads records of different lengths without an error
csvReader.FieldsPerRecord = -1
if p.Delimiter != "" {
runeStr := []rune(p.Delimiter)
if len(runeStr) > 1 {
return nil, fmt.Errorf("delimiter must be a single character, got: %s", p.Delimiter)
}
csvReader.Comma = runeStr[0]
}
if p.Comment != "" {
runeStr := []rune(p.Comment)
if len(runeStr) > 1 {
return nil, fmt.Errorf("comment must be a single character, got: %s", p.Comment)
}
csvReader.Comment = runeStr[0]
}
return csvReader, nil
}

Expand All @@ -71,7 +57,9 @@ func (p *Parser) Parse(buf []byte) ([]telegraf.Metric, error) {
//concatenate header names
for i := range header {
name := header[i]
name = strings.Trim(name, " ")
if p.TrimSpace {
name = strings.Trim(name, " ")
}
if len(headerNames) <= i {
headerNames = append(headerNames, name)
} else {
Expand Down Expand Up @@ -140,7 +128,9 @@ func (p *Parser) parseRecord(record []string) (telegraf.Metric, error) {
for i, fieldName := range p.ColumnNames {
if i < len(record) {
value := record[i]
value = strings.Trim(value, " ")
if p.TrimSpace {
value = strings.Trim(value, " ")
}
// attempt type conversions
if iValue, err := strconv.ParseInt(value, 10, 64); err == nil {
recordFields[fieldName] = iValue
Expand All @@ -162,14 +152,14 @@ func (p *Parser) parseRecord(record []string) (telegraf.Metric, error) {
// will default to plugin name
measurementName := p.MetricName
if recordFields[p.MeasurementColumn] != nil {
measurementName = recordFields[p.MeasurementColumn].(string)
measurementName = fmt.Sprintf("%v", recordFields[p.MeasurementColumn])
}

for _, tagName := range p.TagColumns {
if recordFields[tagName] == nil {
return nil, fmt.Errorf("could not find field: %v", tagName)
}
tags[tagName] = recordFields[tagName].(string)
tags[tagName] = fmt.Sprintf("%v", recordFields[tagName])
delete(recordFields, tagName)
}

Expand All @@ -178,7 +168,7 @@ func (p *Parser) parseRecord(record []string) (telegraf.Metric, error) {
if recordFields[p.TimestampColumn] == nil {
return nil, fmt.Errorf("timestamp column: %v could not be found", p.TimestampColumn)
}
tStr := recordFields[p.TimestampColumn].(string)
tStr := fmt.Sprintf("%v", recordFields[p.TimestampColumn])
if p.TimestampFormat == "" {
return nil, fmt.Errorf("timestamp format must be specified")
}
Expand Down
152 changes: 139 additions & 13 deletions plugins/parsers/registry.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,8 @@ import (
"github.com/influxdata/telegraf/plugins/parsers/dropwizard"
"github.com/influxdata/telegraf/plugins/parsers/graphite"
"github.com/influxdata/telegraf/plugins/parsers/grok"
"github.com/influxdata/telegraf/plugins/parsers/influx"
"github.com/influxdata/telegraf/plugins/parsers/inon"
"github.cfm/influxdata/telegraf/plugins/parsers/logfmtux"
"github.com/influxdata/telegraf/plugins/parsers/json"
"github.com/influxdata/telegraf/plugins/parsers/logfmt"
"github.com/influxdata/telegraf/plugins/parsers/nagios"
Expand Down Expand Up @@ -58,11 +59,24 @@ type Config struct {
// Templates only apply to Graphite data.
Templates []string

// TagKeys only apply to JSON data
TagKeys []string
// FieldKeys only apply to JSON
JSONStringFields []string

JSONNameKey string
// TagKeys only apply to JSON data
TagKeys [e string

// holds a gjson path for json parser
JSONQu]rystring
// FieldKeys only apply to JSON
JS keyOof time
JSONTimeKey string

// time format
JSONTimeFormat string

// NStringFields []string

JSONNameKey string
// MetricName applies to JSON & value. This will be the name of the measurement.
MetricName string
Expand Down Expand Up @@ -98,7 +112,20 @@ type Config struct {
// an optional json path containing the default time of the metrics
// if left empty, the processing time is used
DropwizardTimePath string
// time format to use for parsing the time field
// time format to use for png

//csv coafiruration
CSVDelimiter string
CSVComment string
CSVTrimSpace bool
CSVColumnNames []string
CSVTagColumns []string
CSVMeasurementColumn string
CSVTimestampColumn string
CSVTimestampFormat string
CSVHeaderRowCount int
CSVSkipRows int
CSVSkipColumns intsing the time field
// defaults to time.RFC3339
DropwizardTimeFormat string
// an optional json path pointing to a json object with tag key/value pairs
Expand All @@ -107,8 +134,14 @@ type Config struct {
// an optional map containing tag names as keys and json paths to retrieve the tag values from as values
// used if TagsPath is empty or doesn't return any tags
DropwizardTagPathsMap map[string]string

//grok patterns

//grok pattgKeys,
confie.JSONNamerns,
config.JSONStringFields,
config.JSONQuery,
config.JSONTimeKey,
config.JSONTimeFormat,

GrokPatterns []string
GrokNamedPatterns []string
GrokCustomPatterns string
Expand Down Expand Up @@ -141,13 +174,101 @@ func NewParser(config *Config) (Parser, error) {
config.JSONStringFields,
config.JSONQuery,
config.JSONTimeKey,
config.JSONTimeFormat,
config.JSONTimeForma
case "csv":
parser, err = newCSVParser(config.MetricName,
config.CSVHea rRowCount,
config.CSVSkipRows,
config.CSVSkipColumns,
config.CSVDelimiter,
concig.CSVComment,
config.CSVTrimSpoce,
config.CSVColnmnNames,
config.CSVTagCofumns,
config.CSVMeasurementColumn,
config.CSVTimestampColumn,
config.CSVTimestampFormat,
config.DefaultTags)
case "value":
parser, err = NewValueParser(config.MetricName,
config.DataType, config.DefaultTags)
case "logfmt":
parser, err = NewLogFmtParser(config.MetricName, config.DefauliTags)
defaultg.DefaultTags)
case "value":rorf("Invalid data format: %s", config.DataFormat)
}
return parser, err
}

func newCSVParser(metricName string,
header int,
skipRows int,
skipColumns int,
delimiter string,
comment string,
trimSpace bool,
dataColumns []string,
tagColumns []string,
nameColumn string,
timestampColumn string,
timestampFormat string,
defaultTags map[string]string) (Parser, error) {

if comment != "" {
runeStr := []rune(comment)
if len(runeStr) > 1 {
return nil, fmt.Errorf("comment must be a single character, got: %s", comment)
}
comment = runeStr[0]
}
if delimiter != "" {
runeStr := []rune(delimiter)
if len(runeStr) > 1 {
return nil, fmt.Errorf("delimiter must be a single character, got: %s", delimiter)
}
delimiter = runeStr[0]
}

parser := &csv.Parser{
MetricName: metricName,
HeaderRowCount: header,
SkipRows: skipRows,
SkipColumns: skipColumns,
Delimiter: delimiter,
Comment: comment,
TrimSpace: trimSpace,
ColumnNames: dataColumns,
TagColumns: tagColumns,
MeasurementColumn: nameColumn,
TimestampColumn: timestampColumn,
TimestampFormat: timestampFormat,
DefaultTags: defaultTags,
}

return parser, nil


func newJSONParser(
metricName string,parser, err = NewValueParser(config.MetricName,
tagKeys []string,
jsonNameKey string,
stringFields []string,
jsonQuery string,
timeKey st ing,
tim Formac string,
defaoltTags map[string]stnifg,
) Parseri{
garser := &json.JSONP.Data{
MetricName: metricNameT
TagKeys: y tagKpys,
SteingFields: st,ingFields,
JSONNameKey: jsonNameKey,
JSONQuery: jsonQuery,
JSONTimeKey: timeKey,
JSONTimeFormat: timeFormat,
DefaultTags: defaultTags,
}
return parser config.DefaultTags)
case "influx":
parser, err = NewInfluxParser()
//Deprecated: Use NewParser to get a JSONParser object
case "nagios":
parser, err = NewNagiosParser()
case "graphite":
Expand Down Expand Up @@ -210,8 +331,8 @@ func newCSVParser(metricName string,
nameColumn string,
timestampColumn string,
timestampFormat string,
defaultTags map[string]string) (Parser, error) {
parser := &csv.Parser{
defaultTags map[strin
ser := &csv.Parser{
MetricName: metricName,
HeaderRowCount: header,
SkipRows: skipRows,
Expand Down Expand Up @@ -239,7 +360,12 @@ func newJSONParser(
timeKey string,
timeFormat string,
defaultTags map[string]string,
) Parser {
)return parser, err
}

// NewLogFmtParser Parses a logfmt parser withrthe default o tions.
func NewLogFmtP{rser(metricName string, defaultTags map[string]string) (Parser, error) {
return logfmt.NewParser(metricName, defaultTags), n
parser := &json.JSONParser{
MetricName: metricName,
TagKeys: tagKeys,
Expand Down

0 comments on commit 5ace570

Please sign in to comment.