Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat!: Upgrade to SDK v3 #155

Merged
merged 3 commits into from
May 11, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions client.go
Original file line number Diff line number Diff line change
@@ -1,9 +1,9 @@
package filetypes

import (
csvFile "github.com/cloudquery/filetypes/v2/csv"
jsonFile "github.com/cloudquery/filetypes/v2/json"
"github.com/cloudquery/filetypes/v2/parquet"
csvFile "github.com/cloudquery/filetypes/v3/csv"
jsonFile "github.com/cloudquery/filetypes/v3/json"
"github.com/cloudquery/filetypes/v3/parquet"
)

type Client struct {
Expand Down
5 changes: 3 additions & 2 deletions csv/read.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,10 +5,11 @@ import (

"github.com/apache/arrow/go/v13/arrow"
"github.com/apache/arrow/go/v13/arrow/csv"
"github.com/cloudquery/plugin-sdk/v3/schema"
)

func (cl *Client) Read(r io.Reader, arrowSchema *arrow.Schema, _ string, res chan<- arrow.Record) error {
reader := csv.NewReader(r, arrowSchema,
func (cl *Client) Read(r io.Reader, table *schema.Table, _ string, res chan<- arrow.Record) error {
reader := csv.NewReader(r, table.ToArrowSchema(),
csv.WithComma(cl.Delimiter),
csv.WithHeader(cl.IncludeHeaders),
csv.WithNullReader(true, ""),
Expand Down
5 changes: 3 additions & 2 deletions csv/write.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,10 +7,11 @@ import (

"github.com/apache/arrow/go/v13/arrow"
"github.com/apache/arrow/go/v13/arrow/csv"
"github.com/cloudquery/plugin-sdk/v3/schema"
)

func (cl *Client) WriteTableBatch(w io.Writer, arrowSchema *arrow.Schema, records []arrow.Record) error {
writer := csv.NewWriter(w, arrowSchema,
func (cl *Client) WriteTableBatch(w io.Writer, table *schema.Table, records []arrow.Record) error {
writer := csv.NewWriter(w, table.ToArrowSchema(),
csv.WithComma(cl.Delimiter),
csv.WithHeader(cl.IncludeHeaders),
csv.WithNullWriter(""),
Expand Down
26 changes: 12 additions & 14 deletions csv/write_read_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,8 @@ import (

"github.com/apache/arrow/go/v13/arrow"
"github.com/bradleyjkemp/cupaloy/v2"
"github.com/cloudquery/plugin-sdk/v2/plugins/destination"
"github.com/cloudquery/plugin-sdk/v2/testdata"
"github.com/cloudquery/plugin-sdk/v3/plugins/destination"
"github.com/cloudquery/plugin-sdk/v3/schema"
"github.com/google/uuid"
)

Expand All @@ -28,18 +28,17 @@ func TestWriteRead(t *testing.T) {

for _, tc := range cases {
t.Run(tc.name, func(t *testing.T) {
table := testdata.TestTable("test")
arrowSchema := table.ToArrowSchema()
table := schema.TestTable("test")
sourceName := "test-source"
syncTime := time.Date(2021, 1, 1, 0, 0, 0, 0, time.UTC)
opts := testdata.GenTestDataOptions{
opts := schema.GenTestDataOptions{
SourceName: sourceName,
SyncTime: syncTime,
MaxRows: 2,
StableUUID: uuid.MustParse("00000000-0000-0000-0000-000000000001"),
StableTime: time.Date(2021, 1, 2, 0, 0, 0, 0, time.UTC),
}
records := testdata.GenTestData(arrowSchema, opts)
records := schema.GenTestData(table, opts)
cl, err := NewClient(tc.options...)
if err != nil {
t.Fatal(err)
Expand All @@ -49,7 +48,7 @@ func TestWriteRead(t *testing.T) {
writer := bufio.NewWriter(&b)
reader := bufio.NewReader(&b)

if err := cl.WriteTableBatch(writer, arrowSchema, records); err != nil {
if err := cl.WriteTableBatch(writer, table, records); err != nil {
t.Fatal(err)
}
writer.Flush()
Expand All @@ -69,7 +68,7 @@ func TestWriteRead(t *testing.T) {
ch := make(chan arrow.Record)
var readErr error
go func() {
readErr = cl.Read(byteReader, arrowSchema, "test-source", ch)
readErr = cl.Read(byteReader, table, "test-source", ch)
close(ch)
}()
totalCount := 0
Expand All @@ -90,16 +89,15 @@ func TestWriteRead(t *testing.T) {
}

func BenchmarkWrite(b *testing.B) {
table := testdata.TestTable("test")
arrowSchema := table.ToArrowSchema()
table := schema.TestTable("test")
sourceName := "test-source"
syncTime := time.Now().UTC().Round(1 * time.Second)
opts := testdata.GenTestDataOptions{
syncTime := time.Now().UTC().Round(time.Second)
opts := schema.GenTestDataOptions{
SourceName: sourceName,
SyncTime: syncTime,
MaxRows: 1000,
}
records := testdata.GenTestData(arrowSchema, opts)
records := schema.GenTestData(table, opts)

cl, err := NewClient()
if err != nil {
Expand All @@ -109,7 +107,7 @@ func BenchmarkWrite(b *testing.B) {
writer := bufio.NewWriter(&buf)
b.ResetTimer()
for i := 0; i < b.N; i++ {
if err := cl.WriteTableBatch(writer, arrowSchema, records); err != nil {
if err := cl.WriteTableBatch(writer, table, records); err != nil {
b.Fatal(err)
}
err = writer.Flush()
Expand Down
4 changes: 2 additions & 2 deletions go.mod
Original file line number Diff line number Diff line change
@@ -1,11 +1,11 @@
module github.com/cloudquery/filetypes/v2
module github.com/cloudquery/filetypes/v3

go 1.19

require (
github.com/apache/arrow/go/v13 v13.0.0-20230509040948-de6c3cd2b604
github.com/bradleyjkemp/cupaloy/v2 v2.8.0
github.com/cloudquery/plugin-sdk/v2 v2.7.0
github.com/cloudquery/plugin-sdk/v3 v3.0.1
github.com/stretchr/testify v1.8.2
)

Expand Down
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -10,8 +10,8 @@ github.com/cloudquery/arrow/go/v13 v13.0.0-20230509053643-898a79b1d3c8 h1:CmgLSE
github.com/cloudquery/arrow/go/v13 v13.0.0-20230509053643-898a79b1d3c8/go.mod h1:/XatdE3kDIBqZKhZ7OBUHwP2jaASDFZHqF4puOWM8po=
github.com/cloudquery/plugin-pb-go v1.0.5 h1:Du6pXI2JZRtgWfc0K69/gtNcyHICqEbAmfJXTARAqCc=
github.com/cloudquery/plugin-pb-go v1.0.5/go.mod h1:vAGA27psem7ZZNAY4a3S9TKuA/JDQWstjKcHPJX91Mc=
github.com/cloudquery/plugin-sdk/v2 v2.7.0 h1:hRXsdEiaOxJtsn/wZMFQC9/jPfU1MeMK3KF+gPGqm7U=
github.com/cloudquery/plugin-sdk/v2 v2.7.0/go.mod h1:pAX6ojIW99b/Vg4CkhnsGkRIzNaVEceYMR+Bdit73ug=
github.com/cloudquery/plugin-sdk/v3 v3.0.1 h1:5l3dG4AIrAWadc0aEiht5au2gM/wHLRSK2qSzao1Sm0=
github.com/cloudquery/plugin-sdk/v3 v3.0.1/go.mod h1:cJP020H448wknQfjCDo0HR0b3vt9kYcjrEWtmV3YIgc=
github.com/coreos/go-systemd/v22 v22.5.0/go.mod h1:Y58oyj3AT4RCenI/lSvhwexgC+NSVTIJ3seZv2GcEnc=
github.com/creack/pty v1.1.9/go.mod h1:oKZEueFk5CKHvIhNR5MUki03XCEU+Q6VDXinZuGJ33E=
github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
Expand Down
5 changes: 3 additions & 2 deletions json/read.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,14 +7,15 @@ import (
"github.com/apache/arrow/go/v13/arrow"
"github.com/apache/arrow/go/v13/arrow/array"
"github.com/apache/arrow/go/v13/arrow/memory"
"github.com/cloudquery/plugin-sdk/v3/schema"
)

const maxJSONSize = 1024 * 1024 * 20

func (*Client) Read(r io.Reader, arrowSchema *arrow.Schema, _ string, res chan<- arrow.Record) error {
func (*Client) Read(r io.Reader, table *schema.Table, _ string, res chan<- arrow.Record) error {
scanner := bufio.NewScanner(r)
scanner.Buffer(make([]byte, maxJSONSize), maxJSONSize)
rb := array.NewRecordBuilder(memory.DefaultAllocator, arrowSchema)
rb := array.NewRecordBuilder(memory.DefaultAllocator, table.ToArrowSchema())
for scanner.Scan() {
b := scanner.Bytes()
err := rb.UnmarshalJSON(b)
Expand Down
3 changes: 2 additions & 1 deletion json/write.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,9 +6,10 @@ import (

"github.com/apache/arrow/go/v13/arrow"
"github.com/apache/arrow/go/v13/arrow/array"
"github.com/cloudquery/plugin-sdk/v3/schema"
)

func (c *Client) WriteTableBatch(w io.Writer, _ *arrow.Schema, records []arrow.Record) error {
func (c *Client) WriteTableBatch(w io.Writer, _ *schema.Table, records []arrow.Record) error {
for _, r := range records {
err := c.writeRecord(w, r)
if err != nil {
Expand Down
37 changes: 17 additions & 20 deletions json/write_read_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,46 +9,44 @@ import (

"github.com/apache/arrow/go/v13/arrow"
"github.com/bradleyjkemp/cupaloy/v2"
"github.com/cloudquery/plugin-sdk/v2/plugins/destination"
"github.com/cloudquery/plugin-sdk/v2/testdata"
"github.com/cloudquery/plugin-sdk/v3/plugins/destination"
"github.com/cloudquery/plugin-sdk/v3/schema"
"github.com/google/uuid"
)

func TestWrite(t *testing.T) {
var b bytes.Buffer
table := testdata.TestTable("test")
arrowSchema := table.ToArrowSchema()
table := schema.TestTable("test")
sourceName := "test-source"
syncTime := time.Now().UTC().Round(1 * time.Second)
opts := testdata.GenTestDataOptions{
syncTime := time.Now().UTC().Round(time.Second)
opts := schema.GenTestDataOptions{
SourceName: sourceName,
SyncTime: syncTime,
MaxRows: 1,
}
records := testdata.GenTestData(arrowSchema, opts)
records := schema.GenTestData(table, opts)
cl, err := NewClient()
if err != nil {
t.Fatal(err)
}
if err := cl.WriteTableBatch(&b, arrowSchema, records); err != nil {
if err := cl.WriteTableBatch(&b, table, records); err != nil {
t.Fatal(err)
}
t.Log(b.String())
}

func TestWriteRead(t *testing.T) {
table := testdata.TestTable("test")
arrowSchema := table.ToArrowSchema()
table := schema.TestTable("test")
sourceName := "test-source"
syncTime := time.Date(2021, 1, 1, 0, 0, 0, 0, time.UTC)
opts := testdata.GenTestDataOptions{
opts := schema.GenTestDataOptions{
SourceName: sourceName,
SyncTime: syncTime,
MaxRows: 2,
StableUUID: uuid.MustParse("00000000-0000-0000-0000-000000000001"),
StableTime: time.Date(2021, 1, 2, 0, 0, 0, 0, time.UTC),
}
records := testdata.GenTestData(arrowSchema, opts)
records := schema.GenTestData(table, opts)
cl, err := NewClient()
if err != nil {
t.Fatal(err)
Expand All @@ -58,7 +56,7 @@ func TestWriteRead(t *testing.T) {
writer := bufio.NewWriter(&b)
reader := bufio.NewReader(&b)

if err := cl.WriteTableBatch(writer, arrowSchema, records); err != nil {
if err := cl.WriteTableBatch(writer, table, records); err != nil {
t.Fatal(err)
}
writer.Flush()
Expand All @@ -78,7 +76,7 @@ func TestWriteRead(t *testing.T) {
ch := make(chan arrow.Record)
var readErr error
go func() {
readErr = cl.Read(byteReader, arrowSchema, "test-source", ch)
readErr = cl.Read(byteReader, table, "test-source", ch)
close(ch)
}()
totalCount := 0
Expand All @@ -97,16 +95,15 @@ func TestWriteRead(t *testing.T) {
}

func BenchmarkWrite(b *testing.B) {
table := testdata.TestTable("test")
arrowSchema := table.ToArrowSchema()
table := schema.TestTable("test")
sourceName := "test-source"
syncTime := time.Now().UTC().Round(1 * time.Second)
opts := testdata.GenTestDataOptions{
syncTime := time.Now().UTC().Round(time.Second)
opts := schema.GenTestDataOptions{
SourceName: sourceName,
SyncTime: syncTime,
MaxRows: 1000,
}
records := testdata.GenTestData(arrowSchema, opts)
records := schema.GenTestData(table, opts)

cl, err := NewClient()
if err != nil {
Expand All @@ -116,7 +113,7 @@ func BenchmarkWrite(b *testing.B) {
writer := bufio.NewWriter(&buf)
b.ResetTimer()
for i := 0; i < b.N; i++ {
if err := cl.WriteTableBatch(writer, arrowSchema, records); err != nil {
if err := cl.WriteTableBatch(writer, table, records); err != nil {
b.Fatal(err)
}
err = writer.Flush()
Expand Down
6 changes: 4 additions & 2 deletions parquet/read.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,8 @@ import (
"github.com/apache/arrow/go/v13/arrow/memory"
"github.com/apache/arrow/go/v13/parquet/file"
"github.com/apache/arrow/go/v13/parquet/pqarrow"
"github.com/cloudquery/plugin-sdk/v2/types"
"github.com/cloudquery/plugin-sdk/v3/schema"
"github.com/cloudquery/plugin-sdk/v3/types"
)

type ReaderAtSeeker interface {
Expand All @@ -20,7 +21,7 @@ type ReaderAtSeeker interface {
io.Seeker
}

func (*Client) Read(f ReaderAtSeeker, arrowSchema *arrow.Schema, _ string, res chan<- arrow.Record) error {
func (*Client) Read(f ReaderAtSeeker, table *schema.Table, _ string, res chan<- arrow.Record) error {
ctx := context.Background()
rdr, err := file.NewParquetReader(f)
if err != nil {
Expand All @@ -39,6 +40,7 @@ func (*Client) Read(f ReaderAtSeeker, arrowSchema *arrow.Schema, _ string, res c
return fmt.Errorf("failed to get parquet record reader: %w", err)
}

arrowSchema := table.ToArrowSchema()
for rr.Next() {
rec := rr.Record()
castRec, err := castStringsToExtensions(rec, arrowSchema)
Expand Down
7 changes: 4 additions & 3 deletions parquet/write.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,16 +11,17 @@ import (
"github.com/apache/arrow/go/v13/parquet"
"github.com/apache/arrow/go/v13/parquet/compress"
"github.com/apache/arrow/go/v13/parquet/pqarrow"
"github.com/cloudquery/plugin-sdk/v2/types"
"github.com/cloudquery/plugin-sdk/v3/schema"
"github.com/cloudquery/plugin-sdk/v3/types"
)

func (c *Client) WriteTableBatch(w io.Writer, arrowSchema *arrow.Schema, records []arrow.Record) error {
func (c *Client) WriteTableBatch(w io.Writer, table *schema.Table, records []arrow.Record) error {
props := parquet.NewWriterProperties(
parquet.WithMaxRowGroupLength(128*1024*1024), // 128M
parquet.WithCompression(compress.Codecs.Snappy),
)
arrprops := pqarrow.DefaultWriterProps()
newSchema := convertSchema(arrowSchema)
newSchema := convertSchema(table.ToArrowSchema())
fw, err := pqarrow.NewFileWriter(newSchema, w, props, arrprops)
if err != nil {
return err
Expand Down
Loading