Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: Parquet format arrays #71

Merged
merged 8 commits into from
Feb 19, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion parquet/read.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ func (*Client) Read(f io.Reader, table *schema.Table, sourceName string, res cha
return err
}

s := makeSchema(table.Columns)
s := makeSchema(table.Name, table.Columns)
r, err := reader.NewParquetReader(newPQReader(buf.Bytes()), s, 2)
if err != nil {
return fmt.Errorf("can't create parquet reader: %w", err)
Expand Down
106 changes: 67 additions & 39 deletions parquet/schema.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,56 +8,84 @@ import (
pschema "github.com/xitongsys/parquet-go/schema"
)

func makeSchema(cols schema.ColumnList) string {
func makeSchema(tableName string, cols schema.ColumnList) string {
s := pschema.JSONSchemaItemType{
Tag: `name=parquet_go_root, repetitiontype=REQUIRED`,
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The item name here doesn't seem to have any significance, parquet_go_root is what the library uses but it's not a const in the parquet world, so I opted to add the table name here so that files are identifiable (with parquet-tools etc.) even if they lost their filenames.

Tag: `name=` + tableName + `_root, repetitiontype=REQUIRED`,
}

for i := range cols {
tag := `name=` + cols[i].Name
if opts := structOptsForColumn(cols[i]); len(opts) > 0 {
tag += ", " + strings.Join(opts, ", ")
for _, col := range cols {
var subFields []*pschema.JSONSchemaItemType

tag := []string{`name=` + col.Name}

switch col.Type {
case schema.TypeTimestamp:
tag = append(tag, "type=INT64", "convertedtype=TIMESTAMP_MILLIS")
case schema.TypeJSON, schema.TypeString, schema.TypeUUID, schema.TypeCIDR, schema.TypeInet, schema.TypeMacAddr:
tag = append(tag, "type=BYTE_ARRAY", "convertedtype=UTF8")
case schema.TypeFloat:
tag = append(tag, "type=DOUBLE")
case schema.TypeInt:
tag = append(tag, "type=INT64")
case schema.TypeByteArray:
tag = append(tag, "type=BYTE_ARRAY")
case schema.TypeBool:
tag = append(tag, "type=BOOLEAN")
case schema.TypeIntArray:
tag = append(tag, "type=LIST", "repetitiontype=OPTIONAL")
subFields = []*pschema.JSONSchemaItemType{
{
Tag: "name=element, type=INT64, repetitiontype=OPTIONAL",
},
}
case schema.TypeStringArray, schema.TypeUUIDArray, schema.TypeCIDRArray, schema.TypeInetArray, schema.TypeMacAddrArray:
tag = append(tag, "type=LIST", "repetitiontype=OPTIONAL")
subFields = []*pschema.JSONSchemaItemType{
{
Tag: "name=element, type=BYTE_ARRAY, convertedtype=UTF8, repetitiontype=OPTIONAL",
},
}
default:
panic("unhandled type: " + col.Type.String())
}
s.Fields = append(s.Fields, &pschema.JSONSchemaItemType{Tag: tag})

if !isArray(col.Type) { // array types are handled differently, see above
if col.CreationOptions.PrimaryKey || col.CreationOptions.IncrementalKey {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think we enforce constraints on incremental keys in other destinations right now, but it seems like a good idea, so I'm fine with this

tag = append(tag, "repetitiontype=REQUIRED")
} else {
tag = append(tag, "repetitiontype=OPTIONAL")
}
}

s.Fields = append(s.Fields, &pschema.JSONSchemaItemType{
Tag: strings.Join(tag, ", "),
Fields: subFields,
})
}

b, _ := json.Marshal(s)
return string(b)
}

func structOptsForColumn(col schema.Column) []string {
opts := []string{}

switch col.Type {
case schema.TypeJSON:
opts = append(opts, "type=BYTE_ARRAY", "convertedtype=UTF8")
case schema.TypeTimestamp:
opts = append(opts, "type=INT64", "convertedtype=TIMESTAMP_MILLIS")
case schema.TypeString, schema.TypeUUID, schema.TypeCIDR, schema.TypeInet, schema.TypeMacAddr,
schema.TypeStringArray, schema.TypeUUIDArray, schema.TypeCIDRArray, schema.TypeInetArray, schema.TypeMacAddrArray:
opts = append(opts, "type=BYTE_ARRAY", "convertedtype=UTF8")
case schema.TypeFloat:
opts = append(opts, "type=DOUBLE")
case schema.TypeInt, schema.TypeIntArray:
opts = append(opts, "type=INT64")
case schema.TypeByteArray:
opts = append(opts, "type=BYTE_ARRAY")
case schema.TypeBool:
opts = append(opts, "type=BOOLEAN")
default:
panic("unhandled type: " + col.Type.String())
}
func isArray(t schema.ValueType) bool {
return arrayElement(t) != schema.TypeInvalid
}

switch col.Type {
case schema.TypeStringArray, schema.TypeIntArray, schema.TypeUUIDArray, schema.TypeCIDRArray, schema.TypeInetArray, schema.TypeMacAddrArray:
opts = append(opts, "repetitiontype=REPEATED")
func arrayElement(t schema.ValueType) schema.ValueType {
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We could use this method in the plugin-sdk in the future.

switch t {
case schema.TypeIntArray:
return schema.TypeInt
case schema.TypeStringArray:
return schema.TypeString
case schema.TypeUUIDArray:
return schema.TypeUUID
case schema.TypeCIDRArray:
return schema.TypeCIDR
case schema.TypeInetArray:
return schema.TypeInet
case schema.TypeMacAddrArray:
return schema.TypeMacAddr
default:
if col.CreationOptions.PrimaryKey || col.CreationOptions.IncrementalKey {
opts = append(opts, "repetitiontype=REQUIRED")
} else {
opts = append(opts, "repetitiontype=OPTIONAL")
}
return schema.TypeInvalid
}

return opts
}
1 change: 1 addition & 0 deletions parquet/transformer.go
Original file line number Diff line number Diff line change
Expand Up @@ -177,6 +177,7 @@ func (ReverseTransformer) ReverseTransformValues(table *schema.Table, values []a
if err := t.Set(v); err != nil {
return nil, fmt.Errorf("failed to convert value %v to type %s: %w", v, table.Columns[i].Type, err)
}
res[i] = t
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Without this, the read-write test panics if there's a type error, instead of failing with a proper (and helpful) error message.

continue
}

Expand Down
2 changes: 1 addition & 1 deletion parquet/write.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ import (
)

func (*Client) WriteTableBatch(w io.Writer, table *schema.Table, resources [][]any) error {
pw, err := writer.NewJSONWriterFromWriter(makeSchema(table.Columns), w, 2)
pw, err := writer.NewJSONWriterFromWriter(makeSchema(table.Name, table.Columns), w, 2)
if err != nil {
return fmt.Errorf("can't create parquet writer: %w", err)
}
Expand Down