Skip to content

Commit

Permalink
feat: Expose Parquet version and root repetition spec options (#567)
Browse files Browse the repository at this point in the history
#### Summary

Some old parquet readers like Snowflake imported need the root repetition to be undefined, and they also only support Parquet v1.

This PR exposes both options.

Related to cloudquery/cloudquery-issues#2106 (internal issue)

---
  • Loading branch information
erezrokah authored Aug 12, 2024
1 parent 26c4fa0 commit 0bf397a
Show file tree
Hide file tree
Showing 4 changed files with 126 additions and 5 deletions.
85 changes: 81 additions & 4 deletions parquet/spec.go
Original file line number Diff line number Diff line change
@@ -1,21 +1,98 @@
package parquet

import "github.com/invopop/jsonschema"
import (
"fmt"
"slices"
"strings"

"github.com/apache/arrow/go/v17/parquet"
"github.com/invopop/jsonschema"
)

var allowedVersions = []string{"v1.0", "v2.4", "v2.6", "v2Latest"}
var allowedRootRepetitions = []string{"undefined", "required", "optional", "repeated"}

// nolint:revive
type ParquetSpec struct{}
type ParquetSpec struct {
Version string `json:"version,omitempty"`
RootRepetition string `json:"root_repetition,omitempty"`
}

func (s *ParquetSpec) GetVersion() parquet.Version {
switch s.Version {
case "v1.0":
return parquet.V1_0
case "v2.4":
return parquet.V2_4
case "v2.6":
return parquet.V2_6
case "v2Latest":
return parquet.V2_LATEST
}
return parquet.V2_LATEST
}

func (s *ParquetSpec) GetRootRepetition() parquet.Repetition {
switch s.RootRepetition {
case "undefined":
return parquet.Repetitions.Undefined
case "required":
return parquet.Repetitions.Required
case "optional":
return parquet.Repetitions.Optional
case "repeated":
return parquet.Repetitions.Repeated
}
return parquet.Repetitions.Repeated
}

func (ParquetSpec) JSONSchema() *jsonschema.Schema {
properties := jsonschema.NewProperties()
allowedVersionsAsAny := make([]any, len(allowedVersions))
for i, v := range allowedVersions {
allowedVersionsAsAny[i] = v
}
properties.Set("version", &jsonschema.Schema{
Type: "string",
Description: "Parquet format version",
Enum: allowedVersionsAsAny,
Default: "v2Latest",
})

allowedRootRepetitionsAsAny := make([]any, len(allowedRootRepetitions))
for i, v := range allowedRootRepetitions {
allowedRootRepetitionsAsAny[i] = v
}
properties.Set("root_repetition", &jsonschema.Schema{
Type: "string",
Description: "Root repetition",
Enum: allowedRootRepetitionsAsAny,
Default: "repeated",
})

return &jsonschema.Schema{
Description: "CloudQuery Parquet file output spec.",
Properties: properties,
Type: "object",
AdditionalProperties: jsonschema.FalseSchema, // "additionalProperties": false
}
}

func (*ParquetSpec) SetDefaults() {
func (s *ParquetSpec) SetDefaults() {
if s.Version == "" {
s.Version = "v2Latest"
}
if s.RootRepetition == "" {
s.RootRepetition = "repeated"
}
}

func (*ParquetSpec) Validate() error {
func (s *ParquetSpec) Validate() error {
if !slices.Contains(allowedVersions, s.Version) {
return fmt.Errorf("invalid version: %s. Allowed values are %s", s.Version, strings.Join(allowedVersions, ", "))
}
if !slices.Contains(allowedRootRepetitions, s.RootRepetition) {
return fmt.Errorf("invalid rootRepetition: %s. Allowed values are %s", s.RootRepetition, strings.Join(allowedRootRepetitions, ", "))
}
return nil
}
18 changes: 18 additions & 0 deletions parquet/spec_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,5 +21,23 @@ func TestSpec_JSONSchema(t *testing.T) {
Err: true,
Spec: `{"extra":true}`,
},
{
Name: "invalid version",
ErrorMessage: "at '/version': value must be one of 'v1.0', 'v2.4', 'v2.6', 'v2Latest'",
Spec: `{"version":"invalid"}`,
},
{
Name: "valid version",
Spec: `{"version":"v1.0"}`,
},
{
Name: "valid root_repetition",
Spec: `{"root_repetition":"undefined"}`,
},
{
Name: "invalid root_repetition",
ErrorMessage: "at '/root_repetition': value must be one of 'undefined', 'required', 'optional', 'repeated'",
Spec: `{"root_repetition":"invalid"}`,
},
})
}
4 changes: 3 additions & 1 deletion parquet/write.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,10 +21,12 @@ type Handle struct {

var _ ftypes.Handle = (*Handle)(nil)

func (*Client) WriteHeader(w io.Writer, t *schema.Table) (ftypes.Handle, error) {
func (c *Client) WriteHeader(w io.Writer, t *schema.Table) (ftypes.Handle, error) {
props := parquet.NewWriterProperties(
parquet.WithMaxRowGroupLength(128*1024*1024), // 128M
parquet.WithCompression(compress.Codecs.Snappy),
parquet.WithVersion(c.spec.GetVersion()),
parquet.WithRootRepetition(c.spec.GetRootRepetition()),
)
arrprops := pqarrow.DefaultWriterProps()
newSchema := convertSchema(t.ToArrowSchema())
Expand Down
24 changes: 24 additions & 0 deletions schema.json
Original file line number Diff line number Diff line change
Expand Up @@ -129,6 +129,30 @@
"description": "CloudQuery JSON file output spec."
},
"ParquetSpec": {
"properties": {
"version": {
"type": "string",
"enum": [
"v1.0",
"v2.4",
"v2.6",
"v2Latest"
],
"description": "Parquet format version",
"default": "v2Latest"
},
"root_repetition": {
"type": "string",
"enum": [
"undefined",
"required",
"optional",
"repeated"
],
"description": "Root repetition",
"default": "repeated"
}
},
"additionalProperties": false,
"type": "object",
"description": "CloudQuery Parquet file output spec."
Expand Down

0 comments on commit 0bf397a

Please sign in to comment.