Skip to content

Commit

Permalink
core: clean up streams in http dispatcher on reset (#415)
Browse files Browse the repository at this point in the history
Description: clean up streams in the http dispatcher when the underlying stream fires an onReset callback. Previously we were leaking this state.
Risk Level: med - stream memory management changes. However, we have a guarantee from the underlying async client implementation that terminal callbacks (onReset, onComplete) will only fire once.
Testing: unit tests.

Signed-off-by: Jose Nino <jnino@lyft.com>
Signed-off-by: JP Simard <jp@jpsim.com>
  • Loading branch information
junr03 authored and jpsim committed Nov 28, 2022
1 parent f7be3d9 commit 0209eb4
Show file tree
Hide file tree
Showing 2 changed files with 100 additions and 3 deletions.
9 changes: 9 additions & 0 deletions mobile/library/common/http/dispatcher.cc
Original file line number Diff line number Diff line change
Expand Up @@ -37,12 +37,21 @@ void Dispatcher::DirectStreamCallbacks::onTrailers(HeaderMapPtr&& trailers) {
void Dispatcher::DirectStreamCallbacks::onComplete() {
ENVOY_LOG(debug, "[S{}] complete stream", stream_handle_);
bridge_callbacks_.on_complete(bridge_callbacks_.context);
// Very important: onComplete and onReset both clean up stream state in the http dispatcher
// because the underlying async client implementation **guarantees** that only onComplete **or**
// onReset will be fired for a stream. This means it is safe to clean up the stream when either of
// the terminal callbacks fire without keeping additional state in this layer.
http_dispatcher_.cleanup(stream_handle_);
}

void Dispatcher::DirectStreamCallbacks::onReset() {
ENVOY_LOG(debug, "[S{}] remote reset stream", stream_handle_);
bridge_callbacks_.on_error({ENVOY_STREAM_RESET, envoy_nodata}, bridge_callbacks_.context);
// Very important: onComplete and onReset both clean up stream state in the http dispatcher
// because the underlying async client implementation **guarantees** that only onComplete **or**
// onReset will be fired for a stream. This means it is safe to clean up the stream when either of
// the terminal callbacks fire without keeping additional state in this layer.
http_dispatcher_.cleanup(stream_handle_);
}

Dispatcher::DirectStream::DirectStream(envoy_stream_t stream_handle,
Expand Down
94 changes: 91 additions & 3 deletions mobile/test/common/http/dispatcher_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -253,7 +253,7 @@ TEST_F(DispatcherTest, ResetStream) {
EXPECT_CALL(event_dispatcher_, post(_)).WillOnce(SaveArg<0>(&post_cb));
http_dispatcher_.resetStream(stream);

EXPECT_CALL(event_dispatcher_, isThreadSafe()).Times(1).WillRepeatedly(Return(true));
EXPECT_CALL(event_dispatcher_, isThreadSafe()).Times(2).WillRepeatedly(Return(true));
post_cb();

// Ensure that the on_error on the bridge_callbacks was called.
Expand Down Expand Up @@ -448,7 +448,7 @@ TEST_F(DispatcherTest, LocalResetAfterStreamStart) {
EXPECT_CALL(event_dispatcher_, post(_)).WillOnce(SaveArg<0>(&reset_post_cb));
http_dispatcher_.resetStream(stream);

EXPECT_CALL(event_dispatcher_, isThreadSafe()).Times(1).WillRepeatedly(Return(true));
EXPECT_CALL(event_dispatcher_, isThreadSafe()).Times(2).WillRepeatedly(Return(true));
reset_post_cb();

// Ensure that the on_error on the bridge_callbacks was called.
Expand Down Expand Up @@ -509,7 +509,7 @@ TEST_F(DispatcherTest, RemoteResetAfterStreamStart) {
EXPECT_CALL(event_dispatcher_, post(_)).WillOnce(SaveArg<0>(&send_headers_post_cb));
http_dispatcher_.sendHeaders(stream, c_headers, false);

EXPECT_CALL(event_dispatcher_, isThreadSafe()).Times(1).WillRepeatedly(Return(true));
EXPECT_CALL(event_dispatcher_, isThreadSafe()).Times(2).WillRepeatedly(Return(true));
EXPECT_CALL(stream_encoder_, encodeHeaders(_, false));
send_headers_post_cb();

Expand Down Expand Up @@ -827,5 +827,93 @@ TEST_F(DispatcherTest, MultipleDataStream) {
ASSERT_EQ(cc.on_complete_calls, 1);
}

TEST_F(DispatcherTest, StreamResetAfterOnComplete) {
envoy_stream_t stream = 1;
// Setup bridge_callbacks to handle the response headers.
envoy_http_callbacks bridge_callbacks;
callbacks_called cc = {0, 0, 0, 0};
bridge_callbacks.context = &cc;
bridge_callbacks.on_headers = [](envoy_headers c_headers, bool end_stream,
void* context) -> void {
ASSERT_TRUE(end_stream);
HeaderMapPtr response_headers = Utility::toInternalHeaders(c_headers);
EXPECT_EQ(response_headers->Status()->value().getStringView(), "200");
callbacks_called* cc = static_cast<callbacks_called*>(context);
cc->on_headers_calls++;
};
bridge_callbacks.on_complete = [](void* context) -> void {
callbacks_called* cc = static_cast<callbacks_called*>(context);
cc->on_complete_calls++;
};
bridge_callbacks.on_error = [](envoy_error actual_error, void* context) -> void {
envoy_error expected_error = {ENVOY_STREAM_RESET, envoy_nodata};
ASSERT_EQ(actual_error.error_code, expected_error.error_code);
callbacks_called* cc = static_cast<callbacks_called*>(context);
cc->on_error_calls++;
};

// Grab the response decoder in order to dispatch responses on the stream.
EXPECT_CALL(cm_.conn_pool_, newStream(_, _))
.WillOnce(Invoke([&](StreamDecoder& decoder,
ConnectionPool::Callbacks& callbacks) -> ConnectionPool::Cancellable* {
callbacks.onPoolReady(stream_encoder_, cm_.conn_pool_.host_);
response_decoder_ = &decoder;
return nullptr;
}));

// Build a set of request headers.
TestHeaderMapImpl headers;
HttpTestUtility::addDefaultHeaders(headers);
envoy_headers c_headers = Utility::toBridgeHeaders(headers);

// Create a stream.
EXPECT_CALL(cm_, httpAsyncClientForCluster("base")).WillOnce(ReturnRef(cm_.async_client_));
EXPECT_CALL(cm_.async_client_, start(_, _))
.WillOnce(
WithArg<0>(Invoke([&](AsyncClient::StreamCallbacks& callbacks) -> AsyncClient::Stream* {
return client_.start(callbacks, AsyncClient::StreamOptions());
})));
EXPECT_EQ(http_dispatcher_.startStream(stream, bridge_callbacks), ENVOY_SUCCESS);

// Send request headers.
Event::PostCb post_cb;
EXPECT_CALL(event_dispatcher_, post(_)).WillOnce(SaveArg<0>(&post_cb));
http_dispatcher_.sendHeaders(stream, c_headers, true);

EXPECT_CALL(event_dispatcher_, isThreadSafe()).Times(1).WillRepeatedly(Return(true));
EXPECT_CALL(stream_encoder_, encodeHeaders(_, true));
post_cb();

// Decode response headers. decodeHeaders with true will bubble up to onHeaders, which will in
// turn cause closeRemote. Because closeLocal has already been called, cleanup will happen; hence
// the second call to isThreadSafe.
EXPECT_CALL(event_dispatcher_, isThreadSafe()).Times(1).WillRepeatedly(Return(true));
response_decoder_->decode100ContinueHeaders(
HeaderMapPtr(new TestHeaderMapImpl{{":status", "100"}}));
response_decoder_->decodeHeaders(HeaderMapPtr(new TestHeaderMapImpl{{":status", "200"}}), true);

EXPECT_EQ(
1UL,
cm_.thread_local_cluster_.cluster_.info_->stats_store_.counter("upstream_rq_200").value());
EXPECT_EQ(1UL, cm_.thread_local_cluster_.cluster_.info_->stats_store_
.counter("internal.upstream_rq_200")
.value());

// resetStream after onComplete has fired is a no-op, as the stream is cleaned from the
// dispatcher.
Event::PostCb reset_post_cb;
EXPECT_CALL(event_dispatcher_, post(_)).WillOnce(SaveArg<0>(&reset_post_cb));
http_dispatcher_.resetStream(stream);

EXPECT_CALL(event_dispatcher_, isThreadSafe()).Times(1).WillRepeatedly(Return(true));
reset_post_cb();

// Ensure that the callbacks on the bridge_callbacks were called.
ASSERT_EQ(cc.on_headers_calls, 1);
ASSERT_EQ(cc.on_complete_calls, 1);
ASSERT_EQ(cc.on_data_calls, 0);
ASSERT_EQ(cc.on_error_calls, 0);
}

} // namespace Http
} // namespace Envoy

0 comments on commit 0209eb4

Please sign in to comment.