-
Notifications
You must be signed in to change notification settings - Fork 1.1k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
RFC: Local event bus #653
Comments
I'll try to work on this |
@magik6k if we get this implemented, and add an event for AutoNAT status changes, it would unblock DHT client/server dynamic switching and lead to a more stable DHT 👍 |
@magik6k anything i can help with? i have availability and would be interested in it as well. |
I'll start with the library, which will probably take a day or two to get to something we'll be able to start integrating into libp2p, at which point help will be nice. I've also quickly looked at existing libraries, but I didn't find anything that would match what we need. Q: what order of magnitude in events/s should I be aiming to get? I'm aiming somewhere around 100k/s without significant load on the system, will that be good enough? |
@magik6k I too didn’t find a fitting 3rd party library. 100k is pretty high for most cases, but it might be in tune for nodes that are scaled vertically (e.g. relays). I’d recommend to shoot for a best-effort implementation, and include benchmarks, so we can then decide to optimise or not. Let me know if I can clarify any part of the proposal. Thanks! |
Did some hacking on this, which resulted in libp2p/go-eventbus#1 |
That sounds a bit painful. Why can't we do this synchronously (maybe with a context?)?
What if we let the user pass in their own channel(s)? That:
func Subscribe(ctx context.Context, ch interface{}, opts...) error
We currently rely on this kind of thing for disconnect/connect events. But yeah, this is hard. |
We need a two-phase startup process. Since services are optional, we don't want subsystem X subscribing to events of type E1 in a runtime where no emitter is registered to emit E1. That might break X's assumptions. X should know if that happens, so it can fail in turn or use a fallback mechanism to derive the information it needs. We can't generate this result synchronously because we don't want to impose global ordering assumptions on how systems are gonna start and/or register with the event bus. So the two phases would be:
|
The reason I didn't propose this is because the channel's lifecycle should be controlled by the eventbus IMO. Returning a receive-only channel makes it impossible for the subscriber to close the channel and cause a system-wide panic in the eventbus. |
Could we just provide something like |
If it returns a channel where a
I wouldn't go as far as to say that (some systems do modify their behaviour if other systems are enabled), but I agree that we should not make the uncaring ones jump through hoops. So I'm favourable to finding semantics that make this opt-in, rather than compulsory. |
We could just create special events for that and use the bus to send them. sub, ic, err := bus.Subscribe(event.Ready(new(TypeToWaitFor)))
initialized, ic, err := bus.Subscribe(event.Ready(new(TypeToWaitFor)))
go func() {
time.Sleep(5 * time.Second)
ef, cf, err := bus.Emitter(new(TypeToWaitFor))
[...]
}
ok/stat/err/whatever := <-initialized // blocks for 5s
[...] The slightly tricky part here is that if the bus is already ready,
We can easily give this guarantee for single emitter emitting from one goroutine (and we can implement an option to guarantee single emitter for given type) |
...
I believe this is a separate concern and one of the reasons we need a service manager like I'm actually wondering if a global event bus is the correct abstraction. Systems like dbus attach events to specific services/properties for this reason. I could see us having some form of standard "subscribe" interface that can be implemented by services (implemented through a reusable embedded struct). However, I'm not entirely sure how that would look.
We can also just defer a recover. My concern is that casting everywhere gets old fast. However, I agree this is funky... oh generics.
SGTM (although I'm not sure if this even needs to be an option, it'll probably be true without any extra work). |
This would only be for enforcing single emitter for a given type |
I'll also see how hard / slow something like
will be to do. |
So with absolutely no optimization and just using reflection to cast types and send events to the typed channel, sends only cost around 200ns more (488ns vs 232ns), which is 2x slower, but 2M events/s is still very fast. (edit: see https://github.com/libp2p/go-eventbus/compare/feat/implement...feat/sendto?expand=1) Now I'm kind of tempted to change the interface to something like:
And change the internals to operate directly on typed channels (through reflection). Even if it will be 10x slower, it will still be plenty fast for our use case, and will be much nicer to use. Thoughts? |
This would mean for example spinning up a goroutine internally, I would say the complexity is not needed. My idea would be:
Re: what to do in case of chan being full: I would create separate Emit functions for what type of Emit it is:
Names are subject to change. |
Receiver(typedChan interface{}, opts) (CancelFunc, error)
Emitter(type interface{}, opts) (Emitter, error) 👍 👍
What if we get a partial success? Return two ints saying how much subs there are, and how much succeeded? |
Hmm, ideally we would be able to retry the send but the number of subs can change across emits so the two ints won't work. Let's say the retry will only work in case of one Receiver and we should have an option to force one receiver (for both Receiver and Emitter sides). Then I would say we should continue sending events to other subs if one of the blocks in case of |
I'd leave |
If I’m following correctly, the proposal is to subscribe to each event type separately via a typed channel? I’m not sure I like this design and its ergonomics. While it might be faster as it removes type assertions from the picture, it also leads to unnecessary complexity if all I want is to receive a group of events in the order they are generated. I’d have to create multiple channels and select from them, losing the ordering guarantee in the receive end because receives in select ls are non-deterministic. All the while, IMO the pursuit for 1M event throughout doesn’t stand here. Realistically I doubt we’ll go above 10k events per second, so I could trade off the performance penalty of type assertions for better semantics and ordering. Example: as a subscriber, I’ll probably want the logic for stream open/closed/reset events to live in the same loop and flow via the same channel. If it doesn’t, my select loop will lose all order guarantees and I might end up consuming all closed events before their respective opens, because the receive is non-deterministic in go. |
I expand on the above here: libp2p/go-eventbus#1 (comment) Let’s have a variadic version of Subscribe! |
If we go with bus := event.NewBus()
[setup some emitters]
stringCh := make(chan fmt.Stringer, 10)
cancel, err := bus.SendTo(stringCh, event.ForceType(new(EventA)))
[...]
cancel, err := bus.SendTo(stringCh, event.ForceType(new(EventB)))
[...] We can then wrap this into a nice helper (I'd prefer to keep the interface very minimal) (it also shouldn't really affect perf much as all type assertions can be taken care of in initial
Note that it's 1M/s under 100% cpu load with nothing else happening. At that throughput 10k/s is about 1% of cpu used for passing events (assuming linear scaling which should be the case for the most part), which seems acceptable. |
@magik6k gotcha. Let me review this on Monday more in-depth (I’m on the road now), and I’ll provide more thoughts. I’m confident we can strike a balance between requirements, DX, and performance with a little bit more exploration. Thanks for spearheading this implementation so far! ❤️ |
Then events about streams should be encapsulated within one structure: type StreamEventCause int
const (
CauseOpen StreamEventCause= iota
CauseClosed
CauseError
)
type StreamEvent struct {
cause StreamEventCause
remoteSide peer.ID
conn net.Conn
} Same with connected/disconnected events. They should be one type send over the same channel. This way you have a reliable event ordering. By using two channels you loose ordering (chan select chooses channel at random). |
Continuing, on the topic. If you have to goroutines sending data through a channel without any syncing, there is no guarantee about the order. In theory, you can attach, for example, a timestamp of when the event was created but this would introduce significant performance penalty. Same goes with copying from two channels into one, there would not be any guarantees about what order the information is recieved from those channels. |
The conclusion is: if event order is required, values should be transferred over one channel (but they can be composite values). |
As Magik6k corrected me, with a subtyped receiver and down/upcasting it is possible to subscribe with one receiving channel to multiple events and receive them in order (if they are generated from for example one goroutine). If you really want one can make the channel |
Apologies if I'm duplicating insights from others here, but a few notes from a libp2p dependent. type EventBus interface {
// RegisterSubscriber registers a new subscriber for the listed event types.
// If successful, it returns a Subscription. Before starting to consume events,
// the caller must check whether the subscription was initialised successfully by
// receiving once from the InitResult() channel.
RegisterSubscriber(opts SubOpts, evtTypes interface{}...) (Subscription, error)
// RegisterEmitter registers an Emitter for the specified event types.
RegisterEmitter(evtTypes interface{}...) (Emitter, error)
} Event Types
// EventType is a unique identifier for a type of event
type EventType string The host managing event types would need to check for collisions. const (
EvtPeerConnected = EventType("host.PeerConnected")
// ...
) But we often run into the issue of cyclical imports. In the current design I'd need to be able to import a package to get access to it's event type definition. By using string-based types I can get around this by creating an equal Vector ClockSince this event bus is being centrally managed, it would be really nice if the host managing the event bus associated events it received with a vector clock that would allow us to reason about event ordering: // VectorTimer is an event with a position in a vector clock. VectorTime returns it's "tick"
type VectorTimer interface {
VectorTime() (tick int)
} clock(s) could be global across all events, or scoped to event types. The latter will better support concurrency at the cost of not being able to compare event times across types. This would allow nice helper functions that do clock comparison: func VectorBefore(a,b Event) bool By accepting events, these helpers could also error if users attempt to compare across event types. Event InterfaceI think that's plenty for an type Event interface {
// EventType returns a representation of the event type
Type() EventType
// VectorTime gives the tick of this event across the vector clock
VectorTime() int
} Effect on the EventBus interface: type EventBus interface {
// RegisterSubscriber registers a new subscriber for the listed event types.
// If successful, it returns a Subscription. Before starting to consume events,
// the caller must check whether the subscription was initialized successfully by
// receiving once from the InitResult() channel.
RegisterSubscriber(opts SubOpts, evtTypes EventType...) (Subscription, error)
// RegisterEmitter registers an Emitter for the specified event types.
RegisterEmitter(events Event...) (Emitter, error)
} As someone who will consume this system, I'd deeply appreciate being able to rely on these event primitives 😄 |
@b5 thanks for your input!
I understand some of the benefits, but I don't see how you'd do anything useful when you receive such an event without referencing the event struct itself. Unless you're interested in using the event as a beacon, i.e. you're interested in knowing that the event occurred, but no details about it. That's admittedly an edge case though. We intend to place all libp2p events in the core package (now that we have one). All libp2p code/users depends on that package anyway, either directly or transitively.
That's a reasonable proposal to model event sequence without relying on time. For this first iteration, we're explicitly ruling out making guarantees about ordering in order to keep things simple, but we'll need to offer a strict order mode soon enough.
Yeah, we can consider an event envelope to encapsulate metadata if needed. I say if needed because we might be able to enforce the ordering guarantee without exposing the vector clock. |
Thanks so much for the quick feedback @raulk! Just for reference, all of these suggestions I'd position as food for thought, and would take no offense if none of these suggestions make the cut. I'm not as close to this work. That being said, I think it may help to establish a clear answer to this question: Is the event bus exclusively intended for use for communication between libp2p systems? If so, then yeah most of what I’m suggesting doesn’t provide much, because all event types are known at compile time. To my first glance this, interface looks like something is as outside users would be able to hook into. If that’s the case, defining an event interface is the only contract you can rely on. As for doing useful things without struct types, I would hope to be able to use this system dealing purely in event behaviours. Concrete example, first with some sample event types: // EventDiscoveredPeer is fired when a peer is found through local network scanning
type EventDiscoveredPeer struct {
p peer.Peer
}
func (e EventDiscoveredPeer) PeerFound() peer.Peer {
return e.p
}
// EventBootstrapPeer is fired whenever the bootstrap service is asked for a peer to connect to
type EventBootstrapPeer struct {
p peer.Peer
}
func (e EventBootstrapPeer) PeerFound() peer.Peer {
return e.p
} Both events exhibit the same behavior. Instead of relying on structs, we code around behaviors: interface PeerFound {
PeerFound() peer.Found
}
go func() {
for {
switch {
case evt := <- sub.Events:
if pf, ok := evt.(PeerFound); ok {
pf.PeerFound().Connect()
}
}
}
}() I’d argue this code is easier to maintain. Relying on struct type assertions means any changes to event structs will need to be updated in all locations. In this setting I can make clearer assertions about what I care about. In the above example I’m not concerned with how peers are found, I just want to subscribe to any event that found peers from the systems I’m subscribed to. This shifts the API burden over to method definition. IMHO with this approach you have more flexibility on both consumption and definition. (Some events may want to apply logic in their method definitions) Having go func() {
for evt, ok := range sub.Events {
switch evt.Type() {
case libp2p.EventBootstrapPeer:
if host.ConnectionCount() < 10 {
// ...
}
case libp2p.EventDiscoveredPeer:
// ...
case libp2p.EventType(“thirdparty.EventPartyStarted”):
// from docs we know PartyStarted is a reader
if party, ok := evt.(io.Reader); ok {
ioutil.ReadAll(party) // ...
}
}
}
}
}() If the event bus allows outside stuff, I think this focus on behaviors may help keep things organized. But if this is strictly an internal system for libp2p services to coordinate, this approach would be overkill. As for the vector clock stuff, totally agreed it’s not a first-pass thing. In practice it might not be needed 😄 |
libp2p local event bus
Context
A libp2p host comprises a number of components. Some are compulsory like the swarm, peerstore, transports, etc. while others are optional like the DHT, autonat, autorelay, etc.
In most cases, events are happening inside those components that other components may be interested in knowing. Examples:
We are lacking a local event bus where we components can emit events, and observers can subscribe and react to those events. We currently depend on hard timing assumptions to wire things together, resulting in a brittle overall system.
An added corollary is that creating certain dynamic protocols and behaviours is impossible nowadays without forking. Conversely, with a reactive, event-drivel solution, we can enable use cases that extend the base functionality of libp2p to be deployed as an add-on rather than a fork.
Technical proposal
An asynchronous event bus seems like the right fit for our requirements. The
EventBus
is a top-level object owned by theHost
:The proposed interface and object model is:
Each event is an instance of a typed struct, passed-by-value for safety, with a simple payload containing only primitive data types, pointers to the originator object (e.g. Conn, Stream, etc.), and basic libp2p types like
peer.ID
,protocol.ID
, etc. We want to avoid shared state and lean towards immutability; each subscriber will receive a copy of the event.These event payloads will live in go-libp2p-core, namespaced by the owning abstraction. Struct names generally follow the convention:
Evt[Entity (noun)][Event (verb past tense / gerund)]
. The gerund form of the verb (-ing) is allowed to express that a process is in progress.Registration methods take zero-value structs, and use reflection to keep a registry of enlisted emitters & subscribers, and the events they handle.
NOTE: we considered generic dictionary objects for event payloads along with string-based topics, but quickly discarded it because it's an unsafe and uncertain model.
Event delivery / concurrency model
We considered several designs for event delivery, with implications for the concurrency model and goroutine count.
We propose to adopt (3).
Ordering of events
We did consider introducing a subscription mode that strongly guarantees the ordering of events, but this is too complex for a first iteration of the event bus, and at this time the benefit is not clear.
Handling backpressure
This is an open point. What happens if the subscriber becomes slow and the event queues are backlogged? Should we drop events? Doing so may cause inconsistency. We definitely must not slow down the producers, so backpressure must be somehow absorbed.
One option is to allow subscribers to specify the backpressure policy they want via the subscription options. Possibilities are:
DROP
,KILL
,NOTIFY
:DROP
: drop events silently -- used when doing so would not cause inconsistency.KILL
: kill the subscription by erroring it immediately.NOTIFY
: do not kill the subscription, but notify a user-provided callback synchronously. The subscriber can use this notification to stop gracefully and restart from a known safe state.NOTIFY
should probably be the default.Standardisation
Since the local event bus has no implications on the wire, we don't consider it material for imminent standardisation in https://github.com/libp2p/specs.
However, if we deem this abstraction valuable enough to issue a RECOMMENDATION for all libp2p implementations to adopt it, then we can consider speccing it out in a language-agnostic way.
The text was updated successfully, but these errors were encountered: