Skip to content
This repository has been archived by the owner on Dec 1, 2022. It is now read-only.

Commit

Permalink
internal storage client needs to retry using a different port (#352)
Browse files Browse the repository at this point in the history
* internal has to retry using another port

* remove some debug log

* revert vscode format

* revert vscode format

* change LOG to VLOG

* add edges no need to offset leader port
  • Loading branch information
lionel.liu@vesoft.com authored Dec 17, 2020
1 parent bd95bd0 commit 51c0e32
Show file tree
Hide file tree
Showing 5 changed files with 47 additions and 45 deletions.
1 change: 0 additions & 1 deletion src/common/clients/storage/GraphStorageClient.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -154,7 +154,6 @@ GraphStorageClient::addEdges(GraphSpaceID space,
req.set_parts(std::move(c.second));
req.set_prop_names(std::move(propNames));
}

return collectResponse(
evb,
std::move(requests),
Expand Down
49 changes: 19 additions & 30 deletions src/common/clients/storage/InternalStorageClient.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -10,8 +10,6 @@
namespace nebula {
namespace storage {

constexpr int32_t kInternalPortOffset = -2;

template <typename T>
cpp2::ErrorCode extractErrorCode(T& tryResp) {
if (!tryResp.hasValue()) {
Expand All @@ -31,8 +29,9 @@ cpp2::ErrorCode extractErrorCode(T& tryResp) {
return cpp2::ErrorCode::E_UNKNOWN;
}

for (auto& partResult : stResp.value().get_result().get_failed_parts()) {
return partResult.code;
auto& failedPart = stResp.value().get_result().get_failed_parts();
for (auto& p : failedPart) {
return p.code;
}
return cpp2::ErrorCode::SUCCEEDED;
}
Expand All @@ -46,13 +45,13 @@ StatusOr<HostAddr> InternalStorageClient::getFuzzyLeader(GraphSpaceID spaceId,
return getLeader(stPartHosts.value());
}

folly::SemiFuture<StatusOr<cpp2::ExecResponse>> InternalStorageClient::forwardTransaction(
folly::SemiFuture<cpp2::ErrorCode> InternalStorageClient::forwardTransaction(
int64_t txnId,
GraphSpaceID spaceId,
PartitionID partId,
std::string&& data,
folly::EventBase* evb) {
auto c = folly::makePromiseContract<StatusOr<cpp2::ExecResponse>>();
auto c = folly::makePromiseContract<cpp2::ErrorCode>();
forwardTransactionImpl(txnId,
spaceId, partId, std::move(data), std::move(c.first), evb);
return std::move(c.second);
Expand All @@ -62,11 +61,12 @@ void InternalStorageClient::forwardTransactionImpl(int64_t txnId,
GraphSpaceID spaceId,
PartitionID partId,
std::string&& data,
folly::Promise<StatusOr<cpp2::ExecResponse>> p,
folly::Promise<cpp2::ErrorCode> p,
folly::EventBase* evb) {
VLOG(1) << "forwardTransactionImpl txnId=" << txnId;
auto statusOrLeader = getFuzzyLeader(spaceId, partId);
if (!statusOrLeader.ok()) {
p.setValue(statusOrLeader.status());
p.setValue(cpp2::ErrorCode::E_SPACE_NOT_FOUND);
return;
}
HostAddr& dest = statusOrLeader.value();
Expand All @@ -84,28 +84,17 @@ void InternalStorageClient::forwardTransactionImpl(int64_t txnId,
std::make_pair(dest, interReq),
[](cpp2::InternalStorageServiceAsyncClient* client, const cpp2::InternalTxnRequest& r) {
return client->future_forwardTransaction(r);
})
.thenValue([=, p = std::move(p)](auto&& stResp) mutable {
if (stResp.ok()) {
auto hasLeaderChange = false;
auto& failedParts = stResp.value().get_result().get_failed_parts();
for (auto& part : failedParts) {
if (part.__isset.leader) {
hasLeaderChange = true;
static HostAddr emptyHost;
if (part.leader != emptyHost) {
updateLeader(spaceId, partId, part.leader);
} else {
LOG(ERROR) << "processor report leader chanage, but not set leader";
}
}
}
if (hasLeaderChange) {
return forwardTransactionImpl(
txnId, spaceId, partId, std::move(data), std::move(p), evb);
}
},
kInternalPortOffset)
.thenTry([=, p = std::move(p)](auto&& t) mutable {
auto code = extractErrorCode(t);
if (code == cpp2::ErrorCode::E_LEADER_CHANGED) {
std::this_thread::sleep_for(std::chrono::milliseconds(500));
return forwardTransactionImpl(
txnId, spaceId, partId, std::move(data), std::move(p), evb);
} else {
p.setValue(code);
}
p.setValue(stResp);
});
}

Expand Down Expand Up @@ -162,7 +151,7 @@ void InternalStorageClient::getValueImpl(GraphSpaceID spaceId,
}
};

getResponse(evb, std::move(req), remote).thenTry(std::move(cb));
getResponse(evb, std::move(req), remote, kInternalPortOffset).thenTry(std::move(cb));
}

} // namespace storage
Expand Down
15 changes: 7 additions & 8 deletions src/common/clients/storage/InternalStorageClient.h
Original file line number Diff line number Diff line change
Expand Up @@ -30,14 +30,13 @@ class InternalStorageClient : public StorageClientBase<cpp2::InternalStorageServ
InternalStorageClient(std::shared_ptr<folly::IOThreadPoolExecutor> ioThreadPool,
meta::MetaClient* metaClient)
: Parent(ioThreadPool, metaClient) {}
virtual ~InternalStorageClient() {}
virtual ~InternalStorageClient() = default;

folly::SemiFuture<StatusOr<cpp2::ExecResponse>> forwardTransaction(
int64_t txnId,
GraphSpaceID spaceId,
PartitionID partId,
std::string&& data,
folly::EventBase* evb = nullptr);
folly::SemiFuture<cpp2::ErrorCode> forwardTransaction(int64_t txnId,
GraphSpaceID spaceId,
PartitionID partId,
std::string&& data,
folly::EventBase* evb = nullptr);

folly::SemiFuture<ErrOrVal> getValue(size_t vIdLen,
GraphSpaceID spaceId,
Expand All @@ -51,7 +50,7 @@ class InternalStorageClient : public StorageClientBase<cpp2::InternalStorageServ
GraphSpaceID spaceId,
PartitionID partId,
std::string&& data,
folly::Promise<StatusOr<cpp2::ExecResponse>> p,
folly::Promise<cpp2::ErrorCode> p,
folly::EventBase* evb);

void getValueImpl(GraphSpaceID spaceId,
Expand Down
4 changes: 4 additions & 0 deletions src/common/clients/storage/StorageClientBase.h
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
DECLARE_int32(storage_client_timeout_ms);
DECLARE_uint32(storage_client_retry_interval_ms);

constexpr int32_t kInternalPortOffset = -2;

namespace nebula {
namespace storage {
Expand Down Expand Up @@ -122,6 +123,7 @@ class StorageClientBase {
folly::EventBase* evb,
std::unordered_map<HostAddr, Request> requests,
RemoteFunc&& remoteFunc,
int32_t portOffsetIfRetry = 0,
std::size_t retry = 0,
std::size_t retryLimit = 3);

Expand All @@ -136,6 +138,7 @@ class StorageClientBase {
folly::EventBase* evb,
std::pair<HostAddr, Request>&& request,
RemoteFunc&& remoteFunc,
int32_t leaderPortOffset = 0,
folly::Promise<StatusOr<Response>> pro = folly::Promise<StatusOr<Response>>(),
std::size_t retry = 0,
std::size_t retryLimit = 3);
Expand All @@ -151,6 +154,7 @@ class StorageClientBase {
folly::EventBase* evb,
std::pair<HostAddr, Request> request,
RemoteFunc remoteFunc,
int32_t leaderPortOffset,
folly::Promise<StatusOr<Response>> pro,
std::size_t retry,
std::size_t retryLimit);
Expand Down
23 changes: 17 additions & 6 deletions src/common/clients/storage/StorageClientBase.inl
Original file line number Diff line number Diff line change
Expand Up @@ -176,6 +176,7 @@ StorageClientBase<ClientType>::collectResponse(
folly::EventBase* evb,
std::unordered_map<HostAddr, Request> requests,
RemoteFunc&& remoteFunc,
int32_t portOffsetIfRetry,
std::size_t retry,
std::size_t retryLimit) {
auto context = std::make_shared<ResponseContext<Request, RemoteFunc, Response>>(
Expand All @@ -200,7 +201,8 @@ StorageClientBase<ClientType>::collectResponse(
res,
remoteFunc = std::move(remoteFunc),
retry,
retryLimit] () mutable {
retryLimit,
portOffsetIfRetry] () mutable {
auto client = clientsMan_->client(host,
evb,
false,
Expand All @@ -219,7 +221,8 @@ StorageClientBase<ClientType>::collectResponse(
evb,
remoteFunc = std::move(remoteFunc),
retry,
retryLimit] (folly::Try<Response>&& val) {
retryLimit,
portOffsetIfRetry] (folly::Try<Response>&& val) {
auto& r = context->findRequest(host);
if (val.hasException()) {
LOG(ERROR) << "Request to " << host
Expand All @@ -246,11 +249,13 @@ StorageClientBase<ClientType>::collectResponse(
if (retry < retryLimit && isValidHostPtr(leader)) {
evb->runAfterDelay([this, evb, leader = *leader, r = std::move(r),
remoteFunc = std::move(remoteFunc), context,
start, retry, retryLimit] () {
start, retry, retryLimit,
portOffsetIfRetry] (){
getResponse(evb,
std::pair<HostAddr, Request>(leader,
std::move(r)),
std::move(remoteFunc),
portOffsetIfRetry,
folly::Promise<StatusOr<Response>>(),
retry + 1,
retryLimit)
Expand Down Expand Up @@ -325,26 +330,28 @@ folly::Future<StatusOr<Response>> StorageClientBase<ClientType>::getResponse(
folly::EventBase* evb,
std::pair<HostAddr, Request>&& request,
RemoteFunc&& remoteFunc,
int32_t leaderPortOffset,
folly::Promise<StatusOr<Response>> pro,
std::size_t retry,
std::size_t retryLimit) {
auto f = pro.getFuture();
getResponseImpl(evb,
std::forward<decltype(request)>(request),
std::forward<RemoteFunc>(remoteFunc),
leaderPortOffset,
std::move(pro),
retry,
retryLimit);
return f;
}


template<typename ClientType>
template<class Request, class RemoteFunc, class Response>
void StorageClientBase<ClientType>::getResponseImpl(
folly::EventBase* evb,
std::pair<HostAddr, Request> request,
RemoteFunc remoteFunc,
int32_t leaderPortOffset,
folly::Promise<StatusOr<Response>> pro,
std::size_t retry,
std::size_t retryLimit) {
Expand All @@ -353,7 +360,7 @@ void StorageClientBase<ClientType>::getResponseImpl(
evb = ioThreadPool_->getEventBase();
}
folly::via(evb, [evb, request = std::move(request), remoteFunc = std::move(remoteFunc),
pro = std::move(pro), retry, retryLimit, this] () mutable {
leaderPortOffset, pro = std::move(pro), retry, retryLimit, this] () mutable {
auto host = request.first;
auto client = clientsMan_->client(host, evb, false, FLAGS_storage_client_timeout_ms);
auto spaceId = request.second.get_space_id();
Expand All @@ -368,6 +375,7 @@ void StorageClientBase<ClientType>::getResponseImpl(
evb,
retry,
retryLimit,
leaderPortOffset,
this] (folly::Try<Response>&& t) mutable {
// exception occurred during RPC
if (t.hasException()) {
Expand Down Expand Up @@ -395,11 +403,14 @@ void StorageClientBase<ClientType>::getResponseImpl(
evb->runAfterDelay([this, evb, leader = *leader,
req = std::move(request.second),
remoteFunc = std::move(remoteFunc), p = std::move(p),
leaderPortOffset,
retry, retryLimit] () mutable {
leader.port += leaderPortOffset;
getResponseImpl(evb,
std::pair<HostAddr, Request>(std::move(leader),
std::move(req)),
std::move(req)),
std::move(remoteFunc),
leaderPortOffset,
std::move(p),
retry + 1,
retryLimit);
Expand Down

0 comments on commit 51c0e32

Please sign in to comment.