-
Notifications
You must be signed in to change notification settings - Fork 223
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Import zikade code #875
Import zikade code #875
Conversation
|
||
func NewWorkQueue[E BehaviourEvent](fn WorkQueueFunc[E]) *WorkQueue[E] { | ||
w := &WorkQueue[E]{ | ||
pending: make(chan pendingEvent[E], 16), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I would prefer to have a justification for buffered channels. Why is 16 different from 1, 2 or 1000? Would unbuffered work two?
Buffering channels can increase performance. Can we ensure that we're not hiding bugs with channel buffering? If not done right, these bugs can manifest in later hard-to-debug environments. E.g., run all tests with unbuffered channels but in production environments allow some buffering?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It needs to be at least one to prevent deadlock in a single threaded situation. 1 or 16 is just a matter of performance. Is there a behavioral difference that you are referring to?
// operation. | ||
func (w *Waiter[E]) Close() { | ||
w.done.Store(true) | ||
close(w.pending) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this will cause a panic when the pending
channel buffer is exhausted
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can you explain the panic scenario? There can be no writes to the channel after this point since done is now true. Reads will continue to drain the channel.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It seems that I'm missing context 🙈 If I call Notify
a ton of times so that I fill the pending
channel and then call Close
-> L131 will panic.
From your comment, it seems like this cannot happen because of how it's used internally.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This would be true except that the waiter has only one writer and the writer is responsible for closing the channel. There is only ever one thread of execution writing to the channel. In the case of a running query the coordinator does it via a state machine. In the case of a response from a node the NodeHandler does it in its own goroutine. I could improve the documentation to state that the waiter is designed to wait for events from a single writer.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I see. In this case here, the writer is the component that calls Notify
. So as long as all Notify
calls have returned before the writer calls Close
everything's fine.
In my mental model, the writer is the component that manages the channel - which is the Waiter
. Following that reasoning, the Waiter
is responsible for properly managing the channel's lifecycle.
"context" | ||
) | ||
|
||
type NullSM[E any, S any] struct{} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think I prefer to fully type StateMachine
instead of just SM
- although we're almost entering Java land with that :D
func NodeInfoToAddrInfo(info kad.NodeInfo[KadKey, ma.Multiaddr]) peer.AddrInfo { | ||
peerID := info.ID().(kadt.PeerID) | ||
return peer.AddrInfo{ | ||
ID: peer.ID(peerID), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
could use NodeIDToPeerID
- can the compiler inline that?
for i := range infos { | ||
peerID := infos[i].ID().(kadt.PeerID) | ||
peers[i] = peer.AddrInfo{ | ||
ID: peer.ID(peerID), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
could use NodeIDToPeerID
} | ||
if done := w.fn(cc.Ctx, cc.Event); done { | ||
w.done.Store(true) | ||
return |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
when returning here, anyone sending on w.pending
will continue to block. The channel needs to be drained here, to release resources of callers of Enqueue
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Only the waiter sends on the channel since it ownseit and writing is guarded by the done flag. Enqueue exits when done is true. What is the blocking behavior here?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Let's imagine I call Enqueue
a ton of times so that w.pending is full and the Enqueue
call is blocking in L94. If fn
returns true for one of the earlier events we return here in L85. The last Enqueue
calls will continue to block in L94.
No description provided.