Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

api: add context to connection create #333

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,13 @@ Versioning](http://semver.org/spec/v2.0.0.html) except to the first release.
decoded to a varbinary object (#313).
- Use objects of the Decimal type instead of pointers (#238)
- Use objects of the Datetime type instead of pointers (#238)
- `connection.Connect` no longer return non-working
connection objects (#136). This function now does not attempt to reconnect
and tries to establish a connection only once. Function might be canceled
via context. Context accepted as first argument.
`pool.Connect` and `pool.Add` now accept context as first argument, which
DifferentialOrange marked this conversation as resolved.
Show resolved Hide resolved
user may cancel in process. If `pool.Connect` is canceled in progress, an
error will be returned. All created connections will be closed.

### Deprecated

Expand Down
32 changes: 27 additions & 5 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -105,13 +105,19 @@ about what it does.
package tarantool

import (
"context"
"fmt"
"time"

"github.com/tarantool/go-tarantool/v2"
)

func main() {
opts := tarantool.Opts{User: "guest"}
conn, err := tarantool.Connect("127.0.0.1:3301", opts)
ctx, cancel := context.WithTimeout(context.Background(),
500 * time.Millisecond)
defer cancel()
conn, err := tarantool.Connect(ctx, "127.0.0.1:3301", opts)
if err != nil {
fmt.Println("Connection refused:", err)
}
Expand All @@ -134,11 +140,17 @@ username. The structure may also contain other settings, see more in
[documentation][godoc-opts-url] for the "`Opts`" structure.

**Observation 3:** The line containing "`tarantool.Connect`" is essential for
starting a session. There are two parameters:
starting a session. There are three parameters:

* a string with `host:port` format, and
* a context,
* a string with `host:port` format,
* the option structure that was set up earlier.

There will be only one attempt to connect. If multiple attempts needed,
"`tarantool.Connect`" could be placed inside the loop with some timeout
between each try. Example could be found in the [example_test](./example_test.go),
name - `ExampleConnect_reconnects`.

**Observation 4:** The `err` structure will be `nil` if there is no error,
otherwise it will have a description which can be retrieved with `err.Error()`.

Expand Down Expand Up @@ -167,10 +179,13 @@ The subpackage has been deleted. You could use `pool` instead.

#### pool package

The logic has not changed, but there are a few renames:

* The `connection_pool` subpackage has been renamed to `pool`.
* The type `PoolOpts` has been renamed to `Opts`.
* `pool.Connect` now accepts context as first argument, which user may cancel
in process. If it is canceled in progress, an error will be returned.
All created connections will be closed.
* `pool.Add` now accepts context as first argument, which user may cancel in
process.

#### msgpack.v5

Expand Down Expand Up @@ -212,6 +227,13 @@ IPROTO constants have been moved to a separate package [go-iproto](https://githu

* The method `Code() uint32` replaced by the `Type() iproto.Type`.

#### Connect function

`connection.Connect` no longer return non-working connection objects. This function
now does not attempt to reconnect and tries to establish a connection only once.
Function might be canceled via context. Context accepted as first argument,
and user may cancel it in process.

## Contributing

See [the contributing guide](CONTRIBUTING.md) for detailed instructions on how
Expand Down
125 changes: 61 additions & 64 deletions connection.go
Original file line number Diff line number Diff line change
Expand Up @@ -375,16 +375,7 @@ func (opts Opts) Clone() Opts {
// - Unix socket, first '/' or '.' indicates Unix socket
// (unix:///abs/path/tnt.sock, unix:path/tnt.sock, /abs/path/tnt.sock,
// ./rel/path/tnt.sock, unix/:path/tnt.sock)
//
// Notes:
//
// - If opts.Reconnect is zero (default), then connection either already connected
// or error is returned.
//
// - If opts.Reconnect is non-zero, then error will be returned only if authorization
// fails. But if Tarantool is not reachable, then it will make an attempt to reconnect later
// and will not finish to make attempts on authorization failures.
func Connect(addr string, opts Opts) (conn *Connection, err error) {
func Connect(ctx context.Context, addr string, opts Opts) (conn *Connection, err error) {
conn = &Connection{
addr: addr,
requestId: 0,
Expand Down Expand Up @@ -432,25 +423,8 @@ func Connect(addr string, opts Opts) (conn *Connection, err error) {

conn.cond = sync.NewCond(&conn.mutex)

if err = conn.createConnection(false); err != nil {
ter, ok := err.(Error)
if conn.opts.Reconnect <= 0 {
return nil, err
} else if ok && (ter.Code == iproto.ER_NO_SUCH_USER ||
ter.Code == iproto.ER_CREDS_MISMATCH) {
// Reported auth errors immediately.
return nil, err
} else {
// Without SkipSchema it is useless.
go func(conn *Connection) {
conn.mutex.Lock()
defer conn.mutex.Unlock()
if err := conn.createConnection(true); err != nil {
conn.closeConnection(err, true)
}
}(conn)
err = nil
}
if err = conn.createConnection(ctx); err != nil {
return nil, err
}

go conn.pinger()
Expand Down Expand Up @@ -534,18 +508,11 @@ func (conn *Connection) cancelFuture(fut *Future, err error) {
}
}

func (conn *Connection) dial() (err error) {
func (conn *Connection) dial(ctx context.Context) error {
opts := conn.opts
dialTimeout := opts.Reconnect / 2
if dialTimeout == 0 {
dialTimeout = 500 * time.Millisecond
} else if dialTimeout > 5*time.Second {
dialTimeout = 5 * time.Second
}

var c Conn
c, err = conn.opts.Dialer.Dial(conn.addr, DialOpts{
DialTimeout: dialTimeout,
c, err := conn.opts.Dialer.Dial(ctx, conn.addr, DialOpts{
IoTimeout: opts.Timeout,
Transport: opts.Transport,
Ssl: opts.Ssl,
Expand All @@ -555,7 +522,7 @@ func (conn *Connection) dial() (err error) {
Password: opts.Pass,
})
if err != nil {
return
return err
}

conn.Greeting.Version = c.Greeting().Version
Expand Down Expand Up @@ -605,7 +572,7 @@ func (conn *Connection) dial() (err error) {
conn.shutdownWatcher = watcher
}

return
return nil
}

func pack(h *smallWBuf, enc *msgpack.Encoder, reqid uint32,
Expand Down Expand Up @@ -658,34 +625,18 @@ func pack(h *smallWBuf, enc *msgpack.Encoder, reqid uint32,
return
}

func (conn *Connection) createConnection(reconnect bool) (err error) {
var reconnects uint
for conn.c == nil && conn.state == connDisconnected {
now := time.Now()
err = conn.dial()
if err == nil || !reconnect {
if err == nil {
conn.notify(Connected)
}
return
}
if conn.opts.MaxReconnects > 0 && reconnects > conn.opts.MaxReconnects {
conn.opts.Logger.Report(LogLastReconnectFailed, conn, err)
err = ClientError{ErrConnectionClosed, "last reconnect failed"}
// mark connection as closed to avoid reopening by another goroutine
return
func (conn *Connection) createConnection(ctx context.Context) error {
var err error
if conn.c == nil && conn.state == connDisconnected {
if err = conn.dial(ctx); err == nil {
conn.notify(Connected)
return nil
}
conn.opts.Logger.Report(LogReconnectFailed, conn, reconnects, err)
conn.notify(ReconnectFailed)
reconnects++
conn.mutex.Unlock()
time.Sleep(time.Until(now.Add(conn.opts.Reconnect)))
conn.mutex.Lock()
}
if conn.state == connClosed {
err = ClientError{ErrConnectionClosed, "using closed connection"}
}
return
return err
}

func (conn *Connection) closeConnection(neterr error, forever bool) (err error) {
Expand Down Expand Up @@ -727,11 +678,57 @@ func (conn *Connection) closeConnection(neterr error, forever bool) (err error)
return
}

func (conn *Connection) getDialTimeout() time.Duration {
dialTimeout := conn.opts.Reconnect / 2
if dialTimeout == 0 {
dialTimeout = 500 * time.Millisecond
} else if dialTimeout > 5*time.Second {
dialTimeout = 5 * time.Second
}
return dialTimeout
}

func (conn *Connection) runReconnects() error {
dialTimeout := conn.getDialTimeout()
var reconnects uint
var err error

for conn.opts.MaxReconnects == 0 || reconnects <= conn.opts.MaxReconnects {
now := time.Now()

ctx, cancel := context.WithTimeout(context.Background(), dialTimeout)
err = conn.createConnection(ctx)
cancel()

if err != nil {
if clientErr, ok := err.(ClientError); ok &&
clientErr.Code == ErrConnectionClosed {
return err
}
} else {
return nil
}

conn.opts.Logger.Report(LogReconnectFailed, conn, reconnects, err)
conn.notify(ReconnectFailed)
reconnects++
conn.mutex.Unlock()

time.Sleep(time.Until(now.Add(conn.opts.Reconnect)))

conn.mutex.Lock()
}

conn.opts.Logger.Report(LogLastReconnectFailed, conn, err)
// mark connection as closed to avoid reopening by another goroutine
return ClientError{ErrConnectionClosed, "last reconnect failed"}
}

func (conn *Connection) reconnectImpl(neterr error, c Conn) {
if conn.opts.Reconnect > 0 {
if c == conn.c {
conn.closeConnection(neterr, false)
if err := conn.createConnection(true); err != nil {
if err := conn.runReconnects(); err != nil {
conn.closeConnection(err, true)
}
}
Expand Down
5 changes: 4 additions & 1 deletion crud/example_test.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package crud_test

import (
"context"
"fmt"
"reflect"
"time"
Expand All @@ -21,7 +22,9 @@ var exampleOpts = tarantool.Opts{
}

func exampleConnect() *tarantool.Connection {
conn, err := tarantool.Connect(exampleServer, exampleOpts)
ctx, cancel := context.WithTimeout(context.Background(), 500*time.Millisecond)
defer cancel()
conn, err := tarantool.Connect(ctx, exampleServer, exampleOpts)
if err != nil {
panic("Connection is not established: " + err.Error())
}
Expand Down
4 changes: 3 additions & 1 deletion crud/tarantool_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -108,7 +108,9 @@ var object = crud.MapObject{

func connect(t testing.TB) *tarantool.Connection {
for i := 0; i < 10; i++ {
conn, err := tarantool.Connect(server, opts)
ctx, cancel := test_helpers.GetConnectContext()
conn, err := tarantool.Connect(ctx, server, opts)
DifferentialOrange marked this conversation as resolved.
Show resolved Hide resolved
cancel()
if err != nil {
t.Fatalf("Failed to connect: %s", err)
}
Expand Down
5 changes: 4 additions & 1 deletion datetime/example_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
package datetime_test

import (
"context"
"fmt"
"time"

Expand All @@ -23,7 +24,9 @@ func Example() {
User: "test",
Pass: "test",
}
conn, err := tarantool.Connect("127.0.0.1:3013", opts)
ctx, cancel := context.WithTimeout(context.Background(), 500*time.Millisecond)
defer cancel()
conn, err := tarantool.Connect(ctx, "127.0.0.1:3013", opts)
if err != nil {
fmt.Printf("Error in connect is %v", err)
return
Expand Down
13 changes: 7 additions & 6 deletions decimal/example_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
package decimal_test

import (
"context"
"log"
"time"

Expand All @@ -22,13 +23,13 @@ import (
func Example() {
server := "127.0.0.1:3013"
opts := tarantool.Opts{
Timeout: 5 * time.Second,
Reconnect: 1 * time.Second,
MaxReconnects: 3,
User: "test",
Pass: "test",
Timeout: 5 * time.Second,
User: "test",
Pass: "test",
}
client, err := tarantool.Connect(server, opts)
ctx, cancel := context.WithTimeout(context.Background(), 500*time.Millisecond)
client, err := tarantool.Connect(ctx, server, opts)
cancel()
if err != nil {
log.Fatalf("Failed to connect: %s", err.Error())
}
Expand Down
Loading
Loading