diff --git a/ent/src/yb/master/catalog_manager.h b/ent/src/yb/master/catalog_manager.h index 680cc9438895..d6df0980d33b 100644 --- a/ent/src/yb/master/catalog_manager.h +++ b/ent/src/yb/master/catalog_manager.h @@ -228,10 +228,6 @@ class CatalogManager : public yb::master::CatalogManager, SnapshotCoordinatorCon Result MetadataColumnId() override; - CHECKED_STATUS ApplyOperationState(const tablet::OperationState& operation_state, - int64_t batch_idx, - const docdb::KeyValueWriteBatchPB& write_batch) override; - void Submit(std::unique_ptr operation) override; void SendCreateTabletSnapshotRequest(const scoped_refptr& tablet, diff --git a/ent/src/yb/master/catalog_manager_ent.cc b/ent/src/yb/master/catalog_manager_ent.cc index b3dd7150cf2c..bff2e2e710d3 100644 --- a/ent/src/yb/master/catalog_manager_ent.cc +++ b/ent/src/yb/master/catalog_manager_ent.cc @@ -1099,12 +1099,6 @@ Result CatalogManager::MetadataColumnId() { return sys_catalog()->MetadataColumnId(); } -Status CatalogManager::ApplyOperationState( - const tablet::OperationState& operation_state, int64_t batch_idx, - const docdb::KeyValueWriteBatchPB& write_batch) { - return tablet_peer()->tablet()->ApplyOperationState(operation_state, batch_idx, write_batch); -} - TabletInfos CatalogManager::GetTabletInfos(const std::vector& ids) { TabletInfos result; result.reserve(ids.size()); diff --git a/src/yb/client/backup-txn-test.cc b/src/yb/client/backup-txn-test.cc index f76643080835..8322bf4fdadc 100644 --- a/src/yb/client/backup-txn-test.cc +++ b/src/yb/client/backup-txn-test.cc @@ -23,6 +23,7 @@ using namespace std::literals; DECLARE_uint64(max_clock_skew_usec); +DECLARE_bool(flush_rocksdb_on_shutdown); namespace yb { namespace client { @@ -38,6 +39,13 @@ class BackupTxnTest : public TransactionTestBase { TransactionTestBase::SetUp(); } + void DoBeforeTearDown() override { + FLAGS_flush_rocksdb_on_shutdown = false; + ASSERT_OK(cluster_->RestartSync()); + + TransactionTestBase::DoBeforeTearDown(); + } + master::MasterBackupServiceProxy MakeBackupServiceProxy() { return master::MasterBackupServiceProxy( &client_->proxy_cache(), cluster_->leader_mini_master()->bound_rpc_addr()); diff --git a/src/yb/master/catalog_manager.cc b/src/yb/master/catalog_manager.cc index 0ec650faa191..741336553057 100644 --- a/src/yb/master/catalog_manager.cc +++ b/src/yb/master/catalog_manager.cc @@ -1014,7 +1014,7 @@ Status CatalogManager::PrepareSysCatalogTable(int64_t term) { metadata.set_namespace_id(kSystemSchemaNamespaceId); metadata.set_name(kSysCatalogTableName); metadata.set_table_type(TableType::YQL_TABLE_TYPE); - SchemaToPB(sys_catalog_->schema_with_ids_, metadata.mutable_schema()); + SchemaToPB(sys_catalog_->schema_, metadata.mutable_schema()); metadata.set_version(0); auto table_ids_map_checkout = table_ids_map_.CheckOut(); @@ -1036,7 +1036,7 @@ Status CatalogManager::PrepareSysCatalogTable(int64_t term) { auto l = table->LockForRead(); PartitionSchema partition_schema; RETURN_NOT_OK(PartitionSchema::FromPB(l->data().pb.partition_schema(), - sys_catalog_->schema_with_ids_, + sys_catalog_->schema_, &partition_schema)); vector partitions; RETURN_NOT_OK(partition_schema.CreatePartitions(1, &partitions)); diff --git a/src/yb/master/master_snapshot_coordinator.cc b/src/yb/master/master_snapshot_coordinator.cc index 540cc9acba62..2569a5c76d20 100644 --- a/src/yb/master/master_snapshot_coordinator.cc +++ b/src/yb/master/master_snapshot_coordinator.cc @@ -23,6 +23,7 @@ #include "yb/master/master_error.h" #include "yb/master/sys_catalog_writer.h" +#include "yb/tablet/tablet.h" #include "yb/tablet/operations/snapshot_operation.h" #include "yb/tablet/operations/write_operation.h" @@ -451,7 +452,7 @@ class MasterSnapshotCoordinator::Impl { } } - RETURN_NOT_OK(context_.ApplyOperationState(state, /* batch_idx= */ -1, write_batch)); + RETURN_NOT_OK(state.tablet()->ApplyOperationState(state, /* batch_idx= */ -1, write_batch)); if (!tablet_infos.empty()) { auto snapshot_id_str = id.AsSlice().ToBuffer(); @@ -472,7 +473,12 @@ class MasterSnapshotCoordinator::Impl { std::lock_guard lock(mutex_); auto emplace_result = snapshots_.emplace(snapshot_id, std::move(snapshot)); if (!emplace_result.second) { - return STATUS_FORMAT(IllegalState, "Duplicate snapshot id: $0", snapshot_id); + // During sys catalog bootstrap we replay WAL, that could contain "create snapshot" operation. + // In this case we add snapshot to snapshots_ and write it data into RocksDB. + // Bootstrap sys catalog tries to load all entries from RocksDB, so could find recently + // added entry and try to Load it. + // So we could just ignore it in this case. + return Status::OK(); } return Status::OK(); @@ -529,7 +535,8 @@ class MasterSnapshotCoordinator::Impl { tablet_infos = snapshot.TabletInfosInState(SysSnapshotEntryPB::DELETING); } } - RETURN_NOT_OK(context_.ApplyOperationState(state, /* batch_idx= */ -1, write_batch)); + + RETURN_NOT_OK(state.tablet()->ApplyOperationState(state, /* batch_idx= */ -1, write_batch)); if (!tablet_infos.empty()) { auto snapshot_id_str = snapshot_id.AsSlice().ToBuffer(); diff --git a/src/yb/master/master_snapshot_coordinator.h b/src/yb/master/master_snapshot_coordinator.h index e412006ff338..dac5c7693b7d 100644 --- a/src/yb/master/master_snapshot_coordinator.h +++ b/src/yb/master/master_snapshot_coordinator.h @@ -56,10 +56,6 @@ class SnapshotCoordinatorContext { virtual Result MetadataColumnId() = 0; - virtual CHECKED_STATUS ApplyOperationState( - const tablet::OperationState& operation_state, int64_t batch_idx, - const docdb::KeyValueWriteBatchPB& write_batch) = 0; - virtual void Submit(std::unique_ptr operation) = 0; virtual ~SnapshotCoordinatorContext() = default; diff --git a/src/yb/master/sys_catalog-internal.h b/src/yb/master/sys_catalog-internal.h index 28af556f051f..9ad642b2a363 100644 --- a/src/yb/master/sys_catalog-internal.h +++ b/src/yb/master/sys_catalog-internal.h @@ -127,7 +127,7 @@ CHECKED_STATUS SysCatalogTable::MutateItems( } std::unique_ptr SysCatalogTable::NewWriter(int64_t leader_term) { - return std::make_unique(kSysCatalogTabletId, schema_with_ids_, leader_term); + return std::make_unique(kSysCatalogTabletId, schema_, leader_term); } } // namespace master diff --git a/src/yb/master/sys_catalog.cc b/src/yb/master/sys_catalog.cc index 5cce8e9ef2a9..db09e2bb8bda 100644 --- a/src/yb/master/sys_catalog.cc +++ b/src/yb/master/sys_catalog.cc @@ -127,7 +127,8 @@ std::string SysCatalogTable::schema_column_metadata() { return kSysCatalogTableC SysCatalogTable::SysCatalogTable(Master* master, MetricRegistry* metrics, ElectedLeaderCallback leader_cb) - : metric_registry_(metrics), + : schema_(BuildTableSchema()), + metric_registry_(metrics), metric_entity_(METRIC_ENTITY_server.Instantiate(metric_registry_, "yb.master")), master_(master), leader_cb_(std::move(leader_cb)) { @@ -207,7 +208,7 @@ Status SysCatalogTable::Load(FsManager* fs_manager) { RETURN_NOT_OK(tablet::RaftGroupMetadata::Load(fs_manager, kSysCatalogTabletId, &metadata)); // Verify that the schema is the current one - if (!metadata->schema().Equals(BuildTableSchema())) { + if (!metadata->schema().Equals(schema_)) { // TODO: In this case we probably should execute the migration step. return(STATUS(Corruption, "Unexpected schema", metadata->schema().ToString())); } @@ -551,9 +552,9 @@ Status SysCatalogTable::OpenTablet(const scoped_refptrRegisterMaintenanceOps(master_->maintenance_manager()); - const Schema* schema = tablet->schema(); - schema_ = SchemaBuilder(*schema).BuildWithoutIds(); - schema_with_ids_ = SchemaBuilder(*schema).Build(); + if (!tablet->schema()->Equals(schema_)) { + return STATUS(Corruption, "Unexpected schema", tablet->schema()->ToString()); + } return Status::OK(); } @@ -683,16 +684,15 @@ Status SysCatalogTable::Visit(VisitorBase* visitor) { if (!tablet) { return STATUS(ShutdownInProgress, "SysConfig is shutting down."); } - auto iter = tablet->NewRowIterator(schema_, boost::none); - RETURN_NOT_OK(iter); + auto iter = VERIFY_RESULT(tablet->NewRowIterator(schema_.CopyWithoutColumnIds(), boost::none)); - auto doc_iter = dynamic_cast(iter->get()); + auto doc_iter = dynamic_cast(iter.get()); CHECK(doc_iter != nullptr); QLConditionPB cond; cond.set_op(QL_OP_AND); - QLAddInt8Condition(&cond, schema_with_ids_.column_id(type_col_idx), QL_OP_EQUAL, tables_entry); + QLAddInt8Condition(&cond, schema_.column_id(type_col_idx), QL_OP_EQUAL, tables_entry); yb::docdb::DocQLScanSpec spec( - schema_with_ids_, boost::none /* hash_code */, boost::none /* max_hash_code */, + schema_, boost::none /* hash_code */, boost::none /* max_hash_code */, {} /* hashed_components */, &cond, nullptr /* if_req */, rocksdb::kDefaultQueryId); RETURN_NOT_OK(doc_iter->Init(spec)); @@ -700,13 +700,13 @@ Status SysCatalogTable::Visit(VisitorBase* visitor) { QLValue entry_type, entry_id, metadata; uint64_t count = 0; auto start = CoarseMonoClock::Now(); - while (VERIFY_RESULT((**iter).HasNext())) { + while (VERIFY_RESULT(iter->HasNext())) { ++count; - RETURN_NOT_OK((**iter).NextRow(&value_map)); - RETURN_NOT_OK(value_map.GetValue(schema_with_ids_.column_id(type_col_idx), &entry_type)); + RETURN_NOT_OK(iter->NextRow(&value_map)); + RETURN_NOT_OK(value_map.GetValue(schema_.column_id(type_col_idx), &entry_type)); CHECK_EQ(entry_type.int8_value(), tables_entry); - RETURN_NOT_OK(value_map.GetValue(schema_with_ids_.column_id(entry_id_col_idx), &entry_id)); - RETURN_NOT_OK(value_map.GetValue(schema_with_ids_.column_id(metadata_col_idx), &metadata)); + RETURN_NOT_OK(value_map.GetValue(schema_.column_id(entry_id_col_idx), &entry_id)); + RETURN_NOT_OK(value_map.GetValue(schema_.column_id(metadata_col_idx), &metadata)); RETURN_NOT_OK(visitor->Visit(entry_id.binary_value(), metadata.binary_value())); } auto duration = CoarseMonoClock::Now() - start; @@ -804,7 +804,7 @@ Status SysCatalogTable::DeleteYsqlSystemTable(const string& table_id) { } Result SysCatalogTable::MetadataColumnId() { - return schema_with_ids_.ColumnIdByName(kSysCatalogTableColMetadata); + return schema_.ColumnIdByName(kSysCatalogTableColMetadata); } } // namespace master diff --git a/src/yb/master/sys_catalog.h b/src/yb/master/sys_catalog.h index 275b1206ebad..4ad0417ca553 100644 --- a/src/yb/master/sys_catalog.h +++ b/src/yb/master/sys_catalog.h @@ -217,11 +217,8 @@ class SysCatalogTable { // Crashes due to an invariant check if the rpc server is not running. void InitLocalRaftPeerPB(); - // Table schema, without IDs, used to send messages to the TabletPeer - Schema schema_; - // Table schema, with IDs, used for the YQL write path. - Schema schema_with_ids_; + Schema schema_; MetricRegistry* metric_registry_; diff --git a/src/yb/tablet/tablet_bootstrap.cc b/src/yb/tablet/tablet_bootstrap.cc index 8bcae83975a3..688d5ac05d6e 100644 --- a/src/yb/tablet/tablet_bootstrap.cc +++ b/src/yb/tablet/tablet_bootstrap.cc @@ -123,10 +123,10 @@ static string DebugInfo(const string& tablet_id, int segment_seqno, int entry_idx, const string& segment_path, - const LogEntryPB& entry) { + const LogEntryPB* entry) { // Truncate the debug string to a reasonable length for logging. Otherwise, glog will truncate // for us and we may miss important information which came after this long string. - string debug_str = entry.ShortDebugString(); + string debug_str = entry ? entry->ShortDebugString() : ""s; if (debug_str.size() > 500) { debug_str.resize(500); debug_str.append("..."); @@ -178,8 +178,7 @@ struct ReplayState { CHECKED_STATUS ApplyCommittedPendingReplicates(const Handler& handler) { auto iter = pending_replicates.begin(); while (iter != pending_replicates.end() && CanApply(iter->second.entry.get())) { - std::unique_ptr entry = std::move(iter->second.entry); - RETURN_NOT_OK(handler(entry.get(), iter->second.entry_time)); + RETURN_NOT_OK(handler(iter->second.entry.get(), iter->second.entry_time)); iter = pending_replicates.erase(iter); // erase and advance the iterator (C++11) ++num_entries_applied_to_rocksdb; } @@ -315,8 +314,7 @@ Status ReplayState::UpdateSplitOpId(const ReplicateMsg& msg, const TabletId& tab void ReplayState::AddEntriesToStrings(const OpIndexToEntryMap& entries, std::vector* strings) const { for (const OpIndexToEntryMap::value_type& map_entry : entries) { - LogEntryPB* entry = DCHECK_NOTNULL(map_entry.second.entry.get()); - strings->push_back(Substitute(" [$0] $1", map_entry.first, entry->ShortDebugString())); + strings->push_back(Format(" [$0] $1", map_entry.first, map_entry.second.entry.get())); } } @@ -775,6 +773,7 @@ Status TabletBootstrap::PlayTabletSnapshotOpRequest(ReplicateMsg* replicate_msg) TabletSnapshotOpRequestPB* const snapshot = replicate_msg->mutable_snapshot_request(); SnapshotOperationState tx_state(tablet_.get(), snapshot); + tx_state.set_hybrid_time(HybridTime(replicate_msg->hybrid_time())); return tx_state.Apply(/* leader_term= */ yb::OpId::kUnknownTerm); } @@ -970,12 +969,12 @@ Status TabletBootstrap::PlaySegments(ConsensusBootstrapInfo* consensus_info) { Status s = HandleEntry( read_result.entry_metadata[entry_idx], &read_result.entries[entry_idx]); if (!s.ok()) { - LOG(INFO) << "Dumping replay state to log"; + LOG(INFO) << "Dumping replay state to log: " << s; DumpReplayStateToLog(); RETURN_NOT_OK_PREPEND(s, DebugInfo(tablet_->tablet_id(), segment->header().sequence_number(), entry_idx, segment->path(), - *read_result.entries[entry_idx])); + read_result.entries[entry_idx].get())); } } if (!read_result.entry_metadata.empty()) {