Skip to content

Commit

Permalink
queue: add an example with connection_pool
Browse files Browse the repository at this point in the history
The example demonstrates how to use the queue package with
the connection_pool package.

Closes #176
  • Loading branch information
oleg-jukovec committed Sep 20, 2022
1 parent 840e3ed commit ef7fbd3
Show file tree
Hide file tree
Showing 5 changed files with 282 additions and 10 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ Versioning](http://semver.org/spec/v2.0.0.html) except to the first release.
ConnectionPool (#178)
- Execute, ExecuteTyped and ExecuteAsync methods to ConnectionPool (#176)
- ConnectorAdapter type to use ConnectionPool as Connector interface (#176)
- An example how to use queue and connection_pool subpackages together (#176)

### Changed

Expand Down
14 changes: 5 additions & 9 deletions queue/config.lua
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ box.cfg{
work_dir = os.getenv("TEST_TNT_WORK_DIR"),
}

box.once("init", function()
box.once("init", function()
box.schema.user.create('test', {password = 'test'})
box.schema.func.create('queue.tube.test_queue:touch')
box.schema.func.create('queue.tube.test_queue:ack')
Expand All @@ -23,21 +23,15 @@ box.cfg{
box.schema.func.create('queue.identify')
box.schema.func.create('queue.state')
box.schema.func.create('queue.statistics')
box.schema.user.grant('test', 'create', 'space')
box.schema.user.grant('test', 'write', 'space', '_schema')
box.schema.user.grant('test', 'write', 'space', '_space')
box.schema.user.grant('test', 'read,write', 'space', '_space_sequence')
box.schema.user.grant('test', 'write', 'space', '_index')
box.schema.user.grant('test', 'create,read,write,drop', 'space')
box.schema.user.grant('test', 'read, write', 'space', '_queue_session_ids')
box.schema.user.grant('test', 'execute', 'universe')
box.schema.user.grant('test', 'read,write', 'space', '_queue')
box.schema.user.grant('test', 'read,write', 'space', '_schema')
box.schema.user.grant('test', 'read,write', 'space', '_space_sequence')
box.schema.user.grant('test', 'read,write', 'space', '_space')
box.schema.user.grant('test', 'read,write', 'space', '_index')
box.schema.user.grant('test', 'read,write', 'space', '_queue_consumers')
box.schema.user.grant('test', 'read,write', 'space', '_priv')
box.schema.user.grant('test', 'read,write', 'space', '_queue_taken_2')
box.schema.user.grant('test', 'read,write', 'space', '_queue_shared_sessions')
if box.space._trigger ~= nil then
box.schema.user.grant('test', 'read', 'space', '_trigger')
end
Expand All @@ -56,3 +50,5 @@ end)
box.cfg{
listen = os.getenv("TEST_TNT_LISTEN"),
}

require('console').start()
68 changes: 68 additions & 0 deletions queue/example_connection_pool.lua
Original file line number Diff line number Diff line change
@@ -0,0 +1,68 @@
local queue = require('queue')
rawset(_G, 'queue', queue)

local allowed = {"localhost:3016", "localhost:3017"}
local listen = os.getenv("TEST_TNT_LISTEN")

local founded = false
local replication = {}
for i, l in ipairs(allowed) do
if l == listen then
founded = true
end
replication[i] = "test:test@" .. l
end

if not founded then
print("Invalid TEST_TNT_LISTEN")
os.exit()
end

-- Set listen only when every other thing is configured.
box.cfg{
listen = listen,
replication = replication,
read_only = listen == allowed[1],
}

box.once("init", function()
box.schema.user.create('test', {password = 'test'})
box.schema.func.create('queue.tube.test_queue:touch')
box.schema.func.create('queue.tube.test_queue:ack')
box.schema.func.create('queue.tube.test_queue:put')
box.schema.func.create('queue.tube.test_queue:drop')
box.schema.func.create('queue.tube.test_queue:peek')
box.schema.func.create('queue.tube.test_queue:kick')
box.schema.func.create('queue.tube.test_queue:take')
box.schema.func.create('queue.tube.test_queue:delete')
box.schema.func.create('queue.tube.test_queue:release')
box.schema.func.create('queue.tube.test_queue:release_all')
box.schema.func.create('queue.tube.test_queue:bury')
box.schema.func.create('queue.identify')
box.schema.func.create('queue.state')
box.schema.func.create('queue.statistics')
box.schema.user.grant('test', 'replication')
box.schema.user.grant('test', 'create,read,write,drop', 'space')
box.schema.user.grant('test', 'read, write', 'space', '_queue_session_ids')
box.schema.user.grant('test', 'execute', 'universe')
box.schema.user.grant('test', 'read,write', 'space', '_queue')
box.schema.user.grant('test', 'read,write', 'space', '_schema')
box.schema.user.grant('test', 'read,write', 'space', '_space_sequence')
box.schema.user.grant('test', 'read,write', 'space', '_space')
box.schema.user.grant('test', 'read,write', 'space', '_index')
box.schema.user.grant('test', 'read,write', 'space', '_priv')
if box.space._trigger ~= nil then
box.schema.user.grant('test', 'read', 'space', '_trigger')
end
if box.space._fk_constraint ~= nil then
box.schema.user.grant('test', 'read', 'space', '_fk_constraint')
end
if box.space._ck_constraint ~= nil then
box.schema.user.grant('test', 'read', 'space', '_ck_constraint')
end
if box.space._func_index ~= nil then
box.schema.user.grant('test', 'read', 'space', '_func_index')
end
end)

require('console').start()
171 changes: 171 additions & 0 deletions queue/example_connection_pool_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,171 @@
package queue_test

import (
"fmt"
"log"
"sync"
"time"

"github.com/google/uuid"
"github.com/tarantool/go-tarantool"
"github.com/tarantool/go-tarantool/connection_pool"
"github.com/tarantool/go-tarantool/queue"
)

type QueueConnectionHandler struct {
name string
cfg queue.Cfg

uuid uuid.UUID
registered bool
err error
registerMutex sync.Mutex
done chan struct{}
}

var _ connection_pool.ConnectionHandler = &QueueConnectionHandler{}

func NewQueueConnectionHandler(name string, cfg queue.Cfg) *QueueConnectionHandler {
return &QueueConnectionHandler{
name: name,
cfg: cfg,
done: make(chan struct{}),
}
}

func (h *QueueConnectionHandler) Discovered(conn *tarantool.Connection,
role connection_pool.Role) error {
// The queue only works with a master instance.
if role != connection_pool.MasterRole {
return nil
}

// We need to register a shared session only once.
h.registerMutex.Lock()

if h.err != nil {
h.registerMutex.Unlock()
return h.err
}

q := queue.New(conn, h.name)
opts := queue.CfgOpts{InReplicaset: true, Ttr: 60 * time.Second}
if !h.registered {
if h.err = q.Cfg(opts); h.err == nil {
if h.uuid, h.err = q.Identify(nil); h.err == nil {
h.err = q.Create(h.cfg)
}
}
h.registered = h.err == nil
close(h.done)
h.registerMutex.Unlock()
return h.err
} else {
// We already registered. We don't need the lock any more.
h.registerMutex.Unlock()
if err := q.Cfg(opts); err == nil {
return err
}
if _, err := q.Identify(&h.uuid); err != nil {
return err
}
if err := q.Create(h.cfg); err != nil {
return err
}
}
return nil
}

func (h *QueueConnectionHandler) Deactivated(conn *tarantool.Connection,
role connection_pool.Role) error {
return nil
}

// Example demonstrates how to use the queue package with the connection_pool
// package. First of all, you need to create a connection handler for the
// a ConnectionPool object to process new connections from RW-instances.
//
// You need to register a shared session UUID at a first master connection.
// It needs to be used to re-identify as the shared session on new
// RW-instances. See QueueConnectionHandler.Discovered() implementation.
//
// After that, you need to create a ConnectorAdapter object with RW mode for
// the ConnectionPool to send requests into RW-instances. This adapter can
// be used to create a ready-to-work queue object.
func Example_connectionPool() {
// You could to use example_connection_pool.lua to play with the code
// manually. You need to start a master and a replica instance first.
//
// 1 terminal:
// $ mkdir master && cd master
// $ TEST_TNT_LISTEN="localhost:3017" tarantool ../example_connection_pool.lua
//
// 2 terminal:
// $ mkdir replica && cd replica
// $ TEST_TNT_LISTEN="localhost:3017" tarantool ../example_connection_pool.lua
//
// Then you need to update connection pool servers to:
//
// servers := []string{
// "127.0.0.1:3016",
// "127.0.0.1:3017",
// }
//
// After it you could run the sample:
//
// $ go test . -v -run Example_connectionPool
servers := []string{
"127.0.0.1:3014",
"127.0.0.1:3015",
}
connOpts := tarantool.Opts{
Timeout: 500 * time.Millisecond,
User: "test",
Pass: "test",
}

cfg := queue.Cfg{
Temporary: false,
IfNotExists: true,
Kind: queue.FIFO,
Opts: queue.Opts{
Ttl: 10 * time.Second,
},
}
h := NewQueueConnectionHandler("test_queue", cfg)
poolOpts := connection_pool.OptsPool{
CheckTimeout: 1 * time.Second,
ConnectionHandler: h,
}
connPool, err := connection_pool.ConnectWithOpts(servers, connOpts, poolOpts)
if err != nil {
log.Fatalf("unable to connect to the pool: %s", err)
}

defer connPool.Close()

<-h.done

if h.err != nil {
log.Fatalf("unable to identify in the pool: %s", h.err)
}

rw := connection_pool.NewConnectorAdapter(connPool, connection_pool.RW)
q := queue.New(rw, "test_queue")

fmt.Println("A Queue object is ready to work.")

if _, err = q.Put("test_data"); err != nil {
log.Fatalf("unable to put data into the queue: %s", err)
}

if task, err := q.Take(); err != nil {
log.Fatalf("unable to take data from the queue: %s", err)
} else {
task.Ack()
fmt.Println("data:", task.Data())
}
// Output:
// A Queue object is ready to work.
// data: test_data
}
38 changes: 37 additions & 1 deletion queue/queue_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,13 @@ import (
)

var server = "127.0.0.1:3013"
var serversPool = []string{
"127.0.0.1:3014",
"127.0.0.1:3015",
}

var instances []test_helpers.TarantoolInstance

var opts = Opts{
Timeout: 2500 * time.Millisecond,
User: "test",
Expand Down Expand Up @@ -899,12 +906,41 @@ func runTestMain(m *testing.M) int {
ConnectRetry: 3,
RetryTimeout: 500 * time.Millisecond,
})
defer test_helpers.StopTarantoolWithCleanup(inst)

if err != nil {
log.Fatalf("Failed to prepare test tarantool: %s", err)
}

defer test_helpers.StopTarantoolWithCleanup(inst)

workDirs := []string{"work_dir1", "work_dir2"}
poolOpts := test_helpers.StartOpts{
InitScript: "config.lua",
User: opts.User,
Pass: opts.Pass,
WaitStart: 100 * time.Millisecond,
ConnectRetry: 3,
RetryTimeout: 500 * time.Millisecond,
}
instances, err = test_helpers.StartTarantoolInstances(serversPool, workDirs, poolOpts)

if err != nil {
log.Fatalf("Failed to prepare test tarantool pool: %s", err)
}

defer test_helpers.StopTarantoolInstances(instances)

roles := []bool{false, true}
connOpts := Opts{
Timeout: 500 * time.Millisecond,
User: "test",
Pass: "test",
}
err = test_helpers.SetClusterRO(serversPool, connOpts, roles)

if err != nil {
log.Fatalf("Failed to set roles in tarantool pool: %s", err)
}
return m.Run()
}

Expand Down

0 comments on commit ef7fbd3

Please sign in to comment.