From 917b6adb1152e8aeb2a20b960b7339b1e769fb1c Mon Sep 17 00:00:00 2001 From: Oleg Jukovec Date: Thu, 25 Aug 2022 13:49:13 +0300 Subject: [PATCH] queue: add an example with connection_pool The example demonstrates how to use the queue package with the connection_pool package. Closes #176 --- CHANGELOG.md | 1 + connection_pool/connection_pool_test.go | 2 +- multi/multi_test.go | 2 +- queue/example_connection_pool_test.go | 201 ++++++++++++++++++++++++ queue/queue_test.go | 39 ++++- queue/{ => testdata}/config.lua | 14 +- queue/testdata/pool.lua | 56 +++++++ test_helpers/main.go | 7 +- 8 files changed, 306 insertions(+), 16 deletions(-) create mode 100644 queue/example_connection_pool_test.go rename queue/{ => testdata}/config.lua (81%) create mode 100644 queue/testdata/pool.lua diff --git a/CHANGELOG.md b/CHANGELOG.md index fe991b84b..f69e97128 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -15,6 +15,7 @@ Versioning](http://semver.org/spec/v2.0.0.html) except to the first release. ConnectionPool (#178) - Execute, ExecuteTyped and ExecuteAsync methods to ConnectionPool (#176) - ConnectorAdapter type to use ConnectionPool as Connector interface (#176) +- An example how to use queue and connection_pool subpackages together (#176) ### Changed diff --git a/connection_pool/connection_pool_test.go b/connection_pool/connection_pool_test.go index b3aee615d..091d5216c 100644 --- a/connection_pool/connection_pool_test.go +++ b/connection_pool/connection_pool_test.go @@ -2056,7 +2056,7 @@ func TestStream_TxnIsolationLevel(t *testing.T) { func runTestMain(m *testing.M) int { initScript := "config.lua" waitStart := 100 * time.Millisecond - var connectRetry uint = 3 + connectRetry := 3 retryTimeout := 500 * time.Millisecond workDirs := []string{ "work_dir1", "work_dir2", diff --git a/multi/multi_test.go b/multi/multi_test.go index 643ea186d..2d43bb179 100644 --- a/multi/multi_test.go +++ b/multi/multi_test.go @@ -556,7 +556,7 @@ func TestStream_Rollback(t *testing.T) { func runTestMain(m *testing.M) int { initScript := "config.lua" waitStart := 100 * time.Millisecond - var connectRetry uint = 3 + connectRetry := 3 retryTimeout := 500 * time.Millisecond // Tarantool supports streams and interactive transactions since version 2.10.0 diff --git a/queue/example_connection_pool_test.go b/queue/example_connection_pool_test.go new file mode 100644 index 000000000..8d6052fa0 --- /dev/null +++ b/queue/example_connection_pool_test.go @@ -0,0 +1,201 @@ +package queue_test + +import ( + "fmt" + "sync" + "time" + + "github.com/google/uuid" + "github.com/tarantool/go-tarantool" + "github.com/tarantool/go-tarantool/connection_pool" + "github.com/tarantool/go-tarantool/queue" + "github.com/tarantool/go-tarantool/test_helpers" +) + +// QueueConnectionHandler handles new connections in a ConnectionPool. +type QueueConnectionHandler struct { + name string + cfg queue.Cfg + + uuid uuid.UUID + registered bool + err error + mutex sync.Mutex + masterUpdated chan struct{} +} + +// QueueConnectionHandler implements the ConnectionHandler interface. +var _ connection_pool.ConnectionHandler = &QueueConnectionHandler{} + +// NewQueueConnectionHandler creates a QueueConnectionHandler object. +func NewQueueConnectionHandler(name string, cfg queue.Cfg) *QueueConnectionHandler { + return &QueueConnectionHandler{ + name: name, + cfg: cfg, + masterUpdated: make(chan struct{}, 10), + } +} + +// Discovered configures a queue for an instance and identifies a shared queue +// session on master instances. +// +// NOTE: the Queue supports only a master-replica cluster configuration. It +// does not support a master-master configuration. +func (h *QueueConnectionHandler) Discovered(conn *tarantool.Connection, + role connection_pool.Role) error { + h.mutex.Lock() + defer h.mutex.Unlock() + + if h.err != nil { + return h.err + } + + master := role == connection_pool.MasterRole + if master { + defer func() { + h.masterUpdated <- struct{}{} + }() + } + + // Set up a queue module configuration for an instance. + q := queue.New(conn, h.name) + opts := queue.CfgOpts{InReplicaset: true, Ttr: 60 * time.Second} + + if h.err = q.Cfg(opts); h.err != nil { + return fmt.Errorf("unable to configure queue: %w", h.err) + } + + // The queue only works with a master instance. + if !master { + return nil + } + + if h.err = q.Create(h.cfg); h.err != nil { + return h.err + } + + if !h.registered { + // We register a shared session at the first time. + if h.uuid, h.err = q.Identify(nil); h.err != nil { + return h.err + } + h.registered = true + } else { + // We re-identify as the shared session. + if _, h.err = q.Identify(&h.uuid); h.err != nil { + return h.err + } + } + + fmt.Printf("Master %s is ready to work!\n", conn.Addr()) + + return nil +} + +// Deactivated doesn't do anything useful for the example. +func (h *QueueConnectionHandler) Deactivated(conn *tarantool.Connection, + role connection_pool.Role) error { + return nil +} + +// Closes closes a QueueConnectionHandler object. +func (h *QueueConnectionHandler) Close() { + close(h.masterUpdated) +} + +// Example demonstrates how to use the queue package with the connection_pool +// package. First of all, you need to create a ConnectionHandler implementation +// for the a ConnectionPool object to process new connections from +// RW-instances. +// +// You need to register a shared session UUID at a first master connection. +// It needs to be used to re-identify as the shared session on new +// RW-instances. See QueueConnectionHandler.Discovered() implementation. +// +// After that, you need to create a ConnectorAdapter object with RW mode for +// the ConnectionPool to send requests into RW-instances. This adapter can +// be used to create a ready-to-work queue object. +func Example_connectionPool() { + // Create a ConnectionHandler object. + cfg := queue.Cfg{ + Temporary: false, + IfNotExists: true, + Kind: queue.FIFO, + Opts: queue.Opts{ + Ttl: 10 * time.Second, + }, + } + h := NewQueueConnectionHandler("test_queue", cfg) + defer h.Close() + + // Create a ConnectionPool object. + servers := []string{ + "127.0.0.1:3014", + "127.0.0.1:3015", + } + connOpts := tarantool.Opts{ + Timeout: 1 * time.Second, + User: "test", + Pass: "test", + } + poolOpts := connection_pool.OptsPool{ + CheckTimeout: 1 * time.Second, + ConnectionHandler: h, + } + connPool, err := connection_pool.ConnectWithOpts(servers, connOpts, poolOpts) + if err != nil { + fmt.Printf("Unable to connect to the pool: %s", err) + return + } + defer connPool.Close() + + // Wait for a master instance identification in the queue. + <-h.masterUpdated + if h.err != nil { + fmt.Printf("Unable to identify in the pool: %s", h.err) + return + } + + // Create a Queue object from the ConnectionPool object via + // a ConnectorAdapter. + rw := connection_pool.NewConnectorAdapter(connPool, connection_pool.RW) + q := queue.New(rw, "test_queue") + fmt.Println("A Queue object is ready to work.") + + testData := "test_data" + fmt.Println("Send data:", testData) + if _, err = q.Put(testData); err != nil { + fmt.Printf("Unable to put data into the queue: %s", err) + return + } + + // Switch a master instance in the pool. + roles := []bool{true, false} + err = test_helpers.SetClusterRO(servers, connOpts, roles) + if err != nil { + fmt.Printf("Unable to set cluster roles: %s", err) + return + } + + // Wait for a new master instance re-identification. + <-h.masterUpdated + if h.err != nil { + fmt.Printf("Unable to re-identify in the pool: %s", h.err) + return + } + + // Take a data from the new master instance. + if task, err := q.TakeTimeout(1 * time.Second); err == nil && task.Data() != nil { + task.Ack() + fmt.Println("Got data:", task.Data()) + } else { + fmt.Println("Unable to got data:", err) + } + + // Output: + // Master 127.0.0.1:3014 is ready to work! + // A Queue object is ready to work. + // Send data: test_data + // Master 127.0.0.1:3015 is ready to work! + // Got data: test_data +} diff --git a/queue/queue_test.go b/queue/queue_test.go index 7b19c0dd3..85276e4a8 100644 --- a/queue/queue_test.go +++ b/queue/queue_test.go @@ -14,6 +14,13 @@ import ( ) var server = "127.0.0.1:3013" +var serversPool = []string{ + "127.0.0.1:3014", + "127.0.0.1:3015", +} + +var instances []test_helpers.TarantoolInstance + var opts = Opts{ Timeout: 2500 * time.Millisecond, User: "test", @@ -890,7 +897,7 @@ func TestTask_Touch(t *testing.T) { // https://stackoverflow.com/questions/27629380/how-to-exit-a-go-program-honoring-deferred-calls func runTestMain(m *testing.M) int { inst, err := test_helpers.StartTarantool(test_helpers.StartOpts{ - InitScript: "config.lua", + InitScript: "testdata/config.lua", Listen: server, WorkDir: "work_dir", User: opts.User, @@ -899,12 +906,40 @@ func runTestMain(m *testing.M) int { ConnectRetry: 3, RetryTimeout: 500 * time.Millisecond, }) - defer test_helpers.StopTarantoolWithCleanup(inst) if err != nil { log.Fatalf("Failed to prepare test tarantool: %s", err) } + defer test_helpers.StopTarantoolWithCleanup(inst) + + workDirs := []string{"work_dir1", "work_dir2"} + poolOpts := test_helpers.StartOpts{ + InitScript: "testdata/pool.lua", + User: opts.User, + Pass: opts.Pass, + WaitStart: 3 * time.Second, // replication_timeout * 3 + ConnectRetry: -1, + } + instances, err = test_helpers.StartTarantoolInstances(serversPool, workDirs, poolOpts) + + if err != nil { + log.Fatalf("Failed to prepare test tarantool pool: %s", err) + } + + defer test_helpers.StopTarantoolInstances(instances) + + roles := []bool{false, true} + connOpts := Opts{ + Timeout: 500 * time.Millisecond, + User: "test", + Pass: "test", + } + err = test_helpers.SetClusterRO(serversPool, connOpts, roles) + + if err != nil { + log.Fatalf("Failed to set roles in tarantool pool: %s", err) + } return m.Run() } diff --git a/queue/config.lua b/queue/testdata/config.lua similarity index 81% rename from queue/config.lua rename to queue/testdata/config.lua index df28496a3..eccb19a68 100644 --- a/queue/config.lua +++ b/queue/testdata/config.lua @@ -7,7 +7,7 @@ box.cfg{ work_dir = os.getenv("TEST_TNT_WORK_DIR"), } - box.once("init", function() +box.once("init", function() box.schema.user.create('test', {password = 'test'}) box.schema.func.create('queue.tube.test_queue:touch') box.schema.func.create('queue.tube.test_queue:ack') @@ -23,21 +23,15 @@ box.cfg{ box.schema.func.create('queue.identify') box.schema.func.create('queue.state') box.schema.func.create('queue.statistics') - box.schema.user.grant('test', 'create', 'space') - box.schema.user.grant('test', 'write', 'space', '_schema') - box.schema.user.grant('test', 'write', 'space', '_space') - box.schema.user.grant('test', 'read,write', 'space', '_space_sequence') - box.schema.user.grant('test', 'write', 'space', '_index') + box.schema.user.grant('test', 'create,read,write,drop', 'space') box.schema.user.grant('test', 'read, write', 'space', '_queue_session_ids') box.schema.user.grant('test', 'execute', 'universe') box.schema.user.grant('test', 'read,write', 'space', '_queue') box.schema.user.grant('test', 'read,write', 'space', '_schema') + box.schema.user.grant('test', 'read,write', 'space', '_space_sequence') box.schema.user.grant('test', 'read,write', 'space', '_space') box.schema.user.grant('test', 'read,write', 'space', '_index') - box.schema.user.grant('test', 'read,write', 'space', '_queue_consumers') box.schema.user.grant('test', 'read,write', 'space', '_priv') - box.schema.user.grant('test', 'read,write', 'space', '_queue_taken_2') - box.schema.user.grant('test', 'read,write', 'space', '_queue_shared_sessions') if box.space._trigger ~= nil then box.schema.user.grant('test', 'read', 'space', '_trigger') end @@ -56,3 +50,5 @@ end) box.cfg{ listen = os.getenv("TEST_TNT_LISTEN"), } + +require('console').start() diff --git a/queue/testdata/pool.lua b/queue/testdata/pool.lua new file mode 100644 index 000000000..7c63aa787 --- /dev/null +++ b/queue/testdata/pool.lua @@ -0,0 +1,56 @@ +local queue = require('queue') +rawset(_G, 'queue', queue) + +local listen = os.getenv("TEST_TNT_LISTEN") +box.cfg{ + work_dir = os.getenv("TEST_TNT_WORK_DIR"), + listen = listen, + replication = { + "test:test@127.0.0.1:3014", + "test:test@127.0.0.1:3015", + }, + read_only = listen == "127.0.0.1:3015" +} + +box.once("schema", function() + box.schema.user.create('test', {password = 'test'}) + box.schema.user.grant('test', 'replication') + + box.schema.func.create('queue.tube.test_queue:touch') + box.schema.func.create('queue.tube.test_queue:ack') + box.schema.func.create('queue.tube.test_queue:put') + box.schema.func.create('queue.tube.test_queue:drop') + box.schema.func.create('queue.tube.test_queue:peek') + box.schema.func.create('queue.tube.test_queue:kick') + box.schema.func.create('queue.tube.test_queue:take') + box.schema.func.create('queue.tube.test_queue:delete') + box.schema.func.create('queue.tube.test_queue:release') + box.schema.func.create('queue.tube.test_queue:release_all') + box.schema.func.create('queue.tube.test_queue:bury') + box.schema.func.create('queue.identify') + box.schema.func.create('queue.state') + box.schema.func.create('queue.statistics') + box.schema.user.grant('test', 'create,read,write,drop', 'space') + box.schema.user.grant('test', 'read, write', 'space', '_queue_session_ids') + box.schema.user.grant('test', 'execute', 'universe') + box.schema.user.grant('test', 'read,write', 'space', '_queue') + box.schema.user.grant('test', 'read,write', 'space', '_schema') + box.schema.user.grant('test', 'read,write', 'space', '_space_sequence') + box.schema.user.grant('test', 'read,write', 'space', '_space') + box.schema.user.grant('test', 'read,write', 'space', '_index') + box.schema.user.grant('test', 'read,write', 'space', '_priv') + if box.space._trigger ~= nil then + box.schema.user.grant('test', 'read', 'space', '_trigger') + end + if box.space._fk_constraint ~= nil then + box.schema.user.grant('test', 'read', 'space', '_fk_constraint') + end + if box.space._ck_constraint ~= nil then + box.schema.user.grant('test', 'read', 'space', '_ck_constraint') + end + if box.space._func_index ~= nil then + box.schema.user.grant('test', 'read', 'space', '_func_index') + end +end) + +require('console').start() diff --git a/test_helpers/main.go b/test_helpers/main.go index cdc3c343d..77e22c535 100644 --- a/test_helpers/main.go +++ b/test_helpers/main.go @@ -67,8 +67,9 @@ type StartOpts struct { // WaitStart is a time to wait before starting to ping tarantool. WaitStart time.Duration - // ConnectRetry is a count of attempts to ping tarantool. - ConnectRetry uint + // ConnectRetry is a count of retry attempts to ping tarantool. If the + // value < 0 then there will be no ping tarantool at all. + ConnectRetry int // RetryTimeout is a time between tarantool ping retries. RetryTimeout time.Duration @@ -240,7 +241,7 @@ func StartTarantool(startOpts StartOpts) (TarantoolInstance, error) { Ssl: startOpts.ClientSsl, } - var i uint + var i int var server string if startOpts.ClientServer != "" { server = startOpts.ClientServer