Skip to content

Commit

Permalink
io: enable closed event for listener filter (#21585)
Browse files Browse the repository at this point in the history
Previously the PR #18265 removed the `Closed` event for listener filter, since the Windows doesn't work well.

There is still have a corner case that can't be handled without the `Closed` event #18265 (comment)

This PR tries to introduce the `Closed` event back to the listener filter. As the comments on Windows code said, both `Read` and `Closed` can't be registered at same time due to the `Connection` doesn't support that yet. That means it shouldn't be a problem for the listener filter. Also currently, there is no Envoy code using both `Read` and `Closed` event, so it should be ok to remove those workaround code to make the both `Read` and `Closed` events registered to work with listener filter.

The full analysis is here #18265 (comment)

Signed-off-by: He Jie Xu <hejie.xu@intel.com>
  • Loading branch information
soulxu authored Jun 29, 2022
1 parent 04e04c9 commit fb4567f
Show file tree
Hide file tree
Showing 9 changed files with 80 additions and 37 deletions.
17 changes: 1 addition & 16 deletions source/common/event/file_event_impl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -55,9 +55,7 @@ void FileEventImpl::activate(uint32_t events) {
void FileEventImpl::assignEvents(uint32_t events, event_base* base) {
ASSERT(dispatcher_.isThreadSafe());
ASSERT(base != nullptr);
// TODO(antoniovicente) remove this once ConnectionImpl can
// handle Read and Close events delivered together.
ASSERT(!((events & FileReadyType::Read) && (events & FileReadyType::Closed)));

enabled_events_ = events;
event_assign(
&raw_event_, base, fd_,
Expand Down Expand Up @@ -137,10 +135,6 @@ void FileEventImpl::registerEventIfEmulatedEdge(uint32_t event) {
ASSERT((event & (FileReadyType::Read | FileReadyType::Write)) == event);
if (trigger_ == FileTriggerType::EmulatedEdge) {
auto new_event_mask = enabled_events_ | event;
if (event & FileReadyType::Read && (enabled_events_ & FileReadyType::Closed)) {
// We never ask for both early close and read at the same time.
new_event_mask = new_event_mask & ~FileReadyType::Read;
}
updateEvents(new_event_mask);
}
}
Expand All @@ -149,15 +143,6 @@ void FileEventImpl::registerEventIfEmulatedEdge(uint32_t event) {
void FileEventImpl::mergeInjectedEventsAndRunCb(uint32_t events) {
ASSERT(dispatcher_.isThreadSafe());
if (injected_activation_events_ != 0) {
// TODO(antoniovicente) remove this adjustment to activation events once ConnectionImpl can
// handle Read and Close events delivered together.
if constexpr (PlatformDefaultTriggerType == FileTriggerType::EmulatedEdge) {
if (events & FileReadyType::Closed && injected_activation_events_ & FileReadyType::Read) {
// We never ask for both early close and read at the same time. If close is requested
// keep that instead.
injected_activation_events_ = injected_activation_events_ & ~FileReadyType::Read;
}
}
events |= injected_activation_events_;
injected_activation_events_ = 0;
activation_cb_->cancel();
Expand Down
9 changes: 8 additions & 1 deletion source/common/network/listener_filter_buffer_impl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ ListenerFilterBufferImpl::ListenerFilterBufferImpl(IoHandle& io_handle,

io_handle_.initializeFileEvent(
dispatcher_, [this](uint32_t events) { onFileEvent(events); },
Event::PlatformDefaultTriggerType, Event::FileReadyType::Read);
Event::PlatformDefaultTriggerType, Event::FileReadyType::Read | Event::FileReadyType::Closed);
}

const Buffer::ConstRawSlice ListenerFilterBufferImpl::rawSlice() const {
Expand Down Expand Up @@ -94,6 +94,13 @@ void ListenerFilterBufferImpl::activateFileEvent(uint32_t events) {
void ListenerFilterBufferImpl::onFileEvent(uint32_t events) {
ENVOY_LOG(trace, "onFileEvent: {}", events);

if (events & Event::FileReadyType::Closed) {
on_close_cb_(false);
return;
}

ASSERT(events == Event::FileReadyType::Read);

auto state = peekFromSocket();
if (state == PeekState::Done) {
on_data_cb_(*this);
Expand Down
5 changes: 3 additions & 2 deletions test/common/network/listener_filter_buffer_fuzz_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -35,8 +35,9 @@ class ListenerFilterBufferFuzzer {
return;
}

EXPECT_CALL(io_handle_, createFileEvent_(_, _, Event::PlatformDefaultTriggerType,
Event::FileReadyType::Read))
EXPECT_CALL(io_handle_,
createFileEvent_(_, _, Event::PlatformDefaultTriggerType,
Event::FileReadyType::Read | Event::FileReadyType::Closed))
.WillOnce(SaveArg<1>(&file_event_callback_));

// Use the on_data callback to verify the data.
Expand Down
5 changes: 3 additions & 2 deletions test/common/network/listener_filter_buffer_impl_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,9 @@ namespace {
class ListenerFilterBufferImplTest : public testing::Test {
public:
void initialize() {
EXPECT_CALL(io_handle_, createFileEvent_(_, _, Event::PlatformDefaultTriggerType,
Event::FileReadyType::Read))
EXPECT_CALL(io_handle_,
createFileEvent_(_, _, Event::PlatformDefaultTriggerType,
Event::FileReadyType::Read | Event::FileReadyType::Closed))
.WillOnce(SaveArg<1>(&file_event_callback_));

listener_buffer_ = std::make_unique<ListenerFilterBufferImpl>(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,8 +43,9 @@ class HttpInspectorTest : public testing::Test {
EXPECT_CALL(cb_, dispatcher()).WillRepeatedly(ReturnRef(dispatcher_));
EXPECT_CALL(testing::Const(socket_), ioHandle()).WillRepeatedly(ReturnRef(*io_handle_));
EXPECT_CALL(socket_, ioHandle()).WillRepeatedly(ReturnRef(*io_handle_));
EXPECT_CALL(dispatcher_, createFileEvent_(_, _, Event::PlatformDefaultTriggerType,
Event::FileReadyType::Read))
EXPECT_CALL(dispatcher_,
createFileEvent_(_, _, Event::PlatformDefaultTriggerType,
Event::FileReadyType::Read | Event::FileReadyType::Closed))
.WillOnce(
DoAll(SaveArg<1>(&file_event_callback_), ReturnNew<NiceMock<Event::MockFileEvent>>()));
buffer_ = std::make_unique<Network::ListenerFilterBufferImpl>(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,8 +45,9 @@ class TlsInspectorTest : public testing::TestWithParam<std::tuple<uint16_t, uint

EXPECT_CALL(cb_, socket()).WillRepeatedly(ReturnRef(socket_));
EXPECT_CALL(socket_, ioHandle()).WillRepeatedly(ReturnRef(*io_handle_));
EXPECT_CALL(dispatcher_, createFileEvent_(_, _, Event::PlatformDefaultTriggerType,
Event::FileReadyType::Read))
EXPECT_CALL(dispatcher_,
createFileEvent_(_, _, Event::PlatformDefaultTriggerType,
Event::FileReadyType::Read | Event::FileReadyType::Closed))
.WillOnce(
DoAll(SaveArg<1>(&file_event_callback_), ReturnNew<NiceMock<Event::MockFileEvent>>()));
buffer_ = std::make_unique<Network::ListenerFilterBufferImpl>(
Expand Down Expand Up @@ -242,7 +243,8 @@ TEST_P(TlsInspectorTest, ClientHelloTooBig) {
EXPECT_CALL(cb_, socket()).WillRepeatedly(ReturnRef(socket_));
EXPECT_CALL(socket_, ioHandle()).WillRepeatedly(ReturnRef(*io_handle_));
EXPECT_CALL(dispatcher_,
createFileEvent_(_, _, Event::PlatformDefaultTriggerType, Event::FileReadyType::Read))
createFileEvent_(_, _, Event::PlatformDefaultTriggerType,
Event::FileReadyType::Read | Event::FileReadyType::Closed))
.WillOnce(
DoAll(SaveArg<1>(&file_event_callback_), ReturnNew<NiceMock<Event::MockFileEvent>>()));
buffer_ = std::make_unique<Network::ListenerFilterBufferImpl>(
Expand Down
37 changes: 37 additions & 0 deletions test/integration/listener_lds_integration_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -957,6 +957,43 @@ TEST_P(ListenerFilterIntegrationTest, MixNoInspectDataFilterAndInspectDataFilter
tcp_client->close();
}

TEST_P(ListenerFilterIntegrationTest, InspectDataFiltersClientCloseConnectionWithFewData) {
// This is required `EV_FEATURE_EARLY_CLOSE` feature for libevent, and this feature is
// only supported with `epoll`. But `MacOS` uses the `kqueue`.
// https://libevent.org/doc/event_8h.html#a98f643f9c9063a4cbf410f519eb61e55
#if !defined(__APPLE__)
config_helper_.addListenerFilter(R"EOF(
name: inspect_data1
typed_config:
"@type": type.googleapis.com/test.integration.filters.InspectDataListenerFilterConfig
max_read_bytes: 10
close_connection: false
)EOF");

config_helper_.addConfigModifier([](envoy::config::bootstrap::v3::Bootstrap& bootstrap) {
bootstrap.mutable_static_resources()
->mutable_listeners(0)
->set_continue_on_listener_filters_timeout(false);
bootstrap.mutable_static_resources()
->mutable_listeners(0)
->mutable_listener_filters_timeout()
->MergeFrom(ProtobufUtil::TimeUtil::MillisecondsToDuration(1000000));
bootstrap.mutable_static_resources()->mutable_listeners(0)->set_stat_prefix("listener_0");
});

std::string data = "hello";
initialize();
enableHalfClose(true);
IntegrationTcpClientPtr tcp_client = makeTcpConnection(lookupPort("listener_0"));
auto result = tcp_client->write(data, true);
// The connection could be closed when writing or after write.
if (result == true) {
tcp_client->waitForDisconnect();
}
test_server_->waitForCounterEq("listener.listener_0.downstream_listener_filter_remote_close", 1);
#endif
}

// Only update the order of listener filters, ensure the listener filters
// was update.
TEST_P(ListenerFilterIntegrationTest, UpdateListenerFilterOrder) {
Expand Down
21 changes: 14 additions & 7 deletions test/server/active_tcp_listener_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -136,7 +136,8 @@ TEST_F(ActiveTcpListenerTest, ListenerFilterWithInspectData) {
Event::FileReadyCb file_event_callback;
// ensure the listener filter buffer will register the file event.
EXPECT_CALL(io_handle_,
createFileEvent_(_, _, Event::PlatformDefaultTriggerType, Event::FileReadyType::Read))
createFileEvent_(_, _, Event::PlatformDefaultTriggerType,
Event::FileReadyType::Read | Event::FileReadyType::Closed))
.WillOnce(SaveArg<1>(&file_event_callback));
EXPECT_CALL(io_handle_, activateFileEvents(Event::FileReadyType::Read));
generic_active_listener_->onAcceptWorker(std::move(generic_accepted_socket_), false, true);
Expand Down Expand Up @@ -170,7 +171,8 @@ TEST_F(ActiveTcpListenerTest, ListenerFilterWithInspectDataFailedWithPeek) {
Event::FileReadyCb file_event_callback;
// ensure the listener filter buffer will register the file event.
EXPECT_CALL(io_handle_,
createFileEvent_(_, _, Event::PlatformDefaultTriggerType, Event::FileReadyType::Read))
createFileEvent_(_, _, Event::PlatformDefaultTriggerType,
Event::FileReadyType::Read | Event::FileReadyType::Closed))
.WillOnce(SaveArg<1>(&file_event_callback));
EXPECT_CALL(io_handle_, activateFileEvents(Event::FileReadyType::Read));
// calling the onAcceptWorker() to create the ActiveTcpSocket.
Expand Down Expand Up @@ -240,7 +242,8 @@ TEST_F(ActiveTcpListenerTest, ListenerFilterWithInspectDataMultipleFilters) {

Event::FileReadyCb file_event_callback;
EXPECT_CALL(io_handle_,
createFileEvent_(_, _, Event::PlatformDefaultTriggerType, Event::FileReadyType::Read))
createFileEvent_(_, _, Event::PlatformDefaultTriggerType,
Event::FileReadyType::Read | Event::FileReadyType::Closed))
.WillOnce(SaveArg<1>(&file_event_callback));
EXPECT_CALL(io_handle_, activateFileEvents(Event::FileReadyType::Read));

Expand Down Expand Up @@ -329,7 +332,8 @@ TEST_F(ActiveTcpListenerTest, ListenerFilterWithInspectDataMultipleFilters2) {
Event::FileReadyCb file_event_callback;

EXPECT_CALL(io_handle_,
createFileEvent_(_, _, Event::PlatformDefaultTriggerType, Event::FileReadyType::Read))
createFileEvent_(_, _, Event::PlatformDefaultTriggerType,
Event::FileReadyType::Read | Event::FileReadyType::Closed))
.WillOnce(SaveArg<1>(&file_event_callback));
EXPECT_CALL(io_handle_, recv)
.WillOnce([&](void*, size_t size, int) {
Expand Down Expand Up @@ -389,7 +393,8 @@ TEST_F(ActiveTcpListenerTest, ListenerFilterWithClose) {
Event::FileReadyCb file_event_callback;
// ensure the listener filter buffer will register the file event.
EXPECT_CALL(io_handle_,
createFileEvent_(_, _, Event::PlatformDefaultTriggerType, Event::FileReadyType::Read))
createFileEvent_(_, _, Event::PlatformDefaultTriggerType,
Event::FileReadyType::Read | Event::FileReadyType::Closed))
.WillOnce(SaveArg<1>(&file_event_callback));
EXPECT_CALL(io_handle_, activateFileEvents(Event::FileReadyType::Read));
generic_active_listener_->onAcceptWorker(std::move(generic_accepted_socket_), false, true);
Expand Down Expand Up @@ -424,7 +429,8 @@ TEST_F(ActiveTcpListenerTest, ListenerFilterCloseSockets) {
Event::FileReadyCb file_event_callback;
// ensure the listener filter buffer will register the file event.
EXPECT_CALL(io_handle_,
createFileEvent_(_, _, Event::PlatformDefaultTriggerType, Event::FileReadyType::Read))
createFileEvent_(_, _, Event::PlatformDefaultTriggerType,
Event::FileReadyType::Read | Event::FileReadyType::Closed))
.WillOnce(SaveArg<1>(&file_event_callback));
EXPECT_CALL(io_handle_, activateFileEvents(Event::FileReadyType::Read));
EXPECT_CALL(io_handle_, recv)
Expand Down Expand Up @@ -452,7 +458,8 @@ TEST_F(ActiveTcpListenerTest, PopulateSNIWhenActiveTcpSocketTimeout) {
Event::FileReadyCb file_event_callback;
// ensure the listener filter buffer will register the file event.
EXPECT_CALL(io_handle_,
createFileEvent_(_, _, Event::PlatformDefaultTriggerType, Event::FileReadyType::Read))
createFileEvent_(_, _, Event::PlatformDefaultTriggerType,
Event::FileReadyType::Read | Event::FileReadyType::Closed))
.WillOnce(SaveArg<1>(&file_event_callback));
EXPECT_CALL(io_handle_, activateFileEvents(Event::FileReadyType::Read));

Expand Down
10 changes: 6 additions & 4 deletions test/server/connection_handler_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1963,8 +1963,9 @@ TEST_F(ConnectionHandlerTest, ListenerFilterTimeout) {
EXPECT_CALL(*accepted_socket, ioHandle()).WillOnce(ReturnRef(io_handle)).RetiresOnSaturation();
EXPECT_CALL(io_handle, isOpen()).WillOnce(Return(true));
EXPECT_CALL(*accepted_socket, ioHandle()).WillOnce(ReturnRef(io_handle)).RetiresOnSaturation();
EXPECT_CALL(io_handle, createFileEvent_(_, _, Event::PlatformDefaultTriggerType,
Event::FileReadyType::Read));
EXPECT_CALL(io_handle,
createFileEvent_(_, _, Event::PlatformDefaultTriggerType,
Event::FileReadyType::Read | Event::FileReadyType::Closed));
EXPECT_CALL(io_handle, activateFileEvents(Event::FileReadyType::Read));
Event::MockTimer* timeout = new Event::MockTimer(&dispatcher_);
EXPECT_CALL(*timeout, enableTimer(std::chrono::milliseconds(15000), _));
Expand Down Expand Up @@ -2126,8 +2127,9 @@ TEST_F(ConnectionHandlerTest, ListenerFilterDisabledTimeout) {
EXPECT_CALL(*accepted_socket, ioHandle()).WillOnce(ReturnRef(io_handle)).RetiresOnSaturation();
EXPECT_CALL(io_handle, isOpen()).WillOnce(Return(true));
EXPECT_CALL(*accepted_socket, ioHandle()).WillOnce(ReturnRef(io_handle)).RetiresOnSaturation();
EXPECT_CALL(io_handle, createFileEvent_(_, _, Event::PlatformDefaultTriggerType,
Event::FileReadyType::Read));
EXPECT_CALL(io_handle,
createFileEvent_(_, _, Event::PlatformDefaultTriggerType,
Event::FileReadyType::Read | Event::FileReadyType::Closed));
EXPECT_CALL(io_handle, activateFileEvents(Event::FileReadyType::Read));
EXPECT_CALL(dispatcher_, createTimer_(_)).Times(0);
EXPECT_CALL(*test_filter, destroy_());
Expand Down

0 comments on commit fb4567f

Please sign in to comment.