Skip to content

Commit

Permalink
Merge pull request #128 from DongbinCheng/ISSUE-127
Browse files Browse the repository at this point in the history
ISSUE-127: add error handling for request forwarding
  • Loading branch information
DongbinCheng authored Mar 21, 2024
2 parents 33930ca + 66896f9 commit 7d1c44c
Show file tree
Hide file tree
Showing 3 changed files with 32 additions and 14 deletions.
9 changes: 6 additions & 3 deletions src/app_demo/should_be_generated/domain/IncreaseCommand.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -51,9 +51,12 @@ void IncreaseCommand::onPersistFailed(
call->mMeta = std::make_shared<gringofts::forward::ForwardMetaBase>(reserved.value(), callData->getContext());
auto request = std::make_shared<protos::IncreaseRequest>(mRequest);

gringofts::Singleton<ExecuteForwardCore>::getInstance().forwardRequest(
request, &protos::DemoService::Stub::PrepareAsyncExecute, call);
return;
if (gringofts::Singleton<ExecuteForwardCore>::getInstance().forwardRequest(
request, &protos::DemoService::Stub::PrepareAsyncExecute, call)) {
return;
} else {
delete call;
}
}
callData->fillResultAndReply(code, errorMessage, reserved);
}
Expand Down
7 changes: 6 additions & 1 deletion src/infra/forward/ForwardClient.h
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,11 @@ class ForwardClientBase {
}

template<typename RequestType, typename RpcFuncType, typename CallType>
void forwardRequest(std::shared_ptr<RequestType> request, RpcFuncType rpcFunc, CallType *call) {
bool forwardRequest(std::shared_ptr<RequestType> request, RpcFuncType rpcFunc, CallType *call) {
if (mStub == nullptr) {
SPDLOG_WARN("ForwardClient for {} is nullptr", mPeerId);
return false;
}
call->mForwardRquestTime = TimeUtil::currentTimeInNanos();
if (call->mMeta->mServerContext != nullptr) {
call->mContext.set_deadline(call->mMeta->mServerContext->deadline());
Expand All @@ -84,6 +88,7 @@ class ForwardClientBase {
call->mResponseReader->Finish(&call->mResponse,
&call->mStatus,
reinterpret_cast<void *>(call));
return true;
}

private:
Expand Down
30 changes: 20 additions & 10 deletions src/infra/forward/ForwardCore.h
Original file line number Diff line number Diff line change
Expand Up @@ -44,20 +44,31 @@ class ForwardCore {
dnsResolver = std::make_shared<DNSResolver>();
}
initClients(reader, dnsResolver);
mInitialized = true;
}

bool isInitialized() const {
return mInitialized;
}

template<typename RequestType, typename RpcFuncType, typename CallType>
void forwardRequest(std::shared_ptr<RequestType> request, RpcFuncType rpcFunc, CallType *call) {
bool forwardRequest(std::shared_ptr<RequestType> request, RpcFuncType rpcFunc, CallType *call) {
if (call == nullptr || call->mMeta == nullptr) {
SPDLOG_ERROR("call or call->mMeta is nullptr");
return false;
}
if (rpcFunc == nullptr || request == nullptr) {
SPDLOG_ERROR("rpcFunc or request is nullptr");
return false;
}
if (mPeers.find(call->mMeta->mLeaderId) == mPeers.end()) {
SPDLOG_ERROR("mLeaderID not legal", call->mMeta->mLeaderId);
return false;
}
auto &peer = mPeers[call->mMeta->mLeaderId];
auto index = peer.mPointer.fetch_add(1) % mConcurrency;
auto &client = mClients[peer.mId * mConcurrency + index];
client->forwardRequest(request, rpcFunc, call);
auto clientIndex = (peer.mId * mConcurrency) + (peer.mPointer.fetch_add(1) % mConcurrency);
auto client = mClients.find(clientIndex);
if (client == mClients.end() || client->second == nullptr) {
SPDLOG_ERROR("ForwardClient for peer {} not found", peer.mId);
return false;
}
SPDLOG_DEBUG("Forward Request to {}", peer.mId);
return client->second->forwardRequest(request, rpcFunc, call);
}

private:
Expand Down Expand Up @@ -95,7 +106,6 @@ class ForwardCore {
const uint64_t mConcurrency = 3;
std::map<uint64_t, Peer> mPeers;
uint64_t mSelfId;
bool mInitialized = false;
};

} /// namespace forward
Expand Down

0 comments on commit 7d1c44c

Please sign in to comment.