Skip to content

Commit

Permalink
[inproc] Promise based inproc transport (grpc#35281)
Browse files Browse the repository at this point in the history
Closes grpc#35281

COPYBARA_INTEGRATE_REVIEW=grpc#35281 from ctiller:cg-inproc 3fe1bce
PiperOrigin-RevId: 590425232
  • Loading branch information
ctiller authored and copybara-github committed Dec 13, 2023
1 parent 0340372 commit a04188b
Show file tree
Hide file tree
Showing 2 changed files with 174 additions and 13 deletions.
2 changes: 2 additions & 0 deletions src/core/BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -6157,6 +6157,7 @@ grpc_cc_library(
"slice_buffer",
"status_helper",
"time",
"try_seq",
"//:channel_arg_names",
"//:config",
"//:debug_location",
Expand All @@ -6165,6 +6166,7 @@ grpc_cc_library(
"//:grpc_base",
"//:grpc_public_hdrs",
"//:grpc_trace",
"//:promise",
"//:ref_counted_ptr",
],
)
Expand Down
185 changes: 172 additions & 13 deletions src/core/ext/transport/inproc/inproc_transport.cc
Original file line number Diff line number Diff line change
Expand Up @@ -16,40 +16,199 @@

#include "src/core/ext/transport/inproc/inproc_transport.h"

#include <atomic>

#include <grpc/grpc.h>
#include <grpc/support/log.h>

#include "src/core/ext/transport/inproc/legacy_inproc_transport.h"
#include "src/core/lib/experiments/experiments.h"
#include "src/core/lib/gprpp/crash.h"
#include "src/core/lib/promise/promise.h"
#include "src/core/lib/promise/try_seq.h"
#include "src/core/lib/surface/server.h"
#include "src/core/lib/transport/transport.h"

namespace grpc_core {

namespace {
class InprocServerTransport final : public RefCounted<InprocServerTransport>,
public Transport,
public ServerTransport {
public:
void SetAcceptFunction(AcceptFunction accept_function) override {
accept_ = std::move(accept_function);
ConnectionState expect = ConnectionState::kInitial;
state_.compare_exchange_strong(expect, ConnectionState::kReady,
std::memory_order_acq_rel,
std::memory_order_acquire);
MutexLock lock(&state_tracker_mu_);
state_tracker_.SetState(GRPC_CHANNEL_READY, absl::OkStatus(),
"accept function set");
}

void Orphan() override { Unref(); }

FilterStackTransport* filter_stack_transport() override { return nullptr; }
ClientTransport* client_transport() override { return nullptr; }
ServerTransport* server_transport() override { return this; }
absl::string_view GetTransportName() const override { return "inproc"; }
void SetPollset(grpc_stream*, grpc_pollset*) override {}
void SetPollsetSet(grpc_stream*, grpc_pollset_set*) override {}
void PerformOp(grpc_transport_op* op) override {
gpr_log(GPR_INFO, "inproc server op: %s",
grpc_transport_op_string(op).c_str());
if (op->start_connectivity_watch != nullptr) {
MutexLock lock(&state_tracker_mu_);
state_tracker_.AddWatcher(op->start_connectivity_watch_state,
std::move(op->start_connectivity_watch));
}
if (op->stop_connectivity_watch != nullptr) {
MutexLock lock(&state_tracker_mu_);
state_tracker_.RemoveWatcher(op->stop_connectivity_watch);
}
if (op->set_accept_stream) {
Crash("set_accept_stream not supported on inproc transport");
}
}
grpc_endpoint* GetEndpoint() override { return nullptr; }

void Disconnect(absl::Status error) {
if (disconnecting_.exchange(true, std::memory_order_relaxed)) return;
disconnect_error_ = std::move(error);
state_.store(ConnectionState::kDisconnected, std::memory_order_relaxed);
MutexLock lock(&state_tracker_mu_);
state_tracker_.SetState(GRPC_CHANNEL_SHUTDOWN, disconnect_error_,
"inproc transport disconnected");
}

absl::StatusOr<CallInitiator> AcceptCall(ClientMetadata& md) {
switch (state_.load(std::memory_order_acquire)) {
case ConnectionState::kInitial:
return absl::InternalError(
"inproc transport hasn't started accepting calls");
case ConnectionState::kDisconnected:
return absl::UnavailableError("inproc transport is disconnected");
case ConnectionState::kReady:
break;
}
return accept_(md);
}

private:
enum class ConnectionState : uint8_t { kInitial, kReady, kDisconnected };

std::atomic<ConnectionState> state_{ConnectionState::kInitial};
std::atomic<bool> disconnecting_{false};
AcceptFunction accept_;
absl::Status disconnect_error_;
Mutex state_tracker_mu_;
ConnectivityStateTracker state_tracker_ ABSL_GUARDED_BY(state_tracker_mu_){
"inproc_server_transport", GRPC_CHANNEL_CONNECTING};
};

class InprocClientTransport final : public Transport, public ClientTransport {
public:
void StartCall(CallHandler call_handler) override {
call_handler.SpawnGuarded(
"pull_initial_metadata",
TrySeq(
call_handler.PullClientInitialMetadata(),
[server_transport = server_transport_,
call_handler](ClientMetadataHandle md) {
auto call_initiator = server_transport->AcceptCall(*md);
if (!call_initiator.ok()) return call_initiator.status();
ForwardCall(call_handler, std::move(*call_initiator),
std::move(md));
return absl::OkStatus();
},
ImmediateOkStatus()));
}

void Orphan() override { delete this; }

OrphanablePtr<Transport> GetServerTransport() {
return OrphanablePtr<Transport>(server_transport_->Ref().release());
}

FilterStackTransport* filter_stack_transport() override { return nullptr; }
ClientTransport* client_transport() override { return this; }
ServerTransport* server_transport() override { return nullptr; }
absl::string_view GetTransportName() const override { return "inproc"; }
void SetPollset(grpc_stream*, grpc_pollset*) override {}
void SetPollsetSet(grpc_stream*, grpc_pollset_set*) override {}
void PerformOp(grpc_transport_op*) override { Crash("unimplemented"); }
grpc_endpoint* GetEndpoint() override { return nullptr; }

private:
~InprocClientTransport() override {
server_transport_->Disconnect(
absl::UnavailableError("Client transport closed"));
}

RefCountedPtr<InprocServerTransport> server_transport_ =
MakeRefCounted<InprocServerTransport>();
};

bool UsePromiseBasedTransport() {
if (!IsPromiseBasedInprocTransportEnabled()) return false;
if (!IsPromiseBasedClientCallEnabled()) {
gpr_log(GPR_ERROR,
"Promise based inproc transport requested but promise based client "
"calls are disabled: using legacy implementation.");
return false;
}
if (!IsPromiseBasedServerCallEnabled()) {
gpr_log(GPR_ERROR,
"Promise based inproc transport requested but promise based server "
"calls are disabled: using legacy implementation.");
return false;
}
GPR_ASSERT(IsPromiseBasedClientCallEnabled());
GPR_ASSERT(IsPromiseBasedServerCallEnabled());
return true;
}

RefCountedPtr<Channel> MakeLameChannel(absl::string_view why,
absl::Status error) {
gpr_log(GPR_ERROR, "%s: %s", std::string(why).c_str(),
std::string(error.message()).c_str());
intptr_t integer;
grpc_status_code status = GRPC_STATUS_INTERNAL;
if (grpc_error_get_int(error, StatusIntProperty::kRpcStatus, &integer)) {
status = static_cast<grpc_status_code>(integer);
}
return RefCountedPtr<Channel>(Channel::FromC(grpc_lame_client_channel_create(
nullptr, status, std::string(why).c_str())));
}

RefCountedPtr<Channel> MakeInprocChannel(Server* server,
ChannelArgs client_channel_args) {
auto client_transport = MakeOrphanable<InprocClientTransport>();
auto server_transport = client_transport->GetServerTransport();
auto error =
server->SetupTransport(server_transport.get(), nullptr,
server->channel_args()
.Remove(GRPC_ARG_MAX_CONNECTION_IDLE_MS)
.Remove(GRPC_ARG_MAX_CONNECTION_AGE_MS),
nullptr);
if (!error.ok()) {
return MakeLameChannel("Failed to create server channel", std::move(error));
}
std::ignore = server_transport.release(); // consumed by SetupTransport
auto channel = Channel::Create(
"inproc",
client_channel_args.Set(GRPC_ARG_DEFAULT_AUTHORITY, "inproc.authority"),
GRPC_CLIENT_DIRECT_CHANNEL, client_transport.release());
if (!channel.ok()) {
return MakeLameChannel("Failed to create client channel", channel.status());
}
return std::move(*channel);
}
} // namespace

} // namespace grpc_core

grpc_channel* grpc_inproc_channel_create(grpc_server* server,
const grpc_channel_args* args,
void* reserved) {
grpc_core::ApplicationCallbackExecCtx app_exec_ctx;
grpc_core::ExecCtx exec_ctx;
if (!grpc_core::UsePromiseBasedTransport()) {
return grpc_legacy_inproc_channel_create(server, args, reserved);
}
grpc_core::Crash("unimplemented");
return grpc_core::MakeInprocChannel(grpc_core::Server::FromC(server),
grpc_core::CoreConfiguration::Get()
.channel_args_preconditioning()
.PreconditionChannelArgs(args))
.release()
->c_ptr();
}

0 comments on commit a04188b

Please sign in to comment.