Skip to content

Commit

Permalink
http: add HCM functionality required for rate limiting (#6242)
Browse files Browse the repository at this point in the history
This PR adds new decode/encodeData() callbacks which allow
allow filters direct control over sending data to subsequent
filters, circumventing any HCM buffering. This is the simplest
and lease invasive change I could come up with to support this
functionality (or others like it).

Fixes #6140
Part of #5942

Signed-off-by: Matt Klein <mklein@lyft.com>
  • Loading branch information
mattklein123 authored Mar 12, 2019
1 parent eff73b4 commit 56b0309
Show file tree
Hide file tree
Showing 6 changed files with 246 additions and 1 deletion.
56 changes: 55 additions & 1 deletion include/envoy/http/filter.h
Original file line number Diff line number Diff line change
Expand Up @@ -217,11 +217,38 @@ class StreamDecoderFilterCallbacks : public virtual StreamFilterCallbacks {
*
* It is an error to call this method in any other case.
*
* See also injectDecodedDataToFilterChain() for a different way of passing data to further
* filters and also how the two methods are different.
*
* @param data Buffer::Instance supplies the data to be decoded.
* @param streaming_filter boolean supplies if this filter streams data or buffers the full body.
*/
virtual void addDecodedData(Buffer::Instance& data, bool streaming_filter) PURE;

/**
* Decode data directly to subsequent filters in the filter chain. This method is used in
* advanced cases in which a filter needs full control over how subsequent filters view data,
* and does not want to make use of HTTP connection manager buffering. Using this method allows
* a filter to buffer data (or not) and then periodically inject data to subsequent filters,
* indicating end_stream at an appropriate time. This can be used to implement rate limiting,
* periodic data emission, etc.
*
* This method should only be called outside of callback context. I.e., do not call this method
* from within a filter's decodeData() call.
*
* When using this callback, filters should generally only return
* FilterDataStatus::StopIterationNoBuffer from their decodeData() call, since use of this method
* indicates that a filter does not wish to participate in standard HTTP connection manager
* buffering and continuation and will perform any necessary buffering and continuation on its
* own.
*
* This callback is different from addDecodedData() in that the specified data and end_stream
* status will be propagated directly to further filters in the filter chain. This is different
* from addDecodedData() where data is added to the HTTP connection manager's buffered data with
* the assumption that standard HTTP connection manager buffering and continuation are being used.
*/
virtual void injectDecodedDataToFilterChain(Buffer::Instance& data, bool end_stream) PURE;

/**
* Adds decoded trailers. May only be called in decodeData when end_stream is set to true.
* If called in any other context, an assertion will be triggered.
Expand Down Expand Up @@ -469,11 +496,38 @@ class StreamEncoderFilterCallbacks : public virtual StreamFilterCallbacks {
*
* It is an error to call this method in any other case.
*
* See also injectEncodedDataToFilterChain() for a different way of passing data to further
* filters and also how the two methods are different.
*
* @param data Buffer::Instance supplies the data to be encoded.
* @param streaming_filter boolean supplies if this filter streams data or buffers the full body.
*/
virtual void addEncodedData(Buffer::Instance& data, bool streaming_filter) PURE;

/**
* Encode data directly to subsequent filters in the filter chain. This method is used in
* advanced cases in which a filter needs full control over how subsequent filters view data,
* and does not want to make use of HTTP connection manager buffering. Using this method allows
* a filter to buffer data (or not) and then periodically inject data to subsequent filters,
* indicating end_stream at an appropriate time. This can be used to implement rate limiting,
* periodic data emission, etc.
*
* This method should only be called outside of callback context. I.e., do not call this method
* from within a filter's encodeData() call.
*
* When using this callback, filters should generally only return
* FilterDataStatus::StopIterationNoBuffer from their encodeData() call, since use of this method
* indicates that a filter does not wish to participate in standard HTTP connection manager
* buffering and continuation and will perform any necessary buffering and continuation on its
* own.
*
* This callback is different from addEncodedData() in that the specified data and end_stream
* status will be propagated directly to further filters in the filter chain. This is different
* from addEncodedData() where data is added to the HTTP connection manager's buffered data with
* the assumption that standard HTTP connection manager buffering and continuation are being used.
*/
virtual void injectEncodedDataToFilterChain(Buffer::Instance& data, bool end_stream) PURE;

/**
* Adds encoded trailers. May only be called in encodeData when end_stream is set to true.
* If called in any other context, an assertion will be triggered.
Expand Down Expand Up @@ -517,7 +571,7 @@ class StreamEncoderFilterCallbacks : public virtual StreamFilterCallbacks {
*/
class StreamEncoderFilter : public StreamFilterBase {
public:
/*
/**
* Called with 100-continue headers.
*
* This is not folded into encodeHeaders because most Envoy users and filters
Expand Down
3 changes: 3 additions & 0 deletions source/common/http/async_client_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -311,6 +311,9 @@ class AsyncStreamImpl : public AsyncClient::Stream,
// filter which uses this function for buffering.
ASSERT(buffered_body_ != nullptr);
}
void injectDecodedDataToFilterChain(Buffer::Instance&, bool) override {
NOT_IMPLEMENTED_GCOVR_EXCL_LINE;
}
const Buffer::Instance* decodingBuffer() override { return buffered_body_.get(); }
void modifyDecodingBuffer(std::function<void(Buffer::Instance&)>) override {
NOT_IMPLEMENTED_GCOVR_EXCL_LINE;
Expand Down
10 changes: 10 additions & 0 deletions source/common/http/conn_manager_impl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1734,6 +1734,11 @@ void ConnectionManagerImpl::ActiveStreamDecoderFilter::addDecodedData(Buffer::In
parent_.addDecodedData(*this, data, streaming);
}

void ConnectionManagerImpl::ActiveStreamDecoderFilter::injectDecodedDataToFilterChain(
Buffer::Instance& data, bool end_stream) {
parent_.decodeData(this, data, end_stream);
}

void ConnectionManagerImpl::ActiveStreamDecoderFilter::continueDecoding() { commonContinue(); }

void ConnectionManagerImpl::ActiveStreamDecoderFilter::encode100ContinueHeaders(
Expand Down Expand Up @@ -1850,6 +1855,11 @@ void ConnectionManagerImpl::ActiveStreamEncoderFilter::addEncodedData(Buffer::In
return parent_.addEncodedData(*this, data, streaming);
}

void ConnectionManagerImpl::ActiveStreamEncoderFilter::injectEncodedDataToFilterChain(
Buffer::Instance& data, bool end_stream) {
parent_.encodeData(this, data, end_stream);
}

HeaderMap& ConnectionManagerImpl::ActiveStreamEncoderFilter::addEncodedTrailers() {
return parent_.addEncodedTrailers();
}
Expand Down
2 changes: 2 additions & 0 deletions source/common/http/conn_manager_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -171,6 +171,7 @@ class ConnectionManagerImpl : Logger::Loggable<Logger::Id::http>,

// Http::StreamDecoderFilterCallbacks
void addDecodedData(Buffer::Instance& data, bool streaming) override;
void injectDecodedDataToFilterChain(Buffer::Instance& data, bool end_stream) override;
HeaderMap& addDecodedTrailers() override;
void continueDecoding() override;
const Buffer::Instance* decodingBuffer() override {
Expand Down Expand Up @@ -253,6 +254,7 @@ class ConnectionManagerImpl : Logger::Loggable<Logger::Id::http>,

// Http::StreamEncoderFilterCallbacks
void addEncodedData(Buffer::Instance& data, bool streaming) override;
void injectEncodedDataToFilterChain(Buffer::Instance& data, bool end_stream) override;
HeaderMap& addEncodedTrailers() override;
void onEncoderFilterAboveWriteBufferHighWatermark() override;
void onEncoderFilterBelowWriteBufferLowWatermark() override;
Expand Down
174 changes: 174 additions & 0 deletions test/common/http/conn_manager_impl_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -3422,6 +3422,180 @@ TEST_F(HttpConnectionManagerImplTest, AddDataWithStopAndContinue) {
encoder_filters_[2]->callbacks_->continueEncoding();
}

// Use filter direct decode/encodeData() calls without trailers.
TEST_F(HttpConnectionManagerImplTest, FilterDirectDecodeEncodeDataNoTrailers) {
InSequence s;
setup(false, "");

EXPECT_CALL(*codec_, dispatch(_)).WillOnce(Invoke([&](Buffer::Instance&) -> void {
StreamDecoder* decoder = &conn_manager_->newStream(response_encoder_);
HeaderMapPtr headers{
new TestHeaderMapImpl{{":authority", "host"}, {":path", "/"}, {":method", "GET"}}};
decoder->decodeHeaders(std::move(headers), false);

Buffer::OwnedImpl fake_data("hello");
decoder->decodeData(fake_data, true);
}));

EXPECT_CALL(*route_config_provider_.route_config_, route(_, _));
setupFilterChain(2, 2);

EXPECT_CALL(*decoder_filters_[0], decodeHeaders(_, false))
.WillOnce(Return(FilterHeadersStatus::Continue));
EXPECT_CALL(*decoder_filters_[1], decodeHeaders(_, false))
.WillOnce(Return(FilterHeadersStatus::StopIteration));

Buffer::OwnedImpl decode_buffer;
EXPECT_CALL(*decoder_filters_[0], decodeData(_, true))
.WillOnce(Invoke([&](Buffer::Instance& data, bool) {
decode_buffer.move(data);
return FilterDataStatus::StopIterationNoBuffer;
}));
EXPECT_CALL(*decoder_filters_[0], decodeComplete());

// Kick off the incoming data.
Buffer::OwnedImpl fake_input("1234");
conn_manager_->onData(fake_input, false);

Buffer::OwnedImpl decoded_data_to_forward;
decoded_data_to_forward.move(decode_buffer, 2);
EXPECT_CALL(*decoder_filters_[1], decodeData(BufferStringEqual("he"), false))
.WillOnce(Return(FilterDataStatus::StopIterationNoBuffer));
decoder_filters_[0]->callbacks_->injectDecodedDataToFilterChain(decoded_data_to_forward, false);

EXPECT_CALL(*decoder_filters_[1], decodeData(BufferStringEqual("llo"), true))
.WillOnce(Return(FilterDataStatus::StopIterationNoBuffer));
EXPECT_CALL(*decoder_filters_[1], decodeComplete());
decoder_filters_[0]->callbacks_->injectDecodedDataToFilterChain(decode_buffer, true);

// Response path.
EXPECT_CALL(*encoder_filters_[1], encodeHeaders(_, false))
.WillOnce(Return(FilterHeadersStatus::Continue));
EXPECT_CALL(*encoder_filters_[0], encodeHeaders(_, false))
.WillOnce(Return(FilterHeadersStatus::Continue));
EXPECT_CALL(response_encoder_, encodeHeaders(_, false));

Buffer::OwnedImpl encoder_buffer;
EXPECT_CALL(*encoder_filters_[1], encodeData(_, true))
.WillOnce(Invoke([&](Buffer::Instance& data, bool) {
encoder_buffer.move(data);
return FilterDataStatus::StopIterationNoBuffer;
}));
EXPECT_CALL(*encoder_filters_[1], encodeComplete());

decoder_filters_[1]->callbacks_->encodeHeaders(
HeaderMapPtr{new TestHeaderMapImpl{{":status", "200"}}}, false);
Buffer::OwnedImpl response_body("response");
decoder_filters_[1]->callbacks_->encodeData(response_body, true);

Buffer::OwnedImpl encoded_data_to_forward;
encoded_data_to_forward.move(encoder_buffer, 3);
EXPECT_CALL(*encoder_filters_[0], encodeData(BufferStringEqual("res"), false));
EXPECT_CALL(response_encoder_, encodeData(_, false));
encoder_filters_[1]->callbacks_->injectEncodedDataToFilterChain(encoded_data_to_forward, false);

EXPECT_CALL(*encoder_filters_[0], encodeData(BufferStringEqual("ponse"), true));
EXPECT_CALL(*encoder_filters_[0], encodeComplete());
EXPECT_CALL(response_encoder_, encodeData(_, true));
expectOnDestroy();
encoder_filters_[1]->callbacks_->injectEncodedDataToFilterChain(encoder_buffer, true);
}

// Use filter direct decode/encodeData() calls with trailers.
TEST_F(HttpConnectionManagerImplTest, FilterDirectDecodeEncodeDataTrailers) {
InSequence s;
setup(false, "");

EXPECT_CALL(*codec_, dispatch(_)).WillOnce(Invoke([&](Buffer::Instance&) -> void {
StreamDecoder* decoder = &conn_manager_->newStream(response_encoder_);
HeaderMapPtr headers{
new TestHeaderMapImpl{{":authority", "host"}, {":path", "/"}, {":method", "GET"}}};
decoder->decodeHeaders(std::move(headers), false);

Buffer::OwnedImpl fake_data("hello");
decoder->decodeData(fake_data, false);

HeaderMapPtr trailers{new TestHeaderMapImpl{{"foo", "bar"}}};
decoder->decodeTrailers(std::move(trailers));
}));

EXPECT_CALL(*route_config_provider_.route_config_, route(_, _));
setupFilterChain(2, 2);

EXPECT_CALL(*decoder_filters_[0], decodeHeaders(_, false))
.WillOnce(Return(FilterHeadersStatus::Continue));
EXPECT_CALL(*decoder_filters_[1], decodeHeaders(_, false))
.WillOnce(Return(FilterHeadersStatus::StopIteration));

Buffer::OwnedImpl decode_buffer;
EXPECT_CALL(*decoder_filters_[0], decodeData(_, false))
.WillOnce(Invoke([&](Buffer::Instance& data, bool) {
decode_buffer.move(data);
return FilterDataStatus::StopIterationNoBuffer;
}));
EXPECT_CALL(*decoder_filters_[0], decodeTrailers(_))
.WillOnce(Return(FilterTrailersStatus::StopIteration));
EXPECT_CALL(*decoder_filters_[0], decodeComplete());

// Kick off the incoming data.
Buffer::OwnedImpl fake_input("1234");
conn_manager_->onData(fake_input, false);

Buffer::OwnedImpl decoded_data_to_forward;
decoded_data_to_forward.move(decode_buffer, 2);
EXPECT_CALL(*decoder_filters_[1], decodeData(BufferStringEqual("he"), false))
.WillOnce(Return(FilterDataStatus::StopIterationNoBuffer));
decoder_filters_[0]->callbacks_->injectDecodedDataToFilterChain(decoded_data_to_forward, false);

EXPECT_CALL(*decoder_filters_[1], decodeData(BufferStringEqual("llo"), false))
.WillOnce(Return(FilterDataStatus::StopIterationNoBuffer));
decoder_filters_[0]->callbacks_->injectDecodedDataToFilterChain(decode_buffer, false);

EXPECT_CALL(*decoder_filters_[1], decodeTrailers(_));
EXPECT_CALL(*decoder_filters_[1], decodeComplete());
decoder_filters_[0]->callbacks_->continueDecoding();

// Response path.
EXPECT_CALL(*encoder_filters_[1], encodeHeaders(_, false))
.WillOnce(Return(FilterHeadersStatus::Continue));
EXPECT_CALL(*encoder_filters_[0], encodeHeaders(_, false))
.WillOnce(Return(FilterHeadersStatus::Continue));
EXPECT_CALL(response_encoder_, encodeHeaders(_, false));

Buffer::OwnedImpl encoder_buffer;
EXPECT_CALL(*encoder_filters_[1], encodeData(_, false))
.WillOnce(Invoke([&](Buffer::Instance& data, bool) {
encoder_buffer.move(data);
return FilterDataStatus::StopIterationNoBuffer;
}));
EXPECT_CALL(*encoder_filters_[1], encodeTrailers(_))
.WillOnce(Return(FilterTrailersStatus::StopIteration));
EXPECT_CALL(*encoder_filters_[1], encodeComplete());

decoder_filters_[1]->callbacks_->encodeHeaders(
HeaderMapPtr{new TestHeaderMapImpl{{":status", "200"}}}, false);
Buffer::OwnedImpl response_body("response");
decoder_filters_[1]->callbacks_->encodeData(response_body, false);
decoder_filters_[1]->callbacks_->encodeTrailers(
HeaderMapPtr{new TestHeaderMapImpl{{":status", "200"}}});

Buffer::OwnedImpl encoded_data_to_forward;
encoded_data_to_forward.move(encoder_buffer, 3);
EXPECT_CALL(*encoder_filters_[0], encodeData(BufferStringEqual("res"), false));
EXPECT_CALL(response_encoder_, encodeData(_, false));
encoder_filters_[1]->callbacks_->injectEncodedDataToFilterChain(encoded_data_to_forward, false);

EXPECT_CALL(*encoder_filters_[0], encodeData(BufferStringEqual("ponse"), false));
EXPECT_CALL(response_encoder_, encodeData(_, false));
encoder_filters_[1]->callbacks_->injectEncodedDataToFilterChain(encoder_buffer, false);

EXPECT_CALL(*encoder_filters_[0], encodeTrailers(_));
EXPECT_CALL(*encoder_filters_[0], encodeComplete());
EXPECT_CALL(response_encoder_, encodeTrailers(_));
expectOnDestroy();
encoder_filters_[1]->callbacks_->continueEncoding();
}

TEST_F(HttpConnectionManagerImplTest, MultipleFilters) {
InSequence s;
setup(false, "");
Expand Down
2 changes: 2 additions & 0 deletions test/mocks/http/mocks.h
Original file line number Diff line number Diff line change
Expand Up @@ -177,6 +177,7 @@ class MockStreamDecoderFilterCallbacks : public StreamDecoderFilterCallbacks,

MOCK_METHOD0(continueDecoding, void());
MOCK_METHOD2(addDecodedData, void(Buffer::Instance& data, bool streaming));
MOCK_METHOD2(injectDecodedDataToFilterChain, void(Buffer::Instance& data, bool end_stream));
MOCK_METHOD0(addDecodedTrailers, HeaderMap&());
MOCK_METHOD0(decodingBuffer, const Buffer::Instance*());
MOCK_METHOD1(modifyDecodingBuffer, void(std::function<void(Buffer::Instance&)>));
Expand Down Expand Up @@ -219,6 +220,7 @@ class MockStreamEncoderFilterCallbacks : public StreamEncoderFilterCallbacks,

// Http::StreamEncoderFilterCallbacks
MOCK_METHOD2(addEncodedData, void(Buffer::Instance& data, bool streaming));
MOCK_METHOD2(injectEncodedDataToFilterChain, void(Buffer::Instance& data, bool end_stream));
MOCK_METHOD0(addEncodedTrailers, HeaderMap&());
MOCK_METHOD0(continueEncoding, void());
MOCK_METHOD0(encodingBuffer, const Buffer::Instance*());
Expand Down

0 comments on commit 56b0309

Please sign in to comment.