Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add new config for csv column explicit type conversion #4781

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 13 additions & 0 deletions internal/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -1460,6 +1460,18 @@ func getParserConfig(name string, tbl *ast.Table) (*parsers.Config, error) {
}
}

if node, ok := tbl.Fields["csv_column_types"]; ok {
if kv, ok := node.(*ast.KeyValue); ok {
if ary, ok := kv.Value.(*ast.Array); ok {
for _, elem := range ary.Value {
if str, ok := elem.(*ast.String); ok {
c.CSVColumnTypes = append(c.CSVColumnTypes, str.Value)
}
}
}
}
}

if node, ok := tbl.Fields["csv_tag_columns"]; ok {
if kv, ok := node.(*ast.KeyValue); ok {
if ary, ok := kv.Value.(*ast.Array); ok {
Expand Down Expand Up @@ -1588,6 +1600,7 @@ func getParserConfig(name string, tbl *ast.Table) (*parsers.Config, error) {
delete(tbl.Fields, "grok_custom_pattern_files")
delete(tbl.Fields, "grok_timezone")
delete(tbl.Fields, "csv_column_names")
delete(tbl.Fields, "csv_column_types")
delete(tbl.Fields, "csv_comment")
delete(tbl.Fields, "csv_delimiter")
delete(tbl.Fields, "csv_field_columns")
Expand Down
5 changes: 5 additions & 0 deletions plugins/parsers/csv/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,11 @@ values.
## If `csv_header_row_count` is set to 0, this config must be used
csv_column_names = []

## For assigning explicit data types to columns.
## Supported types: "int", "float", "bool", "string".
## If this is not specified, type conversion will be done on the types above.
csv_column_types = []

## Indicates the number of rows to skip before looking for header information.
csv_skip_rows = 0

Expand Down
35 changes: 35 additions & 0 deletions plugins/parsers/csv/parser.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ type Parser struct {
Comment string
TrimSpace bool
ColumnNames []string
ColumnTypes []string
TagColumns []string
MeasurementColumn string
TimestampColumn string
Expand Down Expand Up @@ -148,6 +149,40 @@ outer:
}
}

// Try explicit conversion only when column types is defined.
if len(p.ColumnTypes) > 0 {
// Throw error if current column count exceeds defined types.
if i >= len(p.ColumnTypes) {
return nil, fmt.Errorf("column type: column count exceeded")
}

var val interface{}
var err error

switch p.ColumnTypes[i] {
case "int":
val, err = strconv.ParseInt(value, 10, 64)
if err != nil {
return nil, fmt.Errorf("column type: parse int error %s", err)
}
case "float":
val, err = strconv.ParseFloat(value, 64)
if err != nil {
return nil, fmt.Errorf("column type: parse float error %s", err)
}
case "bool":
val, err = strconv.ParseBool(value)
if err != nil {
return nil, fmt.Errorf("column type: parse bool error %s", err)
}
default:
val = value
}

recordFields[fieldName] = val
continue
}

// attempt type conversions
if iValue, err := strconv.ParseInt(value, 10, 64); err == nil {
recordFields[fieldName] = iValue
Expand Down
12 changes: 12 additions & 0 deletions plugins/parsers/csv/parser_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -147,6 +147,18 @@ func TestValueConversion(t *testing.T) {

//deep equal fields
require.Equal(t, expectedMetric.Fields(), returnedMetric.Fields())

// Test explicit type conversion.
p.ColumnTypes = []string{"float", "int", "bool", "string"}

metrics, err = p.Parse([]byte(testCSV))
require.NoError(t, err)

returnedMetric, err2 = metric.New(metrics[0].Name(), metrics[0].Tags(), metrics[0].Fields(), time.Unix(0, 0))
require.NoError(t, err2)

//deep equal fields
require.Equal(t, expectedMetric.Fields(), returnedMetric.Fields())
}

func TestSkipComment(t *testing.T) {
Expand Down
8 changes: 8 additions & 0 deletions plugins/parsers/registry.go
Original file line number Diff line number Diff line change
Expand Up @@ -127,6 +127,7 @@ type Config struct {

//csv configuration
CSVColumnNames []string `toml:"csv_column_names"`
CSVColumnTypes []string `toml:"csv_column_types"`
CSVComment string `toml:"csv_comment"`
CSVDelimiter string `toml:"csv_delimiter"`
CSVHeaderRowCount int `toml:"csv_header_row_count"`
Expand Down Expand Up @@ -195,6 +196,7 @@ func NewParser(config *Config) (Parser, error) {
config.CSVComment,
config.CSVTrimSpace,
config.CSVColumnNames,
config.CSVColumnTypes,
config.CSVTagColumns,
config.CSVMeasurementColumn,
config.CSVTimestampColumn,
Expand All @@ -216,6 +218,7 @@ func newCSVParser(metricName string,
comment string,
trimSpace bool,
columnNames []string,
columnTypes []string,
tagColumns []string,
nameColumn string,
timestampColumn string,
Expand All @@ -240,6 +243,10 @@ func newCSVParser(metricName string,
}
}

if len(columnNames) > 0 && len(columnTypes) > 0 && len(columnNames) != len(columnTypes) {
return nil, fmt.Errorf("csv_column_names field count doesn't match with csv_column_types")
}

parser := &csv.Parser{
MetricName: metricName,
HeaderRowCount: headerRowCount,
Expand All @@ -249,6 +256,7 @@ func newCSVParser(metricName string,
Comment: comment,
TrimSpace: trimSpace,
ColumnNames: columnNames,
ColumnTypes: columnTypes,
TagColumns: tagColumns,
MeasurementColumn: nameColumn,
TimestampColumn: timestampColumn,
Expand Down