Skip to content

Commit

Permalink
api: add context to connection create
Browse files Browse the repository at this point in the history
`connection.Connect` and `pool.Connect` no longer return non-working
connection objects. Those functions now accept context as their first
arguments, which user may cancel in process.

`connection.Connect` will block until either the working connection
created (and returned), `opts.MaxReconnects` creation attempts
were made (returns error) or the context is canceled by user
(returns error too).

Closes #136
  • Loading branch information
DerekBum committed Sep 26, 2023
1 parent d8df65d commit ff7346d
Show file tree
Hide file tree
Showing 24 changed files with 268 additions and 140 deletions.
3 changes: 3 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,9 @@ Versioning](http://semver.org/spec/v2.0.0.html) except to the first release.
decoded to a varbinary object (#313).
- Use objects of the Decimal type instead of pointers (#238)
- Use objects of the Datetime type instead of pointers (#238)
- `connection.Connect` and `pool.Connect` no longer return non-working
connection objects (#136). Those functions now accept context as their first
arguments, which user may cancel in process.

### Deprecated

Expand Down
4 changes: 3 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -105,13 +105,15 @@ about what it does.
package tarantool

import (
"context"
"fmt"
"github.com/tarantool/go-tarantool/v2"
)

func main() {
opts := tarantool.Opts{User: "guest"}
conn, err := tarantool.Connect("127.0.0.1:3301", opts)
ctx := context.Background()
conn, err := tarantool.Connect(ctx, "127.0.0.1:3301", opts)
if err != nil {
fmt.Println("Connection refused:", err)
}
Expand Down
46 changes: 29 additions & 17 deletions connection.go
Original file line number Diff line number Diff line change
Expand Up @@ -381,10 +381,11 @@ func (opts Opts) Clone() Opts {
// - If opts.Reconnect is zero (default), then connection either already connected
// or error is returned.
//
// - If opts.Reconnect is non-zero, then error will be returned only if authorization
// fails. But if Tarantool is not reachable, then it will make an attempt to reconnect later
// and will not finish to make attempts on authorization failures.
func Connect(addr string, opts Opts) (conn *Connection, err error) {
// - If opts.Reconnect is non-zero, then error will be returned if authorization
// fails, or user has canceled context. If Tarantool is not reachable, then it
// will make attempts to reconnect and will not finish to make attempts on
// authorization failures.
func Connect(ctx context.Context, addr string, opts Opts) (conn *Connection, err error) {
conn = &Connection{
addr: addr,
requestId: 0,
Expand Down Expand Up @@ -432,7 +433,7 @@ func Connect(addr string, opts Opts) (conn *Connection, err error) {

conn.cond = sync.NewCond(&conn.mutex)

if err = conn.createConnection(false); err != nil {
if err = conn.createConnection(ctx, false); err != nil {
ter, ok := err.(Error)
if conn.opts.Reconnect <= 0 {
return nil, err
Expand All @@ -441,15 +442,12 @@ func Connect(addr string, opts Opts) (conn *Connection, err error) {
// Reported auth errors immediately.
return nil, err
} else {
// Without SkipSchema it is useless.
go func(conn *Connection) {
conn.mutex.Lock()
defer conn.mutex.Unlock()
if err := conn.createConnection(true); err != nil {
conn.closeConnection(err, true)
}
}(conn)
err = nil
conn.mutex.Lock()
defer conn.mutex.Unlock()
if err := conn.createConnection(ctx, true); err != nil {
conn.closeConnection(err, true)
return nil, err
}
}
}

Expand Down Expand Up @@ -658,7 +656,8 @@ func pack(h *smallWBuf, enc *msgpack.Encoder, reqid uint32,
return
}

func (conn *Connection) createConnection(reconnect bool) (err error) {
func (conn *Connection) createConnection(ctx context.Context,
reconnect bool) (err error) {
var reconnects uint
for conn.c == nil && conn.state == connDisconnected {
now := time.Now()
Expand All @@ -679,7 +678,20 @@ func (conn *Connection) createConnection(reconnect bool) (err error) {
conn.notify(ReconnectFailed)
reconnects++
conn.mutex.Unlock()
time.Sleep(time.Until(now.Add(conn.opts.Reconnect)))

timer := time.NewTimer(time.Until(now.Add(conn.opts.Reconnect)))
waitLoop:
for {
select {
case <-ctx.Done():
err = ClientError{ErrConnectionClosed, "context is canceled"}
conn.mutex.Lock()
return
case <-timer.C:
break waitLoop
}
}

conn.mutex.Lock()
}
if conn.state == connClosed {
Expand Down Expand Up @@ -731,7 +743,7 @@ func (conn *Connection) reconnectImpl(neterr error, c Conn) {
if conn.opts.Reconnect > 0 {
if c == conn.c {
conn.closeConnection(neterr, false)
if err := conn.createConnection(true); err != nil {
if err := conn.createConnection(context.Background(), true); err != nil {
conn.closeConnection(err, true)
}
}
Expand Down
3 changes: 2 additions & 1 deletion crud/example_test.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package crud_test

import (
"context"
"fmt"
"reflect"
"time"
Expand All @@ -21,7 +22,7 @@ var exampleOpts = tarantool.Opts{
}

func exampleConnect() *tarantool.Connection {
conn, err := tarantool.Connect(exampleServer, exampleOpts)
conn, err := tarantool.Connect(context.Background(), exampleServer, exampleOpts)
if err != nil {
panic("Connection is not established: " + err.Error())
}
Expand Down
3 changes: 2 additions & 1 deletion crud/tarantool_test.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package crud_test

import (
"context"
"fmt"
"log"
"os"
Expand Down Expand Up @@ -108,7 +109,7 @@ var object = crud.MapObject{

func connect(t testing.TB) *tarantool.Connection {
for i := 0; i < 10; i++ {
conn, err := tarantool.Connect(server, opts)
conn, err := tarantool.Connect(context.Background(), server, opts)
if err != nil {
t.Fatalf("Failed to connect: %s", err)
}
Expand Down
3 changes: 2 additions & 1 deletion datetime/example_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
package datetime_test

import (
"context"
"fmt"
"time"

Expand All @@ -23,7 +24,7 @@ func Example() {
User: "test",
Pass: "test",
}
conn, err := tarantool.Connect("127.0.0.1:3013", opts)
conn, err := tarantool.Connect(context.Background(), "127.0.0.1:3013", opts)
if err != nil {
fmt.Printf("Error in connect is %v", err)
return
Expand Down
3 changes: 2 additions & 1 deletion decimal/example_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
package decimal_test

import (
"context"
"log"
"time"

Expand All @@ -28,7 +29,7 @@ func Example() {
User: "test",
Pass: "test",
}
client, err := tarantool.Connect(server, opts)
client, err := tarantool.Connect(context.Background(), server, opts)
if err != nil {
log.Fatalf("Failed to connect: %s", err.Error())
}
Expand Down
21 changes: 12 additions & 9 deletions dial_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package tarantool_test

import (
"bytes"
"context"
"errors"
"net"
"sync"
Expand Down Expand Up @@ -29,9 +30,10 @@ func TestDialer_Dial_error(t *testing.T) {
err: errors.New(errMsg),
}

conn, err := tarantool.Connect("any", tarantool.Opts{
Dialer: dialer,
})
conn, err := tarantool.Connect(context.Background(), "any",
tarantool.Opts{
Dialer: dialer,
})
assert.Nil(t, conn)
assert.ErrorContains(t, err, errMsg)
}
Expand Down Expand Up @@ -73,7 +75,7 @@ func TestDialer_Dial_passedOpts(t *testing.T) {
}

dialer := &mockPassedDialer{}
conn, err := tarantool.Connect(addr, tarantool.Opts{
conn, err := tarantool.Connect(context.Background(), addr, tarantool.Opts{
Dialer: dialer,
Timeout: opts.IoTimeout,
Transport: opts.Transport,
Expand Down Expand Up @@ -203,11 +205,12 @@ func dialIo(t *testing.T,
dialer := mockIoDialer{
init: init,
}
conn, err := tarantool.Connect("any", tarantool.Opts{
Dialer: &dialer,
Timeout: 1000 * time.Second, // Avoid pings.
SkipSchema: true,
})
conn, err := tarantool.Connect(context.Background(), "any",
tarantool.Opts{
Dialer: &dialer,
Timeout: 1000 * time.Second, // Avoid pings.
SkipSchema: true,
})
require.Nil(t, err)
require.NotNil(t, conn)

Expand Down
3 changes: 2 additions & 1 deletion example_custom_unpacking_test.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package tarantool_test

import (
"context"
"fmt"
"log"
"time"
Expand Down Expand Up @@ -84,7 +85,7 @@ func Example_customUnpacking() {
User: "test",
Pass: "test",
}
conn, err := tarantool.Connect(server, opts)
conn, err := tarantool.Connect(context.Background(), server, opts)
if err != nil {
log.Fatalf("Failed to connect: %s", err.Error())
}
Expand Down
21 changes: 11 additions & 10 deletions example_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ type Tuple struct {
}

func exampleConnect(opts tarantool.Opts) *tarantool.Connection {
conn, err := tarantool.Connect(server, opts)
conn, err := tarantool.Connect(context.Background(), server, opts)
if err != nil {
panic("Connection is not established: " + err.Error())
}
Expand All @@ -38,7 +38,7 @@ func ExampleSslOpts() {
CaFile: "testdata/ca.crt",
},
}
_, err := tarantool.Connect("127.0.0.1:3013", opts)
_, err := tarantool.Connect(context.Background(), "127.0.0.1:3013", opts)
if err != nil {
panic("Connection is not established: " + err.Error())
}
Expand Down Expand Up @@ -913,12 +913,13 @@ func ExampleFuture_GetIterator() {
}

func ExampleConnect() {
conn, err := tarantool.Connect("127.0.0.1:3013", tarantool.Opts{
Timeout: 5 * time.Second,
User: "test",
Pass: "test",
Concurrency: 32,
})
conn, err := tarantool.Connect(context.Background(), "127.0.0.1:3013",
tarantool.Opts{
Timeout: 5 * time.Second,
User: "test",
Pass: "test",
Concurrency: 32,
})
if err != nil {
fmt.Println("No connection available")
return
Expand Down Expand Up @@ -1081,7 +1082,7 @@ func ExampleConnection_NewPrepared() {
User: "test",
Pass: "test",
}
conn, err := tarantool.Connect(server, opts)
conn, err := tarantool.Connect(context.Background(), server, opts)
if err != nil {
fmt.Printf("Failed to connect: %s", err.Error())
}
Expand Down Expand Up @@ -1127,7 +1128,7 @@ func ExampleConnection_NewWatcher() {
Features: []tarantool.ProtocolFeature{tarantool.WatchersFeature},
},
}
conn, err := tarantool.Connect(server, opts)
conn, err := tarantool.Connect(context.Background(), server, opts)
if err != nil {
fmt.Printf("Failed to connect: %s\n", err)
return
Expand Down
Loading

0 comments on commit ff7346d

Please sign in to comment.