Skip to content

Commit

Permalink
Fix the bug for restoring data with duplicated page id (#4889)
Browse files Browse the repository at this point in the history
ref #3594
  • Loading branch information
JaySon-Huang authored May 17, 2022
1 parent 0477f96 commit 43b25fc
Show file tree
Hide file tree
Showing 29 changed files with 777 additions and 426 deletions.
1 change: 1 addition & 0 deletions dbms/src/Common/ErrorCodes.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -424,6 +424,7 @@ extern const int DEADLOCK_AVOIDED = 10013;
extern const int PTHREAD_ERROR = 10014;
extern const int PS_ENTRY_NOT_EXISTS = 10015;
extern const int PS_ENTRY_NO_VALID_VERSION = 10016;
extern const int PS_DIR_APPLY_INVALID_STATUS = 10017;
} // namespace ErrorCodes

} // namespace DB
5 changes: 3 additions & 2 deletions dbms/src/Interpreters/Context.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1650,8 +1650,9 @@ bool Context::initializeGlobalStoragePoolIfNeed(const PathPool & path_pool)
auto lock = getLock();
if (shared->global_storage_pool)
{
// GlobalStoragePool may be initialized many times in some test cases for restore.
LOG_WARNING(shared->log, "GlobalStoragePool has already been initialized.");
// Can't init GlobalStoragePool twice.
// otherwise the pagestorage instances in `StoragePool` for each table won't be updated and cause unexpected problem.
throw Exception("GlobalStoragePool has already been initialized.", ErrorCodes::LOGICAL_ERROR);
}
CurrentMetrics::set(CurrentMetrics::GlobalStorageRunMode, static_cast<UInt8>(shared->storage_run_mode));
if (shared->storage_run_mode == PageStorageRunMode::MIX_MODE || shared->storage_run_mode == PageStorageRunMode::ONLY_V3)
Expand Down
3 changes: 2 additions & 1 deletion dbms/src/Storages/DeltaMerge/DeltaMergeStore.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -243,7 +243,8 @@ DeltaMergeStore::DeltaMergeStore(Context & db_context,
try
{
page_storage_run_mode = storage_pool->restore(); // restore from disk
if (!storage_pool->maxMetaPageId())
if (const auto first_segment_entry = storage_pool->metaReader()->getPageEntry(DELTA_MERGE_FIRST_SEGMENT_ID);
!first_segment_entry.isValid())
{
// Create the first segment.
auto segment_id = storage_pool->newMetaPageId();
Expand Down
89 changes: 37 additions & 52 deletions dbms/src/Storages/DeltaMerge/StoragePool.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -82,26 +82,21 @@ PageStorage::Config extractConfig(const Settings & settings, StorageType subtype
}

GlobalStoragePool::GlobalStoragePool(const PathPool & path_pool, Context & global_ctx, const Settings & settings)
: // The iops and bandwidth in log_storage are relatively high, use multi-disks if possible
log_storage(PageStorage::create("__global__.log",
path_pool.getPSDiskDelegatorGlobalMulti("log"),
extractConfig(settings, StorageType::Log),
global_ctx.getFileProvider(),
true))
,
// The iops in data_storage is low, only use the first disk for storing data
data_storage(PageStorage::create("__global__.data",
path_pool.getPSDiskDelegatorGlobalSingle("data"),
extractConfig(settings, StorageType::Data),
global_ctx.getFileProvider(),
true))
,
// The iops in meta_storage is relatively high, use multi-disks if possible
meta_storage(PageStorage::create("__global__.meta",
path_pool.getPSDiskDelegatorGlobalMulti("meta"),
extractConfig(settings, StorageType::Meta),
global_ctx.getFileProvider(),
true))
: log_storage(PageStorage::create("__global__.log",
path_pool.getPSDiskDelegatorGlobalMulti("log"),
extractConfig(settings, StorageType::Log),
global_ctx.getFileProvider(),
true))
, data_storage(PageStorage::create("__global__.data",
path_pool.getPSDiskDelegatorGlobalMulti("data"),
extractConfig(settings, StorageType::Data),
global_ctx.getFileProvider(),
true))
, meta_storage(PageStorage::create("__global__.meta",
path_pool.getPSDiskDelegatorGlobalMulti("meta"),
extractConfig(settings, StorageType::Meta),
global_ctx.getFileProvider(),
true))
, global_context(global_ctx)
{
}
Expand All @@ -118,9 +113,9 @@ GlobalStoragePool::~GlobalStoragePool()

void GlobalStoragePool::restore()
{
log_max_ids = log_storage->restore();
data_max_ids = data_storage->restore();
meta_max_ids = meta_storage->restore();
log_storage->restore();
data_storage->restore();
meta_storage->restore();

gc_handle = global_context.getBackgroundPool().addTask(
[this] {
Expand Down Expand Up @@ -181,7 +176,7 @@ StoragePool::StoragePool(Context & global_ctx, NamespaceId ns_id_, StoragePathPo
extractConfig(global_context.getSettingsRef(), StorageType::Log),
global_context.getFileProvider());
data_storage_v2 = PageStorage::create(name + ".data",
storage_path_pool.getPSDiskDelegatorSingle("data"),
storage_path_pool.getPSDiskDelegatorSingle("data"), // keep for behavior not changed
extractConfig(global_context.getSettingsRef(), StorageType::Data),
global_ctx.getFileProvider());
meta_storage_v2 = PageStorage::create(name + ".meta",
Expand Down Expand Up @@ -295,41 +290,35 @@ void StoragePool::forceTransformMetaV2toV3()

PageStorageRunMode StoragePool::restore()
{
const auto & global_storage_pool = global_context.getGlobalStoragePool();

switch (run_mode)
{
case PageStorageRunMode::ONLY_V2:
{
auto log_max_ids = log_storage_v2->restore();
auto data_max_ids = data_storage_v2->restore();
auto meta_max_ids = meta_storage_v2->restore();
log_storage_v2->restore();
data_storage_v2->restore();
meta_storage_v2->restore();

assert(log_max_ids.size() == 1);
assert(data_max_ids.size() == 1);
assert(meta_max_ids.size() == 1);

max_log_page_id = log_max_ids[0];
max_data_page_id = data_max_ids[0];
max_meta_page_id = meta_max_ids[0];
max_log_page_id = log_storage_v2->getMaxId(ns_id);
max_data_page_id = data_storage_v2->getMaxId(ns_id);
max_meta_page_id = meta_storage_v2->getMaxId(ns_id);

storage_pool_metrics = CurrentMetrics::Increment{CurrentMetrics::StoragePoolV2Only};
break;
}
case PageStorageRunMode::ONLY_V3:
{
max_log_page_id = global_storage_pool->getLogMaxId(ns_id);
max_data_page_id = global_storage_pool->getDataMaxId(ns_id);
max_meta_page_id = global_storage_pool->getMetaMaxId(ns_id);
max_log_page_id = log_storage_v3->getMaxId(ns_id);
max_data_page_id = data_storage_v3->getMaxId(ns_id);
max_meta_page_id = meta_storage_v3->getMaxId(ns_id);

storage_pool_metrics = CurrentMetrics::Increment{CurrentMetrics::StoragePoolV3Only};
break;
}
case PageStorageRunMode::MIX_MODE:
{
auto v2_log_max_ids = log_storage_v2->restore();
auto v2_data_max_ids = data_storage_v2->restore();
auto v2_meta_max_ids = meta_storage_v2->restore();
log_storage_v2->restore();
data_storage_v2->restore();
meta_storage_v2->restore();

// The pages on data and log can be rewritten to V3 and the old pages on V2 are deleted by `delta merge`.
// However, the pages on meta V2 can not be deleted. As the pages in meta are small, we perform a forceTransformMetaV2toV3 to convert pages before all.
Expand All @@ -349,10 +338,6 @@ PageStorageRunMode StoragePool::restore()
LOG_FMT_INFO(logger, "Current meta translate already done before restored.[ns_id={}] ", ns_id);
}

assert(v2_log_max_ids.size() == 1);
assert(v2_data_max_ids.size() == 1);
assert(v2_meta_max_ids.size() == 1);

// Check number of valid pages in v2
// If V2 already have no any data in disk, Then change run_mode to ONLY_V3
if (log_storage_v2->getNumberOfPages() == 0 && data_storage_v2->getNumberOfPages() == 0 && meta_storage_v2->getNumberOfPages() == 0)
Expand All @@ -375,18 +360,18 @@ PageStorageRunMode StoragePool::restore()
data_storage_writer = std::make_shared<PageWriter>(PageStorageRunMode::ONLY_V3, /*storage_v2_*/ nullptr, data_storage_v3);
meta_storage_writer = std::make_shared<PageWriter>(PageStorageRunMode::ONLY_V3, /*storage_v2_*/ nullptr, meta_storage_v3);

max_log_page_id = global_storage_pool->getLogMaxId(ns_id);
max_data_page_id = global_storage_pool->getDataMaxId(ns_id);
max_meta_page_id = global_storage_pool->getMetaMaxId(ns_id);
max_log_page_id = log_storage_v3->getMaxId(ns_id);
max_data_page_id = data_storage_v3->getMaxId(ns_id);
max_meta_page_id = meta_storage_v3->getMaxId(ns_id);

run_mode = PageStorageRunMode::ONLY_V3;
storage_pool_metrics = CurrentMetrics::Increment{CurrentMetrics::StoragePoolV3Only};
}
else // Still running Mix Mode
{
max_log_page_id = std::max(v2_log_max_ids[0], global_storage_pool->getLogMaxId(ns_id));
max_data_page_id = std::max(v2_data_max_ids[0], global_storage_pool->getDataMaxId(ns_id));
max_meta_page_id = std::max(v2_meta_max_ids[0], global_storage_pool->getMetaMaxId(ns_id));
max_log_page_id = std::max(log_storage_v2->getMaxId(ns_id), log_storage_v3->getMaxId(ns_id));
max_data_page_id = std::max(data_storage_v2->getMaxId(ns_id), data_storage_v3->getMaxId(ns_id));
max_meta_page_id = std::max(meta_storage_v2->getMaxId(ns_id), meta_storage_v3->getMaxId(ns_id));
storage_pool_metrics = CurrentMetrics::Increment{CurrentMetrics::StoragePoolMixMode};
}
break;
Expand Down
37 changes: 0 additions & 37 deletions dbms/src/Storages/DeltaMerge/StoragePool.h
Original file line number Diff line number Diff line change
Expand Up @@ -51,39 +51,6 @@ class GlobalStoragePool : private boost::noncopyable

friend class StoragePool;

PageId getLogMaxId(NamespaceId ns_id) const
{
PageId max_log_page_id = 0;
if (const auto & it = log_max_ids.find(ns_id); it != log_max_ids.end())
{
max_log_page_id = it->second;
}

return max_log_page_id;
}

PageId getDataMaxId(NamespaceId ns_id) const
{
PageId max_data_page_id = 0;
if (const auto & it = data_max_ids.find(ns_id); it != data_max_ids.end())
{
max_data_page_id = it->second;
}

return max_data_page_id;
}

PageId getMetaMaxId(NamespaceId ns_id) const
{
PageId max_meta_page_id = 0;
if (const auto & it = meta_max_ids.find(ns_id); it != meta_max_ids.end())
{
max_meta_page_id = it->second;
}

return max_meta_page_id;
}

// GC immediately
// Only used on dbgFuncMisc
bool gc();
Expand All @@ -96,10 +63,6 @@ class GlobalStoragePool : private boost::noncopyable
PageStoragePtr data_storage;
PageStoragePtr meta_storage;

std::map<NamespaceId, PageId> log_max_ids;
std::map<NamespaceId, PageId> data_max_ids;
std::map<NamespaceId, PageId> meta_max_ids;

std::atomic<Timepoint> last_try_gc_time = Clock::now();

Context & global_context;
Expand Down
10 changes: 3 additions & 7 deletions dbms/src/Storages/Page/PageStorage.h
Original file line number Diff line number Diff line change
Expand Up @@ -229,16 +229,12 @@ class PageStorage : private boost::noncopyable

virtual ~PageStorage() = default;

// Return the map[ns_id, max_page_id]
// The caller should ensure that it only allocate new id that is larger than `max_page_id`. Reusing the
// same ID for different kind of write (put/ref/put_external) would make PageStorage run into unexpected error.
//
// Note that for V2, we always return a map with only one element: <ns_id=0, max_id> cause V2 have no
// idea about ns_id.
virtual std::map<NamespaceId, PageId> restore() = 0;
virtual void restore() = 0;

virtual void drop() = 0;

virtual PageId getMaxId(NamespaceId ns_id) = 0;

virtual SnapshotPtr getSnapshot(const String & tracing_id) = 0;

// Get some statistics of all living snapshots and the oldest living snapshot.
Expand Down
9 changes: 6 additions & 3 deletions dbms/src/Storages/Page/V2/PageStorage.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -196,7 +196,7 @@ toConcreteSnapshot(const DB::PageStorage::SnapshotPtr & ptr)
return assert_cast<PageStorage::ConcreteSnapshotRawPtr>(ptr.get());
}

std::map<NamespaceId, PageId> PageStorage::restore()
void PageStorage::restore()
{
LOG_FMT_INFO(log, "{} begin to restore data from disk. [path={}] [num_writers={}]", storage_name, delegator->defaultPath(), write_files.size());

Expand Down Expand Up @@ -353,9 +353,12 @@ std::map<NamespaceId, PageId> PageStorage::restore()
auto snapshot = getConcreteSnapshot();
size_t num_pages = snapshot->version()->numPages();
LOG_FMT_INFO(log, "{} restore {} pages, write batch sequence: {}, {}", storage_name, num_pages, write_batch_seq, statistics.toString());
}

// Fixed namespace id 0
return {{0, snapshot->version()->maxId()}};
PageId PageStorage::getMaxId(NamespaceId /*ns_id*/)
{
std::lock_guard write_lock(write_mutex);
return versioned_page_entries.getSnapshot("")->version()->maxId();
}

PageId PageStorage::getNormalPageIdImpl(NamespaceId /*ns_id*/, PageId page_id, SnapshotPtr snapshot, bool throw_on_not_exist)
Expand Down
4 changes: 3 additions & 1 deletion dbms/src/Storages/Page/V2/PageStorage.h
Original file line number Diff line number Diff line change
Expand Up @@ -91,10 +91,12 @@ class PageStorage : public DB::PageStorage
const FileProviderPtr & file_provider_);
~PageStorage() = default;

std::map<NamespaceId, PageId> restore() override;
void restore() override;

void drop() override;

PageId getMaxId(NamespaceId ns_id) override;

PageId getNormalPageIdImpl(NamespaceId ns_id, PageId page_id, SnapshotPtr snapshot, bool throw_on_not_exist) override;

DB::PageStorage::SnapshotPtr getSnapshot(const String & tracing_id) override;
Expand Down
28 changes: 0 additions & 28 deletions dbms/src/Storages/Page/V2/tests/gtest_page_storage.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -79,15 +79,6 @@ class PageStorage_test : public DB::base::TiFlashStorageTestBasic
return storage;
}

std::pair<std::shared_ptr<PageStorage>, std::map<NamespaceId, PageId>> reopen()
{
auto delegator = path_pool->getPSDiskDelegatorSingle("log");
auto storage = std::make_shared<PageStorage>("test.t", delegator, config, file_provider);
auto max_ids = storage->restore();
return {storage, max_ids};
}


protected:
PageStorage::Config config;
std::shared_ptr<PageStorage> storage;
Expand Down Expand Up @@ -736,25 +727,6 @@ try
}
CATCH

TEST_F(PageStorage_test, getMaxIdsFromRestore)
try
{
{
WriteBatch batch;
batch.putExternal(1, 0);
batch.putExternal(2, 0);
batch.delPage(1);
batch.delPage(2);
storage->write(std::move(batch));
}

storage = nullptr;
auto [page_storage, max_ids] = reopen();
ASSERT_EQ(max_ids.size(), 1);
ASSERT_EQ(max_ids[0], 2);
}
CATCH

TEST_F(PageStorage_test, IgnoreIncompleteWriteBatch1)
try
{
Expand Down
2 changes: 1 addition & 1 deletion dbms/src/Storages/Page/V3/BlobStore.h
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ extern const int LOGICAL_ERROR;

namespace PS::V3
{
using PageIdAndVersionedEntries = std::vector<std::tuple<PageIdV3Internal, PageVersionType, PageEntryV3>>;
using PageIdAndVersionedEntries = std::vector<std::tuple<PageIdV3Internal, PageVersion, PageEntryV3>>;

class BlobStore : private Allocator<false>
{
Expand Down
Loading

0 comments on commit 43b25fc

Please sign in to comment.