Skip to content

Commit

Permalink
api: add context to connection create
Browse files Browse the repository at this point in the history
`connection.Connect` and `pool.Connect` no longer return non-working
connection objects. Those functions now accept context as their first
arguments, which user may cancel in process.

`connection.Connect` will block until either the working connection
created (and returned), `opts.MaxReconnects` creation attempts
were made (returns error) or the context is canceled by user
(returns error too).

Closes #136
  • Loading branch information
DerekBum committed Oct 17, 2023
1 parent d8df65d commit b402c58
Show file tree
Hide file tree
Showing 30 changed files with 738 additions and 263 deletions.
7 changes: 7 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,13 @@ Versioning](http://semver.org/spec/v2.0.0.html) except to the first release.
decoded to a varbinary object (#313).
- Use objects of the Decimal type instead of pointers (#238)
- Use objects of the Datetime type instead of pointers (#238)
- `connection.Connect` no longer return non-working
connection objects (#136). This function now does not attempt to reconnect
and tries to establish a connection only once. Function might be canceled
via context. Context accepted as first argument.
`pool.Connect` and `pool.Add` now accept context as first argument, which
user may cancel in process. If `pool.Connect` is canceled in progress, an
error will be returned. All created connections will be closed.

### Deprecated

Expand Down
32 changes: 27 additions & 5 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -105,13 +105,19 @@ about what it does.
package tarantool

import (
"context"
"fmt"
"time"

"github.com/tarantool/go-tarantool/v2"
)

func main() {
opts := tarantool.Opts{User: "guest"}
conn, err := tarantool.Connect("127.0.0.1:3301", opts)
ctx, cancel := context.WithTimeout(context.Background(),
500 * time.Millisecond)
defer cancel()
conn, err := tarantool.Connect(ctx, "127.0.0.1:3301", opts)
if err != nil {
fmt.Println("Connection refused:", err)
}
Expand All @@ -134,11 +140,17 @@ username. The structure may also contain other settings, see more in
[documentation][godoc-opts-url] for the "`Opts`" structure.

**Observation 3:** The line containing "`tarantool.Connect`" is essential for
starting a session. There are two parameters:
starting a session. There are three parameters:

* a string with `host:port` format, and
* a context,
* a string with `host:port` format,
* the option structure that was set up earlier.

There will be only one attempt to connect. If multiple attempts needed,
"`tarantool.Connect`" could be placed inside the loop with some timeout
between each try. Example could be found in the [example_test](./example_test.go),
name - `ExampleConnect_reconnects`.

**Observation 4:** The `err` structure will be `nil` if there is no error,
otherwise it will have a description which can be retrieved with `err.Error()`.

Expand Down Expand Up @@ -167,10 +179,13 @@ The subpackage has been deleted. You could use `pool` instead.

#### pool package

The logic has not changed, but there are a few renames:

* The `connection_pool` subpackage has been renamed to `pool`.
* The type `PoolOpts` has been renamed to `Opts`.
* `pool.Connect` now accepts context as first argument, which user may cancel
in process. If it is canceled in progress, an error will be returned.
All created connections will be closed.
* `pool.Add` now accepts context as first argument, which user may cancel in
process.

#### msgpack.v5

Expand Down Expand Up @@ -212,6 +227,13 @@ IPROTO constants have been moved to a separate package [go-iproto](https://githu

* The method `Code() uint32` replaced by the `Type() iproto.Type`.

#### Connect function

`connection.Connect` no longer return non-working connection objects. This function
now does not attempt to reconnect and tries to establish a connection only once.
Function might be canceled via context. Context accepted as first argument,
and user may cancel it in process.

## Contributing

See [the contributing guide](CONTRIBUTING.md) for detailed instructions on how
Expand Down
125 changes: 61 additions & 64 deletions connection.go
Original file line number Diff line number Diff line change
Expand Up @@ -375,16 +375,7 @@ func (opts Opts) Clone() Opts {
// - Unix socket, first '/' or '.' indicates Unix socket
// (unix:///abs/path/tnt.sock, unix:path/tnt.sock, /abs/path/tnt.sock,
// ./rel/path/tnt.sock, unix/:path/tnt.sock)
//
// Notes:
//
// - If opts.Reconnect is zero (default), then connection either already connected
// or error is returned.
//
// - If opts.Reconnect is non-zero, then error will be returned only if authorization
// fails. But if Tarantool is not reachable, then it will make an attempt to reconnect later
// and will not finish to make attempts on authorization failures.
func Connect(addr string, opts Opts) (conn *Connection, err error) {
func Connect(ctx context.Context, addr string, opts Opts) (conn *Connection, err error) {
conn = &Connection{
addr: addr,
requestId: 0,
Expand Down Expand Up @@ -432,25 +423,8 @@ func Connect(addr string, opts Opts) (conn *Connection, err error) {

conn.cond = sync.NewCond(&conn.mutex)

if err = conn.createConnection(false); err != nil {
ter, ok := err.(Error)
if conn.opts.Reconnect <= 0 {
return nil, err
} else if ok && (ter.Code == iproto.ER_NO_SUCH_USER ||
ter.Code == iproto.ER_CREDS_MISMATCH) {
// Reported auth errors immediately.
return nil, err
} else {
// Without SkipSchema it is useless.
go func(conn *Connection) {
conn.mutex.Lock()
defer conn.mutex.Unlock()
if err := conn.createConnection(true); err != nil {
conn.closeConnection(err, true)
}
}(conn)
err = nil
}
if err = conn.createConnection(ctx); err != nil {
return nil, err
}

go conn.pinger()
Expand Down Expand Up @@ -534,18 +508,11 @@ func (conn *Connection) cancelFuture(fut *Future, err error) {
}
}

func (conn *Connection) dial() (err error) {
func (conn *Connection) dial(ctx context.Context) error {
opts := conn.opts
dialTimeout := opts.Reconnect / 2
if dialTimeout == 0 {
dialTimeout = 500 * time.Millisecond
} else if dialTimeout > 5*time.Second {
dialTimeout = 5 * time.Second
}

var c Conn
c, err = conn.opts.Dialer.Dial(conn.addr, DialOpts{
DialTimeout: dialTimeout,
c, err := conn.opts.Dialer.Dial(ctx, conn.addr, DialOpts{
IoTimeout: opts.Timeout,
Transport: opts.Transport,
Ssl: opts.Ssl,
Expand All @@ -555,7 +522,7 @@ func (conn *Connection) dial() (err error) {
Password: opts.Pass,
})
if err != nil {
return
return err
}

conn.Greeting.Version = c.Greeting().Version
Expand Down Expand Up @@ -605,7 +572,7 @@ func (conn *Connection) dial() (err error) {
conn.shutdownWatcher = watcher
}

return
return nil
}

func pack(h *smallWBuf, enc *msgpack.Encoder, reqid uint32,
Expand Down Expand Up @@ -658,34 +625,18 @@ func pack(h *smallWBuf, enc *msgpack.Encoder, reqid uint32,
return
}

func (conn *Connection) createConnection(reconnect bool) (err error) {
var reconnects uint
for conn.c == nil && conn.state == connDisconnected {
now := time.Now()
err = conn.dial()
if err == nil || !reconnect {
if err == nil {
conn.notify(Connected)
}
return
}
if conn.opts.MaxReconnects > 0 && reconnects > conn.opts.MaxReconnects {
conn.opts.Logger.Report(LogLastReconnectFailed, conn, err)
err = ClientError{ErrConnectionClosed, "last reconnect failed"}
// mark connection as closed to avoid reopening by another goroutine
return
func (conn *Connection) createConnection(ctx context.Context) error {
var err error
if conn.c == nil && conn.state == connDisconnected {
if err = conn.dial(ctx); err == nil {
conn.notify(Connected)
return nil
}
conn.opts.Logger.Report(LogReconnectFailed, conn, reconnects, err)
conn.notify(ReconnectFailed)
reconnects++
conn.mutex.Unlock()
time.Sleep(time.Until(now.Add(conn.opts.Reconnect)))
conn.mutex.Lock()
}
if conn.state == connClosed {
err = ClientError{ErrConnectionClosed, "using closed connection"}
}
return
return err
}

func (conn *Connection) closeConnection(neterr error, forever bool) (err error) {
Expand Down Expand Up @@ -727,11 +678,57 @@ func (conn *Connection) closeConnection(neterr error, forever bool) (err error)
return
}

func (conn *Connection) getDialTimeout() time.Duration {
dialTimeout := conn.opts.Reconnect / 2
if dialTimeout == 0 {
dialTimeout = 500 * time.Millisecond
} else if dialTimeout > 5*time.Second {
dialTimeout = 5 * time.Second
}
return dialTimeout
}

func (conn *Connection) runReconnects() error {
dialTimeout := conn.getDialTimeout()
var reconnects uint
var err error

for conn.opts.MaxReconnects == 0 || reconnects <= conn.opts.MaxReconnects {
now := time.Now()

ctx, cancel := context.WithTimeout(context.Background(), dialTimeout)
err = conn.createConnection(ctx)
cancel()

if err != nil {
if clientErr, ok := err.(ClientError); ok &&
clientErr.Code == ErrConnectionClosed {
return err
}
} else {
return nil
}

conn.opts.Logger.Report(LogReconnectFailed, conn, reconnects, err)
conn.notify(ReconnectFailed)
reconnects++
conn.mutex.Unlock()

time.Sleep(time.Until(now.Add(conn.opts.Reconnect)))

conn.mutex.Lock()
}

conn.opts.Logger.Report(LogLastReconnectFailed, conn, err)
// mark connection as closed to avoid reopening by another goroutine
return ClientError{ErrConnectionClosed, "last reconnect failed"}
}

func (conn *Connection) reconnectImpl(neterr error, c Conn) {
if conn.opts.Reconnect > 0 {
if c == conn.c {
conn.closeConnection(neterr, false)
if err := conn.createConnection(true); err != nil {
if err := conn.runReconnects(); err != nil {
conn.closeConnection(err, true)
}
}
Expand Down
5 changes: 4 additions & 1 deletion crud/example_test.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package crud_test

import (
"context"
"fmt"
"reflect"
"time"
Expand All @@ -21,7 +22,9 @@ var exampleOpts = tarantool.Opts{
}

func exampleConnect() *tarantool.Connection {
conn, err := tarantool.Connect(exampleServer, exampleOpts)
ctx, cancel := context.WithTimeout(context.Background(), 500*time.Millisecond)
defer cancel()
conn, err := tarantool.Connect(ctx, exampleServer, exampleOpts)
if err != nil {
panic("Connection is not established: " + err.Error())
}
Expand Down
4 changes: 3 additions & 1 deletion crud/tarantool_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -108,7 +108,9 @@ var object = crud.MapObject{

func connect(t testing.TB) *tarantool.Connection {
for i := 0; i < 10; i++ {
conn, err := tarantool.Connect(server, opts)
ctx, cancel := test_helpers.GetConnectContext()
conn, err := tarantool.Connect(ctx, server, opts)
cancel()
if err != nil {
t.Fatalf("Failed to connect: %s", err)
}
Expand Down
5 changes: 4 additions & 1 deletion datetime/example_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
package datetime_test

import (
"context"
"fmt"
"time"

Expand All @@ -23,7 +24,9 @@ func Example() {
User: "test",
Pass: "test",
}
conn, err := tarantool.Connect("127.0.0.1:3013", opts)
ctx, cancel := context.WithTimeout(context.Background(), 500*time.Millisecond)
defer cancel()
conn, err := tarantool.Connect(ctx, "127.0.0.1:3013", opts)
if err != nil {
fmt.Printf("Error in connect is %v", err)
return
Expand Down
13 changes: 7 additions & 6 deletions decimal/example_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
package decimal_test

import (
"context"
"log"
"time"

Expand All @@ -22,13 +23,13 @@ import (
func Example() {
server := "127.0.0.1:3013"
opts := tarantool.Opts{
Timeout: 5 * time.Second,
Reconnect: 1 * time.Second,
MaxReconnects: 3,
User: "test",
Pass: "test",
Timeout: 5 * time.Second,
User: "test",
Pass: "test",
}
client, err := tarantool.Connect(server, opts)
ctx, cancel := context.WithTimeout(context.Background(), 500*time.Millisecond)
client, err := tarantool.Connect(ctx, server, opts)
cancel()
if err != nil {
log.Fatalf("Failed to connect: %s", err.Error())
}
Expand Down
Loading

0 comments on commit b402c58

Please sign in to comment.