Skip to content

Commit

Permalink
queue: add an example with connection_pool
Browse files Browse the repository at this point in the history
The example demonstrates how to use the queue package with
the connection_pool package.

Closes #176
  • Loading branch information
oleg-jukovec committed Sep 13, 2022
1 parent 3a0809c commit d0314fe
Show file tree
Hide file tree
Showing 4 changed files with 191 additions and 9 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ Versioning](http://semver.org/spec/v2.0.0.html) except to the first release.
ConnectionPool (#178)
- Execute, ExecuteTyped and ExecuteAsync methods to ConnectionPool (#176)
- ConnectorAdapter type to use ConnectionPool as Connector interface (#176)
- An example how to use queue and connection_pool subpackages together (#176)

### Changed

Expand Down
12 changes: 4 additions & 8 deletions queue/config.lua
Original file line number Diff line number Diff line change
Expand Up @@ -23,21 +23,15 @@ box.cfg{
box.schema.func.create('queue.identify')
box.schema.func.create('queue.state')
box.schema.func.create('queue.statistics')
box.schema.user.grant('test', 'create', 'space')
box.schema.user.grant('test', 'write', 'space', '_schema')
box.schema.user.grant('test', 'write', 'space', '_space')
box.schema.user.grant('test', 'read,write', 'space', '_space_sequence')
box.schema.user.grant('test', 'write', 'space', '_index')
box.schema.user.grant('test', 'create,read,write,drop', 'space')
box.schema.user.grant('test', 'read, write', 'space', '_queue_session_ids')
box.schema.user.grant('test', 'execute', 'universe')
box.schema.user.grant('test', 'read,write', 'space', '_queue')
box.schema.user.grant('test', 'read,write', 'space', '_schema')
box.schema.user.grant('test', 'read,write', 'space', '_space_sequence')
box.schema.user.grant('test', 'read,write', 'space', '_space')
box.schema.user.grant('test', 'read,write', 'space', '_index')
box.schema.user.grant('test', 'read,write', 'space', '_queue_consumers')
box.schema.user.grant('test', 'read,write', 'space', '_priv')
box.schema.user.grant('test', 'read,write', 'space', '_queue_taken_2')
box.schema.user.grant('test', 'read,write', 'space', '_queue_shared_sessions')
if box.space._trigger ~= nil then
box.schema.user.grant('test', 'read', 'space', '_trigger')
end
Expand All @@ -56,3 +50,5 @@ end)
box.cfg{
listen = os.getenv("TEST_TNT_LISTEN"),
}

require('console').start()
148 changes: 148 additions & 0 deletions queue/example_connection_pool_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,148 @@
package queue_test

import (
"fmt"
"log"
"sync"
"time"

"github.com/google/uuid"
"github.com/tarantool/go-tarantool"
"github.com/tarantool/go-tarantool/connection_pool"
"github.com/tarantool/go-tarantool/queue"
)

type QueueConnectionHandler struct {
name string
cfg queue.Cfg

uuid uuid.UUID
registered bool
err error
registerMutex sync.Mutex
done chan struct{}
}

var _ connection_pool.ConnectionHandler = &QueueConnectionHandler{}

func NewQueueConnectionHandler(name string, cfg queue.Cfg) *QueueConnectionHandler {
return &QueueConnectionHandler{
name: name,
cfg: cfg,
done: make(chan struct{}),
}
}

func (h *QueueConnectionHandler) Discovered(conn *tarantool.Connection,
role connection_pool.Role) error {
// The queue only works with a master instance.
if role != connection_pool.MasterRole {
return nil
}

// We need to register a shared session only once.
h.registerMutex.Lock()

if h.err != nil {
h.registerMutex.Unlock()
return h.err
}

q := queue.New(conn, h.name)
if !h.registered {
opts := queue.CfgOpts{InReplicaset: true, Ttr: 60 * time.Second}
if h.err = q.Cfg(opts); h.err == nil {
if h.uuid, h.err = q.Identify(nil); h.err == nil {
h.err = q.Create(h.cfg)
}
}
h.registered = h.err == nil
close(h.done)
h.registerMutex.Unlock()
return h.err
} else {
// We already registered. We don't need the lock any more.
h.registerMutex.Unlock()
if _, err := q.Identify(&h.uuid); err != nil {
return err
}
if err := q.Create(h.cfg); err != nil {
return err
}
}
return nil
}

func (h *QueueConnectionHandler) Deactivated(conn *tarantool.Connection,
role connection_pool.Role) error {
return nil
}

// Example demonstrates how to use the queue package with the connection_pool
// package. First of all, you need to create a connection handler for the
// a ConnectionPool object to process new connections from RW-instances.
//
// You need to register a shared session UUID at a first master connection.
// It needs to be used to re-identify as the shared session on new
// RW-instances. See QueueConnectionHandler.Discovered() implementation.
//
// After that, you need to create a ConnectorAdapter object with RW mode for
// the ConnectionPool to send requests into RW-instances. This adapter can
// be used to create a ready-to-work queue object.
func Example_connectionPool() {
servers := []string{
"127.0.0.1:3014",
"127.0.0.1:3015",
"127.0.0.1:3016",
}
connOpts := tarantool.Opts{
Timeout: 500 * time.Millisecond,
User: "test",
Pass: "test",
}

cfg := queue.Cfg{
Temporary: false,
IfNotExists: true,
Kind: queue.FIFO,
Opts: queue.Opts{
Ttl: 10 * time.Second,
},
}
h := NewQueueConnectionHandler("test_queue", cfg)
poolOpts := connection_pool.OptsPool{
CheckTimeout: 1 * time.Second,
ConnectionHandler: h,
}
connPool, err := connection_pool.ConnectWithOpts(servers, connOpts, poolOpts)
if err != nil {
log.Fatalf("unable to connect to the pool: %s", err)
}

defer connPool.Close()

<-h.done

if h.err != nil {
log.Fatalf("unable to identify in the pool: %s", h.err)
}

rw := connection_pool.NewConnectorAdapter(connPool, connection_pool.RW)
q := queue.New(rw, "test_queue")

fmt.Println("A Queue object is ready to work.")

if _, err = q.Put("test_data"); err != nil {
log.Fatalf("unable to put data into the queue: %s", err)
}

if task, err := q.Take(); err != nil {
log.Fatalf("unable to take data from the queue: %s", err)
} else {
task.Ack()
fmt.Println("data:", task.Data())
}
// Output:
// A Queue object is ready to work.
// data: test_data
}
39 changes: 38 additions & 1 deletion queue/queue_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,14 @@ import (
)

var server = "127.0.0.1:3013"
var serversPool = []string{
"127.0.0.1:3014",
"127.0.0.1:3015",
"127.0.0.1:3016",
}

var instances []test_helpers.TarantoolInstance

var opts = Opts{
Timeout: 2500 * time.Millisecond,
User: "test",
Expand Down Expand Up @@ -899,12 +907,41 @@ func runTestMain(m *testing.M) int {
ConnectRetry: 3,
RetryTimeout: 500 * time.Millisecond,
})
defer test_helpers.StopTarantoolWithCleanup(inst)

if err != nil {
log.Fatalf("Failed to prepare test tarantool: %s", err)
}

defer test_helpers.StopTarantoolWithCleanup(inst)

workDirs := []string{"work_dir1", "work_dir2", "work_dir3"}
poolOpts := test_helpers.StartOpts{
InitScript: "config.lua",
User: opts.User,
Pass: opts.Pass,
WaitStart: 100 * time.Millisecond,
ConnectRetry: 3,
RetryTimeout: 500 * time.Millisecond,
}
instances, err = test_helpers.StartTarantoolInstances(serversPool, workDirs, poolOpts)

if err != nil {
log.Fatalf("Failed to prepare test tarantool pool: %s", err)
}

defer test_helpers.StopTarantoolInstances(instances)

roles := []bool{false, true, true}
connOpts := Opts{
Timeout: 500 * time.Millisecond,
User: "test",
Pass: "test",
}
err = test_helpers.SetClusterRO(serversPool, connOpts, roles)

if err != nil {
log.Fatalf("Failed to set roles in tarantool pool: %s", err)
}
return m.Run()
}

Expand Down

0 comments on commit d0314fe

Please sign in to comment.