Skip to content

Commit

Permalink
fix(go/adbc/driver/snowflake): handle non-arrow result sets (#909)
Browse files Browse the repository at this point in the history
Closes #863
  • Loading branch information
zeroshade authored Jul 18, 2023
1 parent 1a07f24 commit 78d44ac
Show file tree
Hide file tree
Showing 4 changed files with 287 additions and 107 deletions.
15 changes: 10 additions & 5 deletions go/adbc/driver/snowflake/driver_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -358,10 +358,15 @@ func (suite *SnowflakeTests) TestStatementEmptyResultSet() {

// XXX: there IS data in this result set, but Snowflake doesn't
// appear to support getting the results as Arrow
_, _, err := suite.stmt.ExecuteQuery(suite.ctx)
var adbcErr adbc.Error
suite.ErrorAs(err, &adbcErr)
rdr, n, err := suite.stmt.ExecuteQuery(suite.ctx)
suite.Require().NoError(err)
defer rdr.Release()

suite.Equal(adbc.StatusInternal, adbcErr.Code)
suite.Contains(adbcErr.Msg, "Cannot get Arrow data from this result set")
suite.True(rdr.Next())
rec := rdr.Record()
suite.Equal(n, rec.NumRows())
suite.EqualValues(25, rec.NumCols())

suite.False(rdr.Next())
suite.NoError(rdr.Err())
}
179 changes: 176 additions & 3 deletions go/adbc/driver/snowflake/record_reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,9 @@ package snowflake

import (
"context"
"encoding/hex"
"math"
"strconv"
"strings"
"sync/atomic"
"time"
Expand Down Expand Up @@ -243,6 +245,163 @@ func getTransformer(sc *arrow.Schema, ld gosnowflake.ArrowStreamLoader) (*arrow.
return out, getRecTransformer(out, transformers)
}

func rowTypesToArrowSchema(ctx context.Context, ld gosnowflake.ArrowStreamLoader) (*arrow.Schema, error) {
var loc *time.Location

metadata := ld.RowTypes()
fields := make([]arrow.Field, len(metadata))
for i, srcMeta := range metadata {
fields[i] = arrow.Field{
Name: srcMeta.Name,
Nullable: srcMeta.Nullable,
Metadata: arrow.MetadataFrom(map[string]string{
"SNOWFLAKE_TYPE": srcMeta.Type,
}),
}
switch srcMeta.Type {
case "fixed":
fields[i].Type = arrow.PrimitiveTypes.Int64
case "real":
fields[i].Type = arrow.PrimitiveTypes.Float64
case "date":
fields[i].Type = arrow.PrimitiveTypes.Date32
case "time":
fields[i].Type = arrow.FixedWidthTypes.Time64ns
case "timestamp_ntz", "timestamp_tz":
fields[i].Type = arrow.FixedWidthTypes.Timestamp_ns
case "timestamp_ltz":
if loc == nil {
loc = time.Now().Location()
}
fields[i].Type = &arrow.TimestampType{Unit: arrow.Nanosecond, TimeZone: loc.String()}
case "binary":
fields[i].Type = arrow.BinaryTypes.Binary
default:
fields[i].Type = arrow.BinaryTypes.String
}
}
return arrow.NewSchema(fields, nil), nil
}

func extractTimestamp(src *string) (sec, nsec int64, err error) {
s, ms, hasFraction := strings.Cut(*src, ".")
sec, err = strconv.ParseInt(s, 10, 64)
if err != nil {
return
}

if !hasFraction {
return
}

nsec, err = strconv.ParseInt(ms+strings.Repeat("0", 9-len(ms)), 10, 64)
return
}

func jsonDataToArrow(ctx context.Context, bldr *array.RecordBuilder, ld gosnowflake.ArrowStreamLoader) (arrow.Record, error) {
rawData := ld.JSONData()
fieldBuilders := bldr.Fields()
for _, rec := range rawData {
for i, col := range rec {
field := fieldBuilders[i]

if col == nil {
field.AppendNull()
continue
}

switch fb := field.(type) {
case *array.Time64Builder:
sec, nsec, err := extractTimestamp(col)
if err != nil {
return nil, err
}

fb.Append(arrow.Time64(sec*1e9 + nsec))
case *array.TimestampBuilder:
tz, err := fb.Type().(*arrow.TimestampType).GetZone()
if err != nil {
return nil, err
}

if tz != time.UTC {
sec, nsec, err := extractTimestamp(col)
if err != nil {
return nil, err
}
val := time.Unix(sec, nsec).In(loc)
ts, err := arrow.TimestampFromTime(val, arrow.Nanosecond)
if err != nil {
return nil, err
}
fb.Append(ts)
break
}

snowflakeType, _ := bldr.Schema().Field(i).Metadata.GetValue("SNOWFLAKE_TYPE")
if snowflakeType == "timestamp_ntz" {
sec, nsec, err := extractTimestamp(col)
if err != nil {
return nil, err
}

fb.Append(arrow.Timestamp(sec*1e9 + nsec))
break
}

// "timestamp_tz" should be value + offset separated by space
tm := strings.Split(*col, " ")
if len(tm) != 2 {
return nil, adbc.Error{
Msg: "invalid TIMESTAMP_TZ data. value doesn't consist of two numeric values separated by a space: " + *col,
SqlState: [5]byte{'2', '2', '0', '0', '7'},
VendorCode: 268000,
Code: adbc.StatusInvalidData,
}
}

sec, nsec, err := extractTimestamp(&tm[0])
if err != nil {
return nil, err
}
offset, err := strconv.ParseInt(tm[1], 10, 64)
if err != nil {
return nil, adbc.Error{
Msg: "invalid TIMESTAMP_TZ data. offset value is not an integer: " + tm[1],
SqlState: [5]byte{'2', '2', '0', '0', '7'},
VendorCode: 268000,
Code: adbc.StatusInvalidData,
}
}

loc := gosnowflake.Location(int(offset) - 1440)
tt := time.Unix(sec, nsec).In(loc)
ts, err := arrow.TimestampFromTime(tt, arrow.Nanosecond)
if err != nil {
return nil, err
}
fb.Append(ts)
case *array.BinaryBuilder:
b, err := hex.DecodeString(*col)
if err != nil {
return nil, adbc.Error{
Msg: err.Error(),
VendorCode: 268002,
SqlState: [5]byte{'2', '2', '0', '0', '3'},
Code: adbc.StatusInvalidData,
}
}
fb.Append(b)
default:
if err := fb.AppendValueFromString(*col); err != nil {
return nil, err
}
}
}
}
return bldr.NewRecord(), nil
}

type reader struct {
refCount int64
schema *arrow.Schema
Expand All @@ -263,10 +422,24 @@ func newRecordReader(ctx context.Context, alloc memory.Allocator, ld gosnowflake
if len(batches) == 0 {
if ld.TotalRows() != 0 {
// XXX(https://github.com/apache/arrow-adbc/issues/863): Snowflake won't return Arrow data for certain queries
return nil, adbc.Error{
Msg: "[Snowflake] Cannot get Arrow data from this result set (see apache/arrow-adbc#863)",
Code: adbc.StatusInternal,
schema, err := rowTypesToArrowSchema(ctx, ld)
if err != nil {
return nil, adbc.Error{
Msg: err.Error(),
Code: adbc.StatusInternal,
}
}

bldr := array.NewRecordBuilder(alloc, schema)
defer bldr.Release()

rec, err := jsonDataToArrow(ctx, bldr, ld)
if err != nil {
return nil, err
}
defer rec.Release()

return array.NewRecordReader(schema, []arrow.Record{rec})
}
schema := arrow.NewSchema([]arrow.Field{}, nil)
reader, err := array.NewRecordReader(schema, []arrow.Record{})
Expand Down
56 changes: 29 additions & 27 deletions go/adbc/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -23,39 +23,39 @@ require (
github.com/apache/arrow/go/v13 v13.0.0-20230713180941-b97597765355
github.com/bluele/gcache v0.0.2
github.com/google/uuid v1.3.0
github.com/snowflakedb/gosnowflake v1.6.21
github.com/snowflakedb/gosnowflake v1.6.22
github.com/stretchr/testify v1.8.2
golang.org/x/exp v0.0.0-20230425010034-47ecfdc1ba53
golang.org/x/exp v0.0.0-20230713183714-613f0c0eb8a1
golang.org/x/sync v0.3.0
golang.org/x/tools v0.10.0
golang.org/x/tools v0.11.0
google.golang.org/grpc v1.54.0
google.golang.org/protobuf v1.30.0
)

require (
github.com/99designs/go-keychain v0.0.0-20191008050251-8e49817e8af4 // indirect
github.com/99designs/keyring v1.2.2 // indirect
github.com/Azure/azure-sdk-for-go/sdk/azcore v1.6.0 // indirect
github.com/Azure/azure-sdk-for-go/sdk/azcore v1.7.0 // indirect
github.com/Azure/azure-sdk-for-go/sdk/internal v1.3.0 // indirect
github.com/Azure/azure-sdk-for-go/sdk/storage/azblob v1.0.0 // indirect
github.com/Azure/azure-sdk-for-go/sdk/storage/azblob v1.1.0 // indirect
github.com/JohnCGriffin/overflow v0.0.0-20211019200055-46fa312c352c // indirect
github.com/andybalholm/brotli v1.0.5 // indirect
github.com/apache/arrow/go/v12 v12.0.0 // indirect
github.com/apache/arrow/go/v12 v12.0.1 // indirect
github.com/apache/thrift v0.16.0 // indirect
github.com/aws/aws-sdk-go-v2 v1.18.0 // indirect
github.com/aws/aws-sdk-go-v2 v1.19.0 // indirect
github.com/aws/aws-sdk-go-v2/aws/protocol/eventstream v1.4.10 // indirect
github.com/aws/aws-sdk-go-v2/credentials v1.13.24 // indirect
github.com/aws/aws-sdk-go-v2/feature/s3/manager v1.11.67 // indirect
github.com/aws/aws-sdk-go-v2/internal/configsources v1.1.33 // indirect
github.com/aws/aws-sdk-go-v2/internal/endpoints/v2 v2.4.27 // indirect
github.com/aws/aws-sdk-go-v2/internal/v4a v1.0.25 // indirect
github.com/aws/aws-sdk-go-v2/credentials v1.13.27 // indirect
github.com/aws/aws-sdk-go-v2/feature/s3/manager v1.11.72 // indirect
github.com/aws/aws-sdk-go-v2/internal/configsources v1.1.35 // indirect
github.com/aws/aws-sdk-go-v2/internal/endpoints/v2 v2.4.29 // indirect
github.com/aws/aws-sdk-go-v2/internal/v4a v1.0.27 // indirect
github.com/aws/aws-sdk-go-v2/service/internal/accept-encoding v1.9.11 // indirect
github.com/aws/aws-sdk-go-v2/service/internal/checksum v1.1.28 // indirect
github.com/aws/aws-sdk-go-v2/service/internal/presigned-url v1.9.27 // indirect
github.com/aws/aws-sdk-go-v2/service/internal/s3shared v1.14.2 // indirect
github.com/aws/aws-sdk-go-v2/service/s3 v1.33.1 // indirect
github.com/aws/aws-sdk-go-v2/service/internal/checksum v1.1.30 // indirect
github.com/aws/aws-sdk-go-v2/service/internal/presigned-url v1.9.29 // indirect
github.com/aws/aws-sdk-go-v2/service/internal/s3shared v1.14.4 // indirect
github.com/aws/aws-sdk-go-v2/service/s3 v1.37.0 // indirect
github.com/aws/smithy-go v1.13.5 // indirect
github.com/danieljoos/wincred v1.1.2 // indirect
github.com/danieljoos/wincred v1.2.0 // indirect
github.com/davecgh/go-spew v1.1.1 // indirect
github.com/dustin/go-humanize v1.0.1 // indirect
github.com/dvsekhvalnov/jose2go v1.5.0 // indirect
Expand All @@ -70,24 +70,24 @@ require (
github.com/jmespath/go-jmespath v0.4.0 // indirect
github.com/kballard/go-shellquote v0.0.0-20180428030007-95032a82bc51 // indirect
github.com/klauspost/asmfmt v1.3.2 // indirect
github.com/klauspost/compress v1.16.5 // indirect
github.com/klauspost/cpuid/v2 v2.2.4 // indirect
github.com/klauspost/compress v1.16.7 // indirect
github.com/klauspost/cpuid/v2 v2.2.5 // indirect
github.com/mattn/go-isatty v0.0.18 // indirect
github.com/minio/asm2plan9s v0.0.0-20200509001527-cdd76441f9d8 // indirect
github.com/minio/c2goasm v0.0.0-20190812172519-36a3d3bbc4f3 // indirect
github.com/mtibben/percent v0.2.1 // indirect
github.com/pierrec/lz4/v4 v4.1.17 // indirect
github.com/pierrec/lz4/v4 v4.1.18 // indirect
github.com/pkg/browser v0.0.0-20210911075715-681adbf594b8 // indirect
github.com/pmezard/go-difflib v1.0.0 // indirect
github.com/remyoudompheng/bigfft v0.0.0-20230129092748-24d4a6f8daec // indirect
github.com/sirupsen/logrus v1.9.0 // indirect
github.com/sirupsen/logrus v1.9.3 // indirect
github.com/zeebo/xxh3 v1.0.2 // indirect
golang.org/x/crypto v0.10.0 // indirect
golang.org/x/mod v0.11.0 // indirect
golang.org/x/net v0.11.0 // indirect
golang.org/x/sys v0.9.0 // indirect
golang.org/x/term v0.9.0 // indirect
golang.org/x/text v0.10.0 // indirect
golang.org/x/crypto v0.11.0 // indirect
golang.org/x/mod v0.12.0 // indirect
golang.org/x/net v0.12.0 // indirect
golang.org/x/sys v0.10.0 // indirect
golang.org/x/term v0.10.0 // indirect
golang.org/x/text v0.11.0 // indirect
golang.org/x/xerrors v0.0.0-20220907171357-04be3eba64a2 // indirect
google.golang.org/genproto v0.0.0-20230410155749-daa745c078e1 // indirect
gopkg.in/yaml.v3 v3.0.1 // indirect
Expand All @@ -102,3 +102,5 @@ require (
modernc.org/strutil v1.1.3 // indirect
modernc.org/token v1.1.0 // indirect
)

replace github.com/snowflakedb/gosnowflake => github.com/snowflakedb/gosnowflake v1.6.23-0.20230717195239-fec38ba82d2a
Loading

0 comments on commit 78d44ac

Please sign in to comment.