Skip to content

Commit

Permalink
flush cache before segment merge (#4955) (#4970)
Browse files Browse the repository at this point in the history
* flush cache before segment merge

* keep flush until success

* check whether segment is valid if flush failed

* Update dbms/src/Storages/DeltaMerge/tests/gtest_dm_segment.cpp

Co-authored-by: JaySon <tshent@qq.com>

* add more fix

* check flush result in segment::write

Co-authored-by: lidezhu <lidezhu@pingcap.com>
Co-authored-by: lidezhu <47731263+lidezhu@users.noreply.github.com>
Co-authored-by: JaySon <tshent@qq.com>
  • Loading branch information
4 people authored May 23, 2022
1 parent 71613fd commit 5dc640b
Show file tree
Hide file tree
Showing 4 changed files with 80 additions and 12 deletions.
23 changes: 23 additions & 0 deletions dbms/src/Storages/DeltaMerge/DeltaMergeStore.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1971,6 +1971,29 @@ void DeltaMergeStore::segmentMerge(DMContext & dm_context, const SegmentPtr & le
right->info(),
dm_context.min_version);

/// This segment may contain some rows that not belong to this segment range which is left by previous split operation.
/// And only saved data in this segment will be filtered by the segment range in the merge process,
/// unsaved data will be directly copied to the new segment.
/// So we flush here to make sure that all potential data left by previous split operation is saved.
while (!left->flushCache(dm_context))
{
// keep flush until success if not abandoned
if (left->hasAbandoned())
{
LOG_FMT_DEBUG(log, "Give up merge segments left [{}], right [{}]", left->segmentId(), right->segmentId());
return;
}
}
while (!right->flushCache(dm_context))
{
// keep flush until success if not abandoned
if (right->hasAbandoned())
{
LOG_FMT_DEBUG(log, "Give up merge segments left [{}], right [{}]", left->segmentId(), right->segmentId());
return;
}
}

SegmentSnapshotPtr left_snap;
SegmentSnapshotPtr right_snap;
ColumnDefinesPtr schema_snap;
Expand Down
38 changes: 36 additions & 2 deletions dbms/src/Storages/DeltaMerge/Segment.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -306,7 +306,7 @@ bool Segment::writeToCache(DMContext & dm_context, const Block & block, size_t o
return delta->appendToCache(dm_context, block, offset, limit);
}

bool Segment::write(DMContext & dm_context, const Block & block)
bool Segment::write(DMContext & dm_context, const Block & block, bool flush_cache)
{
LOG_FMT_TRACE(log, "Segment [{}] write to disk rows: {}", segment_id, block.rows());
WriteBatches wbs(dm_context.storage_pool, dm_context.getWriteLimiter());
Expand All @@ -316,7 +316,14 @@ bool Segment::write(DMContext & dm_context, const Block & block)

if (delta->appendColumnFile(dm_context, column_file))
{
flushCache(dm_context);
if (flush_cache)
{
while (!flushCache(dm_context))
{
if (hasAbandoned())
return false;
}
}
return true;
}
else
Expand Down Expand Up @@ -1129,6 +1136,29 @@ SegmentPair Segment::applySplit(DMContext & dm_context, //
SegmentPtr Segment::merge(DMContext & dm_context, const ColumnDefinesPtr & schema_snap, const SegmentPtr & left, const SegmentPtr & right)
{
WriteBatches wbs(dm_context.storage_pool, dm_context.getWriteLimiter());
/// This segment may contain some rows that not belong to this segment range which is left by previous split operation.
/// And only saved data in this segment will be filtered by the segment range in the merge process,
/// unsaved data will be directly copied to the new segment.
/// So we flush here to make sure that all potential data left by previous split operation is saved.
while (!left->flushCache(dm_context))
{
// keep flush until success if not abandoned
if (left->hasAbandoned())
{
LOG_FMT_DEBUG(left->log, "Give up merge segments left [{}], right [{}]", left->segmentId(), right->segmentId());
return {};
}
}
while (!right->flushCache(dm_context))
{
// keep flush until success if not abandoned
if (right->hasAbandoned())
{
LOG_FMT_DEBUG(right->log, "Give up merge segments left [{}], right [{}]", left->segmentId(), right->segmentId());
return {};
}
}


auto left_snap = left->createSnapshot(dm_context, true, CurrentMetrics::DT_SnapshotOfSegmentMerge);
auto right_snap = right->createSnapshot(dm_context, true, CurrentMetrics::DT_SnapshotOfSegmentMerge);
Expand All @@ -1149,6 +1179,10 @@ SegmentPtr Segment::merge(DMContext & dm_context, const ColumnDefinesPtr & schem
return merged;
}

/// Segments may contain some rows that not belong to its range which is left by previous split operation.
/// And only saved data in the segment will be filtered by the segment range in the merge process,
/// unsaved data will be directly copied to the new segment.
/// So remember to do a flush for the segments before merge.
StableValueSpacePtr Segment::prepareMerge(DMContext & dm_context, //
const ColumnDefinesPtr & schema_snap,
const SegmentPtr & left,
Expand Down
3 changes: 2 additions & 1 deletion dbms/src/Storages/DeltaMerge/Segment.h
Original file line number Diff line number Diff line change
Expand Up @@ -144,7 +144,8 @@ class Segment : private boost::noncopyable
bool writeToCache(DMContext & dm_context, const Block & block, size_t offset, size_t limit);

/// For test only.
bool write(DMContext & dm_context, const Block & block);
bool write(DMContext & dm_context, const Block & block, bool flush_cache = true);

bool write(DMContext & dm_context, const RowKeyRange & delete_range);
bool ingestColumnFiles(DMContext & dm_context, const RowKeyRange & range, const ColumnFiles & column_files, bool clear_data_in_range);

Expand Down
28 changes: 19 additions & 9 deletions dbms/src/Storages/DeltaMerge/tests/gtest_dm_segment.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -949,11 +949,17 @@ CATCH
TEST_F(Segment_test, Split)
try
{
const size_t num_rows_write = 100;
const size_t num_rows_write_per_batch = 100;
const size_t num_rows_write = num_rows_write_per_batch * 2;
{
// write to segment
Block block = DMTestEnv::prepareSimpleWriteBlock(0, num_rows_write, false);
segment->write(dmContext(), std::move(block));
// write to segment and flush
Block block = DMTestEnv::prepareSimpleWriteBlock(0, num_rows_write_per_batch, false);
segment->write(dmContext(), std::move(block), true);
}
{
// write to segment and don't flush
Block block = DMTestEnv::prepareSimpleWriteBlock(num_rows_write_per_batch, 2 * num_rows_write_per_batch, false);
segment->write(dmContext(), std::move(block), false);
}

{
Expand Down Expand Up @@ -989,7 +995,7 @@ try
size_t num_rows_seg2 = 0;
{
{
auto in = segment->getInputStream(dmContext(), *tableColumns(), {RowKeyRange::newAll(false, 1)});
auto in = segment->getInputStream(dmContext(), *tableColumns(), {segment->getRowKeyRange()});
in->readPrefix();
while (Block block = in->read())
{
Expand All @@ -998,7 +1004,7 @@ try
in->readSuffix();
}
{
auto in = segment->getInputStream(dmContext(), *tableColumns(), {RowKeyRange::newAll(false, 1)});
auto in = new_segment->getInputStream(dmContext(), *tableColumns(), {new_segment->getRowKeyRange()});
in->readPrefix();
while (Block block = in->read())
{
Expand All @@ -1009,9 +1015,13 @@ try
ASSERT_EQ(num_rows_seg1 + num_rows_seg2, num_rows_write);
}

// delete rows in the right segment
{
new_segment->write(dmContext(), /*delete_range*/ new_segment->getRowKeyRange());
new_segment->flushCache(dmContext());
}

// merge segments
// TODO: enable merge test!
if (false)
{
segment = Segment::merge(dmContext(), tableColumns(), segment, new_segment);
{
Expand All @@ -1030,7 +1040,7 @@ try
num_rows_read += block.rows();
}
in->readSuffix();
EXPECT_EQ(num_rows_read, num_rows_write);
EXPECT_EQ(num_rows_read, num_rows_seg1);
}
}
}
Expand Down

0 comments on commit 5dc640b

Please sign in to comment.