diff --git a/src/common/clients/storage/GraphStorageClient.cpp b/src/common/clients/storage/GraphStorageClient.cpp index 21f336764..907bfd38a 100644 --- a/src/common/clients/storage/GraphStorageClient.cpp +++ b/src/common/clients/storage/GraphStorageClient.cpp @@ -154,7 +154,6 @@ GraphStorageClient::addEdges(GraphSpaceID space, req.set_parts(std::move(c.second)); req.set_prop_names(std::move(propNames)); } - return collectResponse( evb, std::move(requests), diff --git a/src/common/clients/storage/InternalStorageClient.cpp b/src/common/clients/storage/InternalStorageClient.cpp index a8971ae28..becfe8a7f 100644 --- a/src/common/clients/storage/InternalStorageClient.cpp +++ b/src/common/clients/storage/InternalStorageClient.cpp @@ -10,8 +10,6 @@ namespace nebula { namespace storage { -constexpr int32_t kInternalPortOffset = -2; - template cpp2::ErrorCode extractErrorCode(T& tryResp) { if (!tryResp.hasValue()) { @@ -31,8 +29,9 @@ cpp2::ErrorCode extractErrorCode(T& tryResp) { return cpp2::ErrorCode::E_UNKNOWN; } - for (auto& partResult : stResp.value().get_result().get_failed_parts()) { - return partResult.code; + auto& failedPart = stResp.value().get_result().get_failed_parts(); + for (auto& p : failedPart) { + return p.code; } return cpp2::ErrorCode::SUCCEEDED; } @@ -46,13 +45,13 @@ StatusOr InternalStorageClient::getFuzzyLeader(GraphSpaceID spaceId, return getLeader(stPartHosts.value()); } -folly::SemiFuture> InternalStorageClient::forwardTransaction( +folly::SemiFuture InternalStorageClient::forwardTransaction( int64_t txnId, GraphSpaceID spaceId, PartitionID partId, std::string&& data, folly::EventBase* evb) { - auto c = folly::makePromiseContract>(); + auto c = folly::makePromiseContract(); forwardTransactionImpl(txnId, spaceId, partId, std::move(data), std::move(c.first), evb); return std::move(c.second); @@ -62,11 +61,12 @@ void InternalStorageClient::forwardTransactionImpl(int64_t txnId, GraphSpaceID spaceId, PartitionID partId, std::string&& data, - folly::Promise> p, + folly::Promise p, folly::EventBase* evb) { + VLOG(1) << "forwardTransactionImpl txnId=" << txnId; auto statusOrLeader = getFuzzyLeader(spaceId, partId); if (!statusOrLeader.ok()) { - p.setValue(statusOrLeader.status()); + p.setValue(cpp2::ErrorCode::E_SPACE_NOT_FOUND); return; } HostAddr& dest = statusOrLeader.value(); @@ -84,28 +84,17 @@ void InternalStorageClient::forwardTransactionImpl(int64_t txnId, std::make_pair(dest, interReq), [](cpp2::InternalStorageServiceAsyncClient* client, const cpp2::InternalTxnRequest& r) { return client->future_forwardTransaction(r); - }) - .thenValue([=, p = std::move(p)](auto&& stResp) mutable { - if (stResp.ok()) { - auto hasLeaderChange = false; - auto& failedParts = stResp.value().get_result().get_failed_parts(); - for (auto& part : failedParts) { - if (part.__isset.leader) { - hasLeaderChange = true; - static HostAddr emptyHost; - if (part.leader != emptyHost) { - updateLeader(spaceId, partId, part.leader); - } else { - LOG(ERROR) << "processor report leader chanage, but not set leader"; - } - } - } - if (hasLeaderChange) { - return forwardTransactionImpl( - txnId, spaceId, partId, std::move(data), std::move(p), evb); - } + }, + kInternalPortOffset) + .thenTry([=, p = std::move(p)](auto&& t) mutable { + auto code = extractErrorCode(t); + if (code == cpp2::ErrorCode::E_LEADER_CHANGED) { + std::this_thread::sleep_for(std::chrono::milliseconds(500)); + return forwardTransactionImpl( + txnId, spaceId, partId, std::move(data), std::move(p), evb); + } else { + p.setValue(code); } - p.setValue(stResp); }); } @@ -162,7 +151,7 @@ void InternalStorageClient::getValueImpl(GraphSpaceID spaceId, } }; - getResponse(evb, std::move(req), remote).thenTry(std::move(cb)); + getResponse(evb, std::move(req), remote, kInternalPortOffset).thenTry(std::move(cb)); } } // namespace storage diff --git a/src/common/clients/storage/InternalStorageClient.h b/src/common/clients/storage/InternalStorageClient.h index 23f585674..8a51d2bf7 100644 --- a/src/common/clients/storage/InternalStorageClient.h +++ b/src/common/clients/storage/InternalStorageClient.h @@ -30,14 +30,13 @@ class InternalStorageClient : public StorageClientBase ioThreadPool, meta::MetaClient* metaClient) : Parent(ioThreadPool, metaClient) {} - virtual ~InternalStorageClient() {} + virtual ~InternalStorageClient() = default; - folly::SemiFuture> forwardTransaction( - int64_t txnId, - GraphSpaceID spaceId, - PartitionID partId, - std::string&& data, - folly::EventBase* evb = nullptr); + folly::SemiFuture forwardTransaction(int64_t txnId, + GraphSpaceID spaceId, + PartitionID partId, + std::string&& data, + folly::EventBase* evb = nullptr); folly::SemiFuture getValue(size_t vIdLen, GraphSpaceID spaceId, @@ -51,7 +50,7 @@ class InternalStorageClient : public StorageClientBase> p, + folly::Promise p, folly::EventBase* evb); void getValueImpl(GraphSpaceID spaceId, diff --git a/src/common/clients/storage/StorageClientBase.h b/src/common/clients/storage/StorageClientBase.h index afa1daec9..8972f8862 100644 --- a/src/common/clients/storage/StorageClientBase.h +++ b/src/common/clients/storage/StorageClientBase.h @@ -19,6 +19,7 @@ DECLARE_int32(storage_client_timeout_ms); DECLARE_uint32(storage_client_retry_interval_ms); +constexpr int32_t kInternalPortOffset = -2; namespace nebula { namespace storage { @@ -122,6 +123,7 @@ class StorageClientBase { folly::EventBase* evb, std::unordered_map requests, RemoteFunc&& remoteFunc, + int32_t portOffsetIfRetry = 0, std::size_t retry = 0, std::size_t retryLimit = 3); @@ -136,6 +138,7 @@ class StorageClientBase { folly::EventBase* evb, std::pair&& request, RemoteFunc&& remoteFunc, + int32_t leaderPortOffset = 0, folly::Promise> pro = folly::Promise>(), std::size_t retry = 0, std::size_t retryLimit = 3); @@ -151,6 +154,7 @@ class StorageClientBase { folly::EventBase* evb, std::pair request, RemoteFunc remoteFunc, + int32_t leaderPortOffset, folly::Promise> pro, std::size_t retry, std::size_t retryLimit); diff --git a/src/common/clients/storage/StorageClientBase.inl b/src/common/clients/storage/StorageClientBase.inl index 99940d439..611b73147 100644 --- a/src/common/clients/storage/StorageClientBase.inl +++ b/src/common/clients/storage/StorageClientBase.inl @@ -176,6 +176,7 @@ StorageClientBase::collectResponse( folly::EventBase* evb, std::unordered_map requests, RemoteFunc&& remoteFunc, + int32_t portOffsetIfRetry, std::size_t retry, std::size_t retryLimit) { auto context = std::make_shared>( @@ -200,7 +201,8 @@ StorageClientBase::collectResponse( res, remoteFunc = std::move(remoteFunc), retry, - retryLimit] () mutable { + retryLimit, + portOffsetIfRetry] () mutable { auto client = clientsMan_->client(host, evb, false, @@ -219,7 +221,8 @@ StorageClientBase::collectResponse( evb, remoteFunc = std::move(remoteFunc), retry, - retryLimit] (folly::Try&& val) { + retryLimit, + portOffsetIfRetry] (folly::Try&& val) { auto& r = context->findRequest(host); if (val.hasException()) { LOG(ERROR) << "Request to " << host @@ -246,11 +249,13 @@ StorageClientBase::collectResponse( if (retry < retryLimit && isValidHostPtr(leader)) { evb->runAfterDelay([this, evb, leader = *leader, r = std::move(r), remoteFunc = std::move(remoteFunc), context, - start, retry, retryLimit] () { + start, retry, retryLimit, + portOffsetIfRetry] (){ getResponse(evb, std::pair(leader, std::move(r)), std::move(remoteFunc), + portOffsetIfRetry, folly::Promise>(), retry + 1, retryLimit) @@ -325,6 +330,7 @@ folly::Future> StorageClientBase::getResponse( folly::EventBase* evb, std::pair&& request, RemoteFunc&& remoteFunc, + int32_t leaderPortOffset, folly::Promise> pro, std::size_t retry, std::size_t retryLimit) { @@ -332,19 +338,20 @@ folly::Future> StorageClientBase::getResponse( getResponseImpl(evb, std::forward(request), std::forward(remoteFunc), + leaderPortOffset, std::move(pro), retry, retryLimit); return f; } - template template void StorageClientBase::getResponseImpl( folly::EventBase* evb, std::pair request, RemoteFunc remoteFunc, + int32_t leaderPortOffset, folly::Promise> pro, std::size_t retry, std::size_t retryLimit) { @@ -353,7 +360,7 @@ void StorageClientBase::getResponseImpl( evb = ioThreadPool_->getEventBase(); } folly::via(evb, [evb, request = std::move(request), remoteFunc = std::move(remoteFunc), - pro = std::move(pro), retry, retryLimit, this] () mutable { + leaderPortOffset, pro = std::move(pro), retry, retryLimit, this] () mutable { auto host = request.first; auto client = clientsMan_->client(host, evb, false, FLAGS_storage_client_timeout_ms); auto spaceId = request.second.get_space_id(); @@ -368,6 +375,7 @@ void StorageClientBase::getResponseImpl( evb, retry, retryLimit, + leaderPortOffset, this] (folly::Try&& t) mutable { // exception occurred during RPC if (t.hasException()) { @@ -395,11 +403,14 @@ void StorageClientBase::getResponseImpl( evb->runAfterDelay([this, evb, leader = *leader, req = std::move(request.second), remoteFunc = std::move(remoteFunc), p = std::move(p), + leaderPortOffset, retry, retryLimit] () mutable { + leader.port += leaderPortOffset; getResponseImpl(evb, std::pair(std::move(leader), - std::move(req)), + std::move(req)), std::move(remoteFunc), + leaderPortOffset, std::move(p), retry + 1, retryLimit);