Skip to content

Commit

Permalink
perf(go/adbc/driver/snowflake): Implement concurrency limit (#974)
Browse files Browse the repository at this point in the history
In order to improve the performance of the ADBC Snowflake driver,
particularly for larger size result sets, we should limit the concurrent
connections we make the snowflake during the prefetching in order to
avoid excessive blocking / throttling.
  • Loading branch information
zeroshade authored Aug 10, 2023
1 parent da33e6c commit 36f7233
Show file tree
Hide file tree
Showing 8 changed files with 60 additions and 36 deletions.
11 changes: 9 additions & 2 deletions docs/source/driver/snowflake.rst
Original file line number Diff line number Diff line change
Expand Up @@ -207,10 +207,17 @@ In addition, results are potentially fetched in parallel from multiple endpoints
A limited number of batches are queued per endpoint, though data is always
returned to the client in the order of the endpoints.

The queue size can be changed by setting an option on the :cpp:class:`AdbcStatement`:
To manage the performance of result fetching there are two options to control
buffering and concurrency behavior. These options are only available to be set
on the :cpp:class:`AdbcStatement` object:

``adbc.rpc.result_queue_size``
The number of batches to queue per endpoint. Defaults to 5.
The number of batches to queue in the record reader. Defaults to 200.
Must be an integer > 0.

``adbc.snowflake.rpc.prefetch_concurrency``
The number of concurrent streams being fetched from snowflake at a time.
Defaults to 10. Must be an integer > 0.

Transactions
------------
Expand Down
11 changes: 9 additions & 2 deletions go/adbc/driver/snowflake/connection.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,11 @@ import (
"github.com/snowflakedb/gosnowflake"
)

const (
defaultStatementQueueSize = 200
defaultPrefetchConcurrency = 10
)

type snowflakeConn interface {
driver.Conn
driver.ConnBeginTx
Expand Down Expand Up @@ -777,8 +782,10 @@ func (c *cnxn) Rollback(_ context.Context) error {
// NewStatement initializes a new statement object tied to this connection
func (c *cnxn) NewStatement() (adbc.Statement, error) {
return &statement{
alloc: c.db.alloc,
cnxn: c,
alloc: c.db.alloc,
cnxn: c,
queueSize: defaultStatementQueueSize,
prefetchConcurrency: defaultPrefetchConcurrency,
}, nil
}

Expand Down
3 changes: 2 additions & 1 deletion go/adbc/driver/snowflake/record_reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -413,7 +413,7 @@ type reader struct {
cancelFn context.CancelFunc
}

func newRecordReader(ctx context.Context, alloc memory.Allocator, ld gosnowflake.ArrowStreamLoader, bufferSize int) (array.RecordReader, error) {
func newRecordReader(ctx context.Context, alloc memory.Allocator, ld gosnowflake.ArrowStreamLoader, bufferSize, prefetchConcurrency int) (array.RecordReader, error) {
batches, err := ld.GetBatches()
if err != nil {
return nil, errToAdbcErr(adbc.StatusInternal, err)
Expand Down Expand Up @@ -478,6 +478,7 @@ func newRecordReader(ctx context.Context, alloc memory.Allocator, ld gosnowflake
}
}()

group.SetLimit(prefetchConcurrency)
group.Go(func() error {
defer rr.Release()
defer r.Close()
Expand Down
33 changes: 28 additions & 5 deletions go/adbc/driver/snowflake/statement.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,13 +33,15 @@ import (
)

const (
OptionStatementQueueSize = "adbc.rpc.result_queue_size"
OptionStatementQueueSize = "adbc.rpc.result_queue_size"
OptionStatementPrefetchConcurrency = "adbc.snowflake.rpc.prefetch_concurrency"
)

type statement struct {
cnxn *cnxn
alloc memory.Allocator
queueSize int
cnxn *cnxn
alloc memory.Allocator
queueSize int
prefetchConcurrency int

query string
targetTable string
Expand Down Expand Up @@ -97,7 +99,28 @@ func (st *statement) SetOption(key string, val string) error {
Code: adbc.StatusInvalidArgument,
}
}
if sz <= 0 {
return adbc.Error{
Msg: fmt.Sprintf("invalid value ('%d') for option '%s', must be > 0", sz, key),
Code: adbc.StatusInvalidArgument,
}
}
st.queueSize = sz
case OptionStatementPrefetchConcurrency:
concurrency, err := strconv.Atoi(val)
if err != nil {
return adbc.Error{
Msg: fmt.Sprintf("could not parse '%s' as int for option '%s'", val, key),
Code: adbc.StatusInvalidArgument,
}
}
if concurrency <= 0 {
return adbc.Error{
Msg: fmt.Sprintf("invalid value ('%d') for option '%s', must be > 0", concurrency, key),
Code: adbc.StatusInvalidArgument,
}
}
st.prefetchConcurrency = concurrency
default:
return adbc.Error{
Msg: fmt.Sprintf("invalid statement option %s=%s", key, val),
Expand Down Expand Up @@ -436,7 +459,7 @@ func (st *statement) ExecuteQuery(ctx context.Context) (array.RecordReader, int6
return nil, -1, errToAdbcErr(adbc.StatusInternal, err)
}

rdr, err := newRecordReader(ctx, st.alloc, loader, st.queueSize)
rdr, err := newRecordReader(ctx, st.alloc, loader, st.queueSize, st.prefetchConcurrency)
nrec := loader.TotalRows()
return rdr, nrec, err
}
Expand Down
4 changes: 2 additions & 2 deletions go/adbc/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ require (
github.com/bluele/gcache v0.0.2
github.com/google/uuid v1.3.0
github.com/snowflakedb/gosnowflake v1.6.22
github.com/stretchr/testify v1.8.2
github.com/stretchr/testify v1.8.4
golang.org/x/exp v0.0.0-20230713183714-613f0c0eb8a1
golang.org/x/sync v0.3.0
golang.org/x/tools v0.11.0
Expand All @@ -41,7 +41,7 @@ require (
github.com/JohnCGriffin/overflow v0.0.0-20211019200055-46fa312c352c // indirect
github.com/andybalholm/brotli v1.0.5 // indirect
github.com/apache/arrow/go/v12 v12.0.1 // indirect
github.com/apache/thrift v0.16.0 // indirect
github.com/apache/thrift v0.17.0 // indirect
github.com/aws/aws-sdk-go-v2 v1.19.0 // indirect
github.com/aws/aws-sdk-go-v2/aws/protocol/eventstream v1.4.10 // indirect
github.com/aws/aws-sdk-go-v2/credentials v1.13.27 // indirect
Expand Down
25 changes: 4 additions & 21 deletions go/adbc/go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,8 @@ github.com/apache/arrow/go/v12 v12.0.1 h1:JsR2+hzYYjgSUkBSaahpqCetqZMr76djX80fF/
github.com/apache/arrow/go/v12 v12.0.1/go.mod h1:weuTY7JvTG/HDPtMQxEUp7pU73vkLWMLpY67QwZ/WWw=
github.com/apache/arrow/go/v13 v13.0.0-20230713180941-b97597765355 h1:QuXqLb2HzL5EjY99fFp+iG9NagAruvQIbU/2++x+2VY=
github.com/apache/arrow/go/v13 v13.0.0-20230713180941-b97597765355/go.mod h1:W69eByFNO0ZR30q1/7Sr9d83zcVZmF2MiP3fFYAWJOc=
github.com/apache/thrift v0.16.0 h1:qEy6UW60iVOlUy+b9ZR0d5WzUWYGOo4HfopoyBaNmoY=
github.com/apache/thrift v0.16.0/go.mod h1:PHK3hniurgQaNMZYaCLEqXKsYK8upmhPbmdP2FXSqgU=
github.com/apache/thrift v0.17.0 h1:cMd2aj52n+8VoAtvSvLn4kDC3aZ6IAkBuqWQ2IDu7wo=
github.com/apache/thrift v0.17.0/go.mod h1:OLxhMRJxomX+1I/KUw03qoV3mMz16BwaKI+d4fPBx7Q=
github.com/aws/aws-sdk-go-v2 v1.19.0 h1:klAT+y3pGFBU/qVf1uzwttpBbiuozJYWzNLHioyDJ+k=
github.com/aws/aws-sdk-go-v2 v1.19.0/go.mod h1:uzbQtefpm44goOPmdKyAlXSNcwlRgF3ePWVW6EtJvvw=
github.com/aws/aws-sdk-go-v2/aws/protocol/eventstream v1.4.10 h1:dK82zF6kkPeCo8J1e+tGx4JdvDIQzj7ygIoLg8WMuGs=
Expand Down Expand Up @@ -80,7 +80,6 @@ github.com/goccy/go-json v0.10.2/go.mod h1:6MelG93GURQebXPDq3khkgXZkazVtN9CRI+MG
github.com/godbus/dbus v0.0.0-20190726142602-4481cbc300e2 h1:ZpnhV/YsD2/4cESfV5+Hoeu/iUR3ruzNvZ+yQfO03a0=
github.com/godbus/dbus v0.0.0-20190726142602-4481cbc300e2/go.mod h1:bBOAhwG1umN6/6ZUMtDFBMQR8jRg9O75tm9K00oMsK4=
github.com/golang-jwt/jwt/v4 v4.5.0 h1:7cYmW1XlMY7h7ii7UhUyChSgS5wUJEnm9uZVTGqOWzg=
github.com/golang/mock v1.5.0/go.mod h1:CWnOUgYIOo4TcNZ0wHX3YZCqsaM1I1Jvs6v3mP3KVu8=
github.com/golang/protobuf v1.5.0/go.mod h1:FsONVRAS9T7sI+LIUmWTfcYkHO4aIWwzhcaSAoJOfIk=
github.com/golang/protobuf v1.5.3 h1:KhyjKVUg7Usr/dYsdSqoFveMYd5ko72D+zANwlG1mmg=
github.com/golang/protobuf v1.5.3/go.mod h1:XVQd3VNwM+JqD3oG2Ue2ip4fOMUkwXdXDdiuN0vRsmY=
Expand Down Expand Up @@ -138,35 +137,23 @@ github.com/sirupsen/logrus v1.9.3/go.mod h1:naHLuLoDiP4jHNo9R0sCBMtWGeIprob74mVs
github.com/snowflakedb/gosnowflake v1.6.23-0.20230717195239-fec38ba82d2a h1:F7fKVj3t12jr3Bopzngsp/PZDm1or8zpk+29NN4YFGk=
github.com/snowflakedb/gosnowflake v1.6.23-0.20230717195239-fec38ba82d2a/go.mod h1:KfO4F7bk+aXPUIvBqYxvPhxLlu2/w4TtSC8Rw/yr5Mg=
github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME=
github.com/stretchr/objx v0.4.0/go.mod h1:YvHI0jy2hoMjB+UWwv71VJQ9isScKT/TqJzVSSt89Yw=
github.com/stretchr/objx v0.5.0 h1:1zr/of2m5FGMsad5YfcqgdqdWrIhu+EBEJRhR1U7z/c=
github.com/stretchr/objx v0.5.0/go.mod h1:Yh+to48EsGEfYuaHDzXPcE3xhTkx73EhmCGUpEOglKo=
github.com/stretchr/testify v1.7.0/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg=
github.com/stretchr/testify v1.7.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg=
github.com/stretchr/testify v1.8.0/go.mod h1:yNjHg4UonilssWZ8iaSj1OCr/vHnekPRkoO+kdMU+MU=
github.com/stretchr/testify v1.8.2 h1:+h33VjcLVPDHtOdpUCuF+7gSuG3yGIftsP1YvFihtJ8=
github.com/stretchr/testify v1.8.2/go.mod h1:w2LPCIKwWwSfY2zedu0+kehJoqGctiVI29o6fzry7u4=
github.com/stretchr/testify v1.8.4 h1:CcVxjf3Q8PM0mHUKJCdn+eZZtm5yQwehR5yeSVQQcUk=
github.com/stretchr/testify v1.8.4/go.mod h1:sz/lmYIOXD/1dqDmKjjqLyZ2RngseejIcXlSw2iwfAo=
github.com/zeebo/assert v1.3.0 h1:g7C04CbJuIDKNPFHmsk4hwZDO5O+kntRxzaUoNXj+IQ=
github.com/zeebo/xxh3 v1.0.2 h1:xZmwmqxHZA8AI603jOQ0tMqmBr9lPeFwGg6d+xy9DC0=
github.com/zeebo/xxh3 v1.0.2/go.mod h1:5NWz9Sef7zIDm2JHfFlcQvNekmcEl9ekUZQQKCYaDcA=
golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w=
golang.org/x/crypto v0.0.0-20191011191535-87dc89f01550/go.mod h1:yigFU9vqHzYiE8UmvKecakEJjdnWj3jj499lnFckfCI=
golang.org/x/crypto v0.11.0 h1:6Ewdq3tDic1mg5xRO4milcWCfMVQhI4NkqWWvqejpuA=
golang.org/x/crypto v0.11.0/go.mod h1:xgJhtzW8F9jGdVFWZESrid1U1bjeNy4zgy5cRr/CIio=
golang.org/x/exp v0.0.0-20230713183714-613f0c0eb8a1 h1:MGwJjxBy0HJshjDNfLsYO8xppfqWlA5ZT9OhtUUhTNw=
golang.org/x/exp v0.0.0-20230713183714-613f0c0eb8a1/go.mod h1:FXUEEKJgO7OQYeo8N01OfiKP8RXMtf6e8aTskBGqWdc=
golang.org/x/mod v0.3.0/go.mod h1:s0Qsj1ACt9ePp/hMypM3fl4fZqREWJwdYDEqhRiZZUA=
golang.org/x/mod v0.12.0 h1:rmsUpXtvNzj340zd98LZ4KntptpfRHwpFOHG188oHXc=
golang.org/x/mod v0.12.0/go.mod h1:iBbtSCu2XBx23ZKBPSOrRkjjQPZFPuis4dIYUhu/chs=
golang.org/x/net v0.0.0-20190404232315-eb5bcb51f2a3/go.mod h1:t9HGtf8HONx5eT2rtn7q6eTqICYqUVnKs3thJo3Qplg=
golang.org/x/net v0.0.0-20190620200207-3b0461eec859/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s=
golang.org/x/net v0.12.0 h1:cfawfvKITfUsFCeJIHJrbSxpeu/E81khclypR0GVT50=
golang.org/x/net v0.12.0/go.mod h1:zEVYFnQC7m/vmpQFELhcD1EWkZlX69l4oqgmer6hfKA=
golang.org/x/sync v0.0.0-20190423024810-112230192c58/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sync v0.3.0 h1:ftCYgMx6zT/asHUrPw8BLLscYtGznsLAnjq5RH9P66E=
golang.org/x/sync v0.3.0/go.mod h1:FU7BRWz2tNW+3quACPkgCx/L+uEAv1htQ0V83Z9Rj+Y=
golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
golang.org/x/sys v0.0.0-20190412213103-97732733099d/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20210616045830-e2b7044e8c71/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.0.0-20220715151400-c0bba94af5f8/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.5.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
Expand All @@ -175,14 +162,10 @@ golang.org/x/sys v0.10.0 h1:SqMFp9UcQJZa+pmYuAKjd9xq1f0j5rLcDIk0mj4qAsA=
golang.org/x/sys v0.10.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/term v0.10.0 h1:3R7pNqamzBraeqj/Tj8qt1aQ2HpmlC+Cx/qL/7hn4/c=
golang.org/x/term v0.10.0/go.mod h1:lpqdcUyK/oCiQxvxVrppt5ggO2KCZ5QblwqPnfZ6d5o=
golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ=
golang.org/x/text v0.11.0 h1:LAntKIrcmeSKERyiOh0XMV39LXS8IE9UL2yP7+f5ij4=
golang.org/x/text v0.11.0/go.mod h1:TvPlkZtksWOMsz7fbANvkp4WM8x/WCo/om8BMLbz+aE=
golang.org/x/tools v0.0.0-20191119224855-298f0cb1881e/go.mod h1:b+2E5dAYhXwXZwtnZ6UAqBI28+e2cm9otk0dWdXHAEo=
golang.org/x/tools v0.11.0 h1:EMCa6U9S2LtZXLAMoWiR/R8dAQFRqbAitmbJ2UKhoi8=
golang.org/x/tools v0.11.0/go.mod h1:anzJrxPjNtfgiYQYirP2CPGzGLxrH2u2QBhn6Bf3qY8=
golang.org/x/xerrors v0.0.0-20190717185122-a985d3407aa7/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
golang.org/x/xerrors v0.0.0-20191011141410-1b5146add898/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
golang.org/x/xerrors v0.0.0-20220907171357-04be3eba64a2 h1:H2TDz8ibqkAF6YGhCdN3jS9O0/s90v0rJh3X/OLHEUk=
golang.org/x/xerrors v0.0.0-20220907171357-04be3eba64a2/go.mod h1:K8+ghG5WaK9qNqU5K3HdILfMLy1f3aNYFI/wnl100a8=
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -90,8 +90,11 @@ class DatabaseOptions(enum.Enum):
class StatementOptions(enum.Enum):
"""Statement options specific to the Snowflake driver."""

#: The number of rows per batch. Defaults to 1024.
BATCH_ROWS = "adbc.rpc.result_queue_size"
#: The number of batches queued up at a time. Defaults to 200.
RESULT_QUEUE_SIZE = "adbc.rpc.result_queue_size"
#: Number of concurrent streams being prefetched for a result set.
#: Defaults to 10.
PREFETCH_CONCURRENCY = "adbc.snowflake.rpc.prefetch_concurrency"


def connect(
Expand Down
2 changes: 1 addition & 1 deletion python/adbc_driver_snowflake/tests/test_lowlevel.py
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ def test_options(snowflake):
with adbc_driver_manager.AdbcStatement(snowflake) as stmt:
stmt.set_options(
**{
adbc_driver_snowflake.StatementOptions.BATCH_ROWS.value: "1",
adbc_driver_snowflake.StatementOptions.RESULT_QUEUE_SIZE.value: "1",
}
)
stmt.set_sql_query("SELECT 1")
Expand Down

0 comments on commit 36f7233

Please sign in to comment.