Skip to content

Commit

Permalink
Merge branch 'master' into overflowInt128
Browse files Browse the repository at this point in the history
  • Loading branch information
ti-chi-bot authored May 19, 2022
2 parents d6cd540 + 952cb0a commit 4463059
Show file tree
Hide file tree
Showing 20 changed files with 358 additions and 103 deletions.
10 changes: 5 additions & 5 deletions dbms/src/Storages/DeltaMerge/ColumnFile/ColumnFileBig.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ ColumnFileBig::getReader(const DMContext & context, const StorageSnapshotPtr & /

void ColumnFileBig::serializeMetadata(WriteBuffer & buf, bool /*save_schema*/) const
{
writeIntBinary(file->refId(), buf);
writeIntBinary(file->pageId(), buf);
writeIntBinary(valid_rows, buf);
writeIntBinary(valid_bytes, buf);
}
Expand All @@ -65,17 +65,17 @@ ColumnFilePersistedPtr ColumnFileBig::deserializeMetadata(DMContext & context, /
const RowKeyRange & segment_range,
ReadBuffer & buf)
{
UInt64 file_ref_id;
UInt64 file_page_id;
size_t valid_rows, valid_bytes;

readIntBinary(file_ref_id, buf);
readIntBinary(file_page_id, buf);
readIntBinary(valid_rows, buf);
readIntBinary(valid_bytes, buf);

auto file_id = context.storage_pool.dataReader()->getNormalPageId(file_ref_id);
auto file_id = context.storage_pool.dataReader()->getNormalPageId(file_page_id);
auto file_parent_path = context.path_pool.getStableDiskDelegator().getDTFilePath(file_id);

auto dmfile = DMFile::restore(context.db_context.getFileProvider(), file_id, file_ref_id, file_parent_path, DMFile::ReadMetaMode::all());
auto dmfile = DMFile::restore(context.db_context.getFileProvider(), file_id, file_page_id, file_parent_path, DMFile::ReadMetaMode::all());

auto * dp_file = new ColumnFileBig(dmfile, valid_rows, valid_bytes, segment_range);
return std::shared_ptr<ColumnFileBig>(dp_file);
Expand Down
12 changes: 6 additions & 6 deletions dbms/src/Storages/DeltaMerge/ColumnFile/ColumnFileBig.h
Original file line number Diff line number Diff line change
Expand Up @@ -33,8 +33,8 @@ class ColumnFileBig : public ColumnFilePersisted

private:
DMFilePtr file;
size_t valid_rows;
size_t valid_bytes;
size_t valid_rows = 0;
size_t valid_bytes = 0;

RowKeyRange segment_range;

Expand Down Expand Up @@ -67,17 +67,17 @@ class ColumnFileBig : public ColumnFilePersisted

auto getFile() const { return file; }

PageId getDataPageId() { return file->refId(); }
PageId getDataPageId() { return file->pageId(); }

size_t getRows() const override { return valid_rows; }
size_t getBytes() const override { return valid_bytes; };

void removeData(WriteBatches & wbs) const override
{
// Here we remove the ref id instead of file_id.
// Because a dmfile could be used in serveral places, and only after all ref_ids are removed,
// Here we remove the data id instead of file_id.
// Because a dmfile could be used in several places, and only after all page ids are removed,
// then the file_id got removed.
wbs.removed_data.delPage(file->refId());
wbs.removed_data.delPage(file->pageId());
}

ColumnFileReaderPtr
Expand Down
9 changes: 6 additions & 3 deletions dbms/src/Storages/DeltaMerge/Delta/ColumnFilePersistedSet.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -216,11 +216,14 @@ ColumnFilePersisteds ColumnFilePersistedSet::checkHeadAndCloneTail(DMContext & c
else if (auto * b_file = column_file->tryToBigFile(); b_file)
{
auto delegator = context.path_pool.getStableDiskDelegator();
auto new_ref_id = context.storage_pool.newDataPageIdForDTFile(delegator, __PRETTY_FUNCTION__);
auto new_page_id = context.storage_pool.newDataPageIdForDTFile(delegator, __PRETTY_FUNCTION__);
// Note that the file id may has already been mark as deleted. We must
// create a reference to the page id itself instead of create a reference
// to the file id.
wbs.data.putRefPage(new_page_id, b_file->getDataPageId());
auto file_id = b_file->getFile()->fileId();
wbs.data.putRefPage(new_ref_id, file_id);
auto file_parent_path = delegator.getDTFilePath(file_id);
auto new_file = DMFile::restore(context.db_context.getFileProvider(), file_id, /* ref_id= */ new_ref_id, file_parent_path, DMFile::ReadMetaMode::all());
auto new_file = DMFile::restore(context.db_context.getFileProvider(), file_id, /* page_id= */ new_page_id, file_parent_path, DMFile::ReadMetaMode::all());

auto new_big_file = b_file->cloneWith(context, new_file, target_range);
cloned_tail.push_back(new_big_file);
Expand Down
4 changes: 2 additions & 2 deletions dbms/src/Storages/DeltaMerge/Delta/ColumnFilePersistedSet.h
Original file line number Diff line number Diff line change
Expand Up @@ -57,8 +57,8 @@ class ColumnFilePersistedSet : public std::enable_shared_from_this<ColumnFilePer
PageId metadata_id;
ColumnFilePersistedLevels persisted_files_levels;
// TODO: check the proper memory_order when use this atomic variable
std::atomic<size_t> persisted_files_count;
std::atomic<size_t> persisted_files_level_count;
std::atomic<size_t> persisted_files_count = 0;
std::atomic<size_t> persisted_files_level_count = 0;

std::atomic<size_t> rows = 0;
std::atomic<size_t> bytes = 0;
Expand Down
9 changes: 6 additions & 3 deletions dbms/src/Storages/DeltaMerge/Delta/MemTableSet.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -104,11 +104,14 @@ ColumnFiles MemTableSet::cloneColumnFiles(DMContext & context, const RowKeyRange
else if (auto * f = column_file->tryToBigFile(); f)
{
auto delegator = context.path_pool.getStableDiskDelegator();
auto new_ref_id = context.storage_pool.newDataPageIdForDTFile(delegator, __PRETTY_FUNCTION__);
auto new_page_id = context.storage_pool.newDataPageIdForDTFile(delegator, __PRETTY_FUNCTION__);
// Note that the file id may has already been mark as deleted. We must
// create a reference to the page id itself instead of create a reference
// to the file id.
wbs.data.putRefPage(new_page_id, f->getDataPageId());
auto file_id = f->getFile()->fileId();
wbs.data.putRefPage(new_ref_id, file_id);
auto file_parent_path = delegator.getDTFilePath(file_id);
auto new_file = DMFile::restore(context.db_context.getFileProvider(), file_id, /* ref_id= */ new_ref_id, file_parent_path, DMFile::ReadMetaMode::all());
auto new_file = DMFile::restore(context.db_context.getFileProvider(), file_id, /* page_id= */ new_page_id, file_parent_path, DMFile::ReadMetaMode::all());

auto new_column_file = f->cloneWith(context, new_file, target_range);
cloned_column_files.push_back(new_column_file);
Expand Down
12 changes: 6 additions & 6 deletions dbms/src/Storages/DeltaMerge/DeltaMergeStore.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -314,8 +314,8 @@ void DeltaMergeStore::setUpBackgroundTask(const DMContextPtr & dm_context)
if (valid_ids.count(id))
continue;

// Note that ref_id is useless here.
auto dmfile = DMFile::restore(global_context.getFileProvider(), id, /* ref_id= */ 0, path, DMFile::ReadMetaMode::none());
// Note that page_id is useless here.
auto dmfile = DMFile::restore(global_context.getFileProvider(), id, /* page_id= */ 0, path, DMFile::ReadMetaMode::none());
if (dmfile->canGC())
{
delegate.removeDTFile(dmfile->fileId());
Expand Down Expand Up @@ -820,14 +820,14 @@ void DeltaMergeStore::ingestFiles(
/// Generate DMFile instance with a new ref_id pointed to the file_id.
auto file_id = file->fileId();
const auto & file_parent_path = file->parentPath();
auto ref_id = storage_pool->newDataPageIdForDTFile(delegate, __PRETTY_FUNCTION__);
auto page_id = storage_pool->newDataPageIdForDTFile(delegate, __PRETTY_FUNCTION__);

auto ref_file = DMFile::restore(file_provider, file_id, ref_id, file_parent_path, DMFile::ReadMetaMode::all());
auto ref_file = DMFile::restore(file_provider, file_id, page_id, file_parent_path, DMFile::ReadMetaMode::all());
auto column_file = std::make_shared<ColumnFileBig>(*dm_context, ref_file, segment_range);
if (column_file->getRows() != 0)
{
column_files.emplace_back(std::move(column_file));
wbs.data.putRefPage(ref_id, file_id);
wbs.data.putRefPage(page_id, file->pageId());
}
}

Expand Down Expand Up @@ -2343,7 +2343,7 @@ void DeltaMergeStore::restoreStableFiles()
{
for (const auto & file_id : DMFile::listAllInPath(file_provider, root_path, options))
{
auto dmfile = DMFile::restore(file_provider, file_id, /* ref_id= */ 0, root_path, DMFile::ReadMetaMode::diskSizeOnly());
auto dmfile = DMFile::restore(file_provider, file_id, /* page_id= */ 0, root_path, DMFile::ReadMetaMode::diskSizeOnly());
path_delegate.addDTFile(file_id, dmfile->getBytesOnDisk(), root_path);
}
}
Expand Down
6 changes: 3 additions & 3 deletions dbms/src/Storages/DeltaMerge/File/DMFile.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -146,15 +146,15 @@ DMFilePtr DMFile::create(UInt64 file_id, const String & parent_path, bool single
DMFilePtr DMFile::restore(
const FileProviderPtr & file_provider,
UInt64 file_id,
UInt64 ref_id,
UInt64 page_id,
const String & parent_path,
const ReadMetaMode & read_meta_mode)
{
String path = getPathByStatus(parent_path, file_id, DMFile::Status::READABLE);
bool single_file_mode = Poco::File(path).isFile();
DMFilePtr dmfile(new DMFile(
file_id,
ref_id,
page_id,
parent_path,
single_file_mode ? Mode::SINGLE_FILE : Mode::FOLDER,
Status::READABLE,
Expand Down Expand Up @@ -556,7 +556,7 @@ void DMFile::readMetadata(const FileProviderPtr & file_provider, const ReadMetaM
DB::readIntBinary(footer.sub_file_num, buf);
// initialize sub file state
buf.seek(footer.sub_file_stat_offset, SEEK_SET);
SubFileStat sub_file_stat;
SubFileStat sub_file_stat{};
for (UInt32 i = 0; i < footer.sub_file_num; i++)
{
String name;
Expand Down
18 changes: 12 additions & 6 deletions dbms/src/Storages/DeltaMerge/File/DMFile.h
Original file line number Diff line number Diff line change
Expand Up @@ -141,7 +141,9 @@ class DMFile : private boost::noncopyable

struct SubFileStat
{
SubFileStat() = default;
SubFileStat()
: SubFileStat(0, 0)
{}
SubFileStat(UInt64 offset_, UInt64 size_)
: offset{offset_}
, size{size_}
Expand Down Expand Up @@ -195,7 +197,7 @@ class DMFile : private boost::noncopyable
static DMFilePtr restore(
const FileProviderPtr & file_provider,
UInt64 file_id,
UInt64 ref_id,
UInt64 page_id,
const String & parent_path,
const ReadMetaMode & read_meta_mode);

Expand All @@ -216,8 +218,10 @@ class DMFile : private boost::noncopyable
void enableGC();
void remove(const FileProviderPtr & file_provider);

// The ID for locating DTFile on disk
UInt64 fileId() const { return file_id; }
UInt64 refId() const { return ref_id; }
// The PageID for locating this object in the StoragePool.data
UInt64 pageId() const { return page_id; }

String path() const;

Expand Down Expand Up @@ -289,14 +293,14 @@ class DMFile : private boost::noncopyable

private:
DMFile(UInt64 file_id_,
UInt64 ref_id_,
UInt64 page_id_,
String parent_path_,
Mode mode_,
Status status_,
Poco::Logger * log_,
DMConfigurationOpt configuration_ = std::nullopt)
: file_id(file_id_)
, ref_id(ref_id_)
, page_id(page_id_)
, parent_path(std::move(parent_path_))
, mode(mode_)
, status(status_)
Expand Down Expand Up @@ -395,8 +399,10 @@ class DMFile : private boost::noncopyable
void initializeIndices();

private:
// The id to construct the file path on disk.
UInt64 file_id;
UInt64 ref_id; // It is a reference to file_id, could be the same.
// It is the page_id that represent this file in the PageStorage. It could be the same as file id.
UInt64 page_id;
String parent_path;

PackStats pack_stats;
Expand Down
21 changes: 12 additions & 9 deletions dbms/src/Storages/DeltaMerge/Segment.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -919,27 +919,30 @@ std::optional<Segment::SplitInfo> Segment::prepareSplitLogical(DMContext & dm_co
auto delegate = dm_context.path_pool.getStableDiskDelegator();
for (const auto & dmfile : segment_snap->stable->getDMFiles())
{
auto ori_ref_id = dmfile->refId();
auto ori_page_id = dmfile->pageId();
auto file_id = dmfile->fileId();
auto file_parent_path = delegate.getDTFilePath(file_id);

auto my_dmfile_id = storage_pool.newDataPageIdForDTFile(delegate, __PRETTY_FUNCTION__);
auto other_dmfile_id = storage_pool.newDataPageIdForDTFile(delegate, __PRETTY_FUNCTION__);
auto my_dmfile_page_id = storage_pool.newDataPageIdForDTFile(delegate, __PRETTY_FUNCTION__);
auto other_dmfile_page_id = storage_pool.newDataPageIdForDTFile(delegate, __PRETTY_FUNCTION__);

wbs.data.putRefPage(my_dmfile_id, file_id);
wbs.data.putRefPage(other_dmfile_id, file_id);
wbs.removed_data.delPage(ori_ref_id);
// Note that the file id may has already been mark as deleted. We must
// create a reference to the page id itself instead of create a reference
// to the file id.
wbs.data.putRefPage(my_dmfile_page_id, ori_page_id);
wbs.data.putRefPage(other_dmfile_page_id, ori_page_id);
wbs.removed_data.delPage(ori_page_id);

auto my_dmfile = DMFile::restore(
dm_context.db_context.getFileProvider(),
file_id,
/* ref_id= */ my_dmfile_id,
/* page_id= */ my_dmfile_page_id,
file_parent_path,
DMFile::ReadMetaMode::all());
auto other_dmfile = DMFile::restore(
dm_context.db_context.getFileProvider(),
file_id,
/* ref_id= */ other_dmfile_id,
/* page_id= */ other_dmfile_page_id,
file_parent_path,
DMFile::ReadMetaMode::all());

Expand Down Expand Up @@ -1059,7 +1062,7 @@ std::optional<Segment::SplitInfo> Segment::prepareSplitPhysical(DMContext & dm_c
{
// Here we should remove the ref id instead of file_id.
// Because a dmfile could be used by several segments, and only after all ref_ids are removed, then the file_id removed.
wbs.removed_data.delPage(file->refId());
wbs.removed_data.delPage(file->pageId());
}

LOG_FMT_INFO(log, "Segment [{}] prepare split physical done", segment_id);
Expand Down
12 changes: 6 additions & 6 deletions dbms/src/Storages/DeltaMerge/StableValueSpace.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,7 @@ void StableValueSpace::saveMeta(WriteBatch & meta_wb)
writeIntBinary(valid_bytes, buf);
writeIntBinary(static_cast<UInt64>(files.size()), buf);
for (auto & f : files)
writeIntBinary(f->refId(), buf);
writeIntBinary(f->pageId(), buf);

auto data_size = buf.count(); // Must be called before tryGetReadBuffer.
meta_wb.putPage(id, 0, buf.tryGetReadBuffer(), data_size);
Expand All @@ -100,15 +100,15 @@ StableValueSpacePtr StableValueSpace::restore(DMContext & context, PageId id)
readIntBinary(valid_rows, buf);
readIntBinary(valid_bytes, buf);
readIntBinary(size, buf);
UInt64 ref_id;
UInt64 page_id;
for (size_t i = 0; i < size; ++i)
{
readIntBinary(ref_id, buf);
readIntBinary(page_id, buf);

auto file_id = context.storage_pool.dataReader()->getNormalPageId(ref_id);
auto file_id = context.storage_pool.dataReader()->getNormalPageId(page_id);
auto file_parent_path = context.path_pool.getStableDiskDelegator().getDTFilePath(file_id);

auto dmfile = DMFile::restore(context.db_context.getFileProvider(), file_id, ref_id, file_parent_path, DMFile::ReadMetaMode::all());
auto dmfile = DMFile::restore(context.db_context.getFileProvider(), file_id, page_id, file_parent_path, DMFile::ReadMetaMode::all());
stable->files.push_back(dmfile);
}

Expand Down Expand Up @@ -168,7 +168,7 @@ void StableValueSpace::recordRemovePacksPages(WriteBatches & wbs) const
{
// Here we should remove the ref id instead of file_id.
// Because a dmfile could be used by several segments, and only after all ref_ids are removed, then the file_id removed.
wbs.removed_data.delPage(file->refId());
wbs.removed_data.delPage(file->pageId());
}
}

Expand Down
Loading

0 comments on commit 4463059

Please sign in to comment.