Skip to content

Commit

Permalink
event: assert the case of both read and closed event registered (#18265)
Browse files Browse the repository at this point in the history
On Windows, the Read event may be removed when both the read and closed event are registered. That will lead the thread is waiting for next read event forever. So change to assertion to prevent someone registered both read and closed events.

Signed-off-by: He Jie Xu <hejie.xu@intel.com>
  • Loading branch information
soulxu committed Oct 27, 2021
1 parent b8aaa41 commit 5d320af
Show file tree
Hide file tree
Showing 7 changed files with 40 additions and 69 deletions.
5 changes: 3 additions & 2 deletions source/common/event/file_event_impl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,9 @@ void FileEventImpl::activate(uint32_t events) {
void FileEventImpl::assignEvents(uint32_t events, event_base* base) {
ASSERT(dispatcher_.isThreadSafe());
ASSERT(base != nullptr);
// TODO(antoniovicente) remove this once ConnectionImpl can
// handle Read and Close events delivered together.
ASSERT(!((events & FileReadyType::Read) && (events & FileReadyType::Closed)));
enabled_events_ = events;
event_assign(
&raw_event_, base, fd_,
Expand Down Expand Up @@ -120,7 +123,6 @@ void FileEventImpl::unregisterEventIfEmulatedEdge(uint32_t event) {
ASSERT(dispatcher_.isThreadSafe());
// This constexpr if allows the compiler to optimize away the function on POSIX
if constexpr (PlatformDefaultTriggerType == FileTriggerType::EmulatedEdge) {
ASSERT((event & (FileReadyType::Read | FileReadyType::Write)) == event);
if (trigger_ == FileTriggerType::EmulatedEdge) {
auto new_event_mask = enabled_events_ & ~event;
updateEvents(new_event_mask);
Expand Down Expand Up @@ -156,7 +158,6 @@ void FileEventImpl::mergeInjectedEventsAndRunCb(uint32_t events) {
injected_activation_events_ = injected_activation_events_ & ~FileReadyType::Read;
}
}

events |= injected_activation_events_;
injected_activation_events_ = 0;
activation_cb_->cancel();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -61,10 +61,6 @@ Network::FilterStatus Filter::onAccept(Network::ListenerFilterCallbacks& cb) {
cb.dispatcher(),
[this](uint32_t events) {
ENVOY_LOG(trace, "http inspector event: {}", events);
// inspector is always peeking and can never determine EOF.
// Use this event type to avoid listener timeout on the OS supporting
// FileReadyType::Closed.
bool end_stream = events & Event::FileReadyType::Closed;

const ParseState parse_state = onRead();
switch (parse_state) {
Expand All @@ -78,19 +74,11 @@ Network::FilterStatus Filter::onAccept(Network::ListenerFilterCallbacks& cb) {
cb_->continueFilterChain(true);
break;
case ParseState::Continue:
if (end_stream) {
// Parser fails to determine http but the end of stream is reached. Fallback to
// non-http.
done(false);
cb_->socket().ioHandle().resetFileEvents();
cb_->continueFilterChain(true);
}
// do nothing but wait for the next event
break;
}
},
Event::PlatformDefaultTriggerType,
Event::FileReadyType::Read | Event::FileReadyType::Closed);
Event::PlatformDefaultTriggerType, Event::FileReadyType::Read);
return Network::FilterStatus::StopIteration;
}
NOT_REACHED_GCOVR_EXCL_LINE;
Expand All @@ -107,6 +95,11 @@ ParseState Filter::onRead() {
return ParseState::Error;
}

// Remote closed
if (result.return_value_ == 0) {
return ParseState::Error;
}

const auto parse_state =
parseHttpHeader(absl::string_view(reinterpret_cast<const char*>(buf_), result.return_value_));
switch (parse_state) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -93,12 +93,6 @@ Network::FilterStatus Filter::onAccept(Network::ListenerFilterCallbacks& cb) {
socket.ioHandle().initializeFileEvent(
cb.dispatcher(),
[this](uint32_t events) {
if (events & Event::FileReadyType::Closed) {
config_->stats().connection_closed_.inc();
done(false);
return;
}

ASSERT(events == Event::FileReadyType::Read);
ParseState parse_state = onRead();
switch (parse_state) {
Expand All @@ -113,8 +107,7 @@ Network::FilterStatus Filter::onAccept(Network::ListenerFilterCallbacks& cb) {
break;
}
},
Event::PlatformDefaultTriggerType,
Event::FileReadyType::Read | Event::FileReadyType::Closed);
Event::PlatformDefaultTriggerType, Event::FileReadyType::Read);
return Network::FilterStatus::StopIteration;
}
NOT_REACHED_GCOVR_EXCL_LINE;
Expand Down Expand Up @@ -176,6 +169,11 @@ ParseState Filter::onRead() {
return ParseState::Error;
}

if (result.return_value_ == 0) {
config_->stats().connection_closed_.inc();
return ParseState::Error;
}

// Because we're doing a MSG_PEEK, data we've seen before gets returned every time, so
// skip over what we've already processed.
if (static_cast<uint64_t>(result.return_value_) > read_) {
Expand Down
37 changes: 8 additions & 29 deletions test/common/event/file_event_impl_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -88,8 +88,6 @@ TEST_P(FileEventImplActivateTest, Activate) {
EXPECT_CALL(read_event, ready());
ReadyWatcher write_event;
EXPECT_CALL(write_event, ready());
ReadyWatcher closed_event;
EXPECT_CALL(closed_event, ready());

const FileTriggerType trigger = Event::PlatformDefaultTriggerType;

Expand All @@ -103,14 +101,10 @@ TEST_P(FileEventImplActivateTest, Activate) {
if (events & FileReadyType::Write) {
write_event.ready();
}

if (events & FileReadyType::Closed) {
closed_event.ready();
}
},
trigger, FileReadyType::Read | FileReadyType::Write | FileReadyType::Closed);
trigger, FileReadyType::Read | FileReadyType::Write);

file_event->activate(FileReadyType::Read | FileReadyType::Write | FileReadyType::Closed);
file_event->activate(FileReadyType::Read | FileReadyType::Write);
dispatcher->run(Event::Dispatcher::RunType::NonBlock);

os_sys_calls_.close(fd);
Expand All @@ -125,7 +119,6 @@ TEST_P(FileEventImplActivateTest, ActivateChaining) {
ReadyWatcher fd_event;
ReadyWatcher read_event;
ReadyWatcher write_event;
ReadyWatcher closed_event;

ReadyWatcher prepare_watcher;
evwatch_prepare_new(&static_cast<DispatcherImpl*>(dispatcher.get())->base(), onWatcherReady,
Expand All @@ -140,19 +133,13 @@ TEST_P(FileEventImplActivateTest, ActivateChaining) {
if (events & FileReadyType::Read) {
read_event.ready();
file_event->activate(FileReadyType::Write);
file_event->activate(FileReadyType::Closed);
}

if (events & FileReadyType::Write) {
write_event.ready();
file_event->activate(FileReadyType::Closed);
}

if (events & FileReadyType::Closed) {
closed_event.ready();
}
},
trigger, FileReadyType::Read | FileReadyType::Write | FileReadyType::Closed);
trigger, FileReadyType::Read | FileReadyType::Write);

testing::InSequence s;
// First loop iteration: handle scheduled read event and the real write event produced by poll.
Expand All @@ -166,13 +153,10 @@ TEST_P(FileEventImplActivateTest, ActivateChaining) {
EXPECT_CALL(prepare_watcher, ready());
EXPECT_CALL(fd_event, ready());
EXPECT_CALL(write_event, ready());
EXPECT_CALL(closed_event, ready());
// Third loop iteration: handle close event scheduled while handling write.
EXPECT_CALL(prepare_watcher, ready());
EXPECT_CALL(fd_event, ready());
EXPECT_CALL(closed_event, ready());
// Fourth loop iteration: poll returned no new real events.
EXPECT_CALL(prepare_watcher, ready());
if constexpr (Event::PlatformDefaultTriggerType != Event::FileTriggerType::EmulatedEdge) {
// Third loop iteration: poll returned no new real events.
EXPECT_CALL(prepare_watcher, ready());
}

file_event->activate(FileReadyType::Read);
dispatcher->run(Event::Dispatcher::RunType::NonBlock);
Expand All @@ -189,7 +173,6 @@ TEST_P(FileEventImplActivateTest, SetEnableCancelsActivate) {
ReadyWatcher fd_event;
ReadyWatcher read_event;
ReadyWatcher write_event;
ReadyWatcher closed_event;

ReadyWatcher prepare_watcher;
evwatch_prepare_new(&static_cast<DispatcherImpl*>(dispatcher.get())->base(), onWatcherReady,
Expand All @@ -210,12 +193,8 @@ TEST_P(FileEventImplActivateTest, SetEnableCancelsActivate) {
if (events & FileReadyType::Write) {
write_event.ready();
}

if (events & FileReadyType::Closed) {
closed_event.ready();
}
},
trigger, FileReadyType::Read | FileReadyType::Write | FileReadyType::Closed);
trigger, FileReadyType::Read | FileReadyType::Write);

testing::InSequence s;
// First loop iteration: handle scheduled read event and the real write event produced by poll.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,11 +44,10 @@ class HttpInspectorTest : public testing::Test {

if (include_inline_recv) {
EXPECT_CALL(os_sys_calls_, recv(42, _, _, MSG_PEEK))
.WillOnce(Return(Api::SysCallSizeResult{static_cast<ssize_t>(0), 0}));
.WillOnce(Return(Api::SysCallSizeResult{ssize_t(-1), SOCKET_ERROR_AGAIN}));

EXPECT_CALL(dispatcher_,
createFileEvent_(_, _, Event::PlatformDefaultTriggerType,
Event::FileReadyType::Read | Event::FileReadyType::Closed))
EXPECT_CALL(dispatcher_, createFileEvent_(_, _, Event::PlatformDefaultTriggerType,
Event::FileReadyType::Read))
.WillOnce(DoAll(SaveArg<1>(&file_event_callback_),
ReturnNew<NiceMock<Event::MockFileEvent>>()));

Expand Down Expand Up @@ -334,11 +333,10 @@ TEST_F(HttpInspectorTest, InspectHttp2) {
TEST_F(HttpInspectorTest, ReadClosed) {
init();

EXPECT_CALL(os_sys_calls_, recv(42, _, _, MSG_PEEK));
EXPECT_CALL(socket_, close());
EXPECT_CALL(cb_, continueFilterChain(true));
socket_.close();
file_event_callback_(Event::FileReadyType::Closed);
EXPECT_CALL(os_sys_calls_, recv(42, _, _, MSG_PEEK))
.WillOnce(Return(Api::SysCallSizeResult{0, 0}));
EXPECT_CALL(cb_, continueFilterChain(false));
file_event_callback_(Event::FileReadyType::Read);
EXPECT_EQ(0, cfg_->stats().http2_found_.value());
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ using testing::InSequence;
using testing::Invoke;
using testing::InvokeWithoutArgs;
using testing::NiceMock;
using testing::Return;
using testing::ReturnNew;
using testing::ReturnRef;
using testing::SaveArg;
Expand Down Expand Up @@ -46,11 +47,10 @@ class TlsInspectorTest : public testing::TestWithParam<std::tuple<uint16_t, uint
.WillOnce(
Invoke([](os_fd_t fd, void* buffer, size_t length, int flag) -> Api::SysCallSizeResult {
ENVOY_LOG_MISC(error, "In mock syscall recv {} {} {} {}", fd, buffer, length, flag);
return Api::SysCallSizeResult{static_cast<ssize_t>(0), 0};
return Api::SysCallSizeResult{ssize_t(-1), SOCKET_ERROR_AGAIN};
}));
EXPECT_CALL(dispatcher_,
createFileEvent_(_, _, Event::PlatformDefaultTriggerType,
Event::FileReadyType::Read | Event::FileReadyType::Closed))
EXPECT_CALL(dispatcher_, createFileEvent_(_, _, Event::PlatformDefaultTriggerType,
Event::FileReadyType::Read))
.WillOnce(
DoAll(SaveArg<1>(&file_event_callback_), ReturnNew<NiceMock<Event::MockFileEvent>>()));
filter_->onAccept(cb_);
Expand Down Expand Up @@ -85,8 +85,10 @@ TEST_P(TlsInspectorTest, MaxClientHelloSize) {
// Test that the filter detects Closed events and terminates.
TEST_P(TlsInspectorTest, ConnectionClosed) {
init();
EXPECT_CALL(os_sys_calls_, recv(42, _, _, MSG_PEEK))
.WillOnce(Return(Api::SysCallSizeResult{0, 0}));
EXPECT_CALL(cb_, continueFilterChain(false));
file_event_callback_(Event::FileReadyType::Closed);
file_event_callback_(Event::FileReadyType::Read);
EXPECT_EQ(1, cfg_->stats().connection_closed_.value());
}

Expand Down
6 changes: 3 additions & 3 deletions test/per_file_coverage.sh
Original file line number Diff line number Diff line change
Expand Up @@ -45,10 +45,10 @@ declare -a KNOWN_LOW_COVERAGE=(
"source/extensions/filters/http/kill_request:95.3" # Death tests don't report LCOV
"source/extensions/filters/http/lua:96.4"
"source/extensions/filters/http/wasm:95.8"
"source/extensions/filters/listener:96.2"
"source/extensions/filters/listener/http_inspector:95.9"
"source/extensions/filters/listener:95.9"
"source/extensions/filters/listener/http_inspector:95.8"
"source/extensions/filters/listener/original_dst:93.3"
"source/extensions/filters/listener/tls_inspector:93.5"
"source/extensions/filters/listener/tls_inspector:92.3"
"source/extensions/filters/network/common:96.0"
"source/extensions/filters/network/common/redis:96.2"
"source/extensions/filters/network/mongo_proxy:95.5"
Expand Down

0 comments on commit 5d320af

Please sign in to comment.