Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

filters: support internal buffering #1090

Merged
merged 11 commits into from
Sep 22, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 10 additions & 0 deletions library/common/buffer/utility.cc
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,16 @@ envoy_data toBridgeData(Buffer::Instance& data) {
return bridge_data;
}

envoy_data copyToBridgeData(const Buffer::Instance& data) {
envoy_data bridge_data;
bridge_data.length = data.length();
bridge_data.bytes = static_cast<uint8_t*>(safe_malloc(sizeof(uint8_t) * bridge_data.length));
data.copyOut(0, bridge_data.length, const_cast<uint8_t*>(bridge_data.bytes));
bridge_data.release = free;
bridge_data.context = const_cast<uint8_t*>(bridge_data.bytes);
return bridge_data;
}

} // namespace Utility
} // namespace Buffer
} // namespace Envoy
11 changes: 9 additions & 2 deletions library/common/buffer/utility.h
Original file line number Diff line number Diff line change
Expand Up @@ -11,17 +11,24 @@ namespace Utility {
/**
* Transform envoy_data to Envoy::Buffer::Instance.
* @param headers, the envoy_data to transform.
* @return Envoy::Buffer::InstancePtr, the 1:1 transformation of the envoy_data param.
* @return Envoy::Buffer::InstancePtr, the native transformation of the envoy_data param.
*/
Buffer::InstancePtr toInternalData(envoy_data data);

/**
* Transform from Buffer::Instance to envoy_data.
* @param data, the Buffer::Instance to transform.
* @return envoy_data, the 1:1 transformation of the Buffer::Instance param.
* @return envoy_data, the bridge transformation of the Buffer::Instance param.
*/
envoy_data toBridgeData(Buffer::Instance&);

/**
* Copy from Buffer::Instance to envoy_data.
* @param data, the Buffer::Instance to copy.
* @return envoy_data, the copy produced from the Buffer::Instance param.
*/
envoy_data copyToBridgeData(const Buffer::Instance&);

} // namespace Utility
} // namespace Buffer
} // namespace Envoy
74 changes: 69 additions & 5 deletions library/common/extensions/filters/http/platform_bridge/filter.cc
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ PlatformBridgeFilter::PlatformBridgeFilter(PlatformBridgeFilterConfigSharedPtr c
platform_filter_.instance_context = platform_filter_.init_filter(platform_filter_.static_context);
ASSERT(platform_filter_.instance_context,
fmt::format("init_filter unsuccessful for {}", filter_name_));
iteration_state_ = IterationState::Ongoing;
}

void PlatformBridgeFilter::onDestroy() {
Expand Down Expand Up @@ -81,20 +82,69 @@ Http::FilterHeadersStatus PlatformBridgeFilter::onHeaders(Http::HeaderMap& heade
}

Http::FilterDataStatus PlatformBridgeFilter::onData(Buffer::Instance& data, bool end_stream,
Buffer::Instance* internal_buffer,
envoy_filter_on_data_f on_data) {
// Allow nullptr to act as no-op.
if (on_data == nullptr) {
return Http::FilterDataStatus::Continue;
}

envoy_data in_data = Buffer::Utility::toBridgeData(data);
envoy_data in_data;

if (iteration_state_ == IterationState::Stopped && internal_buffer &&
internal_buffer->length() > 0) {
// Pre-emptively buffer data to present aggregate to platform.
internal_buffer->move(data);
in_data = Buffer::Utility::copyToBridgeData(*internal_buffer);
} else {
in_data = Buffer::Utility::copyToBridgeData(data);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

maybe this will become clear later, but why is this a copy rather than a drain like before?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

When the buffer is unmodified, we shouldn't be draining Envoy's buffer. Per our discussed API, we now explicitly disallow the buffer to be modified in any state other than Continue or Resume.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There are also other ways we could potentially handle this, but for now, this is expedient and has no real downside since transform was doing a copy anyways.

}

envoy_filter_data_status result = on_data(in_data, end_stream, platform_filter_.instance_context);
Http::FilterDataStatus status = static_cast<Http::FilterDataStatus>(result.status);
switch (status) {
case Http::FilterDataStatus::Continue:
if (iteration_state_ == IterationState::Stopped) {
// When platform filter iteration is Stopped, Resume must be used to start iterating again.
// TODO(goaway): decide on the means to surface/handle errors here. Options include:
// - crashing
// - creating an error response for this stream
// - letting Envoy handle any invalid resulting state via its own guards
}
break;
case Http::FilterDataStatus::StopIterationAndBuffer:
if (iteration_state_ == IterationState::Stopped) {
// Data will already have been buffered (above).
status = Http::FilterDataStatus::StopIterationNoBuffer;
} else {
// Data will be buffered on return.
iteration_state_ = IterationState::Stopped;
}
break;
case Http::FilterDataStatus::StopIterationNoBuffer:
// In this context all previously buffered data can/should be dropped. If no data has been
// buffered, this is a no-op. If data was previously buffered, the most likely case is
// that a filter has decided to handle generating a response itself and no longer needs it.
// We opt for making this assumption since it's otherwise ambiguous how we should handle
// buffering when switching between the two stopped states, and since data can be arbitrarily
// interleaved, it's unclear that there's any legitimate case to support any more complex
// behavior.
if (internal_buffer) {
internal_buffer->drain(internal_buffer->length());
}
iteration_state_ = IterationState::Stopped;
break;
default:
PANIC("unsupported status for platform filters");
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

convention is to use NOT_REACHED_GCOVR_EXCL_LINE. You can leave a comment above why the default should not be reached.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actually, the default can be reached - we haven't enumerated all statuses Envoy defines, and a truly strange implementation of a filter could try to break things here. I think the assertion is maybe warranted?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@goaway, under the hood NOT_REACHED_GCOVR_EXCL_LINE panics. It doesn't mean that it cannot be reached, it means that if it reaches it is an error.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

^ bump on this

}

// TODO(goaway): Current platform implementations expose immutable data, thus any modification
// necessitates a full copy. Add 'modified' bit to determine when we can elide the copy. See also
// https://github.com/lyft/envoy-mobile/issues/949 for potential future optimization.
data.drain(data.length());
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

hmmm were we double draining before? Once in toBridge data and once here?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, we were double draining before. Which was benign, but not ideal.

data.addBufferFragment(*Buffer::BridgeFragment::createBridgeFragment(result.data));
if (iteration_state_ == IterationState::Ongoing) {
data.drain(data.length());
data.addBufferFragment(*Buffer::BridgeFragment::createBridgeFragment(result.data));
}

return status;
}
Expand Down Expand Up @@ -132,7 +182,14 @@ Http::FilterHeadersStatus PlatformBridgeFilter::decodeHeaders(Http::RequestHeade

Http::FilterDataStatus PlatformBridgeFilter::decodeData(Buffer::Instance& data, bool end_stream) {
// Delegate to shared implementation for request and response path.
return onData(data, end_stream, platform_filter_.on_request_data);
Buffer::Instance* internal_buffer = nullptr;
if (decoder_callbacks_->decodingBuffer()) {
decoder_callbacks_->modifyDecodingBuffer([&internal_buffer](Buffer::Instance& mutable_buffer) {
internal_buffer = &mutable_buffer;
});
}

return onData(data, end_stream, internal_buffer, platform_filter_.on_request_data);
}

Http::FilterTrailersStatus PlatformBridgeFilter::decodeTrailers(Http::RequestTrailerMap& trailers) {
Expand All @@ -148,7 +205,14 @@ Http::FilterHeadersStatus PlatformBridgeFilter::encodeHeaders(Http::ResponseHead

Http::FilterDataStatus PlatformBridgeFilter::encodeData(Buffer::Instance& data, bool end_stream) {
// Delegate to shared implementation for request and response path.
return onData(data, end_stream, platform_filter_.on_response_data);
Buffer::Instance* internal_buffer = nullptr;
if (encoder_callbacks_->encodingBuffer()) {
encoder_callbacks_->modifyEncodingBuffer([&internal_buffer](Buffer::Instance& mutable_buffer) {
internal_buffer = &mutable_buffer;
});
}

return onData(data, end_stream, internal_buffer, platform_filter_.on_response_data);
}

Http::FilterTrailersStatus
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,8 @@ class PlatformBridgeFilterConfig {

typedef std::shared_ptr<PlatformBridgeFilterConfig> PlatformBridgeFilterConfigSharedPtr;

enum class IterationState { Ongoing, Stopped };

/**
* Harness to bridge Envoy filter invocations up to the platform layer.
*/
Expand Down Expand Up @@ -56,10 +58,11 @@ class PlatformBridgeFilter final : public Http::PassThroughFilter,
Http::FilterHeadersStatus onHeaders(Http::HeaderMap& headers, bool end_stream,
envoy_filter_on_headers_f on_headers);
Http::FilterDataStatus onData(Buffer::Instance& data, bool end_stream,
envoy_filter_on_data_f on_data);
Buffer::Instance* internal_buffer, envoy_filter_on_data_f on_data);
Http::FilterTrailersStatus onTrailers(Http::HeaderMap& trailers,
envoy_filter_on_trailers_f on_trailers);
const std::string filter_name_;
IterationState iteration_state_;
envoy_http_filter platform_filter_;
};

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -180,6 +180,99 @@ platform_filter_name: BasicContinueOnRequestData
EXPECT_EQ(invocations.on_request_data_calls, 1);
}

TEST_F(PlatformBridgeFilterTest, StopAndBufferOnRequestData) {
envoy_http_filter platform_filter;
filter_invocations invocations = {0, 0, 0, 0, 0, 0, 0, 0};
platform_filter.static_context = &invocations;
platform_filter.init_filter = [](const void* context) -> const void* {
filter_invocations* invocations = static_cast<filter_invocations*>(const_cast<void*>(context));
invocations->init_filter_calls++;
return context;
};
platform_filter.on_request_data = [](envoy_data c_data, bool end_stream,
const void* context) -> envoy_filter_data_status {
filter_invocations* invocations = static_cast<filter_invocations*>(const_cast<void*>(context));
std::string expected_data[3] = {"A", "AB", "ABC"};
EXPECT_EQ(to_string(c_data), expected_data[invocations->on_request_data_calls++]);
EXPECT_FALSE(end_stream);
c_data.release(c_data.context);
return {kEnvoyFilterDataStatusStopIterationAndBuffer, envoy_nodata};
};

Buffer::OwnedImpl decoding_buffer;
EXPECT_CALL(decoder_callbacks_, decodingBuffer())
.Times(3)
.WillRepeatedly(Return(&decoding_buffer));
EXPECT_CALL(decoder_callbacks_, modifyDecodingBuffer(_))
.Times(3)
.WillRepeatedly(Invoke([&](std::function<void(Buffer::Instance&)> callback) -> void {
callback(decoding_buffer);
}));

setUpFilter(R"EOF(
platform_filter_name: StopAndBufferOnRequestData
)EOF",
&platform_filter);
EXPECT_EQ(invocations.init_filter_calls, 1);

Buffer::OwnedImpl first_chunk = Buffer::OwnedImpl("A");
EXPECT_EQ(Http::FilterDataStatus::StopIterationAndBuffer,
filter_->decodeData(first_chunk, false));
// Since the return code can't be handled in a unit test, manually update the buffer here.
decoding_buffer.move(first_chunk);
EXPECT_EQ(invocations.on_request_data_calls, 1);

Buffer::OwnedImpl second_chunk = Buffer::OwnedImpl("B");
EXPECT_EQ(Http::FilterDataStatus::StopIterationNoBuffer,
filter_->decodeData(second_chunk, false));
// Manual update not required, because once iteration is stopped, data is added directly.
EXPECT_EQ(invocations.on_request_data_calls, 2);

Buffer::OwnedImpl third_chunk = Buffer::OwnedImpl("C");
EXPECT_EQ(Http::FilterDataStatus::StopIterationNoBuffer, filter_->decodeData(third_chunk, false));
// Manual update not required, because once iteration is stopped, data is added directly.
EXPECT_EQ(invocations.on_request_data_calls, 3);
}

TEST_F(PlatformBridgeFilterTest, StopNoBufferOnRequestData) {
envoy_http_filter platform_filter;
filter_invocations invocations = {0, 0, 0, 0, 0, 0, 0, 0};
platform_filter.static_context = &invocations;
platform_filter.init_filter = [](const void* context) -> const void* {
filter_invocations* invocations = static_cast<filter_invocations*>(const_cast<void*>(context));
invocations->init_filter_calls++;
return context;
};
platform_filter.on_request_data = [](envoy_data c_data, bool end_stream,
const void* context) -> envoy_filter_data_status {
filter_invocations* invocations = static_cast<filter_invocations*>(const_cast<void*>(context));
std::string expected_data[3] = {"A", "B", "C"};
EXPECT_EQ(to_string(c_data), expected_data[invocations->on_request_data_calls++]);
EXPECT_FALSE(end_stream);
c_data.release(c_data.context);
return {kEnvoyFilterDataStatusStopIterationNoBuffer, envoy_nodata};
};

setUpFilter(R"EOF(
platform_filter_name: StopNoBufferOnRequestData
)EOF",
&platform_filter);
EXPECT_EQ(invocations.init_filter_calls, 1);

Buffer::OwnedImpl first_chunk = Buffer::OwnedImpl("A");
EXPECT_EQ(Http::FilterDataStatus::StopIterationNoBuffer, filter_->decodeData(first_chunk, false));
EXPECT_EQ(invocations.on_request_data_calls, 1);

Buffer::OwnedImpl second_chunk = Buffer::OwnedImpl("B");
EXPECT_EQ(Http::FilterDataStatus::StopIterationNoBuffer,
filter_->decodeData(second_chunk, false));
EXPECT_EQ(invocations.on_request_data_calls, 2);

Buffer::OwnedImpl third_chunk = Buffer::OwnedImpl("C");
EXPECT_EQ(Http::FilterDataStatus::StopIterationNoBuffer, filter_->decodeData(third_chunk, false));
EXPECT_EQ(invocations.on_request_data_calls, 3);
}

TEST_F(PlatformBridgeFilterTest, BasicContinueOnRequestTrailers) {
envoy_http_filter platform_filter;
filter_invocations invocations = {0, 0, 0, 0, 0, 0, 0, 0};
Expand Down Expand Up @@ -275,6 +368,99 @@ platform_filter_name: BasicContinueOnResponseData
EXPECT_EQ(invocations.on_response_data_calls, 1);
}

TEST_F(PlatformBridgeFilterTest, StopAndBufferOnResponseData) {
envoy_http_filter platform_filter;
filter_invocations invocations = {0, 0, 0, 0, 0, 0, 0, 0};
platform_filter.static_context = &invocations;
platform_filter.init_filter = [](const void* context) -> const void* {
filter_invocations* invocations = static_cast<filter_invocations*>(const_cast<void*>(context));
invocations->init_filter_calls++;
return context;
};
platform_filter.on_response_data = [](envoy_data c_data, bool end_stream,
const void* context) -> envoy_filter_data_status {
filter_invocations* invocations = static_cast<filter_invocations*>(const_cast<void*>(context));
std::string expected_data[3] = {"A", "AB", "ABC"};
EXPECT_EQ(to_string(c_data), expected_data[invocations->on_response_data_calls++]);
EXPECT_FALSE(end_stream);
c_data.release(c_data.context);
return {kEnvoyFilterDataStatusStopIterationAndBuffer, envoy_nodata};
};

Buffer::OwnedImpl encoding_buffer;
EXPECT_CALL(encoder_callbacks_, encodingBuffer())
.Times(3)
.WillRepeatedly(Return(&encoding_buffer));
EXPECT_CALL(encoder_callbacks_, modifyEncodingBuffer(_))
.Times(3)
.WillRepeatedly(Invoke([&](std::function<void(Buffer::Instance&)> callback) -> void {
callback(encoding_buffer);
}));

setUpFilter(R"EOF(
platform_filter_name: StopAndBufferOnResponseData
)EOF",
&platform_filter);
EXPECT_EQ(invocations.init_filter_calls, 1);

Buffer::OwnedImpl first_chunk = Buffer::OwnedImpl("A");
EXPECT_EQ(Http::FilterDataStatus::StopIterationAndBuffer,
filter_->encodeData(first_chunk, false));
// Since the return code can't be handled in a unit test, manually update the buffer here.
encoding_buffer.move(first_chunk);
EXPECT_EQ(invocations.on_response_data_calls, 1);

Buffer::OwnedImpl second_chunk = Buffer::OwnedImpl("B");
EXPECT_EQ(Http::FilterDataStatus::StopIterationNoBuffer,
filter_->encodeData(second_chunk, false));
// Manual update not required, because once iteration is stopped, data is added directly.
EXPECT_EQ(invocations.on_response_data_calls, 2);

Buffer::OwnedImpl third_chunk = Buffer::OwnedImpl("C");
EXPECT_EQ(Http::FilterDataStatus::StopIterationNoBuffer, filter_->encodeData(third_chunk, false));
// Manual update not required, because once iteration is stopped, data is added directly.
EXPECT_EQ(invocations.on_response_data_calls, 3);
}

TEST_F(PlatformBridgeFilterTest, StopNoBufferOnResponseData) {
envoy_http_filter platform_filter;
filter_invocations invocations = {0, 0, 0, 0, 0, 0, 0, 0};
platform_filter.static_context = &invocations;
platform_filter.init_filter = [](const void* context) -> const void* {
filter_invocations* invocations = static_cast<filter_invocations*>(const_cast<void*>(context));
invocations->init_filter_calls++;
return context;
};
platform_filter.on_response_data = [](envoy_data c_data, bool end_stream,
const void* context) -> envoy_filter_data_status {
filter_invocations* invocations = static_cast<filter_invocations*>(const_cast<void*>(context));
std::string expected_data[3] = {"A", "B", "C"};
EXPECT_EQ(to_string(c_data), expected_data[invocations->on_response_data_calls++]);
EXPECT_FALSE(end_stream);
c_data.release(c_data.context);
return {kEnvoyFilterDataStatusStopIterationNoBuffer, envoy_nodata};
};

setUpFilter(R"EOF(
platform_filter_name: StopNoBufferOnResponseData
)EOF",
&platform_filter);
EXPECT_EQ(invocations.init_filter_calls, 1);

Buffer::OwnedImpl first_chunk = Buffer::OwnedImpl("A");
EXPECT_EQ(Http::FilterDataStatus::StopIterationNoBuffer, filter_->encodeData(first_chunk, false));
EXPECT_EQ(invocations.on_response_data_calls, 1);

Buffer::OwnedImpl second_chunk = Buffer::OwnedImpl("B");
EXPECT_EQ(Http::FilterDataStatus::StopIterationNoBuffer,
filter_->encodeData(second_chunk, false));
EXPECT_EQ(invocations.on_response_data_calls, 2);

Buffer::OwnedImpl third_chunk = Buffer::OwnedImpl("C");
EXPECT_EQ(Http::FilterDataStatus::StopIterationNoBuffer, filter_->encodeData(third_chunk, false));
EXPECT_EQ(invocations.on_response_data_calls, 3);
}

TEST_F(PlatformBridgeFilterTest, BasicContinueOnResponseTrailers) {
envoy_http_filter platform_filter;
filter_invocations invocations = {0, 0, 0, 0, 0, 0, 0, 0};
Expand Down