Skip to content

Commit

Permalink
api: add events subscription support
Browse files Browse the repository at this point in the history
A user can create watcher by the Connection.NewWatcher() call:

    watcher = conn.NewWatcker("key", func(event WatchEvent) {
        // The callback code.
    })

After that, the watcher callback is invoked for the first time. In
this case, the callback is triggered whether or not the key has
already been broadcast. All subsequent invocations are triggered with
box.broadcast() called on the remote host. If a watcher is subscribed
for a key that has not been broadcast yet, the callback is triggered
only once, after the registration of the watcher.

If the key is updated while the watcher callback is running, the
callback will be invoked again with the latest value as soon as it
returns.

Multiple watchers can be created for one key.

If you don’t need the watcher anymore, you can unregister it using
the Unregister method:

    watcher.Unregister()

The api is similar to net.box implementation [1].

It also adds a BroadcastRequest to make it easier to send broadcast
messages.

1. https://www.tarantool.io/en/doc/latest/reference/reference_lua/net_box/#conn-watch

Closes #119
  • Loading branch information
oleg-jukovec committed Nov 30, 2022
1 parent 1d69898 commit e88cf42
Show file tree
Hide file tree
Showing 21 changed files with 1,569 additions and 70 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ Versioning](http://semver.org/spec/v2.0.0.html) except to the first release.
### Added

- Support iproto feature discovery (#120).
- Event subscription support (#119)

### Changed

Expand Down
294 changes: 291 additions & 3 deletions connection.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,8 @@ const (
// LogUnexpectedResultId is logged when response with unknown id was received.
// Most probably it is due to request timeout.
LogUnexpectedResultId
// LogReadWatchEventFailed is logged when failed to read a watch event.
LogReadWatchEventFailed
)

// ConnEvent is sent throw Notify channel specified in Opts.
Expand All @@ -63,6 +65,12 @@ type ConnEvent struct {
When time.Time
}

// A raw watch event.
type connWatchEvent struct {
key string
value interface{}
}

var epoch = time.Now()

// Logger is logger type expected to be passed in options.
Expand All @@ -84,6 +92,9 @@ func (d defaultLogger) Report(event ConnLogKind, conn *Connection, v ...interfac
case LogUnexpectedResultId:
resp := v[0].(*Response)
log.Printf("tarantool: connection %s got unexpected resultId (%d) in response", conn.addr, resp.RequestId)
case LogReadWatchEventFailed:
err := v[0].(error)
log.Printf("tarantool: unable to parse watch event: %s", err)
default:
args := append([]interface{}{"tarantool: unexpected event ", event, conn}, v...)
log.Print(args...)
Expand Down Expand Up @@ -149,6 +160,8 @@ type Connection struct {
lastStreamId uint64

serverProtocolInfo ProtocolInfo
// watchMap is a map of key -> watchSharedData.
watchMap sync.Map
}

var _ = Connector(&Connection{}) // Check compatibility with connector interface.
Expand Down Expand Up @@ -531,7 +544,7 @@ func (conn *Connection) dial() (err error) {
return fmt.Errorf("identify: %w", err)
}

// Auth
// Auth.
if opts.User != "" {
scr, err := scramble(conn.Greeting.auth, opts.Pass)
if err != nil {
Expand All @@ -549,7 +562,34 @@ func (conn *Connection) dial() (err error) {
}
}

// Only if connected and authenticated.
// Watchers.
watchersChecked := false
conn.watchMap.Range(func(key, value interface{}) bool {
if !watchersChecked {

watchersChecked = true
}

st := value.(chan watchState)
state := <-st
if state.cnt > 0 {
req := newWatchRequest(key.(string))
if err = conn.writeRequest(w, req); err != nil {
st <- state
return false
}
state.init = true
state.ack = true
}
st <- state
return true
})

if err != nil {
return fmt.Errorf("unable to register watch: %w", err)
}

// Only if connected and fully initialized.
conn.lockShards()
conn.c = connection
atomic.StoreUint32(&conn.state, connConnected)
Expand Down Expand Up @@ -843,7 +883,50 @@ func (conn *Connection) writer(w *bufio.Writer, c net.Conn) {
}
}

func readWatchEvent(reader io.Reader) (connWatchEvent, error) {
keyExist := false
event := connWatchEvent{}
d := newDecoder(reader)

if l, err := d.DecodeMapLen(); err == nil {
for ; l > 0; l-- {
if cd, err := d.DecodeInt(); err == nil {
switch cd {
case KeyEvent:
if event.key, err = d.DecodeString(); err != nil {
return event, err
}
keyExist = true
case KeyEventData:
if event.value, err = d.DecodeInterface(); err != nil {
return event, err
}
default:
if err = d.Skip(); err != nil {
return event, err
}
}
} else {
return event, err
}
}
} else {
return event, err
}

if !keyExist {
return event, errors.New("watch event does not have a key")
}

return event, nil
}

func (conn *Connection) reader(r *bufio.Reader, c net.Conn) {
events := make(chan connWatchEvent, 1024)
defer close(events)

go conn.eventer(events)

for atomic.LoadUint32(&conn.state) != connClosed {
respBytes, err := conn.read(r)
if err != nil {
Expand All @@ -858,7 +941,14 @@ func (conn *Connection) reader(r *bufio.Reader, c net.Conn) {
}

var fut *Future = nil
if resp.Code == PushCode {
if resp.Code == EventCode {
if event, err := readWatchEvent(&resp.buf); err == nil {
events <- event
} else {
conn.opts.Logger.Report(LogReadWatchEventFailed, conn, err)
}
continue
} else if resp.Code == PushCode {
if fut = conn.peekFuture(resp.RequestId); fut != nil {
fut.AppendPush(resp)
}
Expand All @@ -868,12 +958,37 @@ func (conn *Connection) reader(r *bufio.Reader, c net.Conn) {
conn.markDone(fut)
}
}

if fut == nil {
conn.opts.Logger.Report(LogUnexpectedResultId, conn, resp)
}
}
}

// eventer goroutine gets watch events and updates values for watchers.
func (conn *Connection) eventer(events <-chan connWatchEvent) {
for {
event, ok := <-events
if !ok {
// The channel is closed.
break
}

if value, ok := conn.watchMap.Load(event.key); ok {
st := value.(chan watchState)
state := <-st
state.value = event.value
state.init = false
state.ack = false
if state.changed != nil {
close(state.changed)
state.changed = nil
}
st <- state
}
}
}

func (conn *Connection) newFuture(ctx context.Context) (fut *Future) {
fut = NewFuture()
if conn.rlimit != nil && conn.opts.RLimitAction == RLimitDrop {
Expand Down Expand Up @@ -1029,6 +1144,18 @@ func (conn *Connection) putFuture(fut *Future, req Request, streamId uint64) {
return
}
shard.bufmut.Unlock()

if req.Async() {
if fut = conn.fetchFuture(reqid); fut != nil {
resp := &Response{
RequestId: reqid,
Code: OkCode,
}
fut.SetResponse(resp)
conn.markDone(fut)
}
}

if firstWritten {
conn.dirtyShard <- shardn
}
Expand Down Expand Up @@ -1233,6 +1360,167 @@ func (conn *Connection) NewStream() (*Stream, error) {
}, nil
}

// watchState is the current state of the watcher. See the idea at p. 70, 105:
// https://drive.google.com/file/d/1nPdvhB0PutEJzdCq5ms6UI58dp50fcAN/view
type watchState struct {
// value is a current value.
value interface{}
// init is true if it is an initial state (no events received).
init bool
// ack true if the acknowledge is already sended.
ack bool
// cnt is a count of active watchers for the key.
cnt int
// changed is a channel for broadcast the value changes.
changed chan struct{}
}

// connWatcher is an internal implementation of the Watcher interface.
type connWatcher struct {
unregister sync.Once
done chan struct{}
finished chan struct{}
}

// Unregister unregisters the connection watcher.
func (w *connWatcher) Unregister() {
w.unregister.Do(func() {
close(w.done)
})
<-w.finished
}

// NewWatcher creates a new Watcher object for the connection.
//
// After watcher creation, the watcher callback is invoked for the first time.
// In this case, the callback is triggered whether or not the key has already
// been broadcast. All subsequent invocations are triggered with
// box.broadcast() called on the remote host. If a watcher is subscribed for a
// key that has not been broadcast yet, the callback is triggered only once,
// after the registration of the watcher.
//
// The watcher callbacks are always invoked in a separate goroutine. A watcher
// callback is never executed in parallel with itself, but they can be executed
// in parallel to other watchers.
//
// If the key is updated while the watcher callback is running, the callback
// will be invoked again with the latest value as soon as it returns.
//
// Watchers survive reconnection. All registered watchers are automatically
// resubscribed when the connection is reestablished.
//
// Keep in mind that garbage collection of a watcher handle doesn’t lead to the
// watcher’s destruction. In this case, the watcher remains registered. You
// need to call Unregister() directly.
//
// Unregister() guarantees that there will be no the watcher's callback calls
// after it, but Unregister() call from the callback leads to a deadlock.
//
// See:
// https://www.tarantool.io/en/doc/latest/reference/reference_lua/box_events/#box-watchers
//
// Since 1.10.0
func (conn *Connection) NewWatcher(key string, callback WatchCallback) (Watcher, error) {
watchersSupported := false
for _, feature := range conn.ServerProtocolInfo().Features {
if feature == WatchersFeature {
watchersSupported = true
break
}
}
if !watchersSupported {
return nil, errors.New("watchers does not supported by a server")
}

var st chan watchState
// Get or create a shared data for the key.
if val, ok := conn.watchMap.Load(key); !ok {
st = make(chan watchState, 1)
st <- watchState{
value: nil,
init: true,
ack: false,
cnt: 0,
changed: nil,
}

if val, ok := conn.watchMap.LoadOrStore(key, st); ok {
close(st)
st = val.(chan watchState)
}
} else {
st = val.(chan watchState)
}

state := <-st
// Send an initial watch request if needed.
if state.cnt == 0 {
if _, err := conn.Do(newWatchRequest(key)).Get(); err != nil {
st <- state
return nil, err
}
state.init = true
state.ack = true
}
state.cnt += 1
st <- state

// Start the watcher goroutine.
done := make(chan struct{})
finished := make(chan struct{})

go func() {
for {
state := <-st
if state.changed == nil {
state.changed = make(chan struct{})
}
st <- state

if !state.init {
callback(WatchEvent{
Conn: conn,
Key: key,
Value: state.value,
})

// Acknowledge the notification.
state = <-st
ack := state.ack
state.ack = true
st <- state

if !ack {
conn.Do(newWatchRequest(key)).Get()
// We expect a reconnect and re-subscribe if it fails to
// send the watch request. So it looks ok do not check a
// result.
}
}

select {
case <-done:
state := <-st
state.cnt -= 1
if state.cnt == 0 {
// The last one sends IPROTO_UNWATCH.
conn.Do(newUnwatchRequest(key)).Get()
}
st <- state

close(finished)
return
case <-state.changed:
}
}
}()

return &connWatcher{
done: done,
finished: finished,
}, nil
}

// checkProtocolInfo checks that expected protocol version is
// and protocol features are supported.
func checkProtocolInfo(expected ProtocolInfo, actual ProtocolInfo) error {
Expand Down
Loading

0 comments on commit e88cf42

Please sign in to comment.