From b15b155ee16d77aa99c92afb5ad68ed6bff0cd6f Mon Sep 17 00:00:00 2001 From: Piotr Sikora Date: Fri, 13 Nov 2020 09:02:41 -0800 Subject: [PATCH] wasm: fix order of callbacks for paused requests. (#13840) Fixes proxy-wasm/proxy-wasm-rust-sdk#43. Signed-off-by: Piotr Sikora --- bazel/external/cargo/Cargo.toml | 5 ++ bazel/repository_locations.bzl | 6 +- source/extensions/common/wasm/context.cc | 6 +- test/extensions/filters/http/wasm/BUILD | 1 + .../filters/http/wasm/test_data/BUILD | 11 +++ .../http/wasm/test_data/resume_call_rust.rs | 39 ++++++++++ .../wasm/test_data/test_resume_call_cpp.cc | 53 +++++++++++++ .../filters/http/wasm/wasm_filter_test.cc | 78 +++++++++++++++---- 8 files changed, 180 insertions(+), 19 deletions(-) create mode 100644 test/extensions/filters/http/wasm/test_data/resume_call_rust.rs create mode 100644 test/extensions/filters/http/wasm/test_data/test_resume_call_cpp.cc diff --git a/bazel/external/cargo/Cargo.toml b/bazel/external/cargo/Cargo.toml index f56a3f47ad..610d35df4e 100644 --- a/bazel/external/cargo/Cargo.toml +++ b/bazel/external/cargo/Cargo.toml @@ -46,6 +46,11 @@ name = "http_metadata_rust" path = "../../../test/extensions/filters/http/wasm/test_data/metadata_rust.rs" crate-type = ["cdylib"] +[[example]] +name = "http_resume_call_rust" +path = "../../../test/extensions/filters/http/wasm/test_data/resume_call_rust.rs" +crate-type = ["cdylib"] + [[example]] name = "http_shared_data_rust" path = "../../../test/extensions/filters/http/wasm/test_data/shared_data_rust.rs" diff --git a/bazel/repository_locations.bzl b/bazel/repository_locations.bzl index 015e604af0..402f03c42b 100644 --- a/bazel/repository_locations.bzl +++ b/bazel/repository_locations.bzl @@ -870,8 +870,8 @@ REPOSITORY_LOCATIONS_SPEC = dict( project_name = "WebAssembly for Proxies (C++ host implementation)", project_desc = "WebAssembly for Proxies (C++ host implementation)", project_url = "https://github.com/proxy-wasm/proxy-wasm-cpp-host", - version = "eceb02d5b7772ec1cd78a4d35356e57d2e6d59bb", - sha256 = "ae9d9b87d21d95647ebda197d130b37bddc5c6ee3e6630909a231fd55fcc9069", + version = "15827110ac35fdac9abdb6b05d04ee7ee2044dae", + sha256 = "77a2671205eb0973bee375a1bee4099edef991350433981f6e3508780318117d", strip_prefix = "proxy-wasm-cpp-host-{version}", urls = ["https://github.com/proxy-wasm/proxy-wasm-cpp-host/archive/{version}.tar.gz"], use_category = ["dataplane_ext"], @@ -882,7 +882,7 @@ REPOSITORY_LOCATIONS_SPEC = dict( "envoy.filters.network.wasm", "envoy.stat_sinks.wasm", ], - release_date = "2020-11-10", + release_date = "2020-11-12", cpe = "N/A", ), emscripten_toolchain = dict( diff --git a/source/extensions/common/wasm/context.cc b/source/extensions/common/wasm/context.cc index f38f9a2c32..006e7648c0 100644 --- a/source/extensions/common/wasm/context.cc +++ b/source/extensions/common/wasm/context.cc @@ -1493,12 +1493,14 @@ WasmResult Context::continueStream(WasmStreamType stream_type) { switch (stream_type) { case WasmStreamType::Request: if (decoder_callbacks_) { - decoder_callbacks_->continueDecoding(); + // We are in a reentrant call, so defer. + wasm()->addAfterVmCallAction([this] { decoder_callbacks_->continueDecoding(); }); } break; case WasmStreamType::Response: if (encoder_callbacks_) { - encoder_callbacks_->continueEncoding(); + // We are in a reentrant call, so defer. + wasm()->addAfterVmCallAction([this] { encoder_callbacks_->continueEncoding(); }); } break; default: diff --git a/test/extensions/filters/http/wasm/BUILD b/test/extensions/filters/http/wasm/BUILD index 579903b311..f8392be38a 100644 --- a/test/extensions/filters/http/wasm/BUILD +++ b/test/extensions/filters/http/wasm/BUILD @@ -24,6 +24,7 @@ envoy_extension_cc_test( "//test/extensions/filters/http/wasm/test_data:body_rust.wasm", "//test/extensions/filters/http/wasm/test_data:headers_rust.wasm", "//test/extensions/filters/http/wasm/test_data:metadata_rust.wasm", + "//test/extensions/filters/http/wasm/test_data:resume_call_rust.wasm", "//test/extensions/filters/http/wasm/test_data:shared_data_rust.wasm", "//test/extensions/filters/http/wasm/test_data:shared_queue_rust.wasm", "//test/extensions/filters/http/wasm/test_data:test_cpp.wasm", diff --git a/test/extensions/filters/http/wasm/test_data/BUILD b/test/extensions/filters/http/wasm/test_data/BUILD index 34fbbba8e3..f35b19e114 100644 --- a/test/extensions/filters/http/wasm/test_data/BUILD +++ b/test/extensions/filters/http/wasm/test_data/BUILD @@ -45,6 +45,15 @@ wasm_rust_binary( ], ) +wasm_rust_binary( + name = "resume_call_rust.wasm", + srcs = ["resume_call_rust.rs"], + deps = [ + "//bazel/external/cargo:log", + "//bazel/external/cargo:proxy_wasm", + ], +) + wasm_rust_binary( name = "shared_data_rust.wasm", srcs = ["shared_data_rust.rs"], @@ -72,6 +81,7 @@ envoy_cc_library( "test_cpp_null_plugin.cc", "test_grpc_call_cpp.cc", "test_grpc_stream_cpp.cc", + "test_resume_call_cpp.cc", "test_shared_data_cpp.cc", "test_shared_queue_cpp.cc", ], @@ -97,6 +107,7 @@ envoy_wasm_cc_binary( "test_cpp.cc", "test_grpc_call_cpp.cc", "test_grpc_stream_cpp.cc", + "test_resume_call_cpp.cc", "test_shared_data_cpp.cc", "test_shared_queue_cpp.cc", ], diff --git a/test/extensions/filters/http/wasm/test_data/resume_call_rust.rs b/test/extensions/filters/http/wasm/test_data/resume_call_rust.rs new file mode 100644 index 0000000000..d9eb08b1fa --- /dev/null +++ b/test/extensions/filters/http/wasm/test_data/resume_call_rust.rs @@ -0,0 +1,39 @@ +use log::info; +use proxy_wasm::traits::{Context, HttpContext}; +use proxy_wasm::types::*; +use std::time::Duration; + +#[no_mangle] +pub fn _start() { + proxy_wasm::set_log_level(LogLevel::Trace); + proxy_wasm::set_http_context(|_, _| -> Box { Box::new(TestStream) }); +} + +struct TestStream; + +impl HttpContext for TestStream { + fn on_http_request_headers(&mut self, _: usize) -> Action { + self.dispatch_http_call( + "cluster", + vec![(":method", "POST"), (":path", "/"), (":authority", "foo")], + Some(b"resume"), + vec![], + Duration::from_secs(1), + ) + .unwrap(); + info!("onRequestHeaders"); + Action::Pause + } + + fn on_http_request_body(&mut self, _: usize, _: bool) -> Action { + info!("onRequestBody"); + Action::Continue + } +} + +impl Context for TestStream { + fn on_http_call_response(&mut self, _: u32, _: usize, _: usize, _: usize) { + info!("continueRequest"); + self.resume_http_request(); + } +} diff --git a/test/extensions/filters/http/wasm/test_data/test_resume_call_cpp.cc b/test/extensions/filters/http/wasm/test_data/test_resume_call_cpp.cc new file mode 100644 index 0000000000..f557eb1438 --- /dev/null +++ b/test/extensions/filters/http/wasm/test_data/test_resume_call_cpp.cc @@ -0,0 +1,53 @@ +// NOLINT(namespace-envoy) +#include +#include +#include + +#ifndef NULL_PLUGIN +#include "proxy_wasm_intrinsics_lite.h" +#else +#include "extensions/common/wasm/ext/envoy_null_plugin.h" +#endif + +START_WASM_PLUGIN(HttpWasmTestCpp) + +class ResumeCallContext : public Context { +public: + explicit ResumeCallContext(uint32_t id, RootContext* root) : Context(id, root) {} + + FilterHeadersStatus onRequestHeaders(uint32_t, bool) override; + FilterDataStatus onRequestBody(size_t, bool) override; +}; + +class ResumeCallRootContext : public RootContext { +public: + explicit ResumeCallRootContext(uint32_t id, std::string_view root_id) + : RootContext(id, root_id) {} +}; + +static RegisterContextFactory register_ResumeCallContext(CONTEXT_FACTORY(ResumeCallContext), + ROOT_FACTORY(ResumeCallRootContext), + "resume_call"); + +FilterHeadersStatus ResumeCallContext::onRequestHeaders(uint32_t, bool) { + auto context_id = id(); + auto resume_callback = [context_id](uint32_t, size_t, uint32_t) { + getContext(context_id)->setEffectiveContext(); + logInfo("continueRequest"); + continueRequest(); + }; + if (root()->httpCall("cluster", {{":method", "POST"}, {":path", "/"}, {":authority", "foo"}}, + "resume", {}, 1000, resume_callback) != WasmResult::Ok) { + logError("unexpected failure"); + return FilterHeadersStatus::StopIteration; + } + logInfo("onRequestHeaders"); + return FilterHeadersStatus::StopIteration; +} + +FilterDataStatus ResumeCallContext::onRequestBody(size_t, bool) { + logInfo("onRequestBody"); + return FilterDataStatus::Continue; +} + +END_WASM_PLUGIN diff --git a/test/extensions/filters/http/wasm/wasm_filter_test.cc b/test/extensions/filters/http/wasm/wasm_filter_test.cc index a80dcd64a8..9999d453c7 100644 --- a/test/extensions/filters/http/wasm/wasm_filter_test.cc +++ b/test/extensions/filters/http/wasm/wasm_filter_test.cc @@ -7,6 +7,7 @@ #include "test/test_common/wasm_base.h" using testing::Eq; +using testing::InSequence; using testing::Invoke; using testing::Return; using testing::ReturnRef; @@ -214,7 +215,7 @@ TEST_P(WasmHttpFilterTest, HeadersStopAndContinue) { EXPECT_CALL(filter(), log_(spdlog::level::info, Eq(absl::string_view("header path /")))); EXPECT_CALL(filter(), log_(spdlog::level::warn, Eq(absl::string_view("onDone 2")))); Http::TestRequestHeaderMapImpl request_headers{{":path", "/"}, {"server", "envoy-wasm-pause"}}; - EXPECT_EQ(Http::FilterHeadersStatus::StopIteration, + EXPECT_EQ(Http::FilterHeadersStatus::StopAllIterationAndWatermark, filter().decodeHeaders(request_headers, true)); root_context_->onTick(0); filter().clearRouteCache(); @@ -615,7 +616,56 @@ TEST_P(WasmHttpFilterTest, AsyncCall) { callbacks->onSuccess(request, std::move(response_message)); return proxy_wasm::WasmResult::Ok; })); - EXPECT_EQ(Http::FilterHeadersStatus::StopIteration, + EXPECT_EQ(Http::FilterHeadersStatus::StopAllIterationAndWatermark, + filter().decodeHeaders(request_headers, false)); + + EXPECT_NE(callbacks, nullptr); +} + +TEST_P(WasmHttpFilterTest, StopAndResumeViaAsyncCall) { + setupTest("resume_call"); + setupFilter(); + + InSequence s; + + Http::TestRequestHeaderMapImpl request_headers{{":path", "/"}}; + Http::MockAsyncClientRequest request(&cluster_manager_.async_client_); + Http::AsyncClient::Callbacks* callbacks = nullptr; + EXPECT_CALL(cluster_manager_, get(Eq("cluster"))); + EXPECT_CALL(cluster_manager_, httpAsyncClientForCluster("cluster")); + EXPECT_CALL(cluster_manager_.async_client_, send_(_, _, _)) + .WillOnce( + Invoke([&](Http::RequestMessagePtr& message, Http::AsyncClient::Callbacks& cb, + const Http::AsyncClient::RequestOptions&) -> Http::AsyncClient::Request* { + EXPECT_EQ((Http::TestRequestHeaderMapImpl{{":method", "POST"}, + {":path", "/"}, + {":authority", "foo"}, + {"content-length", "6"}}), + message->headers()); + callbacks = &cb; + return &request; + })); + + EXPECT_CALL(filter(), log_(spdlog::level::info, Eq("onRequestHeaders"))) + .WillOnce(Invoke([&](uint32_t, absl::string_view) -> proxy_wasm::WasmResult { + Http::ResponseMessagePtr response_message(new Http::ResponseMessageImpl( + Http::ResponseHeaderMapPtr{new Http::TestResponseHeaderMapImpl{{":status", "200"}}})); + NiceMock span; + Http::TestResponseHeaderMapImpl response_header{{":status", "200"}}; + callbacks->onBeforeFinalizeUpstreamSpan(span, &response_header); + callbacks->onSuccess(request, std::move(response_message)); + return proxy_wasm::WasmResult::Ok; + })); + EXPECT_CALL(filter(), log_(spdlog::level::info, Eq("continueRequest"))); + + Http::MockStreamDecoderFilterCallbacks decoder_callbacks; + filter().setDecoderFilterCallbacks(decoder_callbacks); + EXPECT_CALL(decoder_callbacks, continueDecoding()).WillOnce(Invoke([&]() { + // Verify that we're not resuming processing from within Wasm callback. + EXPECT_EQ(proxy_wasm::current_context_, nullptr); + })); + + EXPECT_EQ(Http::FilterHeadersStatus::StopAllIterationAndWatermark, filter().decodeHeaders(request_headers, false)); EXPECT_NE(callbacks, nullptr); @@ -680,7 +730,7 @@ TEST_P(WasmHttpFilterTest, AsyncCallFailure) { } else { EXPECT_CALL(rootContext(), log_(spdlog::level::info, Eq("async_call failed"))); } - EXPECT_EQ(Http::FilterHeadersStatus::StopIteration, + EXPECT_EQ(Http::FilterHeadersStatus::StopAllIterationAndWatermark, filter().decodeHeaders(request_headers, false)); EXPECT_NE(callbacks, nullptr); @@ -711,7 +761,7 @@ TEST_P(WasmHttpFilterTest, AsyncCallAfterDestroyed) { })); EXPECT_CALL(filter(), log_(spdlog::level::info, Eq("onRequestHeaders"))); - EXPECT_EQ(Http::FilterHeadersStatus::StopIteration, + EXPECT_EQ(Http::FilterHeadersStatus::StopAllIterationAndWatermark, filter().decodeHeaders(request_headers, false)); EXPECT_CALL(request, cancel()).WillOnce([&]() { callbacks = nullptr; }); @@ -772,7 +822,7 @@ TEST_P(WasmHttpFilterTest, GrpcCall) { })); EXPECT_CALL(rootContext(), log_(spdlog::level::debug, Eq("response"))); Http::TestRequestHeaderMapImpl request_headers{{":path", "/"}}; - EXPECT_EQ(Http::FilterHeadersStatus::StopIteration, + EXPECT_EQ(Http::FilterHeadersStatus::StopAllIterationAndWatermark, filter().decodeHeaders(request_headers, false)); ProtobufWkt::Value value; @@ -856,7 +906,7 @@ TEST_P(WasmHttpFilterTest, GrpcCallFailure) { })); EXPECT_CALL(rootContext(), log_(spdlog::level::debug, Eq("failure bad"))); Http::TestRequestHeaderMapImpl request_headers{{":path", "/"}}; - EXPECT_EQ(Http::FilterHeadersStatus::StopIteration, + EXPECT_EQ(Http::FilterHeadersStatus::StopAllIterationAndWatermark, filter().decodeHeaders(request_headers, false)); // Test some additional error paths. @@ -917,7 +967,7 @@ TEST_P(WasmHttpFilterTest, GrpcCallCancel) { return std::move(client_factory); })); Http::TestRequestHeaderMapImpl request_headers{{":path", "/"}}; - EXPECT_EQ(Http::FilterHeadersStatus::StopIteration, + EXPECT_EQ(Http::FilterHeadersStatus::StopAllIterationAndWatermark, filter().decodeHeaders(request_headers, false)); rootContext().onQueueReady(0); @@ -961,7 +1011,7 @@ TEST_P(WasmHttpFilterTest, GrpcCallClose) { return std::move(client_factory); })); Http::TestRequestHeaderMapImpl request_headers{{":path", "/"}}; - EXPECT_EQ(Http::FilterHeadersStatus::StopIteration, + EXPECT_EQ(Http::FilterHeadersStatus::StopAllIterationAndWatermark, filter().decodeHeaders(request_headers, false)); rootContext().onQueueReady(1); @@ -1006,7 +1056,7 @@ TEST_P(WasmHttpFilterTest, GrpcCallAfterDestroyed) { })); Http::TestRequestHeaderMapImpl request_headers{{":path", "/"}}; - EXPECT_EQ(Http::FilterHeadersStatus::StopIteration, + EXPECT_EQ(Http::FilterHeadersStatus::StopAllIterationAndWatermark, filter().decodeHeaders(request_headers, false)); EXPECT_CALL(request, cancel()).WillOnce([&]() { callbacks = nullptr; }); @@ -1071,7 +1121,7 @@ TEST_P(WasmHttpFilterTest, GrpcStream) { EXPECT_CALL(rootContext(), log_(spdlog::level::debug, Eq("response response"))); EXPECT_CALL(rootContext(), log_(spdlog::level::debug, Eq("close done"))); Http::TestRequestHeaderMapImpl request_headers{{":path", "/"}}; - EXPECT_EQ(Http::FilterHeadersStatus::StopIteration, + EXPECT_EQ(Http::FilterHeadersStatus::StopAllIterationAndWatermark, filter().decodeHeaders(request_headers, false)); ProtobufWkt::Value value; @@ -1102,7 +1152,7 @@ TEST_P(WasmHttpFilterTest, GrpcStreamCloseLocal) { EXPECT_CALL(rootContext(), log_(spdlog::level::debug, Eq("response close"))); EXPECT_CALL(rootContext(), log_(spdlog::level::debug, Eq("close ok"))); Http::TestRequestHeaderMapImpl request_headers{{":path", "/"}}; - EXPECT_EQ(Http::FilterHeadersStatus::StopIteration, + EXPECT_EQ(Http::FilterHeadersStatus::StopAllIterationAndWatermark, filter().decodeHeaders(request_headers, false)); ProtobufWkt::Value value; @@ -1132,7 +1182,7 @@ TEST_P(WasmHttpFilterTest, GrpcStreamCloseRemote) { EXPECT_CALL(rootContext(), log_(spdlog::level::debug, Eq("response response"))); EXPECT_CALL(rootContext(), log_(spdlog::level::debug, Eq("close close"))); Http::TestRequestHeaderMapImpl request_headers{{":path", "/"}}; - EXPECT_EQ(Http::FilterHeadersStatus::StopIteration, + EXPECT_EQ(Http::FilterHeadersStatus::StopAllIterationAndWatermark, filter().decodeHeaders(request_headers, false)); ProtobufWkt::Value value; @@ -1159,7 +1209,7 @@ TEST_P(WasmHttpFilterTest, GrpcStreamCancel) { setupGrpcStreamTest(callbacks); Http::TestRequestHeaderMapImpl request_headers{{":path", "/"}}; - EXPECT_EQ(Http::FilterHeadersStatus::StopIteration, + EXPECT_EQ(Http::FilterHeadersStatus::StopAllIterationAndWatermark, filter().decodeHeaders(request_headers, false)); ProtobufWkt::Value value; @@ -1187,7 +1237,7 @@ TEST_P(WasmHttpFilterTest, GrpcStreamOpenAtShutdown) { EXPECT_CALL(rootContext(), log_(spdlog::level::debug, Eq("response response"))); Http::TestRequestHeaderMapImpl request_headers{{":path", "/"}}; - EXPECT_EQ(Http::FilterHeadersStatus::StopIteration, + EXPECT_EQ(Http::FilterHeadersStatus::StopAllIterationAndWatermark, filter().decodeHeaders(request_headers, false)); ProtobufWkt::Value value;