Skip to content

Commit

Permalink
* Enabled by default usage of internal/pool in internal/query.Client
Browse files Browse the repository at this point in the history
  • Loading branch information
asmyasnikov committed Sep 6, 2024
1 parent e563693 commit 68f04d3
Show file tree
Hide file tree
Showing 3 changed files with 17 additions and 52 deletions.
2 changes: 2 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
* Enabled by default usage of `internal/pool` in `internal/query.Client`

## v3.79.1
* Changed `trace.Table` and `trace.Query` traces
* Implemented `internal/pool` the same as table client pool from `internal/table.Client`
Expand Down
60 changes: 15 additions & 45 deletions internal/query/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -559,11 +559,19 @@ func (c *Client) DoTx(ctx context.Context, op query.TxOperation, opts ...options
return nil
}

func newPool(
ctx context.Context, cfg *config.Config, createSession func(ctx context.Context) (*Session, error),
) sessionPool {
if cfg.UseSessionPool() {
return pool.New(ctx,
func New(ctx context.Context, balancer grpc.ClientConnInterface, cfg *config.Config) *Client {
onDone := trace.QueryOnNew(cfg.Trace(), &ctx,
stack.FunctionID("github.com/ydb-platform/ydb-go-sdk/v3/internal/query.New"),
)
defer onDone()

grpcClient := Ydb_Query_V1.NewQueryServiceClient(balancer)

client := &Client{
config: cfg,
queryServiceClient: grpcClient,
done: make(chan struct{}),
pool: pool.New(ctx,
pool.WithLimit[*Session, Session](cfg.PoolLimit()),
pool.WithTrace[*Session, Session](poolTrace(cfg.Trace())),
pool.WithCreateItemTimeout[*Session, Session](cfg.SessionCreateTimeout()),
Expand All @@ -580,52 +588,14 @@ func newPool(
}
defer cancelCreate()

s, err := createSession(createCtx)
s, err := createSession(createCtx, grpcClient, cfg)
if err != nil {
return nil, xerrors.WithStackTrace(err)
}

return s, nil
}),
)
}

return &poolStub{
createSession: createSession,
}
}

func New(ctx context.Context, balancer grpc.ClientConnInterface, cfg *config.Config) *Client {
onDone := trace.QueryOnNew(cfg.Trace(), &ctx,
stack.FunctionID("github.com/ydb-platform/ydb-go-sdk/v3/internal/query.New"),
)
defer onDone()

grpcClient := Ydb_Query_V1.NewQueryServiceClient(balancer)

client := &Client{
config: cfg,
queryServiceClient: grpcClient,
done: make(chan struct{}),
pool: newPool(ctx, cfg, func(ctx context.Context) (_ *Session, err error) {
var (
createCtx context.Context
cancelCreate context.CancelFunc
)
if d := cfg.SessionCreateTimeout(); d > 0 {
createCtx, cancelCreate = xcontext.WithTimeout(ctx, d)
} else {
createCtx, cancelCreate = xcontext.WithCancel(ctx)
}
defer cancelCreate()

s, err := createSession(createCtx, grpcClient, cfg)
if err != nil {
return nil, xerrors.WithStackTrace(err)
}

return s, nil
}),
),
}

return client
Expand Down
7 changes: 0 additions & 7 deletions internal/query/config/config.go
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
package config

import (
"os"
"time"

"github.com/ydb-platform/ydb-go-sdk/v3/internal/config"
Expand All @@ -20,7 +19,6 @@ type Config struct {

poolLimit int

useSessionPool bool
sessionCreateTimeout time.Duration
sessionDeleteTimeout time.Duration

Expand All @@ -44,7 +42,6 @@ func defaults() *Config {
sessionCreateTimeout: DefaultSessionCreateTimeout,
sessionDeleteTimeout: DefaultSessionDeleteTimeout,
trace: &trace.Query{},
useSessionPool: os.Getenv("YDB_GO_SDK_QUERY_SERVICE_USE_SESSION_POOL") != "",
}
}

Expand All @@ -71,7 +68,3 @@ func (c *Config) SessionCreateTimeout() time.Duration {
func (c *Config) SessionDeleteTimeout() time.Duration {
return c.sessionDeleteTimeout
}

func (c *Config) UseSessionPool() bool {
return c.useSessionPool
}

0 comments on commit 68f04d3

Please sign in to comment.