Skip to content

Commit

Permalink
wasm: fix order of callbacks for paused requests. (#13840)
Browse files Browse the repository at this point in the history
Fixes proxy-wasm/proxy-wasm-rust-sdk#43.

Signed-off-by: Piotr Sikora <piotrsikora@google.com>
  • Loading branch information
PiotrSikora authored Nov 13, 2020
1 parent 6d3691e commit b15b155
Show file tree
Hide file tree
Showing 8 changed files with 180 additions and 19 deletions.
5 changes: 5 additions & 0 deletions bazel/external/cargo/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,11 @@ name = "http_metadata_rust"
path = "../../../test/extensions/filters/http/wasm/test_data/metadata_rust.rs"
crate-type = ["cdylib"]

[[example]]
name = "http_resume_call_rust"
path = "../../../test/extensions/filters/http/wasm/test_data/resume_call_rust.rs"
crate-type = ["cdylib"]

[[example]]
name = "http_shared_data_rust"
path = "../../../test/extensions/filters/http/wasm/test_data/shared_data_rust.rs"
Expand Down
6 changes: 3 additions & 3 deletions bazel/repository_locations.bzl
Original file line number Diff line number Diff line change
Expand Up @@ -870,8 +870,8 @@ REPOSITORY_LOCATIONS_SPEC = dict(
project_name = "WebAssembly for Proxies (C++ host implementation)",
project_desc = "WebAssembly for Proxies (C++ host implementation)",
project_url = "https://github.com/proxy-wasm/proxy-wasm-cpp-host",
version = "eceb02d5b7772ec1cd78a4d35356e57d2e6d59bb",
sha256 = "ae9d9b87d21d95647ebda197d130b37bddc5c6ee3e6630909a231fd55fcc9069",
version = "15827110ac35fdac9abdb6b05d04ee7ee2044dae",
sha256 = "77a2671205eb0973bee375a1bee4099edef991350433981f6e3508780318117d",
strip_prefix = "proxy-wasm-cpp-host-{version}",
urls = ["https://github.com/proxy-wasm/proxy-wasm-cpp-host/archive/{version}.tar.gz"],
use_category = ["dataplane_ext"],
Expand All @@ -882,7 +882,7 @@ REPOSITORY_LOCATIONS_SPEC = dict(
"envoy.filters.network.wasm",
"envoy.stat_sinks.wasm",
],
release_date = "2020-11-10",
release_date = "2020-11-12",
cpe = "N/A",
),
emscripten_toolchain = dict(
Expand Down
6 changes: 4 additions & 2 deletions source/extensions/common/wasm/context.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1493,12 +1493,14 @@ WasmResult Context::continueStream(WasmStreamType stream_type) {
switch (stream_type) {
case WasmStreamType::Request:
if (decoder_callbacks_) {
decoder_callbacks_->continueDecoding();
// We are in a reentrant call, so defer.
wasm()->addAfterVmCallAction([this] { decoder_callbacks_->continueDecoding(); });
}
break;
case WasmStreamType::Response:
if (encoder_callbacks_) {
encoder_callbacks_->continueEncoding();
// We are in a reentrant call, so defer.
wasm()->addAfterVmCallAction([this] { encoder_callbacks_->continueEncoding(); });
}
break;
default:
Expand Down
1 change: 1 addition & 0 deletions test/extensions/filters/http/wasm/BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ envoy_extension_cc_test(
"//test/extensions/filters/http/wasm/test_data:body_rust.wasm",
"//test/extensions/filters/http/wasm/test_data:headers_rust.wasm",
"//test/extensions/filters/http/wasm/test_data:metadata_rust.wasm",
"//test/extensions/filters/http/wasm/test_data:resume_call_rust.wasm",
"//test/extensions/filters/http/wasm/test_data:shared_data_rust.wasm",
"//test/extensions/filters/http/wasm/test_data:shared_queue_rust.wasm",
"//test/extensions/filters/http/wasm/test_data:test_cpp.wasm",
Expand Down
11 changes: 11 additions & 0 deletions test/extensions/filters/http/wasm/test_data/BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,15 @@ wasm_rust_binary(
],
)

wasm_rust_binary(
name = "resume_call_rust.wasm",
srcs = ["resume_call_rust.rs"],
deps = [
"//bazel/external/cargo:log",
"//bazel/external/cargo:proxy_wasm",
],
)

wasm_rust_binary(
name = "shared_data_rust.wasm",
srcs = ["shared_data_rust.rs"],
Expand Down Expand Up @@ -72,6 +81,7 @@ envoy_cc_library(
"test_cpp_null_plugin.cc",
"test_grpc_call_cpp.cc",
"test_grpc_stream_cpp.cc",
"test_resume_call_cpp.cc",
"test_shared_data_cpp.cc",
"test_shared_queue_cpp.cc",
],
Expand All @@ -97,6 +107,7 @@ envoy_wasm_cc_binary(
"test_cpp.cc",
"test_grpc_call_cpp.cc",
"test_grpc_stream_cpp.cc",
"test_resume_call_cpp.cc",
"test_shared_data_cpp.cc",
"test_shared_queue_cpp.cc",
],
Expand Down
39 changes: 39 additions & 0 deletions test/extensions/filters/http/wasm/test_data/resume_call_rust.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
use log::info;
use proxy_wasm::traits::{Context, HttpContext};
use proxy_wasm::types::*;
use std::time::Duration;

#[no_mangle]
pub fn _start() {
proxy_wasm::set_log_level(LogLevel::Trace);
proxy_wasm::set_http_context(|_, _| -> Box<dyn HttpContext> { Box::new(TestStream) });
}

struct TestStream;

impl HttpContext for TestStream {
fn on_http_request_headers(&mut self, _: usize) -> Action {
self.dispatch_http_call(
"cluster",
vec![(":method", "POST"), (":path", "/"), (":authority", "foo")],
Some(b"resume"),
vec![],
Duration::from_secs(1),
)
.unwrap();
info!("onRequestHeaders");
Action::Pause
}

fn on_http_request_body(&mut self, _: usize, _: bool) -> Action {
info!("onRequestBody");
Action::Continue
}
}

impl Context for TestStream {
fn on_http_call_response(&mut self, _: u32, _: usize, _: usize, _: usize) {
info!("continueRequest");
self.resume_http_request();
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
// NOLINT(namespace-envoy)
#include <memory>
#include <string>
#include <unordered_map>

#ifndef NULL_PLUGIN
#include "proxy_wasm_intrinsics_lite.h"
#else
#include "extensions/common/wasm/ext/envoy_null_plugin.h"
#endif

START_WASM_PLUGIN(HttpWasmTestCpp)

class ResumeCallContext : public Context {
public:
explicit ResumeCallContext(uint32_t id, RootContext* root) : Context(id, root) {}

FilterHeadersStatus onRequestHeaders(uint32_t, bool) override;
FilterDataStatus onRequestBody(size_t, bool) override;
};

class ResumeCallRootContext : public RootContext {
public:
explicit ResumeCallRootContext(uint32_t id, std::string_view root_id)
: RootContext(id, root_id) {}
};

static RegisterContextFactory register_ResumeCallContext(CONTEXT_FACTORY(ResumeCallContext),
ROOT_FACTORY(ResumeCallRootContext),
"resume_call");

FilterHeadersStatus ResumeCallContext::onRequestHeaders(uint32_t, bool) {
auto context_id = id();
auto resume_callback = [context_id](uint32_t, size_t, uint32_t) {
getContext(context_id)->setEffectiveContext();
logInfo("continueRequest");
continueRequest();
};
if (root()->httpCall("cluster", {{":method", "POST"}, {":path", "/"}, {":authority", "foo"}},
"resume", {}, 1000, resume_callback) != WasmResult::Ok) {
logError("unexpected failure");
return FilterHeadersStatus::StopIteration;
}
logInfo("onRequestHeaders");
return FilterHeadersStatus::StopIteration;
}

FilterDataStatus ResumeCallContext::onRequestBody(size_t, bool) {
logInfo("onRequestBody");
return FilterDataStatus::Continue;
}

END_WASM_PLUGIN
78 changes: 64 additions & 14 deletions test/extensions/filters/http/wasm/wasm_filter_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
#include "test/test_common/wasm_base.h"

using testing::Eq;
using testing::InSequence;
using testing::Invoke;
using testing::Return;
using testing::ReturnRef;
Expand Down Expand Up @@ -214,7 +215,7 @@ TEST_P(WasmHttpFilterTest, HeadersStopAndContinue) {
EXPECT_CALL(filter(), log_(spdlog::level::info, Eq(absl::string_view("header path /"))));
EXPECT_CALL(filter(), log_(spdlog::level::warn, Eq(absl::string_view("onDone 2"))));
Http::TestRequestHeaderMapImpl request_headers{{":path", "/"}, {"server", "envoy-wasm-pause"}};
EXPECT_EQ(Http::FilterHeadersStatus::StopIteration,
EXPECT_EQ(Http::FilterHeadersStatus::StopAllIterationAndWatermark,
filter().decodeHeaders(request_headers, true));
root_context_->onTick(0);
filter().clearRouteCache();
Expand Down Expand Up @@ -615,7 +616,56 @@ TEST_P(WasmHttpFilterTest, AsyncCall) {
callbacks->onSuccess(request, std::move(response_message));
return proxy_wasm::WasmResult::Ok;
}));
EXPECT_EQ(Http::FilterHeadersStatus::StopIteration,
EXPECT_EQ(Http::FilterHeadersStatus::StopAllIterationAndWatermark,
filter().decodeHeaders(request_headers, false));

EXPECT_NE(callbacks, nullptr);
}

TEST_P(WasmHttpFilterTest, StopAndResumeViaAsyncCall) {
setupTest("resume_call");
setupFilter();

InSequence s;

Http::TestRequestHeaderMapImpl request_headers{{":path", "/"}};
Http::MockAsyncClientRequest request(&cluster_manager_.async_client_);
Http::AsyncClient::Callbacks* callbacks = nullptr;
EXPECT_CALL(cluster_manager_, get(Eq("cluster")));
EXPECT_CALL(cluster_manager_, httpAsyncClientForCluster("cluster"));
EXPECT_CALL(cluster_manager_.async_client_, send_(_, _, _))
.WillOnce(
Invoke([&](Http::RequestMessagePtr& message, Http::AsyncClient::Callbacks& cb,
const Http::AsyncClient::RequestOptions&) -> Http::AsyncClient::Request* {
EXPECT_EQ((Http::TestRequestHeaderMapImpl{{":method", "POST"},
{":path", "/"},
{":authority", "foo"},
{"content-length", "6"}}),
message->headers());
callbacks = &cb;
return &request;
}));

EXPECT_CALL(filter(), log_(spdlog::level::info, Eq("onRequestHeaders")))
.WillOnce(Invoke([&](uint32_t, absl::string_view) -> proxy_wasm::WasmResult {
Http::ResponseMessagePtr response_message(new Http::ResponseMessageImpl(
Http::ResponseHeaderMapPtr{new Http::TestResponseHeaderMapImpl{{":status", "200"}}}));
NiceMock<Tracing::MockSpan> span;
Http::TestResponseHeaderMapImpl response_header{{":status", "200"}};
callbacks->onBeforeFinalizeUpstreamSpan(span, &response_header);
callbacks->onSuccess(request, std::move(response_message));
return proxy_wasm::WasmResult::Ok;
}));
EXPECT_CALL(filter(), log_(spdlog::level::info, Eq("continueRequest")));

Http::MockStreamDecoderFilterCallbacks decoder_callbacks;
filter().setDecoderFilterCallbacks(decoder_callbacks);
EXPECT_CALL(decoder_callbacks, continueDecoding()).WillOnce(Invoke([&]() {
// Verify that we're not resuming processing from within Wasm callback.
EXPECT_EQ(proxy_wasm::current_context_, nullptr);
}));

EXPECT_EQ(Http::FilterHeadersStatus::StopAllIterationAndWatermark,
filter().decodeHeaders(request_headers, false));

EXPECT_NE(callbacks, nullptr);
Expand Down Expand Up @@ -680,7 +730,7 @@ TEST_P(WasmHttpFilterTest, AsyncCallFailure) {
} else {
EXPECT_CALL(rootContext(), log_(spdlog::level::info, Eq("async_call failed")));
}
EXPECT_EQ(Http::FilterHeadersStatus::StopIteration,
EXPECT_EQ(Http::FilterHeadersStatus::StopAllIterationAndWatermark,
filter().decodeHeaders(request_headers, false));

EXPECT_NE(callbacks, nullptr);
Expand Down Expand Up @@ -711,7 +761,7 @@ TEST_P(WasmHttpFilterTest, AsyncCallAfterDestroyed) {
}));

EXPECT_CALL(filter(), log_(spdlog::level::info, Eq("onRequestHeaders")));
EXPECT_EQ(Http::FilterHeadersStatus::StopIteration,
EXPECT_EQ(Http::FilterHeadersStatus::StopAllIterationAndWatermark,
filter().decodeHeaders(request_headers, false));

EXPECT_CALL(request, cancel()).WillOnce([&]() { callbacks = nullptr; });
Expand Down Expand Up @@ -772,7 +822,7 @@ TEST_P(WasmHttpFilterTest, GrpcCall) {
}));
EXPECT_CALL(rootContext(), log_(spdlog::level::debug, Eq("response")));
Http::TestRequestHeaderMapImpl request_headers{{":path", "/"}};
EXPECT_EQ(Http::FilterHeadersStatus::StopIteration,
EXPECT_EQ(Http::FilterHeadersStatus::StopAllIterationAndWatermark,
filter().decodeHeaders(request_headers, false));

ProtobufWkt::Value value;
Expand Down Expand Up @@ -856,7 +906,7 @@ TEST_P(WasmHttpFilterTest, GrpcCallFailure) {
}));
EXPECT_CALL(rootContext(), log_(spdlog::level::debug, Eq("failure bad")));
Http::TestRequestHeaderMapImpl request_headers{{":path", "/"}};
EXPECT_EQ(Http::FilterHeadersStatus::StopIteration,
EXPECT_EQ(Http::FilterHeadersStatus::StopAllIterationAndWatermark,
filter().decodeHeaders(request_headers, false));

// Test some additional error paths.
Expand Down Expand Up @@ -917,7 +967,7 @@ TEST_P(WasmHttpFilterTest, GrpcCallCancel) {
return std::move(client_factory);
}));
Http::TestRequestHeaderMapImpl request_headers{{":path", "/"}};
EXPECT_EQ(Http::FilterHeadersStatus::StopIteration,
EXPECT_EQ(Http::FilterHeadersStatus::StopAllIterationAndWatermark,
filter().decodeHeaders(request_headers, false));

rootContext().onQueueReady(0);
Expand Down Expand Up @@ -961,7 +1011,7 @@ TEST_P(WasmHttpFilterTest, GrpcCallClose) {
return std::move(client_factory);
}));
Http::TestRequestHeaderMapImpl request_headers{{":path", "/"}};
EXPECT_EQ(Http::FilterHeadersStatus::StopIteration,
EXPECT_EQ(Http::FilterHeadersStatus::StopAllIterationAndWatermark,
filter().decodeHeaders(request_headers, false));

rootContext().onQueueReady(1);
Expand Down Expand Up @@ -1006,7 +1056,7 @@ TEST_P(WasmHttpFilterTest, GrpcCallAfterDestroyed) {
}));
Http::TestRequestHeaderMapImpl request_headers{{":path", "/"}};

EXPECT_EQ(Http::FilterHeadersStatus::StopIteration,
EXPECT_EQ(Http::FilterHeadersStatus::StopAllIterationAndWatermark,
filter().decodeHeaders(request_headers, false));

EXPECT_CALL(request, cancel()).WillOnce([&]() { callbacks = nullptr; });
Expand Down Expand Up @@ -1071,7 +1121,7 @@ TEST_P(WasmHttpFilterTest, GrpcStream) {
EXPECT_CALL(rootContext(), log_(spdlog::level::debug, Eq("response response")));
EXPECT_CALL(rootContext(), log_(spdlog::level::debug, Eq("close done")));
Http::TestRequestHeaderMapImpl request_headers{{":path", "/"}};
EXPECT_EQ(Http::FilterHeadersStatus::StopIteration,
EXPECT_EQ(Http::FilterHeadersStatus::StopAllIterationAndWatermark,
filter().decodeHeaders(request_headers, false));

ProtobufWkt::Value value;
Expand Down Expand Up @@ -1102,7 +1152,7 @@ TEST_P(WasmHttpFilterTest, GrpcStreamCloseLocal) {
EXPECT_CALL(rootContext(), log_(spdlog::level::debug, Eq("response close")));
EXPECT_CALL(rootContext(), log_(spdlog::level::debug, Eq("close ok")));
Http::TestRequestHeaderMapImpl request_headers{{":path", "/"}};
EXPECT_EQ(Http::FilterHeadersStatus::StopIteration,
EXPECT_EQ(Http::FilterHeadersStatus::StopAllIterationAndWatermark,
filter().decodeHeaders(request_headers, false));

ProtobufWkt::Value value;
Expand Down Expand Up @@ -1132,7 +1182,7 @@ TEST_P(WasmHttpFilterTest, GrpcStreamCloseRemote) {
EXPECT_CALL(rootContext(), log_(spdlog::level::debug, Eq("response response")));
EXPECT_CALL(rootContext(), log_(spdlog::level::debug, Eq("close close")));
Http::TestRequestHeaderMapImpl request_headers{{":path", "/"}};
EXPECT_EQ(Http::FilterHeadersStatus::StopIteration,
EXPECT_EQ(Http::FilterHeadersStatus::StopAllIterationAndWatermark,
filter().decodeHeaders(request_headers, false));

ProtobufWkt::Value value;
Expand All @@ -1159,7 +1209,7 @@ TEST_P(WasmHttpFilterTest, GrpcStreamCancel) {
setupGrpcStreamTest(callbacks);

Http::TestRequestHeaderMapImpl request_headers{{":path", "/"}};
EXPECT_EQ(Http::FilterHeadersStatus::StopIteration,
EXPECT_EQ(Http::FilterHeadersStatus::StopAllIterationAndWatermark,
filter().decodeHeaders(request_headers, false));

ProtobufWkt::Value value;
Expand Down Expand Up @@ -1187,7 +1237,7 @@ TEST_P(WasmHttpFilterTest, GrpcStreamOpenAtShutdown) {

EXPECT_CALL(rootContext(), log_(spdlog::level::debug, Eq("response response")));
Http::TestRequestHeaderMapImpl request_headers{{":path", "/"}};
EXPECT_EQ(Http::FilterHeadersStatus::StopIteration,
EXPECT_EQ(Http::FilterHeadersStatus::StopAllIterationAndWatermark,
filter().decodeHeaders(request_headers, false));

ProtobufWkt::Value value;
Expand Down

0 comments on commit b15b155

Please sign in to comment.