Skip to content

Commit

Permalink
Implement SearchValue
Browse files Browse the repository at this point in the history
  • Loading branch information
magik6k committed Aug 31, 2018
1 parent 6514c3b commit 0126083
Show file tree
Hide file tree
Showing 2 changed files with 102 additions and 2 deletions.
4 changes: 2 additions & 2 deletions package.json
Original file line number Diff line number Diff line change
Expand Up @@ -64,9 +64,9 @@
"version": "4.1.6"
},
{
"hash": "QmS4niovD1U6pRjUBXivr1zvvLBqiTKbERjFo994JU7oQS",
"hash": "QmNMiBu1AaojCGVqPp1pibwhHq4zTuvgrHXEi2dH2AgEuo",
"name": "go-libp2p-routing",
"version": "2.4.9"
"version": "2.5.0"
},
{
"author": "Stebalien",
Expand Down
100 changes: 100 additions & 0 deletions pubsub.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,14 @@ import (

var log = logging.Logger("pubsub-valuestore")

type watchGroup struct {
lk sync.RWMutex
closing chan struct{}

listeners map[int64]chan<- []byte
n int64
}

type PubsubValueStore struct {
ctx context.Context
ds ds.Datastore
Expand All @@ -36,6 +44,9 @@ type PubsubValueStore struct {
mx sync.Mutex
subs map[string]*floodsub.Subscription

watchLk sync.RWMutex
watching map[string]*watchGroup

Validator record.Validator
}

Expand Down Expand Up @@ -163,6 +174,37 @@ func (p *PubsubValueStore) GetValue(ctx context.Context, key string, opts ...rop
return p.getLocal(key)
}

func (p *PubsubValueStore) SearchValue(ctx context.Context, key string, opts ...ropts.Option) (<-chan []byte, error) {
if err := p.Subscribe(key); err != nil {
return nil, err
}

p.watchLk.Lock()
wg, ok := p.watching[key]
if !ok {
wg = &watchGroup{
closing: make(chan struct{}),
listeners: map[int64]chan<- []byte{},
}
}
// Lock searchgroup before checking local storage so we don't miss updates
wg.lk.Lock()
p.watchLk.Unlock()

out := make(chan []byte, 1)
lv, err := p.getLocal(key)
if err == nil {
out <- lv
}

p.watchLk.Lock()
wg.add(ctx, out)
p.watchLk.Lock()
wg.lk.Unlock()

return out, nil
}

// GetSubscriptions retrieves a list of active topic subscriptions
func (p *PubsubValueStore) GetSubscriptions() []string {
p.mx.Lock()
Expand All @@ -188,6 +230,13 @@ func (p *PubsubValueStore) Cancel(name string) bool {
delete(p.subs, name)
}

p.watchLk.Lock()
if wg, ok := p.watching[name]; ok {
close(wg.closing)
delete(p.watching, name)
}
p.watchLk.Unlock()

return ok
}

Expand All @@ -208,10 +257,26 @@ func (p *PubsubValueStore) handleSubscription(sub *floodsub.Subscription, key st
if err != nil {
log.Warningf("PubsubResolve: error writing update for %s: %s", key, err)
}
p.notifyWatchers(key, msg.GetData())
}
}
}

func (p *PubsubValueStore) notifyWatchers(key string, data []byte) {
p.watchLk.RLock()
sg, ok := p.watching[key]
p.watchLk.RUnlock()
if !ok {
return
}

sg.lk.RLock()
for _, watcher := range sg.listeners {
watcher <- data
}
sg.lk.RUnlock()
}

// rendezvous with peers in the name topic through provider records
// Note: rendezvous/boostrap should really be handled by the pubsub implementation itself!
func bootstrapPubsub(ctx context.Context, cr routing.ContentRouting, host p2phost.Host, name string) {
Expand Down Expand Up @@ -268,3 +333,38 @@ func bootstrapPubsub(ctx context.Context, cr routing.ContentRouting, host p2phos

wg.Wait()
}

func (sg *watchGroup) add(ctx context.Context, outCh chan []byte) {
proxy := make(chan []byte, 1)
n := sg.n
sg.listeners[n] = proxy
sg.n++

go func() {
defer close(outCh)
defer func() {
sg.lk.Lock()
delete(sg.listeners, n)
//TODO: searchgroup GC?
sg.lk.Unlock()
}()

select {
case val, ok := <-proxy:
if !ok {
return
}

// outCh is buffered, so we just put the value or swap it for the newer one
select {
case outCh <- val:
case <-outCh:
outCh <- val
}
case <-sg.closing:
return
case <-ctx.Done():
return
}
}()
}

0 comments on commit 0126083

Please sign in to comment.