From 7e458fa7f0ce74a8cf29d4a6fe4972a1ef339b6b Mon Sep 17 00:00:00 2001 From: DongbinCheng Date: Wed, 11 Dec 2024 22:19:00 -0500 Subject: [PATCH] ISSUE-133: follower forward add cluster verification --- .../should_be_generated/app/RequestForward.h | 4 +--- src/app_util/RequestCallData.h | 24 +++++++++++++++++++ src/infra/forward/ForwardClient.h | 6 +++++ src/infra/forward/ForwardCore.h | 4 +++- src/infra/util/ClusterInfo.h | 8 +++++++ 5 files changed, 42 insertions(+), 4 deletions(-) diff --git a/src/app_demo/should_be_generated/app/RequestForward.h b/src/app_demo/should_be_generated/app/RequestForward.h index da928ca..1d47af0 100644 --- a/src/app_demo/should_be_generated/app/RequestForward.h +++ b/src/app_demo/should_be_generated/app/RequestForward.h @@ -37,9 +37,7 @@ struct ExecuteForwardCall : mResponse.set_code(301); mResponse.set_message("Not a leader any longer"); } - if (mResponse.reserved().empty()) { - mResponse.set_reserved(std::to_string(mMeta->mLeaderId)); - } + mResponse.set_reserved(std::to_string(mMeta->mLeaderId)); mClientHandle->forwardResponseReply(&mResponse); } }; diff --git a/src/app_util/RequestCallData.h b/src/app_util/RequestCallData.h index ea4eac8..2ba01b1 100644 --- a/src/app_util/RequestCallData.h +++ b/src/app_util/RequestCallData.h @@ -94,6 +94,13 @@ class RequestCallData final : public RequestHandle { // the one for this CallData. The instance will deallocate itself as // part of its FINISH state. new RequestCallData(mService, mCompletionQueue, mCommandQueue, mBlackList); + + if (!verifyClusterId()) { + mStatus = FINISH; + mResponder.Finish(mResponse, grpc::Status(grpc::INVALID_ARGUMENT, "forward to wrong cluster"), this); + return; + } + // check black list if (mBlackList != nullptr && mBlackList->inBlackList(mRequest)) { fillResultAndReply(201, "Duplicated request", std::nullopt); @@ -158,6 +165,23 @@ class RequestCallData final : public RequestHandle { delete this; } + // check grpc meta data from client before processing the request + bool verifyClusterId() { + std::multimap clientMeta = mContext.client_metadata(); + auto reqIter = clientMeta.find("req_source"); + if (reqIter != clientMeta.end() && (reqIter->second).compare("forward") == 0) { + auto clusterIter = clientMeta.find("cluster_id"); + if (clusterIter != clientMeta.end()) { + auto myClusterId = std::to_string(app::AppInfo::groupId()); + if ((clusterIter->second).compare(myClusterId.c_str()) != 0) { + SPDLOG_WARN("req clusterId does not match this clusterid {}, reject the request", myClusterId); + return false; + } + } + } + return true; + } + protected: // The means of communication with the gRPC runtime for an asynchronous // server. diff --git a/src/infra/forward/ForwardClient.h b/src/infra/forward/ForwardClient.h index 68abf6e..aa7355a 100644 --- a/src/infra/forward/ForwardClient.h +++ b/src/infra/forward/ForwardClient.h @@ -33,10 +33,12 @@ class ForwardClientBase { ForwardClientBase(const std::string &peerHostname, std::optional tlsConfOpt, std::shared_ptr dnsResolver, + ClusterId clusterId, uint64_t peerId, uint64_t clientId): mPeerAddress(peerHostname), mTLSConfOpt(tlsConfOpt), mDNSResolver(dnsResolver), + mClusterId(clusterId), mPeerId(peerId), mClientId(clientId), mGaugeReplyQueueSize(gringofts::getGauge("forward_reply_queue_size", {{"clientId", std::to_string(clientId)}})) { @@ -71,6 +73,9 @@ class ForwardClientBase { SPDLOG_WARN("ForwardClient for {} is nullptr", mPeerId); return false; } + // use lowercase letters(a-z), digits(0-9), hyphen(-) and underscores(_) in header key + call->mContext.AddMetadata("req_source", "forward"); + call->mContext.AddMetadata("cluster_id", std::to_string(mClusterId)); call->mForwardRquestTime = TimeUtil::currentTimeInNanos(); if (call->mMeta->mServerContext != nullptr) { call->mContext.set_deadline(call->mMeta->mServerContext->deadline()); @@ -168,6 +173,7 @@ class ForwardClientBase { std::string mResolvedPeerAddress; std::optional mTLSConfOpt; std::shared_ptr mDNSResolver; + ClusterId mClusterId; uint64_t mPeerId = 0; /// flag that notify thread to quit diff --git a/src/infra/forward/ForwardCore.h b/src/infra/forward/ForwardCore.h index 93ba187..058858b 100644 --- a/src/infra/forward/ForwardCore.h +++ b/src/infra/forward/ForwardCore.h @@ -73,6 +73,7 @@ class ForwardCore { private: void initClusterConf(const ClusterInfo &clusterInfo) { + mClusterId = clusterInfo.getClusterId(); auto nodes = clusterInfo.getAllNodeInfo(); for (auto &[nodeId, node] : nodes) { std::string host = node.mHostName; @@ -95,7 +96,7 @@ class ForwardCore { for (int i = 0; i < mConcurrency; ++i) { auto clientId = peerId * mConcurrency + i; mClients[clientId] = std::make_unique>( - peer.mAddress, tlsConfOpt, dnsResolver, peerId, clientId); + peer.mAddress, tlsConfOpt, dnsResolver, mClusterId, peerId, clientId); } } } @@ -106,6 +107,7 @@ class ForwardCore { const uint64_t mConcurrency = 3; std::map mPeers; uint64_t mSelfId; + ClusterId mClusterId; }; } /// namespace forward diff --git a/src/infra/util/ClusterInfo.h b/src/infra/util/ClusterInfo.h index e237305..869c31b 100644 --- a/src/infra/util/ClusterInfo.h +++ b/src/infra/util/ClusterInfo.h @@ -68,6 +68,14 @@ class ClusterInfo final { std::string to_string() const; + void setClusterId(ClusterId clusterId) { + mClusterId = clusterId; + } + + ClusterId getClusterId() const { + return mClusterId; + } + private: /// example with two three-node clusters: /// 0#1@node01.ebay.com:5245|50055|50056|5678|50065|61203,2@node02.ebay.com:5245|50055|50056|5678|50065|61203,