Skip to content

Commit

Permalink
[#1032] Fix master bootstrap after snapshot operations
Browse files Browse the repository at this point in the history
Summary:
This diff fixes several issues related to replying snapshot related operations during master bootstrap:
1) On boostrap failure we are trying to dump current replay state, but it could cause crash, since replayed entry could already be moved from pending_replicates and other data structures. Fixed by handling nullptr there.
2) schema_with_ids_ in SysCatalog is initialized after bootstrap, that prevents snapshot related operations from being replayed. Fixed by initializing schema_with_ids_ in ctor.
3) SnapshotOperationState does not have hybrid time, while being replayed. Fixed by setting hybrid time from replicate message.
4) tablet_ in TabletPeer is not yet initialized while applying snapshot operation state during bootstrap. Fixed by using tablet from operation state.
5) Snapshot could be inserted twice during bootstrap. First because of logs replay and second during sys catalog load.

Also added restart w/o flush before BackupTxnTest tear down to test replaying snapshot operations during bootstrap.

Test Plan: ybd --gtest_filter BackupTxnTest.*

Reviewers: mikhail, oleg, bogdan

Reviewed By: oleg

Subscribers: ybase

Differential Revision: https://phabricator.dev.yugabyte.com/D8277
  • Loading branch information
spolitov committed Apr 13, 2020
1 parent c952ee3 commit 06471cc
Show file tree
Hide file tree
Showing 10 changed files with 45 additions and 48 deletions.
4 changes: 0 additions & 4 deletions ent/src/yb/master/catalog_manager.h
Original file line number Diff line number Diff line change
Expand Up @@ -228,10 +228,6 @@ class CatalogManager : public yb::master::CatalogManager, SnapshotCoordinatorCon

Result<ColumnId> MetadataColumnId() override;

CHECKED_STATUS ApplyOperationState(const tablet::OperationState& operation_state,
int64_t batch_idx,
const docdb::KeyValueWriteBatchPB& write_batch) override;

void Submit(std::unique_ptr<tablet::Operation> operation) override;

void SendCreateTabletSnapshotRequest(const scoped_refptr<TabletInfo>& tablet,
Expand Down
6 changes: 0 additions & 6 deletions ent/src/yb/master/catalog_manager_ent.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1099,12 +1099,6 @@ Result<ColumnId> CatalogManager::MetadataColumnId() {
return sys_catalog()->MetadataColumnId();
}

Status CatalogManager::ApplyOperationState(
const tablet::OperationState& operation_state, int64_t batch_idx,
const docdb::KeyValueWriteBatchPB& write_batch) {
return tablet_peer()->tablet()->ApplyOperationState(operation_state, batch_idx, write_batch);
}

TabletInfos CatalogManager::GetTabletInfos(const std::vector<TabletId>& ids) {
TabletInfos result;
result.reserve(ids.size());
Expand Down
8 changes: 8 additions & 0 deletions src/yb/client/backup-txn-test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
using namespace std::literals;

DECLARE_uint64(max_clock_skew_usec);
DECLARE_bool(flush_rocksdb_on_shutdown);

namespace yb {
namespace client {
Expand All @@ -38,6 +39,13 @@ class BackupTxnTest : public TransactionTestBase {
TransactionTestBase::SetUp();
}

void DoBeforeTearDown() override {
FLAGS_flush_rocksdb_on_shutdown = false;
ASSERT_OK(cluster_->RestartSync());

TransactionTestBase::DoBeforeTearDown();
}

master::MasterBackupServiceProxy MakeBackupServiceProxy() {
return master::MasterBackupServiceProxy(
&client_->proxy_cache(), cluster_->leader_mini_master()->bound_rpc_addr());
Expand Down
4 changes: 2 additions & 2 deletions src/yb/master/catalog_manager.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1014,7 +1014,7 @@ Status CatalogManager::PrepareSysCatalogTable(int64_t term) {
metadata.set_namespace_id(kSystemSchemaNamespaceId);
metadata.set_name(kSysCatalogTableName);
metadata.set_table_type(TableType::YQL_TABLE_TYPE);
SchemaToPB(sys_catalog_->schema_with_ids_, metadata.mutable_schema());
SchemaToPB(sys_catalog_->schema_, metadata.mutable_schema());
metadata.set_version(0);

auto table_ids_map_checkout = table_ids_map_.CheckOut();
Expand All @@ -1036,7 +1036,7 @@ Status CatalogManager::PrepareSysCatalogTable(int64_t term) {
auto l = table->LockForRead();
PartitionSchema partition_schema;
RETURN_NOT_OK(PartitionSchema::FromPB(l->data().pb.partition_schema(),
sys_catalog_->schema_with_ids_,
sys_catalog_->schema_,
&partition_schema));
vector<Partition> partitions;
RETURN_NOT_OK(partition_schema.CreatePartitions(1, &partitions));
Expand Down
13 changes: 10 additions & 3 deletions src/yb/master/master_snapshot_coordinator.cc
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
#include "yb/master/master_error.h"
#include "yb/master/sys_catalog_writer.h"

#include "yb/tablet/tablet.h"
#include "yb/tablet/operations/snapshot_operation.h"
#include "yb/tablet/operations/write_operation.h"

Expand Down Expand Up @@ -451,7 +452,7 @@ class MasterSnapshotCoordinator::Impl {
}
}

RETURN_NOT_OK(context_.ApplyOperationState(state, /* batch_idx= */ -1, write_batch));
RETURN_NOT_OK(state.tablet()->ApplyOperationState(state, /* batch_idx= */ -1, write_batch));

if (!tablet_infos.empty()) {
auto snapshot_id_str = id.AsSlice().ToBuffer();
Expand All @@ -472,7 +473,12 @@ class MasterSnapshotCoordinator::Impl {
std::lock_guard<std::mutex> lock(mutex_);
auto emplace_result = snapshots_.emplace(snapshot_id, std::move(snapshot));
if (!emplace_result.second) {
return STATUS_FORMAT(IllegalState, "Duplicate snapshot id: $0", snapshot_id);
// During sys catalog bootstrap we replay WAL, that could contain "create snapshot" operation.
// In this case we add snapshot to snapshots_ and write it data into RocksDB.
// Bootstrap sys catalog tries to load all entries from RocksDB, so could find recently
// added entry and try to Load it.
// So we could just ignore it in this case.
return Status::OK();
}

return Status::OK();
Expand Down Expand Up @@ -529,7 +535,8 @@ class MasterSnapshotCoordinator::Impl {
tablet_infos = snapshot.TabletInfosInState(SysSnapshotEntryPB::DELETING);
}
}
RETURN_NOT_OK(context_.ApplyOperationState(state, /* batch_idx= */ -1, write_batch));

RETURN_NOT_OK(state.tablet()->ApplyOperationState(state, /* batch_idx= */ -1, write_batch));

if (!tablet_infos.empty()) {
auto snapshot_id_str = snapshot_id.AsSlice().ToBuffer();
Expand Down
4 changes: 0 additions & 4 deletions src/yb/master/master_snapshot_coordinator.h
Original file line number Diff line number Diff line change
Expand Up @@ -56,10 +56,6 @@ class SnapshotCoordinatorContext {

virtual Result<ColumnId> MetadataColumnId() = 0;

virtual CHECKED_STATUS ApplyOperationState(
const tablet::OperationState& operation_state, int64_t batch_idx,
const docdb::KeyValueWriteBatchPB& write_batch) = 0;

virtual void Submit(std::unique_ptr<tablet::Operation> operation) = 0;

virtual ~SnapshotCoordinatorContext() = default;
Expand Down
2 changes: 1 addition & 1 deletion src/yb/master/sys_catalog-internal.h
Original file line number Diff line number Diff line change
Expand Up @@ -127,7 +127,7 @@ CHECKED_STATUS SysCatalogTable::MutateItems(
}

std::unique_ptr<SysCatalogWriter> SysCatalogTable::NewWriter(int64_t leader_term) {
return std::make_unique<SysCatalogWriter>(kSysCatalogTabletId, schema_with_ids_, leader_term);
return std::make_unique<SysCatalogWriter>(kSysCatalogTabletId, schema_, leader_term);
}

} // namespace master
Expand Down
32 changes: 16 additions & 16 deletions src/yb/master/sys_catalog.cc
Original file line number Diff line number Diff line change
Expand Up @@ -127,7 +127,8 @@ std::string SysCatalogTable::schema_column_metadata() { return kSysCatalogTableC

SysCatalogTable::SysCatalogTable(Master* master, MetricRegistry* metrics,
ElectedLeaderCallback leader_cb)
: metric_registry_(metrics),
: schema_(BuildTableSchema()),
metric_registry_(metrics),
metric_entity_(METRIC_ENTITY_server.Instantiate(metric_registry_, "yb.master")),
master_(master),
leader_cb_(std::move(leader_cb)) {
Expand Down Expand Up @@ -207,7 +208,7 @@ Status SysCatalogTable::Load(FsManager* fs_manager) {
RETURN_NOT_OK(tablet::RaftGroupMetadata::Load(fs_manager, kSysCatalogTabletId, &metadata));

// Verify that the schema is the current one
if (!metadata->schema().Equals(BuildTableSchema())) {
if (!metadata->schema().Equals(schema_)) {
// TODO: In this case we probably should execute the migration step.
return(STATUS(Corruption, "Unexpected schema", metadata->schema().ToString()));
}
Expand Down Expand Up @@ -551,9 +552,9 @@ Status SysCatalogTable::OpenTablet(const scoped_refptr<tablet::RaftGroupMetadata

tablet_peer()->RegisterMaintenanceOps(master_->maintenance_manager());

const Schema* schema = tablet->schema();
schema_ = SchemaBuilder(*schema).BuildWithoutIds();
schema_with_ids_ = SchemaBuilder(*schema).Build();
if (!tablet->schema()->Equals(schema_)) {
return STATUS(Corruption, "Unexpected schema", tablet->schema()->ToString());
}
return Status::OK();
}

Expand Down Expand Up @@ -683,30 +684,29 @@ Status SysCatalogTable::Visit(VisitorBase* visitor) {
if (!tablet) {
return STATUS(ShutdownInProgress, "SysConfig is shutting down.");
}
auto iter = tablet->NewRowIterator(schema_, boost::none);
RETURN_NOT_OK(iter);
auto iter = VERIFY_RESULT(tablet->NewRowIterator(schema_.CopyWithoutColumnIds(), boost::none));

auto doc_iter = dynamic_cast<yb::docdb::DocRowwiseIterator*>(iter->get());
auto doc_iter = dynamic_cast<yb::docdb::DocRowwiseIterator*>(iter.get());
CHECK(doc_iter != nullptr);
QLConditionPB cond;
cond.set_op(QL_OP_AND);
QLAddInt8Condition(&cond, schema_with_ids_.column_id(type_col_idx), QL_OP_EQUAL, tables_entry);
QLAddInt8Condition(&cond, schema_.column_id(type_col_idx), QL_OP_EQUAL, tables_entry);
yb::docdb::DocQLScanSpec spec(
schema_with_ids_, boost::none /* hash_code */, boost::none /* max_hash_code */,
schema_, boost::none /* hash_code */, boost::none /* max_hash_code */,
{} /* hashed_components */, &cond, nullptr /* if_req */, rocksdb::kDefaultQueryId);
RETURN_NOT_OK(doc_iter->Init(spec));

QLTableRow value_map;
QLValue entry_type, entry_id, metadata;
uint64_t count = 0;
auto start = CoarseMonoClock::Now();
while (VERIFY_RESULT((**iter).HasNext())) {
while (VERIFY_RESULT(iter->HasNext())) {
++count;
RETURN_NOT_OK((**iter).NextRow(&value_map));
RETURN_NOT_OK(value_map.GetValue(schema_with_ids_.column_id(type_col_idx), &entry_type));
RETURN_NOT_OK(iter->NextRow(&value_map));
RETURN_NOT_OK(value_map.GetValue(schema_.column_id(type_col_idx), &entry_type));
CHECK_EQ(entry_type.int8_value(), tables_entry);
RETURN_NOT_OK(value_map.GetValue(schema_with_ids_.column_id(entry_id_col_idx), &entry_id));
RETURN_NOT_OK(value_map.GetValue(schema_with_ids_.column_id(metadata_col_idx), &metadata));
RETURN_NOT_OK(value_map.GetValue(schema_.column_id(entry_id_col_idx), &entry_id));
RETURN_NOT_OK(value_map.GetValue(schema_.column_id(metadata_col_idx), &metadata));
RETURN_NOT_OK(visitor->Visit(entry_id.binary_value(), metadata.binary_value()));
}
auto duration = CoarseMonoClock::Now() - start;
Expand Down Expand Up @@ -804,7 +804,7 @@ Status SysCatalogTable::DeleteYsqlSystemTable(const string& table_id) {
}

Result<ColumnId> SysCatalogTable::MetadataColumnId() {
return schema_with_ids_.ColumnIdByName(kSysCatalogTableColMetadata);
return schema_.ColumnIdByName(kSysCatalogTableColMetadata);
}

} // namespace master
Expand Down
5 changes: 1 addition & 4 deletions src/yb/master/sys_catalog.h
Original file line number Diff line number Diff line change
Expand Up @@ -217,11 +217,8 @@ class SysCatalogTable {
// Crashes due to an invariant check if the rpc server is not running.
void InitLocalRaftPeerPB();

// Table schema, without IDs, used to send messages to the TabletPeer
Schema schema_;

// Table schema, with IDs, used for the YQL write path.
Schema schema_with_ids_;
Schema schema_;

MetricRegistry* metric_registry_;

Expand Down
15 changes: 7 additions & 8 deletions src/yb/tablet/tablet_bootstrap.cc
Original file line number Diff line number Diff line change
Expand Up @@ -123,10 +123,10 @@ static string DebugInfo(const string& tablet_id,
int segment_seqno,
int entry_idx,
const string& segment_path,
const LogEntryPB& entry) {
const LogEntryPB* entry) {
// Truncate the debug string to a reasonable length for logging. Otherwise, glog will truncate
// for us and we may miss important information which came after this long string.
string debug_str = entry.ShortDebugString();
string debug_str = entry ? entry->ShortDebugString() : "<NULL>"s;
if (debug_str.size() > 500) {
debug_str.resize(500);
debug_str.append("...");
Expand Down Expand Up @@ -178,8 +178,7 @@ struct ReplayState {
CHECKED_STATUS ApplyCommittedPendingReplicates(const Handler& handler) {
auto iter = pending_replicates.begin();
while (iter != pending_replicates.end() && CanApply(iter->second.entry.get())) {
std::unique_ptr<log::LogEntryPB> entry = std::move(iter->second.entry);
RETURN_NOT_OK(handler(entry.get(), iter->second.entry_time));
RETURN_NOT_OK(handler(iter->second.entry.get(), iter->second.entry_time));
iter = pending_replicates.erase(iter); // erase and advance the iterator (C++11)
++num_entries_applied_to_rocksdb;
}
Expand Down Expand Up @@ -315,8 +314,7 @@ Status ReplayState::UpdateSplitOpId(const ReplicateMsg& msg, const TabletId& tab
void ReplayState::AddEntriesToStrings(const OpIndexToEntryMap& entries,
std::vector<std::string>* strings) const {
for (const OpIndexToEntryMap::value_type& map_entry : entries) {
LogEntryPB* entry = DCHECK_NOTNULL(map_entry.second.entry.get());
strings->push_back(Substitute(" [$0] $1", map_entry.first, entry->ShortDebugString()));
strings->push_back(Format(" [$0] $1", map_entry.first, map_entry.second.entry.get()));
}
}

Expand Down Expand Up @@ -775,6 +773,7 @@ Status TabletBootstrap::PlayTabletSnapshotOpRequest(ReplicateMsg* replicate_msg)
TabletSnapshotOpRequestPB* const snapshot = replicate_msg->mutable_snapshot_request();

SnapshotOperationState tx_state(tablet_.get(), snapshot);
tx_state.set_hybrid_time(HybridTime(replicate_msg->hybrid_time()));

return tx_state.Apply(/* leader_term= */ yb::OpId::kUnknownTerm);
}
Expand Down Expand Up @@ -970,12 +969,12 @@ Status TabletBootstrap::PlaySegments(ConsensusBootstrapInfo* consensus_info) {
Status s = HandleEntry(
read_result.entry_metadata[entry_idx], &read_result.entries[entry_idx]);
if (!s.ok()) {
LOG(INFO) << "Dumping replay state to log";
LOG(INFO) << "Dumping replay state to log: " << s;
DumpReplayStateToLog();
RETURN_NOT_OK_PREPEND(s, DebugInfo(tablet_->tablet_id(),
segment->header().sequence_number(),
entry_idx, segment->path(),
*read_result.entries[entry_idx]));
read_result.entries[entry_idx].get()));
}
}
if (!read_result.entry_metadata.empty()) {
Expand Down

0 comments on commit 06471cc

Please sign in to comment.