Skip to content

Commit

Permalink
Merge pull request #134 from DongbinCheng/ISSUE-133
Browse files Browse the repository at this point in the history
ISSUE-133: follower forward add cluster verification
  • Loading branch information
DongbinCheng authored Dec 23, 2024
2 parents af067b7 + 7e458fa commit af2597e
Show file tree
Hide file tree
Showing 5 changed files with 42 additions and 4 deletions.
4 changes: 1 addition & 3 deletions src/app_demo/should_be_generated/app/RequestForward.h
Original file line number Diff line number Diff line change
Expand Up @@ -37,9 +37,7 @@ struct ExecuteForwardCall :
mResponse.set_code(301);
mResponse.set_message("Not a leader any longer");
}
if (mResponse.reserved().empty()) {
mResponse.set_reserved(std::to_string(mMeta->mLeaderId));
}
mResponse.set_reserved(std::to_string(mMeta->mLeaderId));
mClientHandle->forwardResponseReply(&mResponse);
}
};
Expand Down
24 changes: 24 additions & 0 deletions src/app_util/RequestCallData.h
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,13 @@ class RequestCallData final : public RequestHandle {
// the one for this CallData. The instance will deallocate itself as
// part of its FINISH state.
new RequestCallData(mService, mCompletionQueue, mCommandQueue, mBlackList);

if (!verifyClusterId()) {
mStatus = FINISH;
mResponder.Finish(mResponse, grpc::Status(grpc::INVALID_ARGUMENT, "forward to wrong cluster"), this);
return;
}

// check black list
if (mBlackList != nullptr && mBlackList->inBlackList(mRequest)) {
fillResultAndReply(201, "Duplicated request", std::nullopt);
Expand Down Expand Up @@ -158,6 +165,23 @@ class RequestCallData final : public RequestHandle {
delete this;
}

// check grpc meta data from client before processing the request
bool verifyClusterId() {
std::multimap<grpc::string_ref, grpc::string_ref> clientMeta = mContext.client_metadata();
auto reqIter = clientMeta.find("req_source");
if (reqIter != clientMeta.end() && (reqIter->second).compare("forward") == 0) {
auto clusterIter = clientMeta.find("cluster_id");
if (clusterIter != clientMeta.end()) {
auto myClusterId = std::to_string(app::AppInfo::groupId());
if ((clusterIter->second).compare(myClusterId.c_str()) != 0) {
SPDLOG_WARN("req clusterId does not match this clusterid {}, reject the request", myClusterId);
return false;
}
}
}
return true;
}

protected:
// The means of communication with the gRPC runtime for an asynchronous
// server.
Expand Down
6 changes: 6 additions & 0 deletions src/infra/forward/ForwardClient.h
Original file line number Diff line number Diff line change
Expand Up @@ -33,10 +33,12 @@ class ForwardClientBase {
ForwardClientBase(const std::string &peerHostname,
std::optional<TlsConf> tlsConfOpt,
std::shared_ptr<DNSResolver> dnsResolver,
ClusterId clusterId,
uint64_t peerId, uint64_t clientId):
mPeerAddress(peerHostname),
mTLSConfOpt(tlsConfOpt),
mDNSResolver(dnsResolver),
mClusterId(clusterId),
mPeerId(peerId),
mClientId(clientId),
mGaugeReplyQueueSize(gringofts::getGauge("forward_reply_queue_size", {{"clientId", std::to_string(clientId)}})) {
Expand Down Expand Up @@ -71,6 +73,9 @@ class ForwardClientBase {
SPDLOG_WARN("ForwardClient for {} is nullptr", mPeerId);
return false;
}
// use lowercase letters(a-z), digits(0-9), hyphen(-) and underscores(_) in header key
call->mContext.AddMetadata("req_source", "forward");
call->mContext.AddMetadata("cluster_id", std::to_string(mClusterId));
call->mForwardRquestTime = TimeUtil::currentTimeInNanos();
if (call->mMeta->mServerContext != nullptr) {
call->mContext.set_deadline(call->mMeta->mServerContext->deadline());
Expand Down Expand Up @@ -168,6 +173,7 @@ class ForwardClientBase {
std::string mResolvedPeerAddress;
std::optional<TlsConf> mTLSConfOpt;
std::shared_ptr<DNSResolver> mDNSResolver;
ClusterId mClusterId;
uint64_t mPeerId = 0;

/// flag that notify thread to quit
Expand Down
4 changes: 3 additions & 1 deletion src/infra/forward/ForwardCore.h
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,7 @@ class ForwardCore {

private:
void initClusterConf(const ClusterInfo &clusterInfo) {
mClusterId = clusterInfo.getClusterId();
auto nodes = clusterInfo.getAllNodeInfo();
for (auto &[nodeId, node] : nodes) {
std::string host = node.mHostName;
Expand All @@ -95,7 +96,7 @@ class ForwardCore {
for (int i = 0; i < mConcurrency; ++i) {
auto clientId = peerId * mConcurrency + i;
mClients[clientId] = std::make_unique<ForwardClientBase<StubType>>(
peer.mAddress, tlsConfOpt, dnsResolver, peerId, clientId);
peer.mAddress, tlsConfOpt, dnsResolver, mClusterId, peerId, clientId);
}
}
}
Expand All @@ -106,6 +107,7 @@ class ForwardCore {
const uint64_t mConcurrency = 3;
std::map<uint64_t, Peer> mPeers;
uint64_t mSelfId;
ClusterId mClusterId;
};

} /// namespace forward
Expand Down
8 changes: 8 additions & 0 deletions src/infra/util/ClusterInfo.h
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,14 @@ class ClusterInfo final {

std::string to_string() const;

void setClusterId(ClusterId clusterId) {
mClusterId = clusterId;
}

ClusterId getClusterId() const {
return mClusterId;
}

private:
/// example with two three-node clusters:
/// 0#1@node01.ebay.com:5245|50055|50056|5678|50065|61203,2@node02.ebay.com:5245|50055|50056|5678|50065|61203,
Expand Down

0 comments on commit af2597e

Please sign in to comment.