Skip to content

Commit

Permalink
queue: add an example with connection_pool
Browse files Browse the repository at this point in the history
The example demonstrates how to use the queue package with
the connection_pool package.

Closes #176
  • Loading branch information
oleg-jukovec committed Aug 25, 2022
1 parent e183bd8 commit 3752d5e
Show file tree
Hide file tree
Showing 3 changed files with 183 additions and 9 deletions.
12 changes: 4 additions & 8 deletions queue/config.lua
Original file line number Diff line number Diff line change
Expand Up @@ -23,21 +23,15 @@ box.cfg{
box.schema.func.create('queue.identify')
box.schema.func.create('queue.state')
box.schema.func.create('queue.statistics')
box.schema.user.grant('test', 'create', 'space')
box.schema.user.grant('test', 'write', 'space', '_schema')
box.schema.user.grant('test', 'write', 'space', '_space')
box.schema.user.grant('test', 'read,write', 'space', '_space_sequence')
box.schema.user.grant('test', 'write', 'space', '_index')
box.schema.user.grant('test', 'create,read,write,drop', 'space')
box.schema.user.grant('test', 'read, write', 'space', '_queue_session_ids')
box.schema.user.grant('test', 'execute', 'universe')
box.schema.user.grant('test', 'read,write', 'space', '_queue')
box.schema.user.grant('test', 'read,write', 'space', '_schema')
box.schema.user.grant('test', 'read,write', 'space', '_space_sequence')
box.schema.user.grant('test', 'read,write', 'space', '_space')
box.schema.user.grant('test', 'read,write', 'space', '_index')
box.schema.user.grant('test', 'read,write', 'space', '_queue_consumers')
box.schema.user.grant('test', 'read,write', 'space', '_priv')
box.schema.user.grant('test', 'read,write', 'space', '_queue_taken_2')
box.schema.user.grant('test', 'read,write', 'space', '_queue_shared_sessions')
if box.space._trigger ~= nil then
box.schema.user.grant('test', 'read', 'space', '_trigger')
end
Expand All @@ -56,3 +50,5 @@ end)
box.cfg{
listen = os.getenv("TEST_TNT_LISTEN"),
}

require('console').start()
141 changes: 141 additions & 0 deletions queue/example_connection_pool_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,141 @@
package queue_test

import (
"fmt"
"log"
"sync"
"time"

"github.com/google/uuid"
"github.com/tarantool/go-tarantool"
"github.com/tarantool/go-tarantool/connection_pool"
"github.com/tarantool/go-tarantool/queue"
)

type QueueConnectionListener struct {
name string
cfg queue.Cfg
queue queue.Queue

uuid uuid.UUID
registered bool
err error
mutex sync.Mutex
done chan struct{}
}

func NewQueueConnectionListener(name string, cfg queue.Cfg) *QueueConnectionListener {
return &QueueConnectionListener{
name: name,
cfg: cfg,
done: make(chan struct{}),
}
}

func (l *QueueConnectionListener) Added(conn *tarantool.Connection,
role connection_pool.Role) {
if role != connection_pool.MasterRole {
return
}

l.mutex.Lock()
defer l.mutex.Unlock()

if l.err != nil {
return
}

l.queue = queue.New(conn, l.name)
if !l.registered {
opts := queue.CfgOpts{InReplicaset: true, Ttr: 60 * time.Second}
if l.err = l.queue.Cfg(opts); l.err == nil {
if l.uuid, l.err = l.queue.Identify(nil); l.err == nil {
l.err = l.queue.Create(l.cfg)
}
}
l.registered = l.err == nil
close(l.done)
} else {
l.queue.Identify(&l.uuid)
l.queue.Create(l.cfg)
}
}

func (l *QueueConnectionListener) Removed(conn *tarantool.Connection,
role connection_pool.Role) {
return
}

func (l *QueueConnectionListener) Updated(conn *tarantool.Connection,
oldRole, newRole connection_pool.Role) {
return
}

// Example demonstrates how to use the queue package with the connection_pool
// package. First of all, you need to create a connection listener for the
// a ConnectionPool object to process new connections from RW-instances.
//
// You need to registry a shared session UUID at a first master connection.
// It needs to be used to re-identify as the shared session on new
// RW-instances. See QueueConnectionListener.Added() implementation.
//
// After that, you need to create a ConnectorAdapter object with RW mode for
// the ConnectionPool to send requests into RW-instances. This adapter can
// be used to create a ready-to-work queue object.
func Example_connectionPool() {
servers := []string{
"127.0.0.1:3014",
"127.0.0.1:3015",
"127.0.0.1:3016",
}
connOpts := tarantool.Opts{
Timeout: 500 * time.Millisecond,
User: "test",
Pass: "test",
}

cfg := queue.Cfg{
Temporary: false,
IfNotExists: true,
Kind: queue.FIFO,
Opts: queue.Opts{
Ttl: 10 * time.Second,
},
}
l := NewQueueConnectionListener("test_queue", cfg)
poolOpts := connection_pool.OptsPool{
CheckTimeout: 1 * time.Second,
ConnectionListener: l,
}
connPool, err := connection_pool.ConnectWithOpts(servers, connOpts, poolOpts)
if err != nil {
log.Fatalf("unable to connect to the pool: %s", err)
}

defer connPool.Close()

<-l.done

if l.err != nil {
log.Fatalf("unable to identify in the pool: %s", l.err)
}

rw := connection_pool.NewConnectorAdapter(connPool, connection_pool.RW)
q := queue.New(rw, "test_queue")

fmt.Println("A Queue object is ready to work.")

if _, err = q.Put("test_data"); err != nil {
log.Fatalf("unable to put data into the queue: %s", err)
}

if task, err := q.Take(); err != nil {
log.Fatalf("unable to take data from the queue: %s", err)
} else {
task.Ack()
fmt.Println("data:", task.Data())
}
// Output:
// A Queue object is ready to work.
// data: test_data
}
39 changes: 38 additions & 1 deletion queue/queue_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,14 @@ import (
)

var server = "127.0.0.1:3013"
var serversPool = []string{
"127.0.0.1:3014",
"127.0.0.1:3015",
"127.0.0.1:3016",
}

var instances []test_helpers.TarantoolInstance

var opts = Opts{
Timeout: 2500 * time.Millisecond,
User: "test",
Expand Down Expand Up @@ -899,12 +907,41 @@ func runTestMain(m *testing.M) int {
ConnectRetry: 3,
RetryTimeout: 500 * time.Millisecond,
})
defer test_helpers.StopTarantoolWithCleanup(inst)

if err != nil {
log.Fatalf("Failed to prepare test tarantool: %s", err)
}

defer test_helpers.StopTarantoolWithCleanup(inst)

workDirs := []string{"work_dir1", "work_dir2", "work_dir3"}
poolOpts := test_helpers.StartOpts{
InitScript: "config.lua",
User: opts.User,
Pass: opts.Pass,
WaitStart: 100 * time.Millisecond,
ConnectRetry: 3,
RetryTimeout: 500 * time.Millisecond,
}
instances, err = test_helpers.StartTarantoolInstances(serversPool, workDirs, poolOpts)

if err != nil {
log.Fatalf("Failed to prepare test tarantool pool: %s", err)
}

defer test_helpers.StopTarantoolInstances(instances)

roles := []bool{true, false, false}
connOpts := Opts{
Timeout: 500 * time.Millisecond,
User: "test",
Pass: "test",
}
err = test_helpers.SetClusterRO(serversPool, connOpts, roles)

if err != nil {
log.Fatalf("Failed to set roles in tarantool pool: %s", err)
}
return m.Run()
}

Expand Down

0 comments on commit 3752d5e

Please sign in to comment.