-
Notifications
You must be signed in to change notification settings - Fork 188
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Added subscription specific configurations to PubSub #171
Conversation
Added simple configurability to gossipsub Added a gossipsub last-writer-wins persistence configuration
Hey @aschmahmann, thanks for the contribution! 🎉 Could you link to an issue where the need and rationale behind this was discussed? Thanks! |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Some very passing comments from first reading.
What's the rationale for these changes?
gossipsub.go
Outdated
for _, topic := range msg.GetTopicIDs() { | ||
config, ok := gs.topicConfigs[topic] |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
that's just backwards; can't have a config
object handling hte most important function of the router!
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Isn't that the whole premise of every inversion of control system? We could create new routers and pubsub instances for every configuration option instead of using one pubsub instance.
However, that means
- having multiple heartbeat tickers running at the same time
- not reusing any of the gossipsub structures across implementations
- ending up with weird multiple tenancy issues unless we change PubSub more. For example, if right now you create a FloodSub and GossipSub instance on the same node you end up with overly excessive message passing because GossipSub is backwards compatible with FloodSub and will handle both messages. Either we need one router to handle multiple configurations or we need to make PubSub able to handle backwards compatible protocols, this way involves less protocol work.
cancelCh chan<- *Subscription | ||
err error | ||
topic string | ||
topicProtocol protocol.ID |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't think this belongs here.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I disagree, there is in PubSub a declaration type SubOpt func(sub *Subscription) error
that is currently almost unusable because Subscription
has no fields to work with.
I assume that pattern taken with both Option
(for PubSub) and SubOpt
(for Subscriptions) is designed to allow adding fields to the struct, and functionality as option wrappers, without breaking the struct contract/interface. Is that incorrect, if so how is SubOpt
supposed to be used and where would you put the relevant topic/Subscription information?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
gossipsub.go
Outdated
for p := range tmap { | ||
if gs.peers[p] == FloodSubID { | ||
tosend[p] = struct{}{} | ||
if withFloodSubPeers{ |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The various configurations that I've implemented so far could likely all be done in floodsub, episub, etc. Two caveats are 1) we'd need to allow the layer above the router implementations to have more access to the internals (e.g. which peers I'm syncing/sending messages to for a given topic, some control over limiting or manually triggering a broadcast) 2) The existing less performant implementations (e.g. floodsub) would do extremely poorly for some of these configuration options (e.g. persistence) anyway.
Additionally, in future implementations of say episub the configs used by gossipsub should be reusable.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Given that we're ok breaking the router implementations/interfaces a bit do you think I should try and setup some of the configurations (e.g. persistence) to work across all router implementations or is leaving the configurations as something to be plugged into future routers (e.g. episub) sufficient?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
that's fine, although I don't know how useful it will be for floodsub.
gossipconfig.go
Outdated
Put(msg *pb.Message) | ||
} | ||
|
||
type ClassicGossipSubConfiguration struct { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Up for a better name if you've got one
Just a note: you are changing very carefully thought-out code, don't go doing stuff willy-nilly! |
Nothing here has been done "willy-nilly". Everything I've done so far has been targeted at making sure "user space" around PubSub and gossipsub has been minimally affected (which is true for anyone using the GossipSub, FloodSub, ... constructors), while at the same time allowing for changes that are needed by consumers of PubSub (e.g. IPNS). I see from some of your comments that we're actually ok with some breaking changes within the PubSub system which should certainly help clean up the code. I respect that a lot of thought went into the PubSub system and is part of why I'd like to enable users to tinker with it from user space as much as possible so they don't end up trying to copy-paste or fork pubsub. I think it'd be great to have some document discussing how we'd like to enable people to extend PubSub and also consider whether the configuration options I'm putting in are sufficient or just sufficient for the cases that I'm currently considering. |
@aschmahmann could you open an RFC discussion in libp2p/specs? Please outline context, motivation, rationale, goals, etc. Then link to this PR as a sample implementation. Inferring all that from various issues is less than ideal and is subject to bias and interpretation. Also, not all libp2p implementers watch all go repos, so we elevate protocol discussions to the specs repo. libp2p is a specs-driven project so any changes introduced here would have to be preambled by changes in specs. |
@raulk There are 3 potential specs I could add, and would like some advice on which ones to go with. There's: 1) PubSub patch 2) GossipSub patch 3) Something about Last-Writer-Wins persistence
|
@aschmahmann I would start with an issue in libp2p/specs theorising context (what is the current situation), description of change (what you want to change), motivation/rationale (why you want to change it), proposed design (how you want to change it), proposed next steps. Then the community can gauge if this is something of interest across implementations, and can advice how to best integrate it. |
…d be re-emitted to (i.e. not initial message propagation, but rebroadcasting messages) Moved configuration changes to gossipsub into relevant interfaces instead of Options (makes code much cleaner at the expense of breaking the interfaces)
@vyzo while PubSub's |
what options do you want to pass to |
For now likely just WithProtocol(proto protocol.ID) as I am with Subscribe |
that won't work, the protocol ID is a property of the stream. |
Yes, it is a property of the stream. In Master currently the way that you would setup a routing system with a different configuration (e.g. adding persistence) is by creating a new PubSubRouter with a new protocol ID. The implementation here is similar except that instead of having one PubSub instance per router I am effectively putting multiple routers into a single PubSub instance and associating each topic with a particular router. If you look at what I'm doing with |
gossipsub.go
Outdated
if !ok || ok { | ||
gs.pushGossip(p, &pb.ControlIHave{TopicID: &topic, MessageIDs: mids}) | ||
} | ||
emitPeers := config.GetEmitPeers(gs.wrapGetPeers(topic), peers) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@vyzo adding an EmittingStrategy
to the configuration was one way to allow developers to choose which peers messages will get rebroadcasted to.
You are killing gossip piggybacking with this most likely. |
Also, can we call |
I'm actually think I'm doing the opposite. If we had to make a new PubSub instance for every configuration then we'd be stuck with multiple heartbeats each sending out there own piggybacked data which means that if A and B were connected by two routers then we'd end up with two messages instead of one message with both the data from A and B piggybacked onto it. |
I don't think it encompasses all the fundamental actions of the protocol (I'd argue that graft + prune is the main component of gossipsub). However, as I don't have strong feelings about the naming here, I'm fine with |
} | ||
|
||
func (gs *GossipSubRouter) AddGossipPeers(tosend map[peer.ID]struct{}, topics []string, withFloodSubPeers bool) { | ||
for _, topic := range topics { | ||
// any peers in the topic? | ||
tmap, ok := gs.p.topics[topic] | ||
if !ok { | ||
continue |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@vyzo is it intentional that during a Publish
we do not initialize gs.fanout[topic]
if there are no peers in the topic (and we're not subscribed/there's no gs.mesh[topic]
)? By not doing this we ensure that we will not rebroadcast (via emitGossip) our published message even once we come across new peers. This is less relevant in the current implementation since the republishing window is relatively short (5 heartbeat intervals which is 500ms), however it might still be worth correcting this even for our existing implementation.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
They won't be added to fanout unless there is a new message published.
Also, the heartbeat is 1 second, so 5 intervals are 5s.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
My bad on the interval timing, was looking at heartbeat initial delay by accident. Ok, so the following sequence of events in the current implementation is intentional?
Scenario 1:
- A publishes M to GossipSub topic T, but has no peers
- B subscribes to T
- B connects to A
- Nothing happens, B never receives M (even if within the cache history period)
Scenario 2:
- A publishes M to GossipSub topic T, but has no peers
- B1...B20 subscribes to T (important that it's more than
GossipSubD=6
, as mentioned above) - B1...B20 connects to A
- Some of the B's receive M and some do not
Scenario 3:
- A publishes M to GossipSub topic T, but has no peers
- Wait 6 seconds
- B1...B20 subscribes to T
- B1...B20 connects to A
- Nothing happens, none of the B's receives M
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, that's fine.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
That psudo-use of a history buffer seems pretty suspect to me, but that's fine I can preserve that behavior and just modify the persistence version.
To clarify, I don't really see what's gained by having B not receive M in scenario 1.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It's not designed to be a reliability mechanism; it's designed to help the network connect when the mesh is not fully connected for one reason or the other.
Renamed configurations to strategies Added OnGraft function to GossipStrategies LWW strategy now requests the latest topic data during OnGraft
Closing as this is stale. |
There's still work to be done here, both in documentation and some design work. It'd be great to get some feedback on the direction this is going though.
Overall the additions are: