Skip to content

Commit

Permalink
Simplify the implementation related to PythonAsyncProcessorFactory::g…
Browse files Browse the repository at this point in the history
…enericProcessor

Summary:
The current implementation of `PythonAsyncProcessorFactory::genericProcessor` is a function template. Based on the three different RpcKind values that it serves, there is no reason for it to be a function template, which adds to complexity and affects maintainability.

To improve maintainabliity, eliminate the function template and the code that depends on it.

Reviewed By: praihan

Differential Revision: D61351067

fbshipit-source-id: 812d5b6301a5343436fc40c311773aaf1b7a7801
  • Loading branch information
Satish Kumar authored and facebook-github-bot committed Aug 16, 2024
1 parent eadeca1 commit 65946a2
Show file tree
Hide file tree
Showing 4 changed files with 254 additions and 239 deletions.
208 changes: 190 additions & 18 deletions thrift/lib/python/server/PythonAsyncProcessor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -108,12 +108,34 @@ PythonAsyncProcessor::handlePythonServerCallbackOneway(
return std::move(future).defer([callback = std::move(callback)](auto&&) {});
}

void PythonAsyncProcessor::executeReadEventCallbacks(
apache::thrift::Cpp2RequestContext* ctx,
apache::thrift::ContextStack* ctxStack,
apache::thrift::SerializedRequest& serializedRequest,
apache::thrift::protocol::PROTOCOL_TYPES protocol) {
if (ctxStack) {
ctxStack->preRead();

apache::thrift::SerializedMessage smsg;
smsg.protocolType = protocol;
smsg.buffer = serializedRequest.buffer.get();
smsg.methodName = ctx->getMethodName();
ctxStack->onReadData(smsg);

ctxStack->postRead(
nullptr,
serializedRequest.buffer
->computeChainDataLength()); // TODO move this call to inside
// the python code
}
}

void PythonAsyncProcessor::processSerializedCompressedRequestWithMetadata(
apache::thrift::ResponseChannelRequest::UniquePtr req,
apache::thrift::SerializedCompressedRequest&& serializedRequest,
apache::thrift::SerializedCompressedRequest&& serializedCompressedRequest,
const apache::thrift::AsyncProcessorFactory::MethodMetadata&
untypedMethodMetadata,
apache::thrift::protocol::PROTOCOL_TYPES protType,
apache::thrift::protocol::PROTOCOL_TYPES protocol,
apache::thrift::Cpp2RequestContext* context,
folly::EventBase* eb,
apache::thrift::concurrency::ThreadManager* tm) {
Expand All @@ -125,6 +147,7 @@ void PythonAsyncProcessor::processSerializedCompressedRequestWithMetadata(
// from the actual python server
std::string interactionName;
bool interactionFactoryMethod = false;
auto serializedRequest = std::move(serializedCompressedRequest).uncompress();
if (context->getInteractionId()) {
std::string_view serviceName{context->getMethodName()};
serviceName = serviceName.substr(0, serviceName.find("."));
Expand Down Expand Up @@ -154,27 +177,176 @@ void PythonAsyncProcessor::processSerializedCompressedRequestWithMetadata(
interactionFactoryMethod)) {
return;
}
ProcessFunc pfn;
switch (protType) {
case apache::thrift::protocol::T_BINARY_PROTOCOL: {
pfn = methodMetadata.processFuncs.binary;
break;
}
case apache::thrift::protocol::T_COMPACT_PROTOCOL: {
pfn = methodMetadata.processFuncs.compact;
break;
}
default:
LOG(ERROR) << "invalid protType: " << folly::to_underlying(protType);
return;
}

auto executor = tm
? tm->getKeepAlive(
context->getRequestExecutionScope(),
apache::thrift::concurrency::ThreadManager::Source::INTERNAL)
: nullptr;
(this->*pfn)(
std::move(req), std::move(serializedRequest), context, eb, executor);

auto kind = methodMetadata.rpcKind.value();
const char* serviceName = serviceName_.c_str();

auto ctxStack = apache::thrift::ContextStack::create(
this->getEventHandlersSharedPtr(),
serviceName,
functionFullNameMap_.at(context->getMethodName()).c_str(),
context);

executeReadEventCallbacks(
context, ctxStack.get(), serializedRequest, protocol);

// While this folly::makeSemiFuture().deferValue() may seem
// unnecessary, without this deferValue, the call to
// do_import(), defined at the top of this file,
// which happens via the call to dispatchRequest() below
// will crash with a null pointer derefence.
// The hypothesis is that python is not yet initialized
// and we chose not to go down that rabbit hole because
// the current implementation matches what was already present.
folly::makeSemiFuture()
.deferValue([this,
protocol,
context,
eb,
executor,
serviceName,
kind,
req = std::move(req),
ctxStack = std::move(ctxStack),
serializedRequest = std::move(serializedRequest)](
auto&& /* unused */) mutable {
if (!req) {
return folly::makeSemiFuture();
}

if (!req->getShouldStartProcessing()) {
// Ensure request is moved into HandlerCallback, so that request
// is always destroyed on its EventBase thread
if (eb) {
apache::thrift::HandlerCallbackBase::releaseRequest(
std::move(req), eb);
}
return folly::makeSemiFuture();
}

return dispatchRequest(
protocol,
context,
eb,
executor,
apache::thrift::ServerRequestData{},
std::move(req),
std::move(ctxStack),
serviceName,
std::move(serializedRequest),
kind);
})
.via(executor_);
}

folly::SemiFuture<folly::Unit> PythonAsyncProcessor::dispatchRequest(
apache::thrift::protocol::PROTOCOL_TYPES protocol,
apache::thrift::Cpp2RequestContext* ctx,
folly::EventBase* eb,
folly::Executor::KeepAlive<> executor,
apache::thrift::ServerRequestData requestData,
apache::thrift::ResponseChannelRequest::UniquePtr req,
apache::thrift::ContextStack::UniquePtr ctxStack,
const char* serviceName,
apache::thrift::SerializedRequest serializedRequest,
apache::thrift::RpcKind kind) {
const char* methodName = ctx->getMethodName().c_str();
auto throw_wrapped =
protocol == apache::thrift::protocol::PROTOCOL_TYPES::T_BINARY_PROTOCOL
? &detail::throw_wrapped<
apache::thrift::BinaryProtocolReader,
apache::thrift::BinaryProtocolWriter>
: &detail::throw_wrapped<
apache::thrift::CompactProtocolReader,
apache::thrift::CompactProtocolWriter>;

if (kind == apache::thrift::RpcKind::SINGLE_REQUEST_NO_RESPONSE) {
return handlePythonServerCallbackOneway(
protocol,
ctx,
std::move(serializedRequest),
kind,
std::make_unique<apache::thrift::HandlerCallbackBase>(
std::move(req),
std::move(ctxStack),
serviceName,
methodName,
nullptr,
eb,
executor,
ctx,
nullptr,
nullptr,
requestData));
} else if (
kind == apache::thrift::RpcKind::SINGLE_REQUEST_STREAMING_RESPONSE) {
auto return_streaming =
protocol == apache::thrift::protocol::PROTOCOL_TYPES::T_BINARY_PROTOCOL
? &detail::return_streaming<
apache::thrift::BinaryProtocolReader,
apache::thrift::BinaryProtocolWriter>
: &detail::return_streaming<
apache::thrift::CompactProtocolReader,
apache::thrift::CompactProtocolWriter>;
return handlePythonServerCallbackStreaming(
protocol,
ctx,
std::move(serializedRequest),
kind,
std::make_unique<apache::thrift::HandlerCallback<
::apache::thrift::ResponseAndServerStream<
std::unique_ptr<::folly::IOBuf>,
std::unique_ptr<::folly::IOBuf>>>>(
std::move(req),
std::move(ctxStack),
serviceName,
methodName,
return_streaming,
throw_wrapped,
ctx->getProtoSeqId(),
eb,
executor,
ctx,
nullptr,
nullptr,
requestData));
} else {
auto return_serialized =
protocol == apache::thrift::protocol::PROTOCOL_TYPES::T_BINARY_PROTOCOL
? &detail::return_serialized<
apache::thrift::BinaryProtocolReader,
apache::thrift::BinaryProtocolWriter>
: &detail::return_serialized<
apache::thrift::CompactProtocolReader,
apache::thrift::CompactProtocolWriter>;

return handlePythonServerCallback(
protocol,
ctx,
std::move(serializedRequest),
kind,
std::make_unique<
apache::thrift::HandlerCallback<std::unique_ptr<::folly::IOBuf>>>(
std::move(req),
std::move(ctxStack),
serviceName,
methodName,
return_serialized,
throw_wrapped,
ctx->getProtoSeqId(),
eb,
executor,
ctx,
nullptr,
nullptr,
requestData));
}
}

} // namespace python
Expand Down
Loading

0 comments on commit 65946a2

Please sign in to comment.