Skip to content

Commit

Permalink
queue: add an example with connection_pool
Browse files Browse the repository at this point in the history
The example demonstrates how to use the queue package with
the connection_pool package.

Closes #176
  • Loading branch information
oleg-jukovec committed Sep 23, 2022
1 parent 12bc5e2 commit 0132b18
Show file tree
Hide file tree
Showing 8 changed files with 313 additions and 16 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ Versioning](http://semver.org/spec/v2.0.0.html) except to the first release.
ConnectionPool (#178)
- Execute, ExecuteTyped and ExecuteAsync methods to ConnectionPool (#176)
- ConnectorAdapter type to use ConnectionPool as Connector interface (#176)
- An example how to use queue and connection_pool subpackages together (#176)

### Changed

Expand Down
2 changes: 1 addition & 1 deletion connection_pool/connection_pool_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2056,7 +2056,7 @@ func TestStream_TxnIsolationLevel(t *testing.T) {
func runTestMain(m *testing.M) int {
initScript := "config.lua"
waitStart := 100 * time.Millisecond
var connectRetry uint = 3
connectRetry := 3
retryTimeout := 500 * time.Millisecond
workDirs := []string{
"work_dir1", "work_dir2",
Expand Down
2 changes: 1 addition & 1 deletion multi/multi_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -556,7 +556,7 @@ func TestStream_Rollback(t *testing.T) {
func runTestMain(m *testing.M) int {
initScript := "config.lua"
waitStart := 100 * time.Millisecond
var connectRetry uint = 3
connectRetry := 3
retryTimeout := 500 * time.Millisecond

// Tarantool supports streams and interactive transactions since version 2.10.0
Expand Down
208 changes: 208 additions & 0 deletions queue/example_connection_pool_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,208 @@
package queue_test

import (
"fmt"
"sync"
"time"

"github.com/google/uuid"
"github.com/tarantool/go-tarantool"
"github.com/tarantool/go-tarantool/connection_pool"
"github.com/tarantool/go-tarantool/queue"
"github.com/tarantool/go-tarantool/test_helpers"
)

// QueueConnectionHandler handles new connections in a ConnectionPool.
type QueueConnectionHandler struct {
name string
cfg queue.Cfg

uuid uuid.UUID
registered bool
err error
mutex sync.Mutex
masterUpdated chan struct{}
}

// QueueConnectionHandler implements the ConnectionHandler interface.
var _ connection_pool.ConnectionHandler = &QueueConnectionHandler{}

// NewQueueConnectionHandler creates a QueueConnectionHandler object.
func NewQueueConnectionHandler(name string, cfg queue.Cfg) *QueueConnectionHandler {
return &QueueConnectionHandler{
name: name,
cfg: cfg,
masterUpdated: make(chan struct{}, 10),
}
}

// Discovered configures a queue for an instance and identifies a shared queue
// session on master instances.
//
// NOTE: the Queue supports only a master-replica cluster configuration. It
// does not support a master-master configuration.
func (h *QueueConnectionHandler) Discovered(conn *tarantool.Connection,
role connection_pool.Role) error {
h.mutex.Lock()
defer h.mutex.Unlock()

if h.err != nil {
return h.err
}

master := role == connection_pool.MasterRole
if master {
defer func() {
h.masterUpdated <- struct{}{}
}()
}

// Set up a queue module configuration for an instance.
q := queue.New(conn, h.name)
opts := queue.CfgOpts{InReplicaset: true, Ttr: 60 * time.Second}

if h.err = q.Cfg(opts); h.err != nil {
return fmt.Errorf("unable to configure queue: %w", h.err)
}

// The queue only works with a master instance.
if !master {
return nil
}

if h.err = q.Create(h.cfg); h.err != nil {
return h.err
}

if !h.registered {
// We register a shared session at the first time.
if h.uuid, h.err = q.Identify(nil); h.err != nil {
return h.err
}
h.registered = true
} else {
// We re-identify as the shared session.
if _, h.err = q.Identify(&h.uuid); h.err != nil {
return h.err
}
}

fmt.Printf("Master %s is ready to work!\n", conn.Addr())

return nil
}

// Deactivated doesn't do anything useful for the example.
func (h *QueueConnectionHandler) Deactivated(conn *tarantool.Connection,
role connection_pool.Role) error {
return nil
}

// Closes closes a QueueConnectionHandler object.
func (h *QueueConnectionHandler) Close() {
close(h.masterUpdated)
}

// Example demonstrates how to use the queue package with the connection_pool
// package. First of all, you need to create a ConnectionHandler implementation
// for the a ConnectionPool object to process new connections from
// RW-instances.
//
// You need to register a shared session UUID at a first master connection.
// It needs to be used to re-identify as the shared session on new
// RW-instances. See QueueConnectionHandler.Discovered() implementation.
//
// After that, you need to create a ConnectorAdapter object with RW mode for
// the ConnectionPool to send requests into RW-instances. This adapter can
// be used to create a ready-to-work queue object.
func Example_connectionPool() {
// Create a ConnectionHandler object.
cfg := queue.Cfg{
Temporary: false,
IfNotExists: true,
Kind: queue.FIFO,
Opts: queue.Opts{
Ttl: 10 * time.Second,
},
}
h := NewQueueConnectionHandler("test_queue", cfg)
defer h.Close()

// Create a ConnectionPool object.
servers := []string{
"127.0.0.1:3014",
"127.0.0.1:3015",
}
connOpts := tarantool.Opts{
Timeout: 1 * time.Second,
User: "test",
Pass: "test",
}
poolOpts := connection_pool.OptsPool{
CheckTimeout: 1 * time.Second,
ConnectionHandler: h,
}
connPool, err := connection_pool.ConnectWithOpts(servers, connOpts, poolOpts)
if err != nil {
fmt.Printf("Unable to connect to the pool: %s", err)
return
}
defer connPool.Close()

// Wait for a master instance identification in the queue.
<-h.masterUpdated
if h.err != nil {
fmt.Printf("Unable to identify in the pool: %s", h.err)
return
}

// Create a Queue object from the ConnectionPool object via
// a ConnectorAdapter.
rw := connection_pool.NewConnectorAdapter(connPool, connection_pool.RW)
q := queue.New(rw, "test_queue")
fmt.Println("A Queue object is ready to work.")

testData := "test_data"
fmt.Println("Send data:", testData)
if _, err = q.Put(testData); err != nil {
fmt.Printf("Unable to put data into the queue: %s", err)
return
}

// Switch a master instance in the pool.
roles := []bool{true, false}
err = test_helpers.SetClusterRO(servers, connOpts, roles)
if err != nil {
fmt.Printf("Unable to set cluster roles: %s", err)
return
}

// Wait for a new master instance re-identification.
<-h.masterUpdated
if h.err != nil {
fmt.Printf("Unable to re-identify in the pool: %s", h.err)
return
}

// Take a data from the new master instance.
task, err := q.TakeTimeout(1 * time.Second)
if err == nil && task != nil && task.Data() != nil {
task.Ack()
fmt.Println("Got data:", task.Data())
} else {
if err != nil {
fmt.Println("Unable to got task:", err)
} else if task == nil {
fmt.Println("task == nil")
} else {
fmt.Println("task.Data() == nil")
}
}

// Output:
// Master 127.0.0.1:3014 is ready to work!
// A Queue object is ready to work.
// Send data: test_data
// Master 127.0.0.1:3015 is ready to work!
// Got data: test_data
}
39 changes: 37 additions & 2 deletions queue/queue_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,13 @@ import (
)

var server = "127.0.0.1:3013"
var serversPool = []string{
"127.0.0.1:3014",
"127.0.0.1:3015",
}

var instances []test_helpers.TarantoolInstance

var opts = Opts{
Timeout: 2500 * time.Millisecond,
User: "test",
Expand Down Expand Up @@ -890,7 +897,7 @@ func TestTask_Touch(t *testing.T) {
// https://stackoverflow.com/questions/27629380/how-to-exit-a-go-program-honoring-deferred-calls
func runTestMain(m *testing.M) int {
inst, err := test_helpers.StartTarantool(test_helpers.StartOpts{
InitScript: "config.lua",
InitScript: "testdata/config.lua",
Listen: server,
WorkDir: "work_dir",
User: opts.User,
Expand All @@ -899,12 +906,40 @@ func runTestMain(m *testing.M) int {
ConnectRetry: 3,
RetryTimeout: 500 * time.Millisecond,
})
defer test_helpers.StopTarantoolWithCleanup(inst)

if err != nil {
log.Fatalf("Failed to prepare test tarantool: %s", err)
}

defer test_helpers.StopTarantoolWithCleanup(inst)

workDirs := []string{"work_dir1", "work_dir2"}
poolOpts := test_helpers.StartOpts{
InitScript: "testdata/pool.lua",
User: opts.User,
Pass: opts.Pass,
WaitStart: 3 * time.Second, // replication_timeout * 3
ConnectRetry: -1,
}
instances, err = test_helpers.StartTarantoolInstances(serversPool, workDirs, poolOpts)

if err != nil {
log.Fatalf("Failed to prepare test tarantool pool: %s", err)
}

defer test_helpers.StopTarantoolInstances(instances)

roles := []bool{false, true}
connOpts := Opts{
Timeout: 500 * time.Millisecond,
User: "test",
Pass: "test",
}
err = test_helpers.SetClusterRO(serversPool, connOpts, roles)

if err != nil {
log.Fatalf("Failed to set roles in tarantool pool: %s", err)
}
return m.Run()
}

Expand Down
14 changes: 5 additions & 9 deletions queue/config.lua → queue/testdata/config.lua
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ box.cfg{
work_dir = os.getenv("TEST_TNT_WORK_DIR"),
}

box.once("init", function()
box.once("init", function()
box.schema.user.create('test', {password = 'test'})
box.schema.func.create('queue.tube.test_queue:touch')
box.schema.func.create('queue.tube.test_queue:ack')
Expand All @@ -23,21 +23,15 @@ box.cfg{
box.schema.func.create('queue.identify')
box.schema.func.create('queue.state')
box.schema.func.create('queue.statistics')
box.schema.user.grant('test', 'create', 'space')
box.schema.user.grant('test', 'write', 'space', '_schema')
box.schema.user.grant('test', 'write', 'space', '_space')
box.schema.user.grant('test', 'read,write', 'space', '_space_sequence')
box.schema.user.grant('test', 'write', 'space', '_index')
box.schema.user.grant('test', 'create,read,write,drop', 'space')
box.schema.user.grant('test', 'read, write', 'space', '_queue_session_ids')
box.schema.user.grant('test', 'execute', 'universe')
box.schema.user.grant('test', 'read,write', 'space', '_queue')
box.schema.user.grant('test', 'read,write', 'space', '_schema')
box.schema.user.grant('test', 'read,write', 'space', '_space_sequence')
box.schema.user.grant('test', 'read,write', 'space', '_space')
box.schema.user.grant('test', 'read,write', 'space', '_index')
box.schema.user.grant('test', 'read,write', 'space', '_queue_consumers')
box.schema.user.grant('test', 'read,write', 'space', '_priv')
box.schema.user.grant('test', 'read,write', 'space', '_queue_taken_2')
box.schema.user.grant('test', 'read,write', 'space', '_queue_shared_sessions')
if box.space._trigger ~= nil then
box.schema.user.grant('test', 'read', 'space', '_trigger')
end
Expand All @@ -56,3 +50,5 @@ end)
box.cfg{
listen = os.getenv("TEST_TNT_LISTEN"),
}

require('console').start()
56 changes: 56 additions & 0 deletions queue/testdata/pool.lua
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
local queue = require('queue')
rawset(_G, 'queue', queue)

local listen = os.getenv("TEST_TNT_LISTEN")
box.cfg{
work_dir = os.getenv("TEST_TNT_WORK_DIR"),
listen = listen,
replication = {
"test:test@127.0.0.1:3014",
"test:test@127.0.0.1:3015",
},
read_only = listen == "127.0.0.1:3015"
}

box.once("schema", function()
box.schema.user.create('test', {password = 'test'})
box.schema.user.grant('test', 'replication')

box.schema.func.create('queue.tube.test_queue:touch')
box.schema.func.create('queue.tube.test_queue:ack')
box.schema.func.create('queue.tube.test_queue:put')
box.schema.func.create('queue.tube.test_queue:drop')
box.schema.func.create('queue.tube.test_queue:peek')
box.schema.func.create('queue.tube.test_queue:kick')
box.schema.func.create('queue.tube.test_queue:take')
box.schema.func.create('queue.tube.test_queue:delete')
box.schema.func.create('queue.tube.test_queue:release')
box.schema.func.create('queue.tube.test_queue:release_all')
box.schema.func.create('queue.tube.test_queue:bury')
box.schema.func.create('queue.identify')
box.schema.func.create('queue.state')
box.schema.func.create('queue.statistics')
box.schema.user.grant('test', 'create,read,write,drop', 'space')
box.schema.user.grant('test', 'read, write', 'space', '_queue_session_ids')
box.schema.user.grant('test', 'execute', 'universe')
box.schema.user.grant('test', 'read,write', 'space', '_queue')
box.schema.user.grant('test', 'read,write', 'space', '_schema')
box.schema.user.grant('test', 'read,write', 'space', '_space_sequence')
box.schema.user.grant('test', 'read,write', 'space', '_space')
box.schema.user.grant('test', 'read,write', 'space', '_index')
box.schema.user.grant('test', 'read,write', 'space', '_priv')
if box.space._trigger ~= nil then
box.schema.user.grant('test', 'read', 'space', '_trigger')
end
if box.space._fk_constraint ~= nil then
box.schema.user.grant('test', 'read', 'space', '_fk_constraint')
end
if box.space._ck_constraint ~= nil then
box.schema.user.grant('test', 'read', 'space', '_ck_constraint')
end
if box.space._func_index ~= nil then
box.schema.user.grant('test', 'read', 'space', '_func_index')
end
end)

require('console').start()
Loading

0 comments on commit 0132b18

Please sign in to comment.