Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

async_tcp_client: remove callbacks if connection was not closed #35410

Merged
merged 8 commits into from
Aug 1, 2024
Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 8 additions & 0 deletions source/common/tcp/async_tcp_client_impl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,14 @@ AsyncTcpClientImpl::AsyncTcpClientImpl(Event::Dispatcher& dispatcher,
connect_timer_(dispatcher.createTimer([this]() { onConnectTimeout(); })),
enable_half_close_(enable_half_close) {}

AsyncTcpClientImpl::~AsyncTcpClientImpl() {
if (connection_) {
connection_->removeConnectionCallbacks(*this);
}

close(Network::ConnectionCloseType::NoFlush);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could you elaborate more for the reason that the close() is added here? I don't see from your initial fix, and I feel a user is possible to close the connection with other type.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The reason is because if we don't close it, tests fail because of the ASSERT in:

ConnectionImpl::~ConnectionImpl() {
ASSERT(!socket_->isOpen() && delayed_close_timer_ == nullptr,
"ConnectionImpl was unexpectedly torn down without being closed.");
// In general we assume that owning code has called close() previously to the destructor being
// run. This generally must be done so that callbacks run in the correct context (vs. deferred
// deletion). Hence the assert above. However, call close() here just to be completely sure that
// the fd is closed and make it more likely that we crash from a bad close callback.
close(ConnectionCloseType::NoFlush);
}

The comment in that code section explains the expectation out of the owner of the ConnectionImpl:
In general we assume that owning code has called close() previously to the destructor being run.

Copy link
Member

@botengyao botengyao Jul 29, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I feel this is to mitigate the wrong usage of this client in the test filter to simulate the callback issue, and ideally this should not happen. It needs to be fixed in the usage part. FWIW, the original version of fix LGTM, and we can change how we test it or remove the newly added tests, either works for me.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The changes differ in the close call, is there a reason why we shouldn't include it, regardless of the testing?
Removing the newly added test meaning no integration test coverage for this case.

Copy link
Member

@botengyao botengyao Jul 30, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think the reason you see the non-close error is because we force release the async_client before calling any close as the wrong usage, and that is also the assertion there. If we really want to keep the close here to cover this should-not-happen case, adding a close boolean for close may be less risky. I am more worried that if a caller calls close(FlushWriteAndDelay) and then following with a close(NoFlush) when the connection is still in the defer deletion list, the previous one will be overrode.

}

bool AsyncTcpClientImpl::connect() {
if (connection_) {
return false;
Expand Down
1 change: 1 addition & 0 deletions source/common/tcp/async_tcp_client_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ class AsyncTcpClientImpl : public AsyncTcpClient,
AsyncTcpClientImpl(Event::Dispatcher& dispatcher,
Upstream::ThreadLocalCluster& thread_local_cluster,
Upstream::LoadBalancerContext* context, bool enable_half_close);
~AsyncTcpClientImpl();

void close(Network::ConnectionCloseType type) override;

Expand Down
13 changes: 11 additions & 2 deletions test/common/tcp/async_tcp_client_impl_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,15 @@ using testing::Return;
namespace Envoy {
namespace Tcp {

class CustomMockClientConnection : public Network::MockClientConnection {
public:
~CustomMockClientConnection() {
if (state_ != Connection::State::Closed) {
raiseEvent(Network::ConnectionEvent::LocalClose);
}
};
};

class AsyncTcpClientImplTest : public Event::TestUsingSimulatedTime, public testing::Test {
public:
AsyncTcpClientImplTest() = default;
Expand All @@ -32,7 +41,7 @@ class AsyncTcpClientImplTest : public Event::TestUsingSimulatedTime, public test
}

void expectCreateConnection(bool trigger_connected = true) {
connection_ = new NiceMock<Network::MockClientConnection>();
connection_ = new NiceMock<CustomMockClientConnection>();
Upstream::MockHost::MockCreateConnectionData conn_info;
connection_->streamInfo().setAttemptCount(1);
conn_info.connection_ = connection_;
Expand All @@ -59,7 +68,7 @@ class AsyncTcpClientImplTest : public Event::TestUsingSimulatedTime, public test
NiceMock<Event::MockTimer>* connect_timer_;
NiceMock<Event::MockDispatcher> dispatcher_;
NiceMock<Upstream::MockClusterManager> cluster_manager_;
Network::MockClientConnection* connection_{};
CustomMockClientConnection* connection_{};

NiceMock<Tcp::AsyncClient::MockAsyncTcpClientCallbacks> callbacks_;
};
Expand Down
9 changes: 8 additions & 1 deletion test/integration/filters/test_network_async_tcp_filter.cc
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,8 @@ class TestNetworkAsyncTcpFilter : public Network::ReadFilter {
const test::integration::filters::TestNetworkAsyncTcpFilterConfig& config,
Stats::Scope& scope, Upstream::ClusterManager& cluster_manager)
: stats_(generateStats("test_network_async_tcp_filter", scope)),
cluster_name_(config.cluster_name()), cluster_manager_(cluster_manager) {
cluster_name_(config.cluster_name()), kill_after_on_data_(config.kill_after_on_data()),
cluster_manager_(cluster_manager) {
const auto thread_local_cluster = cluster_manager_.getThreadLocalCluster(cluster_name_);
options_ = std::make_shared<Tcp::AsyncTcpClientOptions>(true);
if (thread_local_cluster != nullptr) {
Expand All @@ -60,6 +61,11 @@ class TestNetworkAsyncTcpFilter : public Network::ReadFilter {
data.length());
client_->write(data, end_stream);

if (kill_after_on_data_) {
Tcp::AsyncTcpClient* c1 = client_.release();
delete c1;
}

return Network::FilterStatus::StopIteration;
}

Expand Down Expand Up @@ -166,6 +172,7 @@ class TestNetworkAsyncTcpFilter : public Network::ReadFilter {
TestNetworkAsyncTcpFilterStats stats_;
Tcp::AsyncTcpClientPtr client_;
absl::string_view cluster_name_;
bool kill_after_on_data_;
std::unique_ptr<RequestAsyncCallbacks> request_callbacks_;
std::unique_ptr<DownstreamCallbacks> downstream_callbacks_;
Upstream::ClusterManager& cluster_manager_;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,4 +4,5 @@ package test.integration.filters;

message TestNetworkAsyncTcpFilterConfig {
string cluster_name = 1;
bool kill_after_on_data = 2;
}
61 changes: 46 additions & 15 deletions test/integration/tcp_async_client_integration_test.cc
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
#include "test/integration/filters/test_network_async_tcp_filter.pb.h"
#include "test/integration/integration.h"

#include "gtest/gtest.h"
Expand All @@ -16,15 +17,37 @@ class TcpAsyncClientIntegrationTest : public testing::TestWithParam<Network::Add
typed_config:
"@type": type.googleapis.com/test.integration.filters.TestNetworkAsyncTcpFilterConfig
cluster_name: cluster_0
)EOF")) {}
)EOF")) {
enableHalfClose(true);
}

void init(bool kill_after_on_data = false) {
const std::string yaml = fmt::format(R"EOF(
cluster_name: cluster_0
kill_after_on_data: {}
)EOF",
kill_after_on_data ? "true" : "false");

config_helper_.addConfigModifier(
[&](envoy::config::bootstrap::v3::Bootstrap& bootstrap) -> void {
test::integration::filters::TestNetworkAsyncTcpFilterConfig proto_config;
TestUtility::loadFromYaml(yaml, proto_config);

auto* listener = bootstrap.mutable_static_resources()->mutable_listeners(0);
auto* filter_chain = listener->mutable_filter_chains(0);
auto* filter = filter_chain->mutable_filters(0);
filter->mutable_typed_config()->PackFrom(proto_config);
});

BaseIntegrationTest::initialize();
}
};

INSTANTIATE_TEST_SUITE_P(IpVersions, TcpAsyncClientIntegrationTest,
testing::ValuesIn(TestEnvironment::getIpVersionsForTest()));

TEST_P(TcpAsyncClientIntegrationTest, SingleRequest) {
enableHalfClose(true);
initialize();
init();

std::string request("request");
std::string response("response");
Expand All @@ -51,8 +74,7 @@ TEST_P(TcpAsyncClientIntegrationTest, SingleRequest) {
}

TEST_P(TcpAsyncClientIntegrationTest, MultipleRequestFrames) {
enableHalfClose(true);
initialize();
init();

std::string data_frame_1("data_frame_1");
std::string data_frame_2("data_frame_2");
Expand Down Expand Up @@ -85,8 +107,7 @@ TEST_P(TcpAsyncClientIntegrationTest, MultipleRequestFrames) {
}

TEST_P(TcpAsyncClientIntegrationTest, MultipleResponseFrames) {
enableHalfClose(true);
initialize();
init();

std::string data_frame_1("data_frame_1");
std::string response_1("response_1");
Expand Down Expand Up @@ -116,8 +137,7 @@ TEST_P(TcpAsyncClientIntegrationTest, Reconnect) {
return;
}

enableHalfClose(true);
initialize();
init();

IntegrationTcpClientPtr tcp_client = makeTcpConnection(lookupPort("listener_0"));
ASSERT_TRUE(tcp_client->write("hello1", false));
Expand All @@ -143,11 +163,24 @@ TEST_P(TcpAsyncClientIntegrationTest, Reconnect) {
test_server_->waitForGaugeEq("cluster.cluster_0.upstream_cx_active", 0);
}

TEST_P(TcpAsyncClientIntegrationTest, ClientTearDown) {
init(true);

std::string request("request");

IntegrationTcpClientPtr tcp_client = makeTcpConnection(lookupPort("listener_0"));
ASSERT_TRUE(tcp_client->write(request, true));
FakeRawConnectionPtr fake_upstream_connection;
ASSERT_TRUE(fake_upstreams_[0]->waitForRawConnection(fake_upstream_connection));
ASSERT_TRUE(fake_upstream_connection->waitForData(request.size()));

tcp_client->close();
}

#if ENVOY_PLATFORM_ENABLE_SEND_RST
// Test if RST close can be detected from downstream and upstream is closed by RST.
TEST_P(TcpAsyncClientIntegrationTest, TestClientCloseRST) {
enableHalfClose(true);
initialize();
init();

std::string request("request");
std::string response("response");
Expand Down Expand Up @@ -178,8 +211,7 @@ TEST_P(TcpAsyncClientIntegrationTest, TestClientCloseRST) {

// Test if RST close can be detected from upstream.
TEST_P(TcpAsyncClientIntegrationTest, TestUpstreamCloseRST) {
enableHalfClose(true);
initialize();
init();

std::string request("request");
std::string response("response");
Expand Down Expand Up @@ -212,8 +244,7 @@ TEST_P(TcpAsyncClientIntegrationTest, TestUpstreamCloseRST) {
// the client. The behavior is different for windows, since RST support is literally supported for
// unix like system, disabled the test for windows.
TEST_P(TcpAsyncClientIntegrationTest, TestDownstremHalfClosedThenRST) {
enableHalfClose(true);
initialize();
init();

std::string request("request");
std::string response("response");
Expand Down