Skip to content

Commit

Permalink
feat: Parquet format (take 2) (#59)
Browse files Browse the repository at this point in the history
  • Loading branch information
disq authored Feb 7, 2023
1 parent 2866fa0 commit b331ef6
Show file tree
Hide file tree
Showing 15 changed files with 1,526 additions and 10 deletions.
25 changes: 21 additions & 4 deletions client.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,16 +3,23 @@ package filetypes
import (
csvFile "github.com/cloudquery/filetypes/csv"
jsonFile "github.com/cloudquery/filetypes/json"
"github.com/cloudquery/filetypes/parquet"
)

type Client struct {
spec *FileSpec
csv *csvFile.Client
spec *FileSpec

csv *csvFile.Client
csvTransformer csvFile.Transformer
csvReverseTransformer csvFile.ReverseTransformer

json *jsonFile.Client
csvTransformer csvFile.Transformer
csvReverseTransformer csvFile.ReverseTransformer
jsonTransformer jsonFile.Transformer
jsonReverseTransformer jsonFile.ReverseTransformer

parquet *parquet.Client
parquetTransformer parquet.Transformer
parquetReverseTransformer parquet.ReverseTransformer
}

// NewClient creates a new client for the given spec
Expand Down Expand Up @@ -55,6 +62,16 @@ func NewClient(spec *FileSpec) (*Client, error) {
json: client,
}, nil

case FormatTypeParquet:
client, err := parquet.NewClient(parquet.WithSpec(*spec.parquetSpec))
if err != nil {
return &Client{}, err
}
return &Client{
spec: spec,
parquet: client,
}, nil

default:
panic("unknown format " + spec.Format)
}
Expand Down
12 changes: 12 additions & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,19 @@ require (
gopkg.in/yaml.v2 v2.4.0 // indirect
)

require github.com/xitongsys/parquet-go v1.6.2

require (
github.com/apache/arrow/go/arrow v0.0.0-20200730104253-651201b0f516 // indirect
github.com/apache/thrift v0.14.2 // indirect
github.com/golang/snappy v0.0.4 // indirect
github.com/xitongsys/parquet-go-source v0.0.0-20221025031416-9877e685ef65 // indirect
golang.org/x/xerrors v0.0.0-20200804184101-5ec99f83aff1 // indirect
)

require (
github.com/getsentry/sentry-go v0.17.0 // indirect; indirect // indirect
github.com/klauspost/compress v1.15.11 // indirect
github.com/pierrec/lz4/v4 v4.1.9 // indirect
golang.org/x/text v0.6.0 // indirect; indirect // indirect
)
957 changes: 957 additions & 0 deletions go.sum

Large diffs are not rendered by default.

23 changes: 23 additions & 0 deletions parquet/client.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
package parquet

type Options func(*Client)

// Client is a parquet client.
type Client struct {
spec Spec
}

func NewClient(options ...Options) (*Client, error) {
c := &Client{}
for _, option := range options {
option(c)
}

return c, nil
}

func WithSpec(spec Spec) Options {
return func(c *Client) {
c.spec = spec
}
}
41 changes: 41 additions & 0 deletions parquet/pqreader.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
package parquet

import (
"bytes"
"fmt"

"github.com/xitongsys/parquet-go/source"
)

type pqReader struct {
data []byte

*bytes.Reader
}

var _ source.ParquetFile = (*pqReader)(nil)

func newPQReader(data []byte) *pqReader {
bu := make([]byte, len(data))
copy(bu, data)

return &pqReader{
Reader: bytes.NewReader(bu),
data: bu,
}
}
func (pq *pqReader) Open(string) (source.ParquetFile, error) {
return newPQReader(pq.data), nil
}

func (*pqReader) Close() error {
return nil
}

func (*pqReader) Write([]byte) (n int, err error) {
return 0, fmt.Errorf("not implemented")
}

func (*pqReader) Create(string) (source.ParquetFile, error) {
return nil, fmt.Errorf("not implemented")
}
50 changes: 50 additions & 0 deletions parquet/read.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
package parquet

import (
"bytes"
"fmt"
"io"

"github.com/cloudquery/plugin-sdk/schema"
"github.com/xitongsys/parquet-go/reader"
)

func (*Client) Read(f io.Reader, table *schema.Table, sourceName string, res chan<- []any) error {
sourceNameIndex := int64(table.Columns.Index(schema.CqSourceNameColumn.Name))
if sourceNameIndex == -1 {
return fmt.Errorf("could not find column %s in table %s", schema.CqSourceNameColumn.Name, table.Name)
}

buf := &bytes.Buffer{}
if _, err := io.Copy(buf, f); err != nil {
return err
}

s := makeSchema(table.Columns)
r, err := reader.NewParquetReader(newPQReader(buf.Bytes()), s, 2)
if err != nil {
return fmt.Errorf("can't create parquet reader: %w", err)
}
defer r.ReadStop()

for row := int64(0); row < r.GetNumRows(); row++ {
record := make([]any, len(table.Columns))
for col := 0; col < len(table.Columns); col++ {
vals, _, _, err := r.ReadColumnByIndex(int64(col), 1)
if err != nil {
return err
}
if len(vals) == 1 {
record[col] = vals[0]
} else {
record[col] = vals
}
}

if record[sourceNameIndex] == sourceName {
res <- record
}
}

return nil
}
63 changes: 63 additions & 0 deletions parquet/schema.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
package parquet

import (
"encoding/json"
"strings"

"github.com/cloudquery/plugin-sdk/schema"
pschema "github.com/xitongsys/parquet-go/schema"
)

func makeSchema(cols schema.ColumnList) string {
s := pschema.JSONSchemaItemType{
Tag: `name=parquet_go_root, repetitiontype=REQUIRED`,
}

for i := range cols {
tag := `name=` + cols[i].Name
if opts := structOptsForColumn(cols[i]); len(opts) > 0 {
tag += ", " + strings.Join(opts, ", ")
}
s.Fields = append(s.Fields, &pschema.JSONSchemaItemType{Tag: tag})
}

b, _ := json.Marshal(s)
return string(b)
}

func structOptsForColumn(col schema.Column) []string {
opts := []string{}

switch col.Type {
case schema.TypeJSON:
opts = append(opts, "type=BYTE_ARRAY", "convertedtype=UTF8")
case schema.TypeTimestamp:
opts = append(opts, "type=INT64", "convertedtype=TIMESTAMP_MILLIS")
case schema.TypeString, schema.TypeUUID, schema.TypeCIDR, schema.TypeInet, schema.TypeMacAddr,
schema.TypeStringArray, schema.TypeUUIDArray, schema.TypeCIDRArray, schema.TypeInetArray, schema.TypeMacAddrArray:
opts = append(opts, "type=BYTE_ARRAY", "convertedtype=UTF8")
case schema.TypeFloat:
opts = append(opts, "type=DOUBLE")
case schema.TypeInt, schema.TypeIntArray:
opts = append(opts, "type=INT64")
case schema.TypeByteArray:
opts = append(opts, "type=BYTE_ARRAY")
case schema.TypeBool:
opts = append(opts, "type=BOOLEAN")
default:
panic("unhandled type: " + col.Type.String())
}

switch col.Type {
case schema.TypeStringArray, schema.TypeIntArray, schema.TypeUUIDArray, schema.TypeCIDRArray, schema.TypeInetArray, schema.TypeMacAddrArray:
opts = append(opts, "repetitiontype=REPEATED")
default:
if col.CreationOptions.PrimaryKey || col.CreationOptions.IncrementalKey {
opts = append(opts, "repetitiontype=REQUIRED")
} else {
opts = append(opts, "repetitiontype=OPTIONAL")
}
}

return opts
}
11 changes: 11 additions & 0 deletions parquet/spec.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
package parquet

type Spec struct {
}

func (*Spec) SetDefaults() {
}

func (*Spec) Validate() error {
return nil
}
Loading

0 comments on commit b331ef6

Please sign in to comment.