From ef7fbd3e580b84a013fac213b283e8006a05afe9 Mon Sep 17 00:00:00 2001 From: Oleg Jukovec Date: Thu, 25 Aug 2022 13:49:13 +0300 Subject: [PATCH] queue: add an example with connection_pool The example demonstrates how to use the queue package with the connection_pool package. Closes #176 --- CHANGELOG.md | 1 + queue/config.lua | 14 +-- queue/example_connection_pool.lua | 68 ++++++++++ queue/example_connection_pool_test.go | 171 ++++++++++++++++++++++++++ queue/queue_test.go | 38 +++++- 5 files changed, 282 insertions(+), 10 deletions(-) create mode 100644 queue/example_connection_pool.lua create mode 100644 queue/example_connection_pool_test.go diff --git a/CHANGELOG.md b/CHANGELOG.md index d30807f0d..1d7fc8bcc 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -15,6 +15,7 @@ Versioning](http://semver.org/spec/v2.0.0.html) except to the first release. ConnectionPool (#178) - Execute, ExecuteTyped and ExecuteAsync methods to ConnectionPool (#176) - ConnectorAdapter type to use ConnectionPool as Connector interface (#176) +- An example how to use queue and connection_pool subpackages together (#176) ### Changed diff --git a/queue/config.lua b/queue/config.lua index df28496a3..eccb19a68 100644 --- a/queue/config.lua +++ b/queue/config.lua @@ -7,7 +7,7 @@ box.cfg{ work_dir = os.getenv("TEST_TNT_WORK_DIR"), } - box.once("init", function() +box.once("init", function() box.schema.user.create('test', {password = 'test'}) box.schema.func.create('queue.tube.test_queue:touch') box.schema.func.create('queue.tube.test_queue:ack') @@ -23,21 +23,15 @@ box.cfg{ box.schema.func.create('queue.identify') box.schema.func.create('queue.state') box.schema.func.create('queue.statistics') - box.schema.user.grant('test', 'create', 'space') - box.schema.user.grant('test', 'write', 'space', '_schema') - box.schema.user.grant('test', 'write', 'space', '_space') - box.schema.user.grant('test', 'read,write', 'space', '_space_sequence') - box.schema.user.grant('test', 'write', 'space', '_index') + box.schema.user.grant('test', 'create,read,write,drop', 'space') box.schema.user.grant('test', 'read, write', 'space', '_queue_session_ids') box.schema.user.grant('test', 'execute', 'universe') box.schema.user.grant('test', 'read,write', 'space', '_queue') box.schema.user.grant('test', 'read,write', 'space', '_schema') + box.schema.user.grant('test', 'read,write', 'space', '_space_sequence') box.schema.user.grant('test', 'read,write', 'space', '_space') box.schema.user.grant('test', 'read,write', 'space', '_index') - box.schema.user.grant('test', 'read,write', 'space', '_queue_consumers') box.schema.user.grant('test', 'read,write', 'space', '_priv') - box.schema.user.grant('test', 'read,write', 'space', '_queue_taken_2') - box.schema.user.grant('test', 'read,write', 'space', '_queue_shared_sessions') if box.space._trigger ~= nil then box.schema.user.grant('test', 'read', 'space', '_trigger') end @@ -56,3 +50,5 @@ end) box.cfg{ listen = os.getenv("TEST_TNT_LISTEN"), } + +require('console').start() diff --git a/queue/example_connection_pool.lua b/queue/example_connection_pool.lua new file mode 100644 index 000000000..94ec19ebc --- /dev/null +++ b/queue/example_connection_pool.lua @@ -0,0 +1,68 @@ +local queue = require('queue') +rawset(_G, 'queue', queue) + +local allowed = {"localhost:3016", "localhost:3017"} +local listen = os.getenv("TEST_TNT_LISTEN") + +local founded = false +local replication = {} +for i, l in ipairs(allowed) do + if l == listen then + founded = true + end + replication[i] = "test:test@" .. l +end + +if not founded then + print("Invalid TEST_TNT_LISTEN") + os.exit() +end + +-- Set listen only when every other thing is configured. +box.cfg{ + listen = listen, + replication = replication, + read_only = listen == allowed[1], +} + +box.once("init", function() + box.schema.user.create('test', {password = 'test'}) + box.schema.func.create('queue.tube.test_queue:touch') + box.schema.func.create('queue.tube.test_queue:ack') + box.schema.func.create('queue.tube.test_queue:put') + box.schema.func.create('queue.tube.test_queue:drop') + box.schema.func.create('queue.tube.test_queue:peek') + box.schema.func.create('queue.tube.test_queue:kick') + box.schema.func.create('queue.tube.test_queue:take') + box.schema.func.create('queue.tube.test_queue:delete') + box.schema.func.create('queue.tube.test_queue:release') + box.schema.func.create('queue.tube.test_queue:release_all') + box.schema.func.create('queue.tube.test_queue:bury') + box.schema.func.create('queue.identify') + box.schema.func.create('queue.state') + box.schema.func.create('queue.statistics') + box.schema.user.grant('test', 'replication') + box.schema.user.grant('test', 'create,read,write,drop', 'space') + box.schema.user.grant('test', 'read, write', 'space', '_queue_session_ids') + box.schema.user.grant('test', 'execute', 'universe') + box.schema.user.grant('test', 'read,write', 'space', '_queue') + box.schema.user.grant('test', 'read,write', 'space', '_schema') + box.schema.user.grant('test', 'read,write', 'space', '_space_sequence') + box.schema.user.grant('test', 'read,write', 'space', '_space') + box.schema.user.grant('test', 'read,write', 'space', '_index') + box.schema.user.grant('test', 'read,write', 'space', '_priv') + if box.space._trigger ~= nil then + box.schema.user.grant('test', 'read', 'space', '_trigger') + end + if box.space._fk_constraint ~= nil then + box.schema.user.grant('test', 'read', 'space', '_fk_constraint') + end + if box.space._ck_constraint ~= nil then + box.schema.user.grant('test', 'read', 'space', '_ck_constraint') + end + if box.space._func_index ~= nil then + box.schema.user.grant('test', 'read', 'space', '_func_index') + end +end) + +require('console').start() diff --git a/queue/example_connection_pool_test.go b/queue/example_connection_pool_test.go new file mode 100644 index 000000000..df95587bf --- /dev/null +++ b/queue/example_connection_pool_test.go @@ -0,0 +1,171 @@ +package queue_test + +import ( + "fmt" + "log" + "sync" + "time" + + "github.com/google/uuid" + "github.com/tarantool/go-tarantool" + "github.com/tarantool/go-tarantool/connection_pool" + "github.com/tarantool/go-tarantool/queue" +) + +type QueueConnectionHandler struct { + name string + cfg queue.Cfg + + uuid uuid.UUID + registered bool + err error + registerMutex sync.Mutex + done chan struct{} +} + +var _ connection_pool.ConnectionHandler = &QueueConnectionHandler{} + +func NewQueueConnectionHandler(name string, cfg queue.Cfg) *QueueConnectionHandler { + return &QueueConnectionHandler{ + name: name, + cfg: cfg, + done: make(chan struct{}), + } +} + +func (h *QueueConnectionHandler) Discovered(conn *tarantool.Connection, + role connection_pool.Role) error { + // The queue only works with a master instance. + if role != connection_pool.MasterRole { + return nil + } + + // We need to register a shared session only once. + h.registerMutex.Lock() + + if h.err != nil { + h.registerMutex.Unlock() + return h.err + } + + q := queue.New(conn, h.name) + opts := queue.CfgOpts{InReplicaset: true, Ttr: 60 * time.Second} + if !h.registered { + if h.err = q.Cfg(opts); h.err == nil { + if h.uuid, h.err = q.Identify(nil); h.err == nil { + h.err = q.Create(h.cfg) + } + } + h.registered = h.err == nil + close(h.done) + h.registerMutex.Unlock() + return h.err + } else { + // We already registered. We don't need the lock any more. + h.registerMutex.Unlock() + if err := q.Cfg(opts); err == nil { + return err + } + if _, err := q.Identify(&h.uuid); err != nil { + return err + } + if err := q.Create(h.cfg); err != nil { + return err + } + } + return nil +} + +func (h *QueueConnectionHandler) Deactivated(conn *tarantool.Connection, + role connection_pool.Role) error { + return nil +} + +// Example demonstrates how to use the queue package with the connection_pool +// package. First of all, you need to create a connection handler for the +// a ConnectionPool object to process new connections from RW-instances. +// +// You need to register a shared session UUID at a first master connection. +// It needs to be used to re-identify as the shared session on new +// RW-instances. See QueueConnectionHandler.Discovered() implementation. +// +// After that, you need to create a ConnectorAdapter object with RW mode for +// the ConnectionPool to send requests into RW-instances. This adapter can +// be used to create a ready-to-work queue object. +func Example_connectionPool() { + // You could to use example_connection_pool.lua to play with the code + // manually. You need to start a master and a replica instance first. + // + // 1 terminal: + // $ mkdir master && cd master + // $ TEST_TNT_LISTEN="localhost:3017" tarantool ../example_connection_pool.lua + // + // 2 terminal: + // $ mkdir replica && cd replica + // $ TEST_TNT_LISTEN="localhost:3017" tarantool ../example_connection_pool.lua + // + // Then you need to update connection pool servers to: + // + // servers := []string{ + // "127.0.0.1:3016", + // "127.0.0.1:3017", + // } + // + // After it you could run the sample: + // + // $ go test . -v -run Example_connectionPool + servers := []string{ + "127.0.0.1:3014", + "127.0.0.1:3015", + } + connOpts := tarantool.Opts{ + Timeout: 500 * time.Millisecond, + User: "test", + Pass: "test", + } + + cfg := queue.Cfg{ + Temporary: false, + IfNotExists: true, + Kind: queue.FIFO, + Opts: queue.Opts{ + Ttl: 10 * time.Second, + }, + } + h := NewQueueConnectionHandler("test_queue", cfg) + poolOpts := connection_pool.OptsPool{ + CheckTimeout: 1 * time.Second, + ConnectionHandler: h, + } + connPool, err := connection_pool.ConnectWithOpts(servers, connOpts, poolOpts) + if err != nil { + log.Fatalf("unable to connect to the pool: %s", err) + } + + defer connPool.Close() + + <-h.done + + if h.err != nil { + log.Fatalf("unable to identify in the pool: %s", h.err) + } + + rw := connection_pool.NewConnectorAdapter(connPool, connection_pool.RW) + q := queue.New(rw, "test_queue") + + fmt.Println("A Queue object is ready to work.") + + if _, err = q.Put("test_data"); err != nil { + log.Fatalf("unable to put data into the queue: %s", err) + } + + if task, err := q.Take(); err != nil { + log.Fatalf("unable to take data from the queue: %s", err) + } else { + task.Ack() + fmt.Println("data:", task.Data()) + } + // Output: + // A Queue object is ready to work. + // data: test_data +} diff --git a/queue/queue_test.go b/queue/queue_test.go index 7b19c0dd3..2460d93fe 100644 --- a/queue/queue_test.go +++ b/queue/queue_test.go @@ -14,6 +14,13 @@ import ( ) var server = "127.0.0.1:3013" +var serversPool = []string{ + "127.0.0.1:3014", + "127.0.0.1:3015", +} + +var instances []test_helpers.TarantoolInstance + var opts = Opts{ Timeout: 2500 * time.Millisecond, User: "test", @@ -899,12 +906,41 @@ func runTestMain(m *testing.M) int { ConnectRetry: 3, RetryTimeout: 500 * time.Millisecond, }) - defer test_helpers.StopTarantoolWithCleanup(inst) if err != nil { log.Fatalf("Failed to prepare test tarantool: %s", err) } + defer test_helpers.StopTarantoolWithCleanup(inst) + + workDirs := []string{"work_dir1", "work_dir2"} + poolOpts := test_helpers.StartOpts{ + InitScript: "config.lua", + User: opts.User, + Pass: opts.Pass, + WaitStart: 100 * time.Millisecond, + ConnectRetry: 3, + RetryTimeout: 500 * time.Millisecond, + } + instances, err = test_helpers.StartTarantoolInstances(serversPool, workDirs, poolOpts) + + if err != nil { + log.Fatalf("Failed to prepare test tarantool pool: %s", err) + } + + defer test_helpers.StopTarantoolInstances(instances) + + roles := []bool{false, true} + connOpts := Opts{ + Timeout: 500 * time.Millisecond, + User: "test", + Pass: "test", + } + err = test_helpers.SetClusterRO(serversPool, connOpts, roles) + + if err != nil { + log.Fatalf("Failed to set roles in tarantool pool: %s", err) + } return m.Run() }