Skip to content

Commit

Permalink
Add new config for csv column explicit type conversion (influxdata#4781)
Browse files Browse the repository at this point in the history
  • Loading branch information
rudbast authored and otherpirate committed Mar 15, 2019
1 parent 1942f8f commit 25d5e21
Show file tree
Hide file tree
Showing 5 changed files with 73 additions and 0 deletions.
13 changes: 13 additions & 0 deletions internal/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -1460,6 +1460,18 @@ func getParserConfig(name string, tbl *ast.Table) (*parsers.Config, error) {
}
}

if node, ok := tbl.Fields["csv_column_types"]; ok {
if kv, ok := node.(*ast.KeyValue); ok {
if ary, ok := kv.Value.(*ast.Array); ok {
for _, elem := range ary.Value {
if str, ok := elem.(*ast.String); ok {
c.CSVColumnTypes = append(c.CSVColumnTypes, str.Value)
}
}
}
}
}

if node, ok := tbl.Fields["csv_tag_columns"]; ok {
if kv, ok := node.(*ast.KeyValue); ok {
if ary, ok := kv.Value.(*ast.Array); ok {
Expand Down Expand Up @@ -1588,6 +1600,7 @@ func getParserConfig(name string, tbl *ast.Table) (*parsers.Config, error) {
delete(tbl.Fields, "grok_custom_pattern_files")
delete(tbl.Fields, "grok_timezone")
delete(tbl.Fields, "csv_column_names")
delete(tbl.Fields, "csv_column_types")
delete(tbl.Fields, "csv_comment")
delete(tbl.Fields, "csv_delimiter")
delete(tbl.Fields, "csv_field_columns")
Expand Down
5 changes: 5 additions & 0 deletions plugins/parsers/csv/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,11 @@ values.
## If `csv_header_row_count` is set to 0, this config must be used
csv_column_names = []

## For assigning explicit data types to columns.
## Supported types: "int", "float", "bool", "string".
## If this is not specified, type conversion will be done on the types above.
csv_column_types = []

## Indicates the number of rows to skip before looking for header information.
csv_skip_rows = 0

Expand Down
35 changes: 35 additions & 0 deletions plugins/parsers/csv/parser.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ type Parser struct {
Comment string
TrimSpace bool
ColumnNames []string
ColumnTypes []string
TagColumns []string
MeasurementColumn string
TimestampColumn string
Expand Down Expand Up @@ -148,6 +149,40 @@ outer:
}
}

// Try explicit conversion only when column types is defined.
if len(p.ColumnTypes) > 0 {
// Throw error if current column count exceeds defined types.
if i >= len(p.ColumnTypes) {
return nil, fmt.Errorf("column type: column count exceeded")
}

var val interface{}
var err error

switch p.ColumnTypes[i] {
case "int":
val, err = strconv.ParseInt(value, 10, 64)
if err != nil {
return nil, fmt.Errorf("column type: parse int error %s", err)
}
case "float":
val, err = strconv.ParseFloat(value, 64)
if err != nil {
return nil, fmt.Errorf("column type: parse float error %s", err)
}
case "bool":
val, err = strconv.ParseBool(value)
if err != nil {
return nil, fmt.Errorf("column type: parse bool error %s", err)
}
default:
val = value
}

recordFields[fieldName] = val
continue
}

// attempt type conversions
if iValue, err := strconv.ParseInt(value, 10, 64); err == nil {
recordFields[fieldName] = iValue
Expand Down
12 changes: 12 additions & 0 deletions plugins/parsers/csv/parser_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -147,6 +147,18 @@ func TestValueConversion(t *testing.T) {

//deep equal fields
require.Equal(t, expectedMetric.Fields(), returnedMetric.Fields())

// Test explicit type conversion.
p.ColumnTypes = []string{"float", "int", "bool", "string"}

metrics, err = p.Parse([]byte(testCSV))
require.NoError(t, err)

returnedMetric, err2 = metric.New(metrics[0].Name(), metrics[0].Tags(), metrics[0].Fields(), time.Unix(0, 0))
require.NoError(t, err2)

//deep equal fields
require.Equal(t, expectedMetric.Fields(), returnedMetric.Fields())
}

func TestSkipComment(t *testing.T) {
Expand Down
8 changes: 8 additions & 0 deletions plugins/parsers/registry.go
Original file line number Diff line number Diff line change
Expand Up @@ -127,6 +127,7 @@ type Config struct {

//csv configuration
CSVColumnNames []string `toml:"csv_column_names"`
CSVColumnTypes []string `toml:"csv_column_types"`
CSVComment string `toml:"csv_comment"`
CSVDelimiter string `toml:"csv_delimiter"`
CSVHeaderRowCount int `toml:"csv_header_row_count"`
Expand Down Expand Up @@ -195,6 +196,7 @@ func NewParser(config *Config) (Parser, error) {
config.CSVComment,
config.CSVTrimSpace,
config.CSVColumnNames,
config.CSVColumnTypes,
config.CSVTagColumns,
config.CSVMeasurementColumn,
config.CSVTimestampColumn,
Expand All @@ -216,6 +218,7 @@ func newCSVParser(metricName string,
comment string,
trimSpace bool,
columnNames []string,
columnTypes []string,
tagColumns []string,
nameColumn string,
timestampColumn string,
Expand All @@ -240,6 +243,10 @@ func newCSVParser(metricName string,
}
}

if len(columnNames) > 0 && len(columnTypes) > 0 && len(columnNames) != len(columnTypes) {
return nil, fmt.Errorf("csv_column_names field count doesn't match with csv_column_types")
}

parser := &csv.Parser{
MetricName: metricName,
HeaderRowCount: headerRowCount,
Expand All @@ -249,6 +256,7 @@ func newCSVParser(metricName string,
Comment: comment,
TrimSpace: trimSpace,
ColumnNames: columnNames,
ColumnTypes: columnTypes,
TagColumns: tagColumns,
MeasurementColumn: nameColumn,
TimestampColumn: timestampColumn,
Expand Down

0 comments on commit 25d5e21

Please sign in to comment.