-
Notifications
You must be signed in to change notification settings - Fork 12
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Implement SearchValue #7
Conversation
0126083
to
b85d598
Compare
Tell me when you want a review. |
b85d598
to
e59ef5e
Compare
cb9dce6
to
2822c1c
Compare
pubsub.go
Outdated
lk sync.RWMutex | ||
closing chan struct{} | ||
|
||
listeners map[int64]chan<- []byte |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We can probably just make this a map[chan<- []byte]struct{}
. Then we don't need to give each listener an ID.
pubsub.go
Outdated
defer func() { | ||
wg.lk.Lock() | ||
delete(wg.listeners, n) | ||
//TODO: watchgroup GC? |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Might as well. Just if len(wg.listeners) == 0 { /* lock and remove watch group */ }
.
pubsub.go
Outdated
go func() { | ||
defer close(outCh) | ||
defer func() { | ||
wg.lk.Lock() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There's a deadlock here if notifyWatchers
is trying to write to our channel (it can write twice in a row so the buffer doesn't help). Unfortunately, the watcher <- data
write may also have to select on the context (or at least the context cancellation channel). The map will probably need to be map[chan <-[]byte]context.Context
.
pubsub.go
Outdated
|
||
sg.lk.RLock() | ||
for _, watcher := range sg.listeners { | ||
watcher <- data |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This also (in addition to the context) needs to select on sg.closing
as we're not holding the watchLk. Currently, sg.closing
can be closed, making all the listeners exit, and we'll hang here forever.
Wow... this is a lot tricker than I thought it would be. |
pubsub.go
Outdated
|
||
go func() { | ||
ctx, cancel := context.WithCancel(ctx) | ||
wg.listeners[outCh] = ctx |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think this needs to happen outside the goroutine (while we're holding the lock).
I've replaced one of the locks with a channel+managing goroutine, opinions? |
pubsub.go
Outdated
closing chan struct{} | ||
|
||
register chan watcher | ||
unregister chan []chan<- []byte |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'd rather use a struct for these, even if it's an anonymous one (to help newcomers to this code).
(assuming we need multiple channels)
pubsub.go
Outdated
ctx, cancel := context.WithCancel(ctx) | ||
wg.register <- watcher{ctx: ctx, out: proxy} | ||
|
||
// we wait for ack from watchgroup manager, essentially transferring our lock |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We really shouldn't do this. Implicitly transferring a lock is guaranteed to trip someone up down the road.
At this point, we effectively have one big lock anyways (as far as I can tell), we might as well just use it and drop the watch manager.
291417b
to
fe784ca
Compare
Done |
fe784ca
to
3b3c7ee
Compare
p.watchLk.Lock() | ||
if _, wok := p.watching[name]; wok { | ||
p.watchLk.Unlock() | ||
return false, errors.New("key has active subscriptions") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
IMO, "cancel" should just cancel the passive subscription (keeping the active ones running, not returning an error). However, we can change that behavior later.
pubsub.go
Outdated
proxy := make(chan []byte, 1) | ||
|
||
ctx, cancel := context.WithCancel(ctx) | ||
wg.listeners[proxy] = ctx |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nit: Now that we're doing the take/replace dance with the proxy channel, we don't really need to store the context anywhere (we also don't need to wrap with a cancel). We only needed it to avoid hanging while writing to the proxy channel.
However, it doesn't hurt to do this so it's up to you.
9b4561f
to
675d272
Compare
Part of libp2p/go-libp2p-routing#25
Replaces #3