diff --git a/CHANGELOG.md b/CHANGELOG.md index 1954e203f..c231ca0e9 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -15,6 +15,7 @@ Versioning](http://semver.org/spec/v2.0.0.html) except to the first release. ConnectionPool (#178) - Execute, ExecuteTyped and ExecuteAsync methods to ConnectionPool (#176) - ConnectorAdapter type to use ConnectionPool as Connector interface (#176) +- An example how to use queue and connection_pool subpackages together (#176) ### Changed diff --git a/queue/config.lua b/queue/config.lua index df28496a3..3a00be39b 100644 --- a/queue/config.lua +++ b/queue/config.lua @@ -23,21 +23,15 @@ box.cfg{ box.schema.func.create('queue.identify') box.schema.func.create('queue.state') box.schema.func.create('queue.statistics') - box.schema.user.grant('test', 'create', 'space') - box.schema.user.grant('test', 'write', 'space', '_schema') - box.schema.user.grant('test', 'write', 'space', '_space') - box.schema.user.grant('test', 'read,write', 'space', '_space_sequence') - box.schema.user.grant('test', 'write', 'space', '_index') + box.schema.user.grant('test', 'create,read,write,drop', 'space') box.schema.user.grant('test', 'read, write', 'space', '_queue_session_ids') box.schema.user.grant('test', 'execute', 'universe') box.schema.user.grant('test', 'read,write', 'space', '_queue') box.schema.user.grant('test', 'read,write', 'space', '_schema') + box.schema.user.grant('test', 'read,write', 'space', '_space_sequence') box.schema.user.grant('test', 'read,write', 'space', '_space') box.schema.user.grant('test', 'read,write', 'space', '_index') - box.schema.user.grant('test', 'read,write', 'space', '_queue_consumers') box.schema.user.grant('test', 'read,write', 'space', '_priv') - box.schema.user.grant('test', 'read,write', 'space', '_queue_taken_2') - box.schema.user.grant('test', 'read,write', 'space', '_queue_shared_sessions') if box.space._trigger ~= nil then box.schema.user.grant('test', 'read', 'space', '_trigger') end @@ -56,3 +50,5 @@ end) box.cfg{ listen = os.getenv("TEST_TNT_LISTEN"), } + +require('console').start() diff --git a/queue/example_connection_pool_test.go b/queue/example_connection_pool_test.go new file mode 100644 index 000000000..454aafe72 --- /dev/null +++ b/queue/example_connection_pool_test.go @@ -0,0 +1,148 @@ +package queue_test + +import ( + "fmt" + "log" + "sync" + "time" + + "github.com/google/uuid" + "github.com/tarantool/go-tarantool" + "github.com/tarantool/go-tarantool/connection_pool" + "github.com/tarantool/go-tarantool/queue" +) + +type QueueConnectionHandler struct { + name string + cfg queue.Cfg + + uuid uuid.UUID + registered bool + err error + registerMutex sync.Mutex + done chan struct{} +} + +var _ connection_pool.ConnectionHandler = &QueueConnectionHandler{} + +func NewQueueConnectionHandler(name string, cfg queue.Cfg) *QueueConnectionHandler { + return &QueueConnectionHandler{ + name: name, + cfg: cfg, + done: make(chan struct{}), + } +} + +func (h *QueueConnectionHandler) Discovered(conn *tarantool.Connection, + role connection_pool.Role) error { + // The queue only works with a master instance. + if role != connection_pool.MasterRole { + return nil + } + + // We need to register a shared session only once. + h.registerMutex.Lock() + + if h.err != nil { + h.registerMutex.Unlock() + return h.err + } + + q := queue.New(conn, h.name) + if !h.registered { + opts := queue.CfgOpts{InReplicaset: true, Ttr: 60 * time.Second} + if h.err = q.Cfg(opts); h.err == nil { + if h.uuid, h.err = q.Identify(nil); h.err == nil { + h.err = q.Create(h.cfg) + } + } + h.registered = h.err == nil + close(h.done) + h.registerMutex.Unlock() + return h.err + } else { + // We already registered. We don't need the lock any more. + h.registerMutex.Unlock() + if _, err := q.Identify(&h.uuid); err != nil { + return err + } + if err := q.Create(h.cfg); err != nil { + return err + } + } + return nil +} + +func (h *QueueConnectionHandler) Deactivated(conn *tarantool.Connection, + role connection_pool.Role) error { + return nil +} + +// Example demonstrates how to use the queue package with the connection_pool +// package. First of all, you need to create a connection handler for the +// a ConnectionPool object to process new connections from RW-instances. +// +// You need to register a shared session UUID at a first master connection. +// It needs to be used to re-identify as the shared session on new +// RW-instances. See QueueConnectionHandler.Discovered() implementation. +// +// After that, you need to create a ConnectorAdapter object with RW mode for +// the ConnectionPool to send requests into RW-instances. This adapter can +// be used to create a ready-to-work queue object. +func Example_connectionPool() { + servers := []string{ + "127.0.0.1:3014", + "127.0.0.1:3015", + "127.0.0.1:3016", + } + connOpts := tarantool.Opts{ + Timeout: 500 * time.Millisecond, + User: "test", + Pass: "test", + } + + cfg := queue.Cfg{ + Temporary: false, + IfNotExists: true, + Kind: queue.FIFO, + Opts: queue.Opts{ + Ttl: 10 * time.Second, + }, + } + h := NewQueueConnectionHandler("test_queue", cfg) + poolOpts := connection_pool.OptsPool{ + CheckTimeout: 1 * time.Second, + ConnectionHandler: h, + } + connPool, err := connection_pool.ConnectWithOpts(servers, connOpts, poolOpts) + if err != nil { + log.Fatalf("unable to connect to the pool: %s", err) + } + + defer connPool.Close() + + <-h.done + + if h.err != nil { + log.Fatalf("unable to identify in the pool: %s", h.err) + } + + rw := connection_pool.NewConnectorAdapter(connPool, connection_pool.RW) + q := queue.New(rw, "test_queue") + + fmt.Println("A Queue object is ready to work.") + + if _, err = q.Put("test_data"); err != nil { + log.Fatalf("unable to put data into the queue: %s", err) + } + + if task, err := q.Take(); err != nil { + log.Fatalf("unable to take data from the queue: %s", err) + } else { + task.Ack() + fmt.Println("data:", task.Data()) + } + // Output: + // A Queue object is ready to work. + // data: test_data +} diff --git a/queue/queue_test.go b/queue/queue_test.go index ef59156da..76d74b062 100644 --- a/queue/queue_test.go +++ b/queue/queue_test.go @@ -14,6 +14,14 @@ import ( ) var server = "127.0.0.1:3013" +var serversPool = []string{ + "127.0.0.1:3014", + "127.0.0.1:3015", + "127.0.0.1:3016", +} + +var instances []test_helpers.TarantoolInstance + var opts = Opts{ Timeout: 2500 * time.Millisecond, User: "test", @@ -899,12 +907,41 @@ func runTestMain(m *testing.M) int { ConnectRetry: 3, RetryTimeout: 500 * time.Millisecond, }) - defer test_helpers.StopTarantoolWithCleanup(inst) if err != nil { log.Fatalf("Failed to prepare test tarantool: %s", err) } + defer test_helpers.StopTarantoolWithCleanup(inst) + + workDirs := []string{"work_dir1", "work_dir2", "work_dir3"} + poolOpts := test_helpers.StartOpts{ + InitScript: "config.lua", + User: opts.User, + Pass: opts.Pass, + WaitStart: 100 * time.Millisecond, + ConnectRetry: 3, + RetryTimeout: 500 * time.Millisecond, + } + instances, err = test_helpers.StartTarantoolInstances(serversPool, workDirs, poolOpts) + + if err != nil { + log.Fatalf("Failed to prepare test tarantool pool: %s", err) + } + + defer test_helpers.StopTarantoolInstances(instances) + + roles := []bool{false, true, true} + connOpts := Opts{ + Timeout: 500 * time.Millisecond, + User: "test", + Pass: "test", + } + err = test_helpers.SetClusterRO(serversPool, connOpts, roles) + + if err != nil { + log.Fatalf("Failed to set roles in tarantool pool: %s", err) + } return m.Run() }