Skip to content

Commit

Permalink
[#1032] Add functionality of resolving intents to transaction partici…
Browse files Browse the repository at this point in the history
…pant

Summary:
For transaction aware snapshot we should apply intents for all transactions
that were committed before snapshot hybrid time.

This diff adds such functionality to transaction participant.

Test Plan: ybd --gtest_filter SnapshotTxnTest.ResolveIntents

Reviewers: bogdan, mikhail, timur

Reviewed By: mikhail, timur

Subscribers: ybase

Differential Revision: https://phabricator.dev.yugabyte.com/D7861
  • Loading branch information
spolitov committed Mar 1, 2020
1 parent 289c488 commit 8848b00
Show file tree
Hide file tree
Showing 10 changed files with 499 additions and 104 deletions.
48 changes: 48 additions & 0 deletions src/yb/client/snapshot-txn-test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@

#include "yb/consensus/raft_consensus.h"

#include "yb/docdb/consensus_frontier.h"

#include "yb/tserver/mini_tablet_server.h"
#include "yb/tserver/tablet_server.h"

Expand Down Expand Up @@ -795,5 +797,51 @@ TEST_F_EX(SnapshotTxnTest, TruncateDuringShutdown, RemoteBootstrapOnStartBase) {
"All tablets running"));
}

TEST_F_EX(SnapshotTxnTest, ResolveIntents, SingleTabletSnapshotTxnTest) {
SetIgnoreApplyingProbability(0.5);

TransactionPool pool(transaction_manager_.get_ptr(), nullptr /* metric_entity */);
auto session = CreateSession();
auto prev_ht = clock_->Now();
for (int i = 0; i != 4; ++i) {
auto txn = ASSERT_RESULT(pool.TakeAndInit(isolation_level_));
session->SetTransaction(txn);
ASSERT_OK(WriteRow(session, i, -i));
ASSERT_OK(txn->CommitFuture().get());

auto peers = ListTabletPeers(
cluster_.get(), [](const std::shared_ptr<tablet::TabletPeer>& peer) {
if (peer->table_type() == TableType::TRANSACTION_STATUS_TABLE_TYPE) {
return false;
}
return peer->consensus()->GetLeaderStatus() == consensus::LeaderStatus::LEADER_AND_READY;
});
ASSERT_EQ(peers.size(), 1);
auto peer = peers[0];
auto tablet = peer->tablet();
ASSERT_OK(tablet->transaction_participant()->ResolveIntents(
peer->clock().Now(), CoarseTimePoint::max()));
auto current_ht = clock_->Now();
ASSERT_OK(tablet->Flush(tablet::FlushMode::kSync));
bool found = false;
auto files = tablet->TEST_db()->GetLiveFilesMetaData();
for (const auto& meta : files) {
auto min_ht = down_cast<docdb::ConsensusFrontier&>(
*meta.smallest.user_frontier).hybrid_time();
auto max_ht = down_cast<docdb::ConsensusFrontier&>(
*meta.largest.user_frontier).hybrid_time();
if (min_ht > prev_ht && max_ht < current_ht) {
found = true;
break;
}
}

ASSERT_TRUE(found) << "Cannot find SST file that fits into " << prev_ht << " - " << current_ht
<< " range, files: " << AsString(files);

prev_ht = current_ht;
}
}

} // namespace client
} // namespace yb
1 change: 1 addition & 0 deletions src/yb/common/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,7 @@ ADD_YB_LIBRARY(pgsql_protocol_proto
NONLINK_DEPS ${PGSQL_PROTOCOL_PROTO_TGTS})

set(COMMON_SRCS
clock.cc
id_mapping.cc
entity_ids.cc
key_encoder.cc
Expand Down
39 changes: 39 additions & 0 deletions src/yb/common/clock.cc
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
// Copyright (c) YugaByte, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
// in compliance with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software distributed under the License
// is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
// or implied. See the License for the specific language governing permissions and limitations
// under the License.
//

#include "yb/common/clock.h"

using namespace std::literals;

DEFINE_uint64(wait_hybrid_time_sleep_interval_us, 10000,
"Sleep interval in microseconds that will be used while waiting for specific "
"hybrid time.");

namespace yb {

Status WaitUntil(ClockBase* clock_base, HybridTime hybrid_time, CoarseTimePoint deadline) {
auto ht_now = clock_base->Now();
while (ht_now < hybrid_time) {
if (CoarseMonoClock::now() > deadline) {
return STATUS_FORMAT(TimedOut, "Timed out waiting for $0, now $1", deadline, ht_now);
}
auto delta_micros = hybrid_time.GetPhysicalValueMicros() - ht_now.GetPhysicalValueMicros();
std::this_thread::sleep_for(
std::max(FLAGS_wait_hybrid_time_sleep_interval_us, delta_micros) * 1us);
ht_now = clock_base->Now();
}

return Status::OK();
}

} // namespace yb
2 changes: 2 additions & 0 deletions src/yb/common/clock.h
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,8 @@ class ClockBase : public RefCountedThreadSafe<ClockBase> {
virtual ~ClockBase() {}
};

CHECKED_STATUS WaitUntil(ClockBase* clock, HybridTime hybrid_time, CoarseTimePoint deadline);

} // namespace yb

#endif // YB_COMMON_CLOCK_H
4 changes: 2 additions & 2 deletions src/yb/docdb/docdb_util.cc
Original file line number Diff line number Diff line change
Expand Up @@ -96,8 +96,8 @@ Status DocDBRocksDBUtil::PopulateRocksDBWriteBatch(
bool decode_dockey,
bool increment_write_id,
PartialRangeKeyIntents partial_range_key_intents) const {
for (const auto& entry : dwb.key_value_pairs()) {
if (decode_dockey) {
if (decode_dockey) {
for (const auto& entry : dwb.key_value_pairs()) {
SubDocKey subdoc_key;
// We don't expect any invalid encoded keys in the write batch. However, these encoded keys
// don't contain the HybridTime.
Expand Down
1 change: 1 addition & 0 deletions src/yb/tablet/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ set(TABLET_SRCS
tablet_peer.cc
transaction_coordinator.cc
transaction_participant.cc
transaction_status_resolver.cc
operation_order_verifier.cc
operations/operation.cc
operations/change_metadata_operation.cc
Expand Down
Loading

0 comments on commit 8848b00

Please sign in to comment.