Skip to content

Commit

Permalink
refactor: Use underlying types.FileType client for all formats (#344)
Browse files Browse the repository at this point in the history
Extracted from #343
  • Loading branch information
candiduslynx authored Oct 23, 2023
1 parent b0fda4f commit 5b0754f
Show file tree
Hide file tree
Showing 7 changed files with 29 additions and 63 deletions.
42 changes: 11 additions & 31 deletions client.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,10 +10,6 @@ import (
type Client struct {
spec *FileSpec
filetype types.FileType

csv *csvFile.Client
json *jsonFile.Client
parquet *parquet.Client
}

var (
Expand All @@ -35,6 +31,7 @@ func NewClient(spec *FileSpec) (*Client, error) {
return &Client{}, err
}

var client types.FileType
switch spec.Format {
case FormatTypeCSV:
opts := []csvFile.Options{
Expand All @@ -44,39 +41,22 @@ func NewClient(spec *FileSpec) (*Client, error) {
opts = append(opts, csvFile.WithHeader())
}

client, err := csvFile.NewClient(opts...)
if err != nil {
return &Client{}, err
}
return &Client{
spec: spec,
csv: client,
filetype: client,
}, nil
client, err = csvFile.NewClient(opts...)

case FormatTypeJSON:
client, err := jsonFile.NewClient()
if err != nil {
return &Client{}, err
}
return &Client{
spec: spec,
json: client,
filetype: client,
}, nil
client, err = jsonFile.NewClient()

case FormatTypeParquet:
client, err := parquet.NewClient(parquet.WithSpec(*spec.parquetSpec))
if err != nil {
return &Client{}, err
}
return &Client{
spec: spec,
parquet: client,
filetype: client,
}, nil
client, err = parquet.NewClient(parquet.WithSpec(*spec.parquetSpec))

default:
// shouldn't be possible as Validate checks for type
panic("unknown format " + spec.Format)
}

if err != nil {
return &Client{}, err
}

return &Client{spec: spec, filetype: client}, nil
}
4 changes: 2 additions & 2 deletions csv/read.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,16 +2,16 @@ package csv

import (
"fmt"
"io"

"github.com/apache/arrow/go/v14/arrow"
"github.com/apache/arrow/go/v14/arrow/array"
"github.com/apache/arrow/go/v14/arrow/csv"
"github.com/apache/arrow/go/v14/arrow/memory"
"github.com/cloudquery/filetypes/v4/types"
"github.com/cloudquery/plugin-sdk/v4/schema"
)

func (cl *Client) Read(r io.Reader, table *schema.Table, res chan<- arrow.Record) error {
func (cl *Client) Read(r types.ReaderAtSeeker, table *schema.Table, res chan<- arrow.Record) error {
arrowSchema := table.ToArrowSchema()
newSchema := convertSchema(arrowSchema)
reader := csv.NewReader(r, newSchema,
Expand Down
4 changes: 2 additions & 2 deletions json/read.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,17 +2,17 @@ package json

import (
"bufio"
"io"

"github.com/apache/arrow/go/v14/arrow"
"github.com/apache/arrow/go/v14/arrow/array"
"github.com/apache/arrow/go/v14/arrow/memory"
"github.com/cloudquery/filetypes/v4/types"
"github.com/cloudquery/plugin-sdk/v4/schema"
)

const maxJSONSize = 1024 * 1024 * 20

func (*Client) Read(r io.Reader, table *schema.Table, res chan<- arrow.Record) error {
func (*Client) Read(r types.ReaderAtSeeker, table *schema.Table, res chan<- arrow.Record) error {
scanner := bufio.NewScanner(r)
scanner.Buffer(make([]byte, maxJSONSize), maxJSONSize)
rb := array.NewRecordBuilder(memory.DefaultAllocator, table.ToArrowSchema())
Expand Down
4 changes: 2 additions & 2 deletions parquet/read.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,13 +8,13 @@ import (
"github.com/apache/arrow/go/v14/arrow"
"github.com/apache/arrow/go/v14/arrow/array"
"github.com/apache/arrow/go/v14/arrow/memory"
"github.com/apache/arrow/go/v14/parquet"
"github.com/apache/arrow/go/v14/parquet/file"
"github.com/apache/arrow/go/v14/parquet/pqarrow"
"github.com/cloudquery/filetypes/v4/types"
"github.com/cloudquery/plugin-sdk/v4/schema"
)

func (*Client) Read(f parquet.ReaderAtSeeker, table *schema.Table, res chan<- arrow.Record) error {
func (*Client) Read(f types.ReaderAtSeeker, table *schema.Table, res chan<- arrow.Record) error {
ctx := context.Background()
rdr, err := file.NewParquetReader(f)
if err != nil {
Expand Down
27 changes: 3 additions & 24 deletions read.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,16 +6,11 @@ import (
"io"

"github.com/apache/arrow/go/v14/arrow"
"github.com/cloudquery/filetypes/v4/types"
"github.com/cloudquery/plugin-sdk/v4/schema"
)

type ReaderAtSeeker interface {
io.Reader
io.ReaderAt
io.Seeker
}

func (cl *Client) Read(f ReaderAtSeeker, table *schema.Table, res chan<- arrow.Record) error {
func (cl *Client) Read(f types.ReaderAtSeeker, table *schema.Table, res chan<- arrow.Record) error {
if cl.spec.Compression == CompressionTypeGZip {
rr, err := gzip.NewReader(f)
if err != nil {
Expand All @@ -29,21 +24,5 @@ func (cl *Client) Read(f ReaderAtSeeker, table *schema.Table, res chan<- arrow.R
f = bytes.NewReader(b)
}

switch cl.spec.Format {
case FormatTypeCSV:
if err := cl.csv.Read(f, table, res); err != nil {
return err
}
case FormatTypeJSON:
if err := cl.json.Read(f, table, res); err != nil {
return err
}
case FormatTypeParquet:
if err := cl.parquet.Read(f, table, res); err != nil {
return err
}
default:
panic("unknown format " + cl.spec.Format)
}
return nil
return cl.filetype.Read(f, table, res)
}
4 changes: 2 additions & 2 deletions stream.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ func (w *writeCloser) Close() error {
}

// StartStream starts a streaming upload using the provided uploadFunc.
func (c *Client) StartStream(table *schema.Table, uploadFunc func(io.Reader) error) (*Stream, error) {
func (cl *Client) StartStream(table *schema.Table, uploadFunc func(io.Reader) error) (*Stream, error) {
pr, pw := io.Pipe()
doneCh := make(chan error)

Expand All @@ -39,7 +39,7 @@ func (c *Client) StartStream(table *schema.Table, uploadFunc func(io.Reader) err
}()

wc := &writeCloser{PipeWriter: pw}
h, err := c.WriteHeader(wc, table)
h, err := cl.WriteHeader(wc, table)
if err != nil {
_ = pw.CloseWithError(err)
<-doneCh
Expand Down
7 changes: 7 additions & 0 deletions types/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,15 @@ import (
"github.com/cloudquery/plugin-sdk/v4/schema"
)

type ReaderAtSeeker interface {
io.Reader
io.ReaderAt
io.Seeker
}

type FileType interface {
WriteHeader(io.Writer, *schema.Table) (Handle, error)
Read(ReaderAtSeeker, *schema.Table, chan<- arrow.Record) error
}

type Handle interface {
Expand Down

0 comments on commit 5b0754f

Please sign in to comment.