diff --git a/src/app_demo/should_be_generated/domain/IncreaseCommand.cpp b/src/app_demo/should_be_generated/domain/IncreaseCommand.cpp index a1a848c..81c1a7c 100644 --- a/src/app_demo/should_be_generated/domain/IncreaseCommand.cpp +++ b/src/app_demo/should_be_generated/domain/IncreaseCommand.cpp @@ -51,9 +51,12 @@ void IncreaseCommand::onPersistFailed( call->mMeta = std::make_shared(reserved.value(), callData->getContext()); auto request = std::make_shared(mRequest); - gringofts::Singleton::getInstance().forwardRequest( - request, &protos::DemoService::Stub::PrepareAsyncExecute, call); - return; + if (gringofts::Singleton::getInstance().forwardRequest( + request, &protos::DemoService::Stub::PrepareAsyncExecute, call)) { + return; + } else { + delete call; + } } callData->fillResultAndReply(code, errorMessage, reserved); } diff --git a/src/infra/forward/ForwardClient.h b/src/infra/forward/ForwardClient.h index c5c4ea2..68abf6e 100644 --- a/src/infra/forward/ForwardClient.h +++ b/src/infra/forward/ForwardClient.h @@ -66,7 +66,11 @@ class ForwardClientBase { } template - void forwardRequest(std::shared_ptr request, RpcFuncType rpcFunc, CallType *call) { + bool forwardRequest(std::shared_ptr request, RpcFuncType rpcFunc, CallType *call) { + if (mStub == nullptr) { + SPDLOG_WARN("ForwardClient for {} is nullptr", mPeerId); + return false; + } call->mForwardRquestTime = TimeUtil::currentTimeInNanos(); if (call->mMeta->mServerContext != nullptr) { call->mContext.set_deadline(call->mMeta->mServerContext->deadline()); @@ -84,6 +88,7 @@ class ForwardClientBase { call->mResponseReader->Finish(&call->mResponse, &call->mStatus, reinterpret_cast(call)); + return true; } private: diff --git a/src/infra/forward/ForwardCore.h b/src/infra/forward/ForwardCore.h index e8a3526..93ba187 100644 --- a/src/infra/forward/ForwardCore.h +++ b/src/infra/forward/ForwardCore.h @@ -44,20 +44,31 @@ class ForwardCore { dnsResolver = std::make_shared(); } initClients(reader, dnsResolver); - mInitialized = true; - } - - bool isInitialized() const { - return mInitialized; } template - void forwardRequest(std::shared_ptr request, RpcFuncType rpcFunc, CallType *call) { + bool forwardRequest(std::shared_ptr request, RpcFuncType rpcFunc, CallType *call) { + if (call == nullptr || call->mMeta == nullptr) { + SPDLOG_ERROR("call or call->mMeta is nullptr"); + return false; + } + if (rpcFunc == nullptr || request == nullptr) { + SPDLOG_ERROR("rpcFunc or request is nullptr"); + return false; + } + if (mPeers.find(call->mMeta->mLeaderId) == mPeers.end()) { + SPDLOG_ERROR("mLeaderID not legal", call->mMeta->mLeaderId); + return false; + } auto &peer = mPeers[call->mMeta->mLeaderId]; - auto index = peer.mPointer.fetch_add(1) % mConcurrency; - auto &client = mClients[peer.mId * mConcurrency + index]; - client->forwardRequest(request, rpcFunc, call); + auto clientIndex = (peer.mId * mConcurrency) + (peer.mPointer.fetch_add(1) % mConcurrency); + auto client = mClients.find(clientIndex); + if (client == mClients.end() || client->second == nullptr) { + SPDLOG_ERROR("ForwardClient for peer {} not found", peer.mId); + return false; + } SPDLOG_DEBUG("Forward Request to {}", peer.mId); + return client->second->forwardRequest(request, rpcFunc, call); } private: @@ -95,7 +106,6 @@ class ForwardCore { const uint64_t mConcurrency = 3; std::map mPeers; uint64_t mSelfId; - bool mInitialized = false; }; } /// namespace forward