-
Notifications
You must be signed in to change notification settings - Fork 5.6k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Fix for program crash when destructor is called before channel close with blocked readers/writers #8197
Conversation
return ret; | ||
} | ||
|
||
template <typename T> | ||
bool Buffered<T>::Receive(T* item) { | ||
bool ret = false; | ||
// Once the channel has been closed and all data has been consumed, | ||
// just return false. Don't even try acquiring the mutex. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If a reader thread is allowed to acquire mutex here, then in a hypothetical infinite readers scenario, the destructor could be blocked forever. So after closing and when all the data in the channel is consumed, any call to Receive
should immediately return without even trying to acquire the lock.
paddle/framework/channel_test.cc
Outdated
size_t i = data; | ||
EXPECT_EQ(ch->Send(&i), false); // should panic because channel is closed | ||
}}; | ||
recv_thread = std::thread{[&]() { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The receive doesn't really panic if the channel is closed.
It is just a matter of terminology here, since our panic behavior is just returning false
instead of throwing an exception (see description in the PR: #8171) .
Ideally only the send to a closed channel should panic and receive on a closed channel first empties the channel buffer if any and receives a zero value post that (that we haven't implemented here) without getting blocked.
So maybe we should fix the comment for better understanding of readers ?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thank you fixed that in the latest commit.
paddle/framework/channel_test.cc
Outdated
}}; | ||
recv_thread = std::thread{[&]() { | ||
size_t i; | ||
// should panic because channel is closed and queue is empty |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Same as above.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Fixed in the latest commit
}}; | ||
|
||
send_thread.join(); | ||
recv_thread.join(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Also, I think the function ClosedUnBufferedChannelPanics
and the function ClosedBufferedChannelPanics
reuse a block of code as a part of the functionality.
I propose that we write the reused block inside another function and call that function with a buffered and unbuffered channel , check PR: #8162 for reference.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I agreed with @kavyasrinet .
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Fixed in the latest commit. Thank you
paddle/framework/channel_test.cc
Outdated
|
||
// This tests that destroying an unbuffered channel also unblocks | ||
// unblocks any senders waiting for senders | ||
TEST(Channel, UnbufferedChannelDestroyUnblocksSendersTest) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Same comment as above, I think there is a lot of repetition of code inside the functions of Unbuffered and Buffered functions. We can write a helper function that implements the block and call it with different channels.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Fixed in the latest commit. Thank you.
// The destructor must wait for all readers and writers to complete their task | ||
// The channel has been closed, so we will not accept new readers and writers | ||
lock.lock(); | ||
destructor_cond_var_.wait( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for catching this!
paddle/framework/channel_test.cc
Outdated
// This tests that a channel must return false | ||
// on send and receive performed after closing the channel. | ||
// Receive will only return false after close when queue is empty | ||
void ClosedChannelSendReceivePanics(Channel<size_t> *ch) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is SendReceiveWithACloseChannelShouldPanic
a more accurate name for this test?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Fixed. Thank you.
paddle/framework/channel_test.cc
Outdated
const size_t data = 5; | ||
std::thread send_thread{[&]() { | ||
size_t i = data; | ||
EXPECT_EQ(ch->Send(&i), true); // should not block |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can we remove the definition of the variable i
and call ch-Send(&data)
here?
paddle/framework/channel_test.cc
Outdated
// Receive will only return false after close when queue is empty | ||
void ClosedChannelSendReceivePanics(Channel<size_t> *ch) { | ||
const size_t data = 5; | ||
std::thread send_thread{[&]() { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think we need a comment here:
// By creating separate threads for sending and receiving, we make this
// function able to test both buffered and unbuffered channels.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM
Fix #8248