-
Notifications
You must be signed in to change notification settings - Fork 12
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[Low prio] Add Watch() method to watch pubsub updates #3
base: master
Are you sure you want to change the base?
Conversation
This method returns a channel on which all pubsub updates for a key are sent. Channels are closed when Cancel() is called on a key. Updates are sent to all channels which have registered for a key (one per call to Watch()).
watchChannels.mux.RLock() | ||
p.watchMux.RUnlock() | ||
|
||
defer watchChannels.mux.RUnlock() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I could be wrong but there is no difference between deferring the RUnlock here as opposed to right under the call to RLock? And it would be clearer if the defer was right after the lock. For example:
watchChannels.mux.RLock()
defer watchChannels.mux.RUnlock()
p.watchMux.RUnlock()
@@ -138,6 +147,23 @@ func (p *PubsubValueStore) Subscribe(key string) error { | |||
return nil | |||
} | |||
|
|||
func (p *PubsubValueStore) Watch(key string) <-chan []byte { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
godoc?
This looks related to: That's my attempt at a value-store independent version of this. I started implementing it for the DHT but decided to punt as we had more pressing issues.
|
defer watchChannels.mux.RUnlock() | ||
for _, ch := range watchChannels.channels { | ||
select { | ||
case ch <- data: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'd consider:
- Adding a buffer size of 1.
- Replacing the buffered value in this case.
That is,
select {
case ch <- data:
case <-ch:
ch <- data
}
That way, the user always gets the latest value.
Well yeah, this is basically an implementation of that interface. However it looks weird that with that interface it's not possible to control subscriptions so a Also seems options are unused, but we'd have to think what happens when All that said, the interface makes sense for what it is (a KeyValueStore user should not have to worry how things are working below, whether subscribing or something else). And we can always adopt the interface signature for |
Playing here a bit.. I would like to have a tool that I can use to watch IPNS-pubsub updates without having to run ipfs, just by running a libp2p node connected to the swarm. It seemed that this would help.
This method returns a channel on which all pubsub updates for a key are sent.
Channels are closed when Cancel() is called on a key.
Updates are sent to all channels which have registered for a key (one per call
to Watch()).