forked from fl00r/go-tarantool-1.6
-
Notifications
You must be signed in to change notification settings - Fork 58
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
queue: add an example with connection_pool
The example demonstrates how to use the queue package with the connection_pool package. Closes #176
- Loading branch information
1 parent
05c418f
commit 2603eda
Showing
8 changed files
with
311 additions
and
16 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,206 @@ | ||
package queue_test | ||
|
||
import ( | ||
"fmt" | ||
"sync" | ||
"time" | ||
|
||
"github.com/google/uuid" | ||
"github.com/tarantool/go-tarantool" | ||
"github.com/tarantool/go-tarantool/connection_pool" | ||
"github.com/tarantool/go-tarantool/queue" | ||
"github.com/tarantool/go-tarantool/test_helpers" | ||
) | ||
|
||
// QueueConnectionHandler handles new connections in a ConnectionPool. | ||
type QueueConnectionHandler struct { | ||
name string | ||
cfg queue.Cfg | ||
|
||
uuid uuid.UUID | ||
registered bool | ||
err error | ||
mutex sync.Mutex | ||
masterUpdated chan struct{} | ||
} | ||
|
||
// QueueConnectionHandler implements the ConnectionHandler interface. | ||
var _ connection_pool.ConnectionHandler = &QueueConnectionHandler{} | ||
|
||
// NewQueueConnectionHandler creates a QueueConnectionHandler object. | ||
func NewQueueConnectionHandler(name string, cfg queue.Cfg) *QueueConnectionHandler { | ||
return &QueueConnectionHandler{ | ||
name: name, | ||
cfg: cfg, | ||
masterUpdated: make(chan struct{}, 10), | ||
} | ||
} | ||
|
||
// Discovered configures a queue for an instance and identifies a shared queue | ||
// session on master instances. | ||
// | ||
// NOTE: the Queue supports only a master-replica cluster configuration. It | ||
// does not support a master-master configuration. | ||
func (h *QueueConnectionHandler) Discovered(conn *tarantool.Connection, | ||
role connection_pool.Role) error { | ||
h.mutex.Lock() | ||
defer h.mutex.Unlock() | ||
|
||
if h.err != nil { | ||
return h.err | ||
} | ||
|
||
master := role == connection_pool.MasterRole | ||
if master { | ||
defer func() { | ||
h.masterUpdated <- struct{}{} | ||
}() | ||
} | ||
|
||
// Set up a queue module configuration for an instance. | ||
q := queue.New(conn, h.name) | ||
opts := queue.CfgOpts{InReplicaset: true, Ttr: 60 * time.Second} | ||
|
||
if h.err = q.Cfg(opts); h.err != nil { | ||
return fmt.Errorf("unable to configure queue: %w", h.err) | ||
} | ||
|
||
// The queue only works with a master instance. | ||
if !master { | ||
return nil | ||
} | ||
|
||
if h.err = q.Create(h.cfg); h.err != nil { | ||
return h.err | ||
} | ||
|
||
if !h.registered { | ||
// We register a shared session at the first time. | ||
if h.uuid, h.err = q.Identify(nil); h.err != nil { | ||
return h.err | ||
} | ||
h.registered = true | ||
} else { | ||
// We re-identify as the shared session. | ||
if _, h.err = q.Identify(&h.uuid); h.err != nil { | ||
return h.err | ||
} | ||
} | ||
|
||
fmt.Printf("Master %s is ready to work!\n", conn.Addr()) | ||
|
||
return nil | ||
} | ||
|
||
// Deactivated doesn't do anything useful for the example. | ||
func (h *QueueConnectionHandler) Deactivated(conn *tarantool.Connection, | ||
role connection_pool.Role) error { | ||
return nil | ||
} | ||
|
||
// Closes closes a QueueConnectionHandler object. | ||
func (h *QueueConnectionHandler) Close() { | ||
close(h.masterUpdated) | ||
} | ||
|
||
// Example demonstrates how to use the queue package with the connection_pool | ||
// package. First of all, you need to create a ConnectionHandler implementation | ||
// for the a ConnectionPool object to process new connections from | ||
// RW-instances. | ||
// | ||
// You need to register a shared session UUID at a first master connection. | ||
// It needs to be used to re-identify as the shared session on new | ||
// RW-instances. See QueueConnectionHandler.Discovered() implementation. | ||
// | ||
// After that, you need to create a ConnectorAdapter object with RW mode for | ||
// the ConnectionPool to send requests into RW-instances. This adapter can | ||
// be used to create a ready-to-work queue object. | ||
func Example_connectionPool() { | ||
// Create a ConnectionHandler object. | ||
cfg := queue.Cfg{ | ||
Temporary: false, | ||
IfNotExists: true, | ||
Kind: queue.FIFO, | ||
Opts: queue.Opts{ | ||
Ttl: 10 * time.Second, | ||
}, | ||
} | ||
h := NewQueueConnectionHandler("test_queue", cfg) | ||
defer h.Close() | ||
|
||
// Create a ConnectionPool object. | ||
servers := []string{ | ||
"127.0.0.1:3014", | ||
"127.0.0.1:3015", | ||
} | ||
connOpts := tarantool.Opts{ | ||
Timeout: 1 * time.Second, | ||
User: "test", | ||
Pass: "test", | ||
} | ||
poolOpts := connection_pool.OptsPool{ | ||
CheckTimeout: 1 * time.Second, | ||
ConnectionHandler: h, | ||
} | ||
connPool, err := connection_pool.ConnectWithOpts(servers, connOpts, poolOpts) | ||
if err != nil { | ||
fmt.Printf("Unable to connect to the pool: %s", err) | ||
return | ||
} | ||
defer connPool.Close() | ||
|
||
// Wait for a master instance identification in the queue. | ||
<-h.masterUpdated | ||
if h.err != nil { | ||
fmt.Printf("Unable to identify in the pool: %s", h.err) | ||
return | ||
} | ||
|
||
// Create a Queue object from the ConnectionPool object via | ||
// a ConnectorAdapter. | ||
rw := connection_pool.NewConnectorAdapter(connPool, connection_pool.RW) | ||
q := queue.New(rw, "test_queue") | ||
fmt.Println("A Queue object is ready to work.") | ||
|
||
testData := "test_data" | ||
fmt.Println("Send data:", testData) | ||
if _, err = q.Put(testData); err != nil { | ||
fmt.Printf("Unable to put data into the queue: %s", err) | ||
return | ||
} | ||
|
||
// Switch a master instance in the pool. | ||
roles := []bool{true, false} | ||
err = test_helpers.SetClusterRO(servers, connOpts, roles) | ||
if err != nil { | ||
fmt.Printf("Unable to set cluster roles: %s", err) | ||
return | ||
} | ||
|
||
// Wait for a new master instance re-identification. | ||
<-h.masterUpdated | ||
if h.err != nil { | ||
fmt.Printf("Unable to re-identify in the pool: %s", h.err) | ||
return | ||
} | ||
|
||
// Take a data from the new master instance. | ||
task, err := q.TakeTimeout(1 * time.Second) | ||
if err != nil { | ||
fmt.Println("Unable to got task:", err) | ||
} else if task == nil { | ||
fmt.Println("task == nil") | ||
} else if task.Data() == nil { | ||
fmt.Println("task.Data() == nil") | ||
} else { | ||
task.Ack() | ||
fmt.Println("Got data:", task.Data()) | ||
} | ||
|
||
// Output: | ||
// Master 127.0.0.1:3014 is ready to work! | ||
// A Queue object is ready to work. | ||
// Send data: test_data | ||
// Master 127.0.0.1:3015 is ready to work! | ||
// Got data: test_data | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,56 @@ | ||
local queue = require('queue') | ||
rawset(_G, 'queue', queue) | ||
|
||
local listen = os.getenv("TEST_TNT_LISTEN") | ||
box.cfg{ | ||
work_dir = os.getenv("TEST_TNT_WORK_DIR"), | ||
listen = listen, | ||
replication = { | ||
"test:test@127.0.0.1:3014", | ||
"test:test@127.0.0.1:3015", | ||
}, | ||
read_only = listen == "127.0.0.1:3015" | ||
} | ||
|
||
box.once("schema", function() | ||
box.schema.user.create('test', {password = 'test'}) | ||
box.schema.user.grant('test', 'replication') | ||
|
||
box.schema.func.create('queue.tube.test_queue:touch') | ||
box.schema.func.create('queue.tube.test_queue:ack') | ||
box.schema.func.create('queue.tube.test_queue:put') | ||
box.schema.func.create('queue.tube.test_queue:drop') | ||
box.schema.func.create('queue.tube.test_queue:peek') | ||
box.schema.func.create('queue.tube.test_queue:kick') | ||
box.schema.func.create('queue.tube.test_queue:take') | ||
box.schema.func.create('queue.tube.test_queue:delete') | ||
box.schema.func.create('queue.tube.test_queue:release') | ||
box.schema.func.create('queue.tube.test_queue:release_all') | ||
box.schema.func.create('queue.tube.test_queue:bury') | ||
box.schema.func.create('queue.identify') | ||
box.schema.func.create('queue.state') | ||
box.schema.func.create('queue.statistics') | ||
box.schema.user.grant('test', 'create,read,write,drop', 'space') | ||
box.schema.user.grant('test', 'read, write', 'space', '_queue_session_ids') | ||
box.schema.user.grant('test', 'execute', 'universe') | ||
box.schema.user.grant('test', 'read,write', 'space', '_queue') | ||
box.schema.user.grant('test', 'read,write', 'space', '_schema') | ||
box.schema.user.grant('test', 'read,write', 'space', '_space_sequence') | ||
box.schema.user.grant('test', 'read,write', 'space', '_space') | ||
box.schema.user.grant('test', 'read,write', 'space', '_index') | ||
box.schema.user.grant('test', 'read,write', 'space', '_priv') | ||
if box.space._trigger ~= nil then | ||
box.schema.user.grant('test', 'read', 'space', '_trigger') | ||
end | ||
if box.space._fk_constraint ~= nil then | ||
box.schema.user.grant('test', 'read', 'space', '_fk_constraint') | ||
end | ||
if box.space._ck_constraint ~= nil then | ||
box.schema.user.grant('test', 'read', 'space', '_ck_constraint') | ||
end | ||
if box.space._func_index ~= nil then | ||
box.schema.user.grant('test', 'read', 'space', '_func_index') | ||
end | ||
end) | ||
|
||
require('console').start() |
Oops, something went wrong.