Skip to content

Commit

Permalink
api: add events subscription support
Browse files Browse the repository at this point in the history
A user can create watcher by the Connection.NewWatcher() call:

    watcher = conn.NewWatcker("key", func(event WatchEvent) {
        // The callback code.
    })

After that, the watcher callback is invoked for the first time. In
this case, the callback is triggered whether or not the key has
already been broadcast. All subsequent invocations are triggered with
box.broadcast() called on the remote host. If a watcher is subscribed
for a key that has not been broadcast yet, the callback is triggered
only once, after the registration of the watcher.

If the key is updated while the watcher callback is running, the
callback will be invoked again with the latest value as soon as it
returns.

Multiple watchers can be created for one key.

If you don’t need the watcher anymore, you can unregister it using
the Unregister method:

    watcher.Unregister()

The api is similar to net.box implementation [1].

It also adds a BroadcastRequest to make it easier to send broadcast
messages.

1. https://www.tarantool.io/ru/doc/latest/reference/reference_lua/net_box/#conn-watch

Closes #119
  • Loading branch information
oleg-jukovec committed Nov 22, 2022
1 parent 7d4b3cc commit 401de6d
Show file tree
Hide file tree
Showing 19 changed files with 1,497 additions and 57 deletions.
2 changes: 2 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,8 @@ Versioning](http://semver.org/spec/v2.0.0.html) except to the first release.

### Added

- Event subscription support (#119)

### Changed

### Fixed
Expand Down
287 changes: 279 additions & 8 deletions connection.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,8 @@ const (
// LogUnexpectedResultId is logged when response with unknown id was received.
// Most probably it is due to request timeout.
LogUnexpectedResultId
// LogReadWatchEventFailed is logged when failed to read a watch event.
LogReadWatchEventFailed
)

// ConnEvent is sent throw Notify channel specified in Opts.
Expand All @@ -62,6 +64,12 @@ type ConnEvent struct {
When time.Time
}

// A raw watch event.
type connWatchEvent struct {
key string
value interface{}
}

var epoch = time.Now()

// Logger is logger type expected to be passed in options.
Expand All @@ -83,6 +91,9 @@ func (d defaultLogger) Report(event ConnLogKind, conn *Connection, v ...interfac
case LogUnexpectedResultId:
resp := v[0].(*Response)
log.Printf("tarantool: connection %s got unexpected resultId (%d) in response", conn.addr, resp.RequestId)
case LogReadWatchEventFailed:
err := v[0].(error)
log.Printf("tarantool: unable to parse watch event: %s\n", err)
default:
args := append([]interface{}{"tarantool: unexpected event ", event, conn}, v...)
log.Print(args...)
Expand Down Expand Up @@ -146,6 +157,9 @@ type Connection struct {
lenbuf [PacketLengthBytes]byte

lastStreamId uint64

// watchMap is a map of key -> watchSharedData.
watchMap sync.Map
}

var _ = Connector(&Connection{}) // Check compatibility with connector interface.
Expand Down Expand Up @@ -502,7 +516,7 @@ func (conn *Connection) dial() (err error) {
conn.Greeting.Version = bytes.NewBuffer(greeting[:64]).String()
conn.Greeting.auth = bytes.NewBuffer(greeting[64:108]).String()

// Auth
// Auth.
if opts.User != "" {
scr, err := scramble(conn.Greeting.auth, opts.Pass)
if err != nil {
Expand All @@ -520,7 +534,20 @@ func (conn *Connection) dial() (err error) {
}
}

// Only if connected and authenticated.
// Watchers.
conn.watchMap.Range(func(key, value interface{}) bool {
req := newWatchRequest(key.(string))
if err = conn.writeRequest(w, req); err != nil {
return false
}
return true
})

if err != nil {
return fmt.Errorf("unable to register watch: %w", err)
}

// Only if connected and fully initialized.
conn.lockShards()
conn.c = connection
atomic.StoreUint32(&conn.state, connConnected)
Expand Down Expand Up @@ -581,23 +608,33 @@ func pack(h *smallWBuf, enc *encoder, reqid uint32,
return
}

func (conn *Connection) writeAuthRequest(w *bufio.Writer, scramble []byte) (err error) {
func (conn *Connection) writeRequest(w *bufio.Writer, req Request) (err error) {
var packet smallWBuf
req := newAuthRequest(conn.opts.User, string(scramble))
err = pack(&packet, newEncoder(&packet), 0, req, ignoreStreamId, conn.Schema)

if err != nil {
return errors.New("auth: pack error " + err.Error())
return fmt.Errorf("pack error %w", err)
}
if err := write(w, packet.b); err != nil {
return errors.New("auth: write error " + err.Error())
return fmt.Errorf("write error %w", err)
}
if err = w.Flush(); err != nil {
return errors.New("auth: flush error " + err.Error())
return fmt.Errorf("flush error %w", err)
}
return
}

func (conn *Connection) writeAuthRequest(w *bufio.Writer, scramble []byte) (err error) {
req := newAuthRequest(conn.opts.User, string(scramble))

err = conn.writeRequest(w, req)
if err != nil {
return fmt.Errorf("auth: %w", err)
}

return nil
}

func (conn *Connection) readAuthResponse(r io.Reader) (err error) {
respBytes, err := conn.read(r)
if err != nil {
Expand Down Expand Up @@ -774,7 +811,50 @@ func (conn *Connection) writer(w *bufio.Writer, c net.Conn) {
}
}

func readWatchEvent(reader io.Reader) (connWatchEvent, error) {
keyExist := false
event := connWatchEvent{}
d := newDecoder(reader)

if l, err := d.DecodeMapLen(); err == nil {
for ; l > 0; l-- {
if cd, err := d.DecodeInt(); err == nil {
switch cd {
case KeyEvent:
if event.key, err = d.DecodeString(); err != nil {
return event, err
}
keyExist = true
case KeyEventData:
if event.value, err = d.DecodeInterface(); err != nil {
return event, err
}
default:
if err = d.Skip(); err != nil {
return event, err
}
}
} else {
return event, err
}
}
} else {
return event, err
}

if !keyExist {
return event, errors.New("watch event does not have a key")
}

return event, nil
}

func (conn *Connection) reader(r *bufio.Reader, c net.Conn) {
events := make(chan connWatchEvent, 1024)
defer close(events)

go conn.eventer(events)

for atomic.LoadUint32(&conn.state) != connClosed {
respBytes, err := conn.read(r)
if err != nil {
Expand All @@ -789,7 +869,14 @@ func (conn *Connection) reader(r *bufio.Reader, c net.Conn) {
}

var fut *Future = nil
if resp.Code == PushCode {
if resp.Code == EventCode {
if event, err := readWatchEvent(&resp.buf); err == nil {
events <- event
} else {
conn.opts.Logger.Report(LogReadWatchEventFailed, conn, err)
}
continue
} else if resp.Code == PushCode {
if fut = conn.peekFuture(resp.RequestId); fut != nil {
fut.AppendPush(resp)
}
Expand All @@ -799,12 +886,41 @@ func (conn *Connection) reader(r *bufio.Reader, c net.Conn) {
conn.markDone(fut)
}
}

if fut == nil {
conn.opts.Logger.Report(LogUnexpectedResultId, conn, resp)
}
}
}

// eventer goroutine gets watch events and updates values for watchers.
func (conn *Connection) eventer(events <-chan connWatchEvent) {
for {
event, ok := <-events
if !ok {
// The channel is closed.
break
}

if value, ok := conn.watchMap.Load(event.key); ok {
shared := value.(*watchSharedData)
state := <-shared.st
if state.changed != nil {
close(state.changed)
}
shared.st <- watchState{event.value, false, nil}

if atomic.LoadUint32(&conn.state) == connConnected {
shared.mu.Lock()
if shared.cnt > 0 {
conn.Do(newWatchRequest(event.key))
}
shared.mu.Unlock()
}
}
}
}

func (conn *Connection) newFuture(ctx context.Context) (fut *Future) {
fut = NewFuture()
if conn.rlimit != nil && conn.opts.RLimitAction == RLimitDrop {
Expand Down Expand Up @@ -960,6 +1076,18 @@ func (conn *Connection) putFuture(fut *Future, req Request, streamId uint64) {
return
}
shard.bufmut.Unlock()

if req.Async() {
if fut = conn.fetchFuture(reqid); fut != nil {
resp := &Response{
RequestId: reqid,
Code: OkCode,
}
fut.SetResponse(resp)
conn.markDone(fut)
}
}

if firstWritten {
conn.dirtyShard <- shardn
}
Expand Down Expand Up @@ -1163,3 +1291,146 @@ func (conn *Connection) NewStream() (*Stream, error) {
Conn: conn,
}, nil
}

type watchState struct {
value interface{}
init bool
changed chan struct{}
}

// watchSharedData is a shared between watchers of some key.
type watchSharedData struct {
// st is the current state of the watcher. See the idea at page 70, 105:
// https://drive.google.com/file/d/1nPdvhB0PutEJzdCq5ms6UI58dp50fcAN/view
st chan watchState

// cnt is a number of active watchers.
cnt int32
// mu helps to send IPROTO_WATCH/IPROTO_UNWATCH without duplicates
// and intersections.
mu sync.Mutex
}

// connWatcher is an internal implementation of the Watcher interface.
type connWatcher struct {
shared *watchSharedData
unregister sync.Once
done chan struct{}
finished chan struct{}
}

// Unregister unregisters the connection watcher.
func (w *connWatcher) Unregister() {
w.unregister.Do(func() {
close(w.done)
})
<-w.finished
}

// NewWatcher creates a new Watcher object for the connection.
//
// After watcher creation, the watcher callback is invoked for the first time.
// In this case, the callback is triggered whether or not the key has already
// been broadcast. All subsequent invocations are triggered with
// box.broadcast() called on the remote host. If a watcher is subscribed for a
// key that has not been broadcast yet, the callback is triggered only once,
// after the registration of the watcher.
//
// The watcher callbacks are always invoked in a separate goroutine. A watcher
// callback is never executed in parallel with itself, but they can be executed
// in parallel to other watchers.
//
// If the key is updated while the watcher callback is running, the callback
// will be invoked again with the latest value as soon as it returns.
//
// Watchers survive reconnection. All registered watchers are automatically
// resubscribed when the connection is reestablished.
//
// Keep in mind that garbage collection of a watcher handle doesn’t lead to the
// watcher’s destruction. In this case, the watcher remains registered. You
// need to call Unregister() directly.
//
// Unregister() guarantees that there will be no the watcher's callback calls
// after it, but Unregister() call from the callback leads to a deadlock.
//
// See:
// https://www.tarantool.io/en/doc/latest/reference/reference_lua/box_events/#box-watchers
//
// Since 1.10.0
func (conn *Connection) NewWatcher(key string, callback WatchCallback) (Watcher, error) {
// TODO: check required features after:
//
// https://github.com/tarantool/go-tarantool/issues/120
var shared *watchSharedData
// Get or create a shared data for the key.
if val, ok := conn.watchMap.Load(key); !ok {
shared = &watchSharedData{
st: make(chan watchState, 1),
cnt: 0,
}
shared.st <- watchState{nil, true, nil}

if val, ok := conn.watchMap.LoadOrStore(key, shared); ok {
shared = val.(*watchSharedData)
}
} else {
shared = val.(*watchSharedData)
}

// Send an initial watch request.
shared.mu.Lock()
if shared.cnt == 0 {
<-shared.st
shared.st <- watchState{nil, true, nil}

if _, err := conn.Do(newWatchRequest(key)).Get(); err != nil {
shared.mu.Unlock()
return nil, err
}
}
shared.cnt += 1
shared.mu.Unlock()

// Start the watcher goroutine.
done := make(chan struct{})
finished := make(chan struct{})

go func() {
for {
state := <-shared.st
if state.changed == nil {
state.changed = make(chan struct{})
}
shared.st <- state

if !state.init {
callback(WatchEvent{
Conn: conn,
Key: key,
Value: state.value,
})
}

select {
case <-done:
shared.mu.Lock()
shared.cnt -= 1
if shared.cnt == 0 {
// A last one sends IPROTO_UNWATCH.
conn.Do(newUnwatchRequest(key))
}
shared.mu.Unlock()

close(finished)
return
case <-state.changed:
}
}
}()

return &connWatcher{
shared: shared,
done: done,
finished: finished,
}, nil
}
Loading

0 comments on commit 401de6d

Please sign in to comment.