diff --git a/client.go b/client.go index 866202bb..b3185562 100644 --- a/client.go +++ b/client.go @@ -1,9 +1,9 @@ package filetypes import ( - csvFile "github.com/cloudquery/filetypes/v2/csv" - jsonFile "github.com/cloudquery/filetypes/v2/json" - "github.com/cloudquery/filetypes/v2/parquet" + csvFile "github.com/cloudquery/filetypes/v3/csv" + jsonFile "github.com/cloudquery/filetypes/v3/json" + "github.com/cloudquery/filetypes/v3/parquet" ) type Client struct { diff --git a/csv/read.go b/csv/read.go index 0d8acf20..94bdd336 100644 --- a/csv/read.go +++ b/csv/read.go @@ -5,10 +5,11 @@ import ( "github.com/apache/arrow/go/v13/arrow" "github.com/apache/arrow/go/v13/arrow/csv" + "github.com/cloudquery/plugin-sdk/v3/schema" ) -func (cl *Client) Read(r io.Reader, arrowSchema *arrow.Schema, _ string, res chan<- arrow.Record) error { - reader := csv.NewReader(r, arrowSchema, +func (cl *Client) Read(r io.Reader, table *schema.Table, _ string, res chan<- arrow.Record) error { + reader := csv.NewReader(r, table.ToArrowSchema(), csv.WithComma(cl.Delimiter), csv.WithHeader(cl.IncludeHeaders), csv.WithNullReader(true, ""), diff --git a/csv/write.go b/csv/write.go index d0169a46..79cdac26 100644 --- a/csv/write.go +++ b/csv/write.go @@ -7,10 +7,11 @@ import ( "github.com/apache/arrow/go/v13/arrow" "github.com/apache/arrow/go/v13/arrow/csv" + "github.com/cloudquery/plugin-sdk/v3/schema" ) -func (cl *Client) WriteTableBatch(w io.Writer, arrowSchema *arrow.Schema, records []arrow.Record) error { - writer := csv.NewWriter(w, arrowSchema, +func (cl *Client) WriteTableBatch(w io.Writer, table *schema.Table, records []arrow.Record) error { + writer := csv.NewWriter(w, table.ToArrowSchema(), csv.WithComma(cl.Delimiter), csv.WithHeader(cl.IncludeHeaders), csv.WithNullWriter(""), diff --git a/csv/write_read_test.go b/csv/write_read_test.go index d8eaa93d..dc50a9a0 100644 --- a/csv/write_read_test.go +++ b/csv/write_read_test.go @@ -9,8 +9,8 @@ import ( "github.com/apache/arrow/go/v13/arrow" "github.com/bradleyjkemp/cupaloy/v2" - "github.com/cloudquery/plugin-sdk/v2/plugins/destination" - "github.com/cloudquery/plugin-sdk/v2/testdata" + "github.com/cloudquery/plugin-sdk/v3/plugins/destination" + "github.com/cloudquery/plugin-sdk/v3/schema" "github.com/google/uuid" ) @@ -28,18 +28,17 @@ func TestWriteRead(t *testing.T) { for _, tc := range cases { t.Run(tc.name, func(t *testing.T) { - table := testdata.TestTable("test") - arrowSchema := table.ToArrowSchema() + table := schema.TestTable("test") sourceName := "test-source" syncTime := time.Date(2021, 1, 1, 0, 0, 0, 0, time.UTC) - opts := testdata.GenTestDataOptions{ + opts := schema.GenTestDataOptions{ SourceName: sourceName, SyncTime: syncTime, MaxRows: 2, StableUUID: uuid.MustParse("00000000-0000-0000-0000-000000000001"), StableTime: time.Date(2021, 1, 2, 0, 0, 0, 0, time.UTC), } - records := testdata.GenTestData(arrowSchema, opts) + records := schema.GenTestData(table, opts) cl, err := NewClient(tc.options...) if err != nil { t.Fatal(err) @@ -49,7 +48,7 @@ func TestWriteRead(t *testing.T) { writer := bufio.NewWriter(&b) reader := bufio.NewReader(&b) - if err := cl.WriteTableBatch(writer, arrowSchema, records); err != nil { + if err := cl.WriteTableBatch(writer, table, records); err != nil { t.Fatal(err) } writer.Flush() @@ -69,7 +68,7 @@ func TestWriteRead(t *testing.T) { ch := make(chan arrow.Record) var readErr error go func() { - readErr = cl.Read(byteReader, arrowSchema, "test-source", ch) + readErr = cl.Read(byteReader, table, "test-source", ch) close(ch) }() totalCount := 0 @@ -90,16 +89,15 @@ func TestWriteRead(t *testing.T) { } func BenchmarkWrite(b *testing.B) { - table := testdata.TestTable("test") - arrowSchema := table.ToArrowSchema() + table := schema.TestTable("test") sourceName := "test-source" - syncTime := time.Now().UTC().Round(1 * time.Second) - opts := testdata.GenTestDataOptions{ + syncTime := time.Now().UTC().Round(time.Second) + opts := schema.GenTestDataOptions{ SourceName: sourceName, SyncTime: syncTime, MaxRows: 1000, } - records := testdata.GenTestData(arrowSchema, opts) + records := schema.GenTestData(table, opts) cl, err := NewClient() if err != nil { @@ -109,7 +107,7 @@ func BenchmarkWrite(b *testing.B) { writer := bufio.NewWriter(&buf) b.ResetTimer() for i := 0; i < b.N; i++ { - if err := cl.WriteTableBatch(writer, arrowSchema, records); err != nil { + if err := cl.WriteTableBatch(writer, table, records); err != nil { b.Fatal(err) } err = writer.Flush() diff --git a/go.mod b/go.mod index 4aa78d0e..9fe6d16d 100644 --- a/go.mod +++ b/go.mod @@ -1,11 +1,11 @@ -module github.com/cloudquery/filetypes/v2 +module github.com/cloudquery/filetypes/v3 go 1.19 require ( github.com/apache/arrow/go/v13 v13.0.0-20230509040948-de6c3cd2b604 github.com/bradleyjkemp/cupaloy/v2 v2.8.0 - github.com/cloudquery/plugin-sdk/v2 v2.7.0 + github.com/cloudquery/plugin-sdk/v3 v3.0.1 github.com/stretchr/testify v1.8.2 ) diff --git a/go.sum b/go.sum index 67730845..c8d5bca5 100644 --- a/go.sum +++ b/go.sum @@ -10,8 +10,8 @@ github.com/cloudquery/arrow/go/v13 v13.0.0-20230509053643-898a79b1d3c8 h1:CmgLSE github.com/cloudquery/arrow/go/v13 v13.0.0-20230509053643-898a79b1d3c8/go.mod h1:/XatdE3kDIBqZKhZ7OBUHwP2jaASDFZHqF4puOWM8po= github.com/cloudquery/plugin-pb-go v1.0.5 h1:Du6pXI2JZRtgWfc0K69/gtNcyHICqEbAmfJXTARAqCc= github.com/cloudquery/plugin-pb-go v1.0.5/go.mod h1:vAGA27psem7ZZNAY4a3S9TKuA/JDQWstjKcHPJX91Mc= -github.com/cloudquery/plugin-sdk/v2 v2.7.0 h1:hRXsdEiaOxJtsn/wZMFQC9/jPfU1MeMK3KF+gPGqm7U= -github.com/cloudquery/plugin-sdk/v2 v2.7.0/go.mod h1:pAX6ojIW99b/Vg4CkhnsGkRIzNaVEceYMR+Bdit73ug= +github.com/cloudquery/plugin-sdk/v3 v3.0.1 h1:5l3dG4AIrAWadc0aEiht5au2gM/wHLRSK2qSzao1Sm0= +github.com/cloudquery/plugin-sdk/v3 v3.0.1/go.mod h1:cJP020H448wknQfjCDo0HR0b3vt9kYcjrEWtmV3YIgc= github.com/coreos/go-systemd/v22 v22.5.0/go.mod h1:Y58oyj3AT4RCenI/lSvhwexgC+NSVTIJ3seZv2GcEnc= github.com/creack/pty v1.1.9/go.mod h1:oKZEueFk5CKHvIhNR5MUki03XCEU+Q6VDXinZuGJ33E= github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= diff --git a/json/read.go b/json/read.go index 5be1549c..d3b5b0a8 100644 --- a/json/read.go +++ b/json/read.go @@ -7,14 +7,15 @@ import ( "github.com/apache/arrow/go/v13/arrow" "github.com/apache/arrow/go/v13/arrow/array" "github.com/apache/arrow/go/v13/arrow/memory" + "github.com/cloudquery/plugin-sdk/v3/schema" ) const maxJSONSize = 1024 * 1024 * 20 -func (*Client) Read(r io.Reader, arrowSchema *arrow.Schema, _ string, res chan<- arrow.Record) error { +func (*Client) Read(r io.Reader, table *schema.Table, _ string, res chan<- arrow.Record) error { scanner := bufio.NewScanner(r) scanner.Buffer(make([]byte, maxJSONSize), maxJSONSize) - rb := array.NewRecordBuilder(memory.DefaultAllocator, arrowSchema) + rb := array.NewRecordBuilder(memory.DefaultAllocator, table.ToArrowSchema()) for scanner.Scan() { b := scanner.Bytes() err := rb.UnmarshalJSON(b) diff --git a/json/write.go b/json/write.go index 27690ebf..a60b9786 100644 --- a/json/write.go +++ b/json/write.go @@ -6,9 +6,10 @@ import ( "github.com/apache/arrow/go/v13/arrow" "github.com/apache/arrow/go/v13/arrow/array" + "github.com/cloudquery/plugin-sdk/v3/schema" ) -func (c *Client) WriteTableBatch(w io.Writer, _ *arrow.Schema, records []arrow.Record) error { +func (c *Client) WriteTableBatch(w io.Writer, _ *schema.Table, records []arrow.Record) error { for _, r := range records { err := c.writeRecord(w, r) if err != nil { diff --git a/json/write_read_test.go b/json/write_read_test.go index d449f12c..7a05ce61 100644 --- a/json/write_read_test.go +++ b/json/write_read_test.go @@ -9,46 +9,44 @@ import ( "github.com/apache/arrow/go/v13/arrow" "github.com/bradleyjkemp/cupaloy/v2" - "github.com/cloudquery/plugin-sdk/v2/plugins/destination" - "github.com/cloudquery/plugin-sdk/v2/testdata" + "github.com/cloudquery/plugin-sdk/v3/plugins/destination" + "github.com/cloudquery/plugin-sdk/v3/schema" "github.com/google/uuid" ) func TestWrite(t *testing.T) { var b bytes.Buffer - table := testdata.TestTable("test") - arrowSchema := table.ToArrowSchema() + table := schema.TestTable("test") sourceName := "test-source" - syncTime := time.Now().UTC().Round(1 * time.Second) - opts := testdata.GenTestDataOptions{ + syncTime := time.Now().UTC().Round(time.Second) + opts := schema.GenTestDataOptions{ SourceName: sourceName, SyncTime: syncTime, MaxRows: 1, } - records := testdata.GenTestData(arrowSchema, opts) + records := schema.GenTestData(table, opts) cl, err := NewClient() if err != nil { t.Fatal(err) } - if err := cl.WriteTableBatch(&b, arrowSchema, records); err != nil { + if err := cl.WriteTableBatch(&b, table, records); err != nil { t.Fatal(err) } t.Log(b.String()) } func TestWriteRead(t *testing.T) { - table := testdata.TestTable("test") - arrowSchema := table.ToArrowSchema() + table := schema.TestTable("test") sourceName := "test-source" syncTime := time.Date(2021, 1, 1, 0, 0, 0, 0, time.UTC) - opts := testdata.GenTestDataOptions{ + opts := schema.GenTestDataOptions{ SourceName: sourceName, SyncTime: syncTime, MaxRows: 2, StableUUID: uuid.MustParse("00000000-0000-0000-0000-000000000001"), StableTime: time.Date(2021, 1, 2, 0, 0, 0, 0, time.UTC), } - records := testdata.GenTestData(arrowSchema, opts) + records := schema.GenTestData(table, opts) cl, err := NewClient() if err != nil { t.Fatal(err) @@ -58,7 +56,7 @@ func TestWriteRead(t *testing.T) { writer := bufio.NewWriter(&b) reader := bufio.NewReader(&b) - if err := cl.WriteTableBatch(writer, arrowSchema, records); err != nil { + if err := cl.WriteTableBatch(writer, table, records); err != nil { t.Fatal(err) } writer.Flush() @@ -78,7 +76,7 @@ func TestWriteRead(t *testing.T) { ch := make(chan arrow.Record) var readErr error go func() { - readErr = cl.Read(byteReader, arrowSchema, "test-source", ch) + readErr = cl.Read(byteReader, table, "test-source", ch) close(ch) }() totalCount := 0 @@ -97,16 +95,15 @@ func TestWriteRead(t *testing.T) { } func BenchmarkWrite(b *testing.B) { - table := testdata.TestTable("test") - arrowSchema := table.ToArrowSchema() + table := schema.TestTable("test") sourceName := "test-source" - syncTime := time.Now().UTC().Round(1 * time.Second) - opts := testdata.GenTestDataOptions{ + syncTime := time.Now().UTC().Round(time.Second) + opts := schema.GenTestDataOptions{ SourceName: sourceName, SyncTime: syncTime, MaxRows: 1000, } - records := testdata.GenTestData(arrowSchema, opts) + records := schema.GenTestData(table, opts) cl, err := NewClient() if err != nil { @@ -116,7 +113,7 @@ func BenchmarkWrite(b *testing.B) { writer := bufio.NewWriter(&buf) b.ResetTimer() for i := 0; i < b.N; i++ { - if err := cl.WriteTableBatch(writer, arrowSchema, records); err != nil { + if err := cl.WriteTableBatch(writer, table, records); err != nil { b.Fatal(err) } err = writer.Flush() diff --git a/parquet/read.go b/parquet/read.go index 0d37a7ea..f707af91 100644 --- a/parquet/read.go +++ b/parquet/read.go @@ -11,7 +11,8 @@ import ( "github.com/apache/arrow/go/v13/arrow/memory" "github.com/apache/arrow/go/v13/parquet/file" "github.com/apache/arrow/go/v13/parquet/pqarrow" - "github.com/cloudquery/plugin-sdk/v2/types" + "github.com/cloudquery/plugin-sdk/v3/schema" + "github.com/cloudquery/plugin-sdk/v3/types" ) type ReaderAtSeeker interface { @@ -20,7 +21,7 @@ type ReaderAtSeeker interface { io.Seeker } -func (*Client) Read(f ReaderAtSeeker, arrowSchema *arrow.Schema, _ string, res chan<- arrow.Record) error { +func (*Client) Read(f ReaderAtSeeker, table *schema.Table, _ string, res chan<- arrow.Record) error { ctx := context.Background() rdr, err := file.NewParquetReader(f) if err != nil { @@ -39,6 +40,7 @@ func (*Client) Read(f ReaderAtSeeker, arrowSchema *arrow.Schema, _ string, res c return fmt.Errorf("failed to get parquet record reader: %w", err) } + arrowSchema := table.ToArrowSchema() for rr.Next() { rec := rr.Record() castRec, err := castStringsToExtensions(rec, arrowSchema) diff --git a/parquet/write.go b/parquet/write.go index e5b2e6aa..76694cc6 100644 --- a/parquet/write.go +++ b/parquet/write.go @@ -11,16 +11,17 @@ import ( "github.com/apache/arrow/go/v13/parquet" "github.com/apache/arrow/go/v13/parquet/compress" "github.com/apache/arrow/go/v13/parquet/pqarrow" - "github.com/cloudquery/plugin-sdk/v2/types" + "github.com/cloudquery/plugin-sdk/v3/schema" + "github.com/cloudquery/plugin-sdk/v3/types" ) -func (c *Client) WriteTableBatch(w io.Writer, arrowSchema *arrow.Schema, records []arrow.Record) error { +func (c *Client) WriteTableBatch(w io.Writer, table *schema.Table, records []arrow.Record) error { props := parquet.NewWriterProperties( parquet.WithMaxRowGroupLength(128*1024*1024), // 128M parquet.WithCompression(compress.Codecs.Snappy), ) arrprops := pqarrow.DefaultWriterProps() - newSchema := convertSchema(arrowSchema) + newSchema := convertSchema(table.ToArrowSchema()) fw, err := pqarrow.NewFileWriter(newSchema, w, props, arrprops) if err != nil { return err diff --git a/parquet/write_read_test.go b/parquet/write_read_test.go index 18165a5f..3aa75a03 100644 --- a/parquet/write_read_test.go +++ b/parquet/write_read_test.go @@ -8,22 +8,21 @@ import ( "time" "github.com/apache/arrow/go/v13/arrow" - "github.com/cloudquery/plugin-sdk/v2/plugins/destination" - "github.com/cloudquery/plugin-sdk/v2/testdata" + "github.com/cloudquery/plugin-sdk/v3/plugins/destination" + "github.com/cloudquery/plugin-sdk/v3/schema" ) func TestWriteRead(t *testing.T) { var b bytes.Buffer - table := testdata.TestTable("test") - arrowSchema := table.ToArrowSchema() + table := schema.TestTable("test") sourceName := "test-source" - syncTime := time.Now().UTC().Round(1 * time.Second) - opts := testdata.GenTestDataOptions{ + syncTime := time.Now().UTC().Round(time.Second) + opts := schema.GenTestDataOptions{ SourceName: sourceName, SyncTime: syncTime, MaxRows: 2, } - records := testdata.GenTestData(arrowSchema, opts) + records := schema.GenTestData(table, opts) writer := bufio.NewWriter(&b) reader := bufio.NewReader(&b) @@ -31,7 +30,7 @@ func TestWriteRead(t *testing.T) { if err != nil { t.Fatal(err) } - if err := cl.WriteTableBatch(writer, arrowSchema, records); err != nil { + if err := cl.WriteTableBatch(writer, table, records); err != nil { t.Fatal(err) } err = writer.Flush() @@ -47,7 +46,7 @@ func TestWriteRead(t *testing.T) { ch := make(chan arrow.Record) var readErr error go func() { - readErr = cl.Read(byteReader, arrowSchema, "test-source", ch) + readErr = cl.Read(byteReader, table, "test-source", ch) close(ch) }() totalCount := 0 @@ -66,16 +65,15 @@ func TestWriteRead(t *testing.T) { } func BenchmarkWrite(b *testing.B) { - table := testdata.TestTable("test") - arrowSchema := table.ToArrowSchema() + table := schema.TestTable("test") sourceName := "test-source" - syncTime := time.Now().UTC().Round(1 * time.Second) - opts := testdata.GenTestDataOptions{ + syncTime := time.Now().UTC().Round(time.Second) + opts := schema.GenTestDataOptions{ SourceName: sourceName, SyncTime: syncTime, MaxRows: 1000, } - records := testdata.GenTestData(arrowSchema, opts) + records := schema.GenTestData(table, opts) cl, err := NewClient() if err != nil { @@ -85,7 +83,7 @@ func BenchmarkWrite(b *testing.B) { writer := bufio.NewWriter(&buf) b.ResetTimer() for i := 0; i < b.N; i++ { - if err := cl.WriteTableBatch(writer, arrowSchema, records); err != nil { + if err := cl.WriteTableBatch(writer, table, records); err != nil { b.Fatal(err) } err = writer.Flush() diff --git a/read.go b/read.go index 0d1ed22e..517f60ab 100644 --- a/read.go +++ b/read.go @@ -4,6 +4,7 @@ import ( "io" "github.com/apache/arrow/go/v13/arrow" + "github.com/cloudquery/plugin-sdk/v3/schema" ) type ReaderAtSeeker interface { @@ -12,18 +13,18 @@ type ReaderAtSeeker interface { io.Seeker } -func (cl *Client) Read(f ReaderAtSeeker, sc *arrow.Schema, sourceName string, res chan<- arrow.Record) error { +func (cl *Client) Read(f ReaderAtSeeker, table *schema.Table, sourceName string, res chan<- arrow.Record) error { switch cl.spec.Format { case FormatTypeCSV: - if err := cl.csv.Read(f, sc, sourceName, res); err != nil { + if err := cl.csv.Read(f, table, sourceName, res); err != nil { return err } case FormatTypeJSON: - if err := cl.json.Read(f, sc, sourceName, res); err != nil { + if err := cl.json.Read(f, table, sourceName, res); err != nil { return err } case FormatTypeParquet: - if err := cl.parquet.Read(f, sc, sourceName, res); err != nil { + if err := cl.parquet.Read(f, table, sourceName, res); err != nil { return err } default: diff --git a/spec.go b/spec.go index 6343219f..c9fe75ec 100644 --- a/spec.go +++ b/spec.go @@ -5,9 +5,9 @@ import ( "encoding/json" "fmt" - "github.com/cloudquery/filetypes/v2/csv" - jsonFile "github.com/cloudquery/filetypes/v2/json" - "github.com/cloudquery/filetypes/v2/parquet" + "github.com/cloudquery/filetypes/v3/csv" + jsonFile "github.com/cloudquery/filetypes/v3/json" + "github.com/cloudquery/filetypes/v3/parquet" ) type FormatType string diff --git a/spec_test.go b/spec_test.go index 54eebcdc..8baefc47 100644 --- a/spec_test.go +++ b/spec_test.go @@ -3,8 +3,8 @@ package filetypes import ( "testing" - "github.com/cloudquery/filetypes/v2/csv" - "github.com/cloudquery/filetypes/v2/json" + "github.com/cloudquery/filetypes/v3/csv" + "github.com/cloudquery/filetypes/v3/json" "github.com/stretchr/testify/assert" ) diff --git a/write.go b/write.go index 8f29ac82..526f6a7d 100644 --- a/write.go +++ b/write.go @@ -4,20 +4,21 @@ import ( "io" "github.com/apache/arrow/go/v13/arrow" + "github.com/cloudquery/plugin-sdk/v3/schema" ) -func (cl *Client) WriteTableBatchFile(w io.Writer, arrowSchema *arrow.Schema, records []arrow.Record) error { +func (cl *Client) WriteTableBatchFile(w io.Writer, table *schema.Table, records []arrow.Record) error { switch cl.spec.Format { case FormatTypeCSV: - if err := cl.csv.WriteTableBatch(w, arrowSchema, records); err != nil { + if err := cl.csv.WriteTableBatch(w, table, records); err != nil { return err } case FormatTypeJSON: - if err := cl.json.WriteTableBatch(w, arrowSchema, records); err != nil { + if err := cl.json.WriteTableBatch(w, table, records); err != nil { return err } case FormatTypeParquet: - if err := cl.parquet.WriteTableBatch(w, arrowSchema, records); err != nil { + if err := cl.parquet.WriteTableBatch(w, table, records); err != nil { return err } default: