forked from fl00r/go-tarantool-1.6
-
Notifications
You must be signed in to change notification settings - Fork 58
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
queue: add an example with connection_pool
The example demonstrates how to use the queue package with the connection_pool package. Closes #176
- Loading branch information
1 parent
12bc5e2
commit 1c6c78a
Showing
8 changed files
with
288 additions
and
16 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,183 @@ | ||
package queue_test | ||
|
||
import ( | ||
"fmt" | ||
"sync" | ||
"time" | ||
|
||
"github.com/google/uuid" | ||
"github.com/tarantool/go-tarantool" | ||
"github.com/tarantool/go-tarantool/connection_pool" | ||
"github.com/tarantool/go-tarantool/queue" | ||
"github.com/tarantool/go-tarantool/test_helpers" | ||
) | ||
|
||
type QueueConnectionHandler struct { | ||
name string | ||
cfg queue.Cfg | ||
|
||
uuid uuid.UUID | ||
registered bool | ||
err error | ||
mutex sync.Mutex | ||
masterUpdated chan struct{} | ||
} | ||
|
||
var _ connection_pool.ConnectionHandler = &QueueConnectionHandler{} | ||
|
||
func NewQueueConnectionHandler(name string, cfg queue.Cfg) *QueueConnectionHandler { | ||
return &QueueConnectionHandler{ | ||
name: name, | ||
cfg: cfg, | ||
masterUpdated: make(chan struct{}, 10), | ||
} | ||
} | ||
|
||
func (h *QueueConnectionHandler) Discovered(conn *tarantool.Connection, | ||
role connection_pool.Role) error { | ||
h.mutex.Lock() | ||
defer h.mutex.Unlock() | ||
|
||
if h.err != nil { | ||
return h.err | ||
} | ||
|
||
master := role == connection_pool.MasterRole | ||
if master { | ||
defer func() { | ||
h.masterUpdated <- struct{}{} | ||
}() | ||
} | ||
|
||
// Set up a queue module configuration for an instance. | ||
q := queue.New(conn, h.name) | ||
opts := queue.CfgOpts{InReplicaset: true, Ttr: 60 * time.Second} | ||
|
||
if h.err = q.Cfg(opts); h.err != nil { | ||
return fmt.Errorf("unable to configure queue: %w", h.err) | ||
} | ||
|
||
// The queue only works with a master instance. | ||
if !master { | ||
return nil | ||
} | ||
|
||
if h.err = q.Create(h.cfg); h.err != nil { | ||
return h.err | ||
} | ||
|
||
if !h.registered { | ||
// We register a shared session at the first time. | ||
if h.uuid, h.err = q.Identify(nil); h.err != nil { | ||
return h.err | ||
} | ||
h.registered = h.err == nil | ||
} else { | ||
// We re-identify as the shared session. | ||
if _, h.err = q.Identify(&h.uuid); h.err != nil { | ||
return h.err | ||
} | ||
} | ||
|
||
fmt.Printf("Master %s is ready to work!\n", conn.Addr()) | ||
|
||
return nil | ||
} | ||
|
||
func (h *QueueConnectionHandler) Deactivated(conn *tarantool.Connection, | ||
role connection_pool.Role) error { | ||
return nil | ||
} | ||
|
||
// Example demonstrates how to use the queue package with the connection_pool | ||
// package. First of all, you need to create a connection handler for the | ||
// a ConnectionPool object to process new connections from RW-instances. | ||
// | ||
// You need to register a shared session UUID at a first master connection. | ||
// It needs to be used to re-identify as the shared session on new | ||
// RW-instances. See QueueConnectionHandler.Discovered() implementation. | ||
// | ||
// After that, you need to create a ConnectorAdapter object with RW mode for | ||
// the ConnectionPool to send requests into RW-instances. This adapter can | ||
// be used to create a ready-to-work queue object. | ||
func Example_connectionPool() { | ||
testData := "test_data" | ||
servers := []string{ | ||
"127.0.0.1:3014", | ||
"127.0.0.1:3015", | ||
} | ||
connOpts := tarantool.Opts{ | ||
Timeout: 500 * time.Millisecond, | ||
User: "test", | ||
Pass: "test", | ||
} | ||
|
||
cfg := queue.Cfg{ | ||
Temporary: false, | ||
IfNotExists: true, | ||
Kind: queue.FIFO, | ||
Opts: queue.Opts{ | ||
Ttl: 10 * time.Second, | ||
}, | ||
} | ||
h := NewQueueConnectionHandler("test_queue", cfg) | ||
defer close(h.masterUpdated) | ||
|
||
poolOpts := connection_pool.OptsPool{ | ||
CheckTimeout: 1 * time.Second, | ||
ConnectionHandler: h, | ||
} | ||
connPool, err := connection_pool.ConnectWithOpts(servers, connOpts, poolOpts) | ||
if err != nil { | ||
fmt.Printf("Unable to connect to the pool: %s", err) | ||
return | ||
} | ||
|
||
defer connPool.Close() | ||
time.Sleep(5 * time.Second) | ||
|
||
<-h.masterUpdated | ||
|
||
if h.err != nil { | ||
fmt.Printf("Unable to identify in the pool: %s", h.err) | ||
return | ||
} | ||
|
||
rw := connection_pool.NewConnectorAdapter(connPool, connection_pool.RW) | ||
|
||
q := queue.New(rw, "test_queue") | ||
fmt.Println("A Queue object is ready to work.") | ||
|
||
fmt.Println("Send data:", testData) | ||
if _, err = q.Put(testData); err != nil { | ||
fmt.Printf("Unable to put data into the queue: %s", err) | ||
return | ||
} | ||
|
||
// Switch master in the pool. | ||
roles := []bool{true, false} | ||
err = test_helpers.SetClusterRO(servers, connOpts, roles) | ||
if err != nil { | ||
fmt.Printf("Unable to set cluster roles: %s", err) | ||
return | ||
} | ||
|
||
<-h.masterUpdated | ||
if h.err != nil { | ||
fmt.Printf("Unable to re-identify in the pool: %s", h.err) | ||
return | ||
} | ||
|
||
if task, err := q.Take(); err == nil { | ||
task.Ack() | ||
fmt.Println("Got data:", task.Data()) | ||
} else { | ||
fmt.Println("Unable to got data:", err) | ||
} | ||
// Output: | ||
// Master 127.0.0.1:3014 is ready to work! | ||
// A Queue object is ready to work. | ||
// Send data: test_data | ||
// Master 127.0.0.1:3015 is ready to work! | ||
// Got data: test_data | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,56 @@ | ||
local queue = require('queue') | ||
rawset(_G, 'queue', queue) | ||
|
||
local listen = os.getenv("TEST_TNT_LISTEN") | ||
box.cfg{ | ||
work_dir = os.getenv("TEST_TNT_WORK_DIR"), | ||
listen = listen, | ||
replication = { | ||
"test:test@127.0.0.1:3014", | ||
"test:test@127.0.0.1:3015", | ||
}, | ||
read_only = listen == "127.0.0.1:3015" | ||
} | ||
|
||
box.once("schema", function() | ||
box.schema.user.create('test', {password = 'test'}) | ||
box.schema.user.grant('test', 'replication') | ||
|
||
box.schema.func.create('queue.tube.test_queue:touch') | ||
box.schema.func.create('queue.tube.test_queue:ack') | ||
box.schema.func.create('queue.tube.test_queue:put') | ||
box.schema.func.create('queue.tube.test_queue:drop') | ||
box.schema.func.create('queue.tube.test_queue:peek') | ||
box.schema.func.create('queue.tube.test_queue:kick') | ||
box.schema.func.create('queue.tube.test_queue:take') | ||
box.schema.func.create('queue.tube.test_queue:delete') | ||
box.schema.func.create('queue.tube.test_queue:release') | ||
box.schema.func.create('queue.tube.test_queue:release_all') | ||
box.schema.func.create('queue.tube.test_queue:bury') | ||
box.schema.func.create('queue.identify') | ||
box.schema.func.create('queue.state') | ||
box.schema.func.create('queue.statistics') | ||
box.schema.user.grant('test', 'create,read,write,drop', 'space') | ||
box.schema.user.grant('test', 'read, write', 'space', '_queue_session_ids') | ||
box.schema.user.grant('test', 'execute', 'universe') | ||
box.schema.user.grant('test', 'read,write', 'space', '_queue') | ||
box.schema.user.grant('test', 'read,write', 'space', '_schema') | ||
box.schema.user.grant('test', 'read,write', 'space', '_space_sequence') | ||
box.schema.user.grant('test', 'read,write', 'space', '_space') | ||
box.schema.user.grant('test', 'read,write', 'space', '_index') | ||
box.schema.user.grant('test', 'read,write', 'space', '_priv') | ||
if box.space._trigger ~= nil then | ||
box.schema.user.grant('test', 'read', 'space', '_trigger') | ||
end | ||
if box.space._fk_constraint ~= nil then | ||
box.schema.user.grant('test', 'read', 'space', '_fk_constraint') | ||
end | ||
if box.space._ck_constraint ~= nil then | ||
box.schema.user.grant('test', 'read', 'space', '_ck_constraint') | ||
end | ||
if box.space._func_index ~= nil then | ||
box.schema.user.grant('test', 'read', 'space', '_func_index') | ||
end | ||
end) | ||
|
||
require('console').start() |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters