Skip to content

Commit

Permalink
[EventEngine] Replace Executor with EE::Run in Client Reactor (grpc#3…
Browse files Browse the repository at this point in the history
…5295)

Closes grpc#35295

COPYBARA_INTEGRATE_REVIEW=grpc#35295 from drfloob:client-callback-reactor-ee de095c7
PiperOrigin-RevId: 590983428
  • Loading branch information
drfloob authored and copybara-github committed Dec 14, 2023
1 parent def3288 commit a0cab83
Show file tree
Hide file tree
Showing 17 changed files with 78 additions and 40 deletions.
1 change: 1 addition & 0 deletions BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -236,6 +236,7 @@ GPR_PUBLIC_HDRS = [
"include/grpc/support/sync_windows.h",
"include/grpc/support/thd_id.h",
"include/grpc/support/time.h",
"include/grpc/impl/call.h",
"include/grpc/impl/codegen/atm.h",
"include/grpc/impl/codegen/atm_gcc_atomic.h",
"include/grpc/impl/codegen/atm_gcc_sync.h",
Expand Down
4 changes: 4 additions & 0 deletions CMakeLists.txt

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

3 changes: 3 additions & 0 deletions Makefile

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions Package.swift

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 4 additions & 0 deletions build_autogenerated.yaml

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions gRPC-Core.podspec

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions grpc.gemspec

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

29 changes: 29 additions & 0 deletions include/grpc/impl/call.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
// Copyright 2023 The gRPC Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

#ifndef GRPC_IMPL_CALL_H
#define GRPC_IMPL_CALL_H

#include <grpc/support/port_platform.h>

#include "absl/functional/any_invocable.h"

#include <grpc/grpc.h>

// Run a callback in the call's EventEngine.
// Internal-only
void grpc_call_run_in_event_engine(const grpc_call* call,
absl::AnyInvocable<void()> cb);

#endif /* GRPC_IMPL_CALL_H */
1 change: 1 addition & 0 deletions include/grpc/module.modulemap

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

28 changes: 13 additions & 15 deletions include/grpcpp/support/client_callback.h
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
#include <functional>

#include <grpc/grpc.h>
#include <grpc/impl/call.h>
#include <grpc/support/log.h>
#include <grpcpp/impl/call.h>
#include <grpcpp/impl/call_op_set.h>
Expand Down Expand Up @@ -123,15 +124,6 @@ class ClientReactor {
/// \param[in] s The status outcome of this RPC
virtual void OnDone(const grpc::Status& /*s*/) = 0;

/// InternalScheduleOnDone is not part of the API and is not meant to be
/// overridden. It is virtual to allow successful builds for certain bazel
/// build users that only want to depend on gRPC codegen headers and not the
/// full library (although this is not a generally-supported option). Although
/// the virtual call is slower than a direct call, this function is
/// heavyweight and the cost of the virtual call is not much in comparison.
/// This function may be removed or devirtualized in the future.
virtual void InternalScheduleOnDone(grpc::Status s);

/// InternalTrailersOnly is not part of the API and is not meant to be
/// overridden. It is virtual to allow successful builds for certain bazel
/// build users that only want to depend on gRPC codegen headers and not the
Expand Down Expand Up @@ -649,11 +641,13 @@ class ClientCallbackReaderWriterImpl
auto* reactor = reactor_;
auto* call = call_.call();
this->~ClientCallbackReaderWriterImpl();
grpc_call_unref(call);
if (GPR_LIKELY(from_reaction)) {
grpc_call_unref(call);
reactor->OnDone(s);
} else {
reactor->InternalScheduleOnDone(std::move(s));
grpc_call_run_in_event_engine(
call, [reactor, s = std::move(s)]() { reactor->OnDone(s); });
grpc_call_unref(call);
}
}
}
Expand Down Expand Up @@ -822,11 +816,13 @@ class ClientCallbackReaderImpl : public ClientCallbackReader<Response> {
auto* reactor = reactor_;
auto* call = call_.call();
this->~ClientCallbackReaderImpl();
grpc_call_unref(call);
if (GPR_LIKELY(from_reaction)) {
grpc_call_unref(call);
reactor->OnDone(s);
} else {
reactor->InternalScheduleOnDone(std::move(s));
grpc_call_run_in_event_engine(
call, [reactor, s = std::move(s)]() { reactor->OnDone(s); });
grpc_call_unref(call);
}
}
}
Expand Down Expand Up @@ -1040,11 +1036,13 @@ class ClientCallbackWriterImpl : public ClientCallbackWriter<Request> {
auto* reactor = reactor_;
auto* call = call_.call();
this->~ClientCallbackWriterImpl();
grpc_call_unref(call);
if (GPR_LIKELY(from_reaction)) {
grpc_call_unref(call);
reactor->OnDone(s);
} else {
reactor->InternalScheduleOnDone(std::move(s));
grpc_call_run_in_event_engine(
call, [reactor, s = std::move(s)]() { reactor->OnDone(s); });
grpc_call_unref(call);
}
}
}
Expand Down
1 change: 1 addition & 0 deletions package.xml

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

16 changes: 15 additions & 1 deletion src/core/lib/surface/call.cc
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@
#include <grpc/compression.h>
#include <grpc/event_engine/event_engine.h>
#include <grpc/grpc.h>
#include <grpc/impl/call.h>
#include <grpc/impl/propagation_bits.h>
#include <grpc/slice.h>
#include <grpc/slice_buffer.h>
Expand Down Expand Up @@ -148,6 +149,10 @@ class Call : public CppImplOf<Call, grpc_call> {
// for that functionality be invented)
virtual grpc_call_stack* call_stack() = 0;

// Return the EventEngine used for this call's async execution.
virtual grpc_event_engine::experimental::EventEngine* event_engine()
const = 0;

protected:
// The maximum number of concurrent batches possible.
// Based upon the maximum number of individually queueable ops in the batch
Expand Down Expand Up @@ -529,6 +534,10 @@ class FilterStackCall final : public Call {
GPR_ROUND_UP_TO_ALIGNMENT_SIZE(sizeof(*this)));
}

grpc_event_engine::experimental::EventEngine* event_engine() const override {
return channel()->event_engine();
}

grpc_call_element* call_elem(size_t idx) {
return grpc_call_stack_element(call_stack(), idx);
}
Expand Down Expand Up @@ -2049,7 +2058,7 @@ class PromiseBasedCall : public Call,
return failed_before_recv_message_.load(std::memory_order_relaxed);
}

grpc_event_engine::experimental::EventEngine* event_engine() const final {
grpc_event_engine::experimental::EventEngine* event_engine() const override {
return channel()->event_engine();
}

Expand Down Expand Up @@ -3795,3 +3804,8 @@ const char* grpc_call_error_to_string(grpc_call_error error) {
}
GPR_UNREACHABLE_CODE(return "GRPC_CALL_ERROR_UNKNOW");
}

void grpc_call_run_in_event_engine(const grpc_call* call,
absl::AnyInvocable<void()> cb) {
grpc_core::Call::FromC(call)->event_engine()->Run(std::move(cb));
}
24 changes: 0 additions & 24 deletions src/cpp/client/client_callback.cc
Original file line number Diff line number Diff line change
Expand Up @@ -32,30 +32,6 @@
namespace grpc {
namespace internal {

void ClientReactor::InternalScheduleOnDone(grpc::Status s) {
// Unlike other uses of closure, do not Ref or Unref here since the reactor
// object's lifetime is controlled by user code.
grpc_core::ExecCtx exec_ctx;
struct ClosureWithArg {
grpc_closure closure;
ClientReactor* const reactor;
const grpc::Status status;
ClosureWithArg(ClientReactor* reactor_arg, grpc::Status s)
: reactor(reactor_arg), status(std::move(s)) {
GRPC_CLOSURE_INIT(
&closure,
[](void* void_arg, grpc_error_handle) {
ClosureWithArg* arg = static_cast<ClosureWithArg*>(void_arg);
arg->reactor->OnDone(arg->status);
delete arg;
},
this, grpc_schedule_on_exec_ctx);
}
};
ClosureWithArg* arg = new ClosureWithArg(this, std::move(s));
grpc_core::Executor::Run(&arg->closure, absl::OkStatus());
}

bool ClientReactor::InternalTrailersOnly(const grpc_call* call) const {
return grpc_call_is_trailers_only(call);
}
Expand Down
1 change: 1 addition & 0 deletions tools/doxygen/Doxyfile.c++

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions tools/doxygen/Doxyfile.c++.internal

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions tools/doxygen/Doxyfile.core

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions tools/doxygen/Doxyfile.core.internal

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

0 comments on commit a0cab83

Please sign in to comment.