From a0cab8318daaf1b5884426f7097fcbe7776188be Mon Sep 17 00:00:00 2001 From: AJ Heller Date: Thu, 14 Dec 2023 10:30:36 -0800 Subject: [PATCH] [EventEngine] Replace Executor with EE::Run in Client Reactor (#35295) Closes #35295 COPYBARA_INTEGRATE_REVIEW=https://github.com/grpc/grpc/pull/35295 from drfloob:client-callback-reactor-ee de095c7f5cc39a308170848faa3220fbe35ba52f PiperOrigin-RevId: 590983428 --- BUILD | 1 + CMakeLists.txt | 4 ++++ Makefile | 3 +++ Package.swift | 1 + build_autogenerated.yaml | 4 ++++ gRPC-Core.podspec | 1 + grpc.gemspec | 1 + include/grpc/impl/call.h | 29 ++++++++++++++++++++++++ include/grpc/module.modulemap | 1 + include/grpcpp/support/client_callback.h | 28 +++++++++++------------ package.xml | 1 + src/core/lib/surface/call.cc | 16 ++++++++++++- src/cpp/client/client_callback.cc | 24 -------------------- tools/doxygen/Doxyfile.c++ | 1 + tools/doxygen/Doxyfile.c++.internal | 1 + tools/doxygen/Doxyfile.core | 1 + tools/doxygen/Doxyfile.core.internal | 1 + 17 files changed, 78 insertions(+), 40 deletions(-) create mode 100644 include/grpc/impl/call.h diff --git a/BUILD b/BUILD index 6d5130d4ebbef..d66485e998cf9 100644 --- a/BUILD +++ b/BUILD @@ -236,6 +236,7 @@ GPR_PUBLIC_HDRS = [ "include/grpc/support/sync_windows.h", "include/grpc/support/thd_id.h", "include/grpc/support/time.h", + "include/grpc/impl/call.h", "include/grpc/impl/codegen/atm.h", "include/grpc/impl/codegen/atm_gcc_atomic.h", "include/grpc/impl/codegen/atm_gcc_sync.h", diff --git a/CMakeLists.txt b/CMakeLists.txt index bef30891364a0..af749535ef69b 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -1698,6 +1698,7 @@ if(_gRPC_PLATFORM_ANDROID) endif() foreach(_hdr + include/grpc/impl/call.h include/grpc/impl/codegen/atm.h include/grpc/impl/codegen/atm_gcc_atomic.h include/grpc/impl/codegen/atm_gcc_sync.h @@ -2627,6 +2628,7 @@ foreach(_hdr include/grpc/grpc_posix.h include/grpc/grpc_security.h include/grpc/grpc_security_constants.h + include/grpc/impl/call.h include/grpc/impl/channel_arg_names.h include/grpc/impl/codegen/atm.h include/grpc/impl/codegen/atm_gcc_atomic.h @@ -3303,6 +3305,7 @@ foreach(_hdr include/grpc/grpc_posix.h include/grpc/grpc_security.h include/grpc/grpc_security_constants.h + include/grpc/impl/call.h include/grpc/impl/channel_arg_names.h include/grpc/impl/codegen/atm.h include/grpc/impl/codegen/atm_gcc_atomic.h @@ -5277,6 +5280,7 @@ foreach(_hdr include/grpc/grpc_posix.h include/grpc/grpc_security.h include/grpc/grpc_security_constants.h + include/grpc/impl/call.h include/grpc/impl/channel_arg_names.h include/grpc/impl/codegen/atm.h include/grpc/impl/codegen/atm_gcc_atomic.h diff --git a/Makefile b/Makefile index 30eb03c8ef2eb..5f47717caf750 100644 --- a/Makefile +++ b/Makefile @@ -878,6 +878,7 @@ LIBGPR_SRC = \ src/core/lib/gprpp/windows/thd.cc \ PUBLIC_HEADERS_C += \ + include/grpc/impl/call.h \ include/grpc/impl/codegen/atm.h \ include/grpc/impl/codegen/atm_gcc_atomic.h \ include/grpc/impl/codegen/atm_gcc_sync.h \ @@ -1762,6 +1763,7 @@ PUBLIC_HEADERS_C += \ include/grpc/grpc_posix.h \ include/grpc/grpc_security.h \ include/grpc/grpc_security_constants.h \ + include/grpc/impl/call.h \ include/grpc/impl/channel_arg_names.h \ include/grpc/impl/codegen/atm.h \ include/grpc/impl/codegen/atm_gcc_atomic.h \ @@ -2290,6 +2292,7 @@ PUBLIC_HEADERS_C += \ include/grpc/grpc_posix.h \ include/grpc/grpc_security.h \ include/grpc/grpc_security_constants.h \ + include/grpc/impl/call.h \ include/grpc/impl/channel_arg_names.h \ include/grpc/impl/codegen/atm.h \ include/grpc/impl/codegen/atm_gcc_atomic.h \ diff --git a/Package.swift b/Package.swift index 512f70b00dfeb..fe0e8286502ba 100644 --- a/Package.swift +++ b/Package.swift @@ -61,6 +61,7 @@ let package = Package( "include/grpc/grpc_posix.h", "include/grpc/grpc_security.h", "include/grpc/grpc_security_constants.h", + "include/grpc/impl/call.h", "include/grpc/impl/channel_arg_names.h", "include/grpc/impl/codegen/atm.h", "include/grpc/impl/codegen/atm_gcc_atomic.h", diff --git a/build_autogenerated.yaml b/build_autogenerated.yaml index bff13ebb1eef0..c0a2f52c64333 100644 --- a/build_autogenerated.yaml +++ b/build_autogenerated.yaml @@ -16,6 +16,7 @@ libs: build: all language: c public_headers: + - include/grpc/impl/call.h - include/grpc/impl/codegen/atm.h - include/grpc/impl/codegen/atm_gcc_atomic.h - include/grpc/impl/codegen/atm_gcc_sync.h @@ -162,6 +163,7 @@ libs: - include/grpc/grpc_posix.h - include/grpc/grpc_security.h - include/grpc/grpc_security_constants.h + - include/grpc/impl/call.h - include/grpc/impl/channel_arg_names.h - include/grpc/impl/codegen/atm.h - include/grpc/impl/codegen/atm_gcc_atomic.h @@ -2102,6 +2104,7 @@ libs: - include/grpc/grpc_posix.h - include/grpc/grpc_security.h - include/grpc/grpc_security_constants.h + - include/grpc/impl/call.h - include/grpc/impl/channel_arg_names.h - include/grpc/impl/codegen/atm.h - include/grpc/impl/codegen/atm_gcc_atomic.h @@ -4360,6 +4363,7 @@ libs: - include/grpc/grpc_posix.h - include/grpc/grpc_security.h - include/grpc/grpc_security_constants.h + - include/grpc/impl/call.h - include/grpc/impl/channel_arg_names.h - include/grpc/impl/codegen/atm.h - include/grpc/impl/codegen/atm_gcc_atomic.h diff --git a/gRPC-Core.podspec b/gRPC-Core.podspec index b33c007214aee..f0db95cb452b5 100644 --- a/gRPC-Core.podspec +++ b/gRPC-Core.podspec @@ -127,6 +127,7 @@ Pod::Spec.new do |s| 'include/grpc/grpc_posix.h', 'include/grpc/grpc_security.h', 'include/grpc/grpc_security_constants.h', + 'include/grpc/impl/call.h', 'include/grpc/impl/channel_arg_names.h', 'include/grpc/impl/codegen/atm.h', 'include/grpc/impl/codegen/atm_gcc_atomic.h', diff --git a/grpc.gemspec b/grpc.gemspec index e18172ead0150..e72ad3b33ce02 100644 --- a/grpc.gemspec +++ b/grpc.gemspec @@ -67,6 +67,7 @@ Gem::Specification.new do |s| s.files += %w( include/grpc/grpc_posix.h ) s.files += %w( include/grpc/grpc_security.h ) s.files += %w( include/grpc/grpc_security_constants.h ) + s.files += %w( include/grpc/impl/call.h ) s.files += %w( include/grpc/impl/channel_arg_names.h ) s.files += %w( include/grpc/impl/codegen/atm.h ) s.files += %w( include/grpc/impl/codegen/atm_gcc_atomic.h ) diff --git a/include/grpc/impl/call.h b/include/grpc/impl/call.h new file mode 100644 index 0000000000000..d1f7c2642b0de --- /dev/null +++ b/include/grpc/impl/call.h @@ -0,0 +1,29 @@ +// Copyright 2023 The gRPC Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#ifndef GRPC_IMPL_CALL_H +#define GRPC_IMPL_CALL_H + +#include + +#include "absl/functional/any_invocable.h" + +#include + +// Run a callback in the call's EventEngine. +// Internal-only +void grpc_call_run_in_event_engine(const grpc_call* call, + absl::AnyInvocable cb); + +#endif /* GRPC_IMPL_CALL_H */ diff --git a/include/grpc/module.modulemap b/include/grpc/module.modulemap index 29c6508a5cb26..3335f7c83cbd2 100644 --- a/include/grpc/module.modulemap +++ b/include/grpc/module.modulemap @@ -13,6 +13,7 @@ header "byte_buffer.h" header "grpc_posix.h" header "grpc_security.h" header "grpc_security_constants.h" + header "impl/call.h" header "impl/channel_arg_names.h" header "impl/codegen/atm.h" header "impl/codegen/byte_buffer.h" diff --git a/include/grpcpp/support/client_callback.h b/include/grpcpp/support/client_callback.h index e44764d83875d..1c420815de417 100644 --- a/include/grpcpp/support/client_callback.h +++ b/include/grpcpp/support/client_callback.h @@ -23,6 +23,7 @@ #include #include +#include #include #include #include @@ -123,15 +124,6 @@ class ClientReactor { /// \param[in] s The status outcome of this RPC virtual void OnDone(const grpc::Status& /*s*/) = 0; - /// InternalScheduleOnDone is not part of the API and is not meant to be - /// overridden. It is virtual to allow successful builds for certain bazel - /// build users that only want to depend on gRPC codegen headers and not the - /// full library (although this is not a generally-supported option). Although - /// the virtual call is slower than a direct call, this function is - /// heavyweight and the cost of the virtual call is not much in comparison. - /// This function may be removed or devirtualized in the future. - virtual void InternalScheduleOnDone(grpc::Status s); - /// InternalTrailersOnly is not part of the API and is not meant to be /// overridden. It is virtual to allow successful builds for certain bazel /// build users that only want to depend on gRPC codegen headers and not the @@ -649,11 +641,13 @@ class ClientCallbackReaderWriterImpl auto* reactor = reactor_; auto* call = call_.call(); this->~ClientCallbackReaderWriterImpl(); - grpc_call_unref(call); if (GPR_LIKELY(from_reaction)) { + grpc_call_unref(call); reactor->OnDone(s); } else { - reactor->InternalScheduleOnDone(std::move(s)); + grpc_call_run_in_event_engine( + call, [reactor, s = std::move(s)]() { reactor->OnDone(s); }); + grpc_call_unref(call); } } } @@ -822,11 +816,13 @@ class ClientCallbackReaderImpl : public ClientCallbackReader { auto* reactor = reactor_; auto* call = call_.call(); this->~ClientCallbackReaderImpl(); - grpc_call_unref(call); if (GPR_LIKELY(from_reaction)) { + grpc_call_unref(call); reactor->OnDone(s); } else { - reactor->InternalScheduleOnDone(std::move(s)); + grpc_call_run_in_event_engine( + call, [reactor, s = std::move(s)]() { reactor->OnDone(s); }); + grpc_call_unref(call); } } } @@ -1040,11 +1036,13 @@ class ClientCallbackWriterImpl : public ClientCallbackWriter { auto* reactor = reactor_; auto* call = call_.call(); this->~ClientCallbackWriterImpl(); - grpc_call_unref(call); if (GPR_LIKELY(from_reaction)) { + grpc_call_unref(call); reactor->OnDone(s); } else { - reactor->InternalScheduleOnDone(std::move(s)); + grpc_call_run_in_event_engine( + call, [reactor, s = std::move(s)]() { reactor->OnDone(s); }); + grpc_call_unref(call); } } } diff --git a/package.xml b/package.xml index f5724cbc51288..35e5ea371e775 100644 --- a/package.xml +++ b/package.xml @@ -49,6 +49,7 @@ + diff --git a/src/core/lib/surface/call.cc b/src/core/lib/surface/call.cc index c34d29d152f22..6d3f5c6eaa5cd 100644 --- a/src/core/lib/surface/call.cc +++ b/src/core/lib/surface/call.cc @@ -45,6 +45,7 @@ #include #include #include +#include #include #include #include @@ -148,6 +149,10 @@ class Call : public CppImplOf { // for that functionality be invented) virtual grpc_call_stack* call_stack() = 0; + // Return the EventEngine used for this call's async execution. + virtual grpc_event_engine::experimental::EventEngine* event_engine() + const = 0; + protected: // The maximum number of concurrent batches possible. // Based upon the maximum number of individually queueable ops in the batch @@ -529,6 +534,10 @@ class FilterStackCall final : public Call { GPR_ROUND_UP_TO_ALIGNMENT_SIZE(sizeof(*this))); } + grpc_event_engine::experimental::EventEngine* event_engine() const override { + return channel()->event_engine(); + } + grpc_call_element* call_elem(size_t idx) { return grpc_call_stack_element(call_stack(), idx); } @@ -2049,7 +2058,7 @@ class PromiseBasedCall : public Call, return failed_before_recv_message_.load(std::memory_order_relaxed); } - grpc_event_engine::experimental::EventEngine* event_engine() const final { + grpc_event_engine::experimental::EventEngine* event_engine() const override { return channel()->event_engine(); } @@ -3795,3 +3804,8 @@ const char* grpc_call_error_to_string(grpc_call_error error) { } GPR_UNREACHABLE_CODE(return "GRPC_CALL_ERROR_UNKNOW"); } + +void grpc_call_run_in_event_engine(const grpc_call* call, + absl::AnyInvocable cb) { + grpc_core::Call::FromC(call)->event_engine()->Run(std::move(cb)); +} diff --git a/src/cpp/client/client_callback.cc b/src/cpp/client/client_callback.cc index 32ed0ff6ec286..7a4bf5ce0bd83 100644 --- a/src/cpp/client/client_callback.cc +++ b/src/cpp/client/client_callback.cc @@ -32,30 +32,6 @@ namespace grpc { namespace internal { -void ClientReactor::InternalScheduleOnDone(grpc::Status s) { - // Unlike other uses of closure, do not Ref or Unref here since the reactor - // object's lifetime is controlled by user code. - grpc_core::ExecCtx exec_ctx; - struct ClosureWithArg { - grpc_closure closure; - ClientReactor* const reactor; - const grpc::Status status; - ClosureWithArg(ClientReactor* reactor_arg, grpc::Status s) - : reactor(reactor_arg), status(std::move(s)) { - GRPC_CLOSURE_INIT( - &closure, - [](void* void_arg, grpc_error_handle) { - ClosureWithArg* arg = static_cast(void_arg); - arg->reactor->OnDone(arg->status); - delete arg; - }, - this, grpc_schedule_on_exec_ctx); - } - }; - ClosureWithArg* arg = new ClosureWithArg(this, std::move(s)); - grpc_core::Executor::Run(&arg->closure, absl::OkStatus()); -} - bool ClientReactor::InternalTrailersOnly(const grpc_call* call) const { return grpc_call_is_trailers_only(call); } diff --git a/tools/doxygen/Doxyfile.c++ b/tools/doxygen/Doxyfile.c++ index 42eec93cf59fe..04addb79643d3 100644 --- a/tools/doxygen/Doxyfile.c++ +++ b/tools/doxygen/Doxyfile.c++ @@ -896,6 +896,7 @@ include/grpc/grpc_crl_provider.h \ include/grpc/grpc_posix.h \ include/grpc/grpc_security.h \ include/grpc/grpc_security_constants.h \ +include/grpc/impl/call.h \ include/grpc/impl/channel_arg_names.h \ include/grpc/impl/codegen/atm.h \ include/grpc/impl/codegen/atm_gcc_atomic.h \ diff --git a/tools/doxygen/Doxyfile.c++.internal b/tools/doxygen/Doxyfile.c++.internal index a173512c3ce5c..66440c542e37d 100644 --- a/tools/doxygen/Doxyfile.c++.internal +++ b/tools/doxygen/Doxyfile.c++.internal @@ -896,6 +896,7 @@ include/grpc/grpc_crl_provider.h \ include/grpc/grpc_posix.h \ include/grpc/grpc_security.h \ include/grpc/grpc_security_constants.h \ +include/grpc/impl/call.h \ include/grpc/impl/channel_arg_names.h \ include/grpc/impl/codegen/atm.h \ include/grpc/impl/codegen/atm_gcc_atomic.h \ diff --git a/tools/doxygen/Doxyfile.core b/tools/doxygen/Doxyfile.core index 6dac1787b7116..ea13ee565fd44 100644 --- a/tools/doxygen/Doxyfile.core +++ b/tools/doxygen/Doxyfile.core @@ -829,6 +829,7 @@ include/grpc/grpc_crl_provider.h \ include/grpc/grpc_posix.h \ include/grpc/grpc_security.h \ include/grpc/grpc_security_constants.h \ +include/grpc/impl/call.h \ include/grpc/impl/channel_arg_names.h \ include/grpc/impl/codegen/atm.h \ include/grpc/impl/codegen/atm_gcc_atomic.h \ diff --git a/tools/doxygen/Doxyfile.core.internal b/tools/doxygen/Doxyfile.core.internal index ae89c4707ccdd..aa559d588d15a 100644 --- a/tools/doxygen/Doxyfile.core.internal +++ b/tools/doxygen/Doxyfile.core.internal @@ -829,6 +829,7 @@ include/grpc/grpc_crl_provider.h \ include/grpc/grpc_posix.h \ include/grpc/grpc_security.h \ include/grpc/grpc_security_constants.h \ +include/grpc/impl/call.h \ include/grpc/impl/channel_arg_names.h \ include/grpc/impl/codegen/atm.h \ include/grpc/impl/codegen/atm_gcc_atomic.h \