Skip to content

Commit

Permalink
search value: another sync approach
Browse files Browse the repository at this point in the history
  • Loading branch information
magik6k committed Sep 23, 2018
1 parent 7365dc9 commit fe784ca
Show file tree
Hide file tree
Showing 2 changed files with 69 additions and 79 deletions.
143 changes: 64 additions & 79 deletions pubsub.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package namesys
import (
"bytes"
"context"
"errors"
"sync"
"time"

Expand All @@ -23,10 +24,9 @@ import (
var log = logging.Logger("pubsub-valuestore")

type watchGroup struct {
lk sync.RWMutex
closing chan struct{}

listeners map[chan<- []byte]context.Context
listeners map[chan []byte]context.Context
}

type PubsubValueStore struct {
Expand All @@ -43,7 +43,7 @@ type PubsubValueStore struct {
mx sync.Mutex
subs map[string]*floodsub.Subscription

watchLk sync.RWMutex
watchLk sync.Mutex
watching map[string]*watchGroup

Validator record.Validator
Expand Down Expand Up @@ -182,29 +182,63 @@ func (p *PubsubValueStore) SearchValue(ctx context.Context, key string, opts ...
return nil, err
}

out := make(chan []byte, 1)
lv, err := p.getLocal(key)
if err == nil {
out <- lv
}

p.watchLk.Lock()
wg, ok := p.watching[key]
if !ok {
wg = &watchGroup{
closing: make(chan struct{}),
listeners: map[chan<- []byte]context.Context{},
listeners: map[chan []byte]context.Context{},
}
p.watching[key] = wg
}
// Lock searchgroup before checking local storage so we don't miss updates
wg.lk.Lock()
p.watchLk.Unlock()

out := make(chan []byte, 1)
lv, err := p.getLocal(key)
if err == nil {
out <- lv
}
proxy := make(chan []byte, 1)

ctx, cancel := context.WithCancel(ctx)
wg.listeners[proxy] = ctx

go func() {
defer func() {
cancel()

p.watchLk.Lock()
delete(wg.listeners, proxy)

if _, ok := p.watching[key]; len(wg.listeners) == 0 && ok {
close(wg.closing)
delete(p.watching, key)
}
p.watchLk.Unlock()

close(out)
}()

for {
select {
case val, ok := <-proxy:
if !ok {
return
}

// outCh is buffered, so we just put the value or swap it for the newer one
select {
case out <- val:
case <-out:
out <- val
}
case <-ctx.Done():
return
}
}
}()

p.watchLk.Lock()
wg.add(ctx, p, key, out)
p.watchLk.Unlock()
wg.lk.Unlock()

return out, nil
}
Expand All @@ -224,24 +258,24 @@ func (p *PubsubValueStore) GetSubscriptions() []string {

// Cancel cancels a topic subscription; returns true if an active
// subscription was canceled
func (p *PubsubValueStore) Cancel(name string) bool {
func (p *PubsubValueStore) Cancel(name string) (bool, error) {
p.mx.Lock()
defer p.mx.Unlock()

p.watchLk.Lock()
if _, wok := p.watching[name]; wok {
p.watchLk.Unlock()
return false, errors.New("key has active subscriptions")
}
p.watchLk.Unlock()

sub, ok := p.subs[name]
if ok {
sub.Cancel()
delete(p.subs, name)
}

p.watchLk.Lock()
if wg, wok := p.watching[name]; wok {
close(wg.closing)
delete(p.watching, name)
}
p.watchLk.Unlock()

return ok
return ok, nil
}

func (p *PubsubValueStore) handleSubscription(sub *floodsub.Subscription, key string, cancel func()) {
Expand All @@ -267,23 +301,20 @@ func (p *PubsubValueStore) handleSubscription(sub *floodsub.Subscription, key st
}

func (p *PubsubValueStore) notifyWatchers(key string, data []byte) {
p.watchLk.RLock()
p.watchLk.Lock()
defer p.watchLk.Unlock()
sg, ok := p.watching[key]
p.watchLk.RUnlock()
if !ok {
return
}

sg.lk.RLock()
for watcher, ctx := range sg.listeners {
for watcher := range sg.listeners {
select {
case watcher <- data:
case <-sg.closing:
break
case <-ctx.Done():
case <-watcher:
watcher <- data
case watcher <- data:
}
}
sg.lk.RUnlock()
}

// rendezvous with peers in the name topic through provider records
Expand Down Expand Up @@ -342,49 +373,3 @@ func bootstrapPubsub(ctx context.Context, cr routing.ContentRouting, host p2phos

wg.Wait()
}

func (wg *watchGroup) add(ctx context.Context, p *PubsubValueStore, key string, outCh chan []byte) {
proxy := make(chan []byte, 1)

go func() {
ctx, cancel := context.WithCancel(ctx)
wg.listeners[outCh] = ctx

defer func() {
cancel()

wg.lk.Lock()
delete(wg.listeners, outCh)

//cleanup empty watchgroups
p.watchLk.Lock()
if _, ok := p.watching[key]; len(wg.listeners) == 0 && ok {
close(wg.closing)
delete(p.watching, key)
}
p.watchLk.Unlock()

close(outCh)
}()

for {
select {
case val, ok := <-proxy:
if !ok {
return
}

// outCh is buffered, so we just put the value or swap it for the newer one
select {
case outCh <- val:
case <-outCh:
outCh <- val
}
case <-wg.closing:
return
case <-ctx.Done():
return
}
}
}()
}
5 changes: 5 additions & 0 deletions pubsub_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -234,6 +234,11 @@ func TestWatch(t *testing.T) {
t.Fatal(err)
}

_, err = vss[1].Cancel(key)
if err.Error() != "key has active subscriptions" {
t.Fatal("cancel should have failed")
}

v = string(<-ch)
if v != "valid for key 2" {
t.Errorf("got unexpected value: %s", v)
Expand Down

0 comments on commit fe784ca

Please sign in to comment.