Skip to content

Commit

Permalink
Merge pull request #117 from DongbinCheng/ISSUE-116
Browse files Browse the repository at this point in the history
ISSUE-116: Gringofts support request forwarding from follower to leader
  • Loading branch information
DongbinCheng authored Mar 5, 2024
2 parents 27f060e + 675a354 commit 1d91f03
Show file tree
Hide file tree
Showing 12 changed files with 424 additions and 0 deletions.
8 changes: 8 additions & 0 deletions src/app_demo/should_be_generated/app/App.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,8 @@ App::App(const char *configPath) : mIsShutdown(false) {

initCommandEventStore(reader);

initForwarder(reader);

std::string snapshotDir = reader.Get("snapshot", "dir", "UNKNOWN");
assert(snapshotDir != "UNKNOWN");

Expand Down Expand Up @@ -193,6 +195,12 @@ void App::initCommandEventStore(const INIReader &reader) {
}
}

void App::initForwarder(const INIReader &reader) {
auto myNodeId = gringofts::app::AppInfo::getMyNodeId();
auto myClusterInfo = gringofts::app::AppInfo::getMyClusterInfo();
gringofts::Singleton<ExecuteForwardCore>::getInstance().init(reader, myNodeId, myClusterInfo);
}

void App::startRequestReceiver() {
mRequestReceiver->start();
}
Expand Down
2 changes: 2 additions & 0 deletions src/app_demo/should_be_generated/app/App.h
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,8 @@ class App final {

void initCommandEventStore(const INIReader &reader);

void initForwarder(const INIReader &reader);

void startRequestReceiver();

void startNetAdminServer();
Expand Down
49 changes: 49 additions & 0 deletions src/app_demo/should_be_generated/app/RequestForward.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
/************************************************************************
Copyright 2019-2020 eBay Inc.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
https://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
**************************************************************************/
#ifndef SRC_APP_DEMO_SHOULD_BE_GENERATED_APP_REQUESTFORWARD_H_
#define SRC_APP_DEMO_SHOULD_BE_GENERATED_APP_REQUESTFORWARD_H_
#include "../../../infra/forward/ForwardCore.h"
#include "../../generated/grpc/demo.grpc.pb.h"
#include "../../../app_util/AppInfo.h"
#include <spdlog/spdlog.h>

namespace gringofts::demo {

struct ExecuteForwardCall :
public gringofts::forward::AsyncClientCall<protos::IncreaseResponse> {
gringofts::RequestHandle *mClientHandle;

explicit ExecuteForwardCall(gringofts::RequestHandle *clientHandle) :
mClientHandle(clientHandle) {}

std::string toString() const override {
return "ExecuteForwardCall";
}

void handleResponse() override {
if (!mStatus.ok()) {
// forward failed, follower set code to 301
mResponse.set_code(301);
mResponse.set_message("Not a leader any longer");
}
if (mResponse.reserved().empty()) {
mResponse.set_reserved(std::to_string(mMeta->mLeaderId));
}
mClientHandle->forwardResponseReply(&mResponse);
}
};

typedef gringofts::forward::ForwardCore<protos::DemoService::Stub> ExecuteForwardCore;
} // namespace gringofts::demo
#endif // SRC_APP_DEMO_SHOULD_BE_GENERATED_APP_REQUESTFORWARD_H_
9 changes: 9 additions & 0 deletions src/app_demo/should_be_generated/domain/IncreaseCommand.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,15 @@ void IncreaseCommand::onPersistFailed(
SPDLOG_WARN("This command does not have request attached.");
return;
}
if (reserved && code == 301) {
auto* call = new ExecuteForwardCall(callData);
call->mMeta = std::make_shared<gringofts::forward::ForwardMetaBase>(reserved.value(), callData->getContext());
auto request = std::make_shared<protos::IncreaseRequest>(mRequest);

gringofts::Singleton<ExecuteForwardCore>::getInstance().forwardRequest(
request, &protos::DemoService::Stub::PrepareAsyncExecute, call);
return;
}
callData->fillResultAndReply(code, errorMessage, reserved);
}

Expand Down
1 change: 1 addition & 0 deletions src/app_demo/should_be_generated/domain/IncreaseCommand.h
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ limitations under the License.
#include "../../../infra/es/Command.h"
#include "../../../infra/es/ProcessCommandStateMachine.h"
#include "../../generated/grpc/demo.pb.h"
#include "../app/RequestForward.h"

namespace gringofts {
namespace demo {
Expand Down
10 changes: 10 additions & 0 deletions src/app_util/RequestCallData.h
Original file line number Diff line number Diff line change
Expand Up @@ -142,6 +142,16 @@ class RequestCallData final : public RequestHandle {
mResponder.Finish(mResponse, s, this);
}

void forwardResponseReply(void *response) {
mResponse = std::move(*static_cast<Response *>(response));
mStatus = FINISH;
mResponder.Finish(mResponse, grpc::Status::OK, this);
}

grpc::ServerContext *getContext() {
return &mContext;
}

void failOver() override {
SPDLOG_WARN("Cannot proceed as callData is no longer valid probably because client has cancelled the request.");
new RequestCallData(mService, mCompletionQueue, mCommandQueue, mBlackList);
Expand Down
56 changes: 56 additions & 0 deletions src/infra/forward/ForwardBase.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
/************************************************************************
Copyright 2019-2020 eBay Inc.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
https://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
**************************************************************************/

#pragma once

#include <string>
#include <optional>
#include <shared_mutex>
#include <thread>
#include <vector>
#include <list>

#include <grpcpp/server.h>
#include <grpcpp/server_builder.h>
#include <spdlog/spdlog.h>
#include "../common_types.h"
#include "../util/TimeUtil.h"
#include "../grpc/RequestHandle.h"

namespace gringofts {
namespace forward {

struct ForwardMetaBase {
uint64_t mLeaderId;
grpc::ServerContext* mServerContext;

explicit ForwardMetaBase(uint64_t leaderId, grpc::ServerContext* serverContext = nullptr) :
mLeaderId(leaderId), mServerContext(serverContext) {}
};

struct AsyncClientCallBase {
virtual ~AsyncClientCallBase() = default;
grpc::ClientContext mContext;
grpc::Status mStatus;
std::shared_ptr<ForwardMetaBase> mMeta;

uint64_t mForwardRquestTime = 0;
uint64_t mGotResponseTime = 0;
uint64_t mReplyResponseTime = 0;

virtual std::string toString() const = 0;
virtual void handleResponse() = 0;
};
} /// namespace forward
} /// namespace gringofts
179 changes: 179 additions & 0 deletions src/infra/forward/ForwardClient.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,179 @@
/************************************************************************
Copyright 2019-2020 eBay Inc.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
https://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
**************************************************************************/

#pragma once

#include "../util/DNSResolver.h"
#include "../util/TlsUtil.h"
#include "ForwardBase.h"

namespace gringofts {
namespace forward {

template<typename ResponseType>
struct AsyncClientCall : public AsyncClientCallBase {
ResponseType mResponse;
std::unique_ptr<grpc::ClientAsyncResponseReader<ResponseType>> mResponseReader;
};

template<typename StubType>
class ForwardClientBase {
public:
ForwardClientBase(const std::string &peerHostname,
std::optional<TlsConf> tlsConfOpt,
std::shared_ptr<DNSResolver> dnsResolver,
uint64_t peerId, uint64_t clientId):
mPeerAddress(peerHostname),
mTLSConfOpt(tlsConfOpt),
mDNSResolver(dnsResolver),
mPeerId(peerId),
mClientId(clientId),
mGaugeReplyQueueSize(gringofts::getGauge("forward_reply_queue_size", {{"clientId", std::to_string(clientId)}})) {
refreshChannel();
mClientLoop = std::thread(&ForwardClientBase::clientLoopMain, this);
mReplyLoop = std::thread(&ForwardClientBase::replyLoopMain, this);
}
~ForwardClientBase() {
mRunning = false;

/// shut down CQ
mCompletionQueue.Shutdown();

/// join event loop.
if (mClientLoop.joinable()) {
mClientLoop.join();
}

if (mReplyLoop.joinable()) {
mReplyLoop.join();
}

/// drain completion queue.
void *tag;
bool ok;
while (mCompletionQueue.Next(&tag, &ok)) {}
}

template<typename RequestType, typename RpcFuncType, typename CallType>
void forwardRequest(std::shared_ptr<RequestType> request, RpcFuncType rpcFunc, CallType *call) {
call->mForwardRquestTime = TimeUtil::currentTimeInNanos();
if (call->mMeta->mServerContext != nullptr) {
call->mContext.set_deadline(call->mMeta->mServerContext->deadline());
} else {
std::chrono::time_point deadline = std::chrono::system_clock::now() + std::chrono::seconds(5);
call->mContext.set_deadline(deadline);
}

std::shared_lock<std::shared_mutex> lock(mMutex);
call->mResponseReader = (mStub.get()->*rpcFunc)(
&call->mContext,
*request.get(),
&mCompletionQueue);
call->mResponseReader->StartCall();
call->mResponseReader->Finish(&call->mResponse,
&call->mStatus,
reinterpret_cast<void *>(call));
}

private:
void refreshChannel() {
grpc::ChannelArguments chArgs;
chArgs.SetMaxReceiveMessageSize(INT_MAX);
chArgs.SetInt("grpc.testing.fixed_reconnect_backoff_ms", 100);
chArgs.SetString("key_" + std::to_string(mClientId), "value_" + std::to_string(mClientId));
auto newResolvedAddress = mDNSResolver->resolve(mPeerAddress);
if (newResolvedAddress != mResolvedPeerAddress) {
std::unique_lock<std::shared_mutex> lock(mMutex);
if (newResolvedAddress != mResolvedPeerAddress) {
SPDLOG_INFO("refreshing channel, addr {}, new resolved addr {}, old resolved addr {}",
mPeerAddress, newResolvedAddress, mResolvedPeerAddress);
auto channel = grpc::CreateCustomChannel(
newResolvedAddress, TlsUtil::buildChannelCredentials(mTLSConfOpt), chArgs);
mStub = std::make_unique<StubType>(channel);
mResolvedPeerAddress = newResolvedAddress;
}
}
}

void clientLoopMain() {
auto peerThreadName = std::string("ForwardClient") + std::to_string(mPeerId);
pthread_setname_np(pthread_self(), peerThreadName.c_str());

void *tag; /// The tag is the memory location of the call object
bool ok = false;

/// Block until the next result is available in the completion queue.
while (mCompletionQueue.Next(&tag, &ok)) {
if (!mRunning) {
SPDLOG_INFO("Client loop quit.");
return;
}

auto *call = static_cast<AsyncClientCallBase *>(tag);
GPR_ASSERT(ok);

if (!call->mStatus.ok()) {
SPDLOG_WARN("{} failed., gRpc error_code: {}, error_message: {}, error_details: {}",
call->toString(),
call->mStatus.error_code(),
call->mStatus.error_message(),
call->mStatus.error_details());

/// collect gRpc error code metrics
getCounter("forward_error_counter", {{"error_code", std::to_string(call->mStatus.error_code())}}).increase();
refreshChannel();
}
call->mGotResponseTime = TimeUtil::currentTimeInNanos();
mReplyQueue.enqueue(call);
mGaugeReplyQueueSize.set(mReplyQueue.size());
}
}

void replyLoopMain() {
auto peerThreadName = std::string("ForwardReply") + std::to_string(mPeerId);
pthread_setname_np(pthread_self(), peerThreadName.c_str());
while (mRunning) {
auto call = mReplyQueue.dequeue();
mGaugeReplyQueueSize.set(mReplyQueue.size());
call->mReplyResponseTime = TimeUtil::currentTimeInNanos();
call->handleResponse();
SPDLOG_DEBUG("ForwardLatency,{},{}",
call->mGotResponseTime - call->mForwardRquestTime,
call->mReplyResponseTime - call->mGotResponseTime);
delete call;
}
}

private:
std::unique_ptr<StubType> mStub;
std::shared_mutex mMutex; /// the lock to guarantee thread-safe access of mStub
grpc::CompletionQueue mCompletionQueue;
std::string mPeerAddress;
std::string mResolvedPeerAddress;
std::optional<TlsConf> mTLSConfOpt;
std::shared_ptr<DNSResolver> mDNSResolver;
uint64_t mPeerId = 0;

/// flag that notify thread to quit
std::atomic<bool> mRunning = true;
std::thread mClientLoop;
std::thread mReplyLoop;
BlockingQueue<AsyncClientCallBase*> mReplyQueue;
uint64_t mClientId;

mutable santiago::MetricsCenter::GaugeType mGaugeReplyQueueSize;
};

} /// namespace forward
} /// namespace gringofts
Loading

0 comments on commit 1d91f03

Please sign in to comment.