diff --git a/dbms/src/Storages/DeltaMerge/DeltaMergeStore.cpp b/dbms/src/Storages/DeltaMerge/DeltaMergeStore.cpp index 997db601d1e..27de092c26a 100644 --- a/dbms/src/Storages/DeltaMerge/DeltaMergeStore.cpp +++ b/dbms/src/Storages/DeltaMerge/DeltaMergeStore.cpp @@ -1971,6 +1971,29 @@ void DeltaMergeStore::segmentMerge(DMContext & dm_context, const SegmentPtr & le right->info(), dm_context.min_version); + /// This segment may contain some rows that not belong to this segment range which is left by previous split operation. + /// And only saved data in this segment will be filtered by the segment range in the merge process, + /// unsaved data will be directly copied to the new segment. + /// So we flush here to make sure that all potential data left by previous split operation is saved. + while (!left->flushCache(dm_context)) + { + // keep flush until success if not abandoned + if (left->hasAbandoned()) + { + LOG_FMT_DEBUG(log, "Give up merge segments left [{}], right [{}]", left->segmentId(), right->segmentId()); + return; + } + } + while (!right->flushCache(dm_context)) + { + // keep flush until success if not abandoned + if (right->hasAbandoned()) + { + LOG_FMT_DEBUG(log, "Give up merge segments left [{}], right [{}]", left->segmentId(), right->segmentId()); + return; + } + } + SegmentSnapshotPtr left_snap; SegmentSnapshotPtr right_snap; ColumnDefinesPtr schema_snap; diff --git a/dbms/src/Storages/DeltaMerge/Segment.cpp b/dbms/src/Storages/DeltaMerge/Segment.cpp index 9e195b2b25d..8398fdcee40 100644 --- a/dbms/src/Storages/DeltaMerge/Segment.cpp +++ b/dbms/src/Storages/DeltaMerge/Segment.cpp @@ -306,7 +306,7 @@ bool Segment::writeToCache(DMContext & dm_context, const Block & block, size_t o return delta->appendToCache(dm_context, block, offset, limit); } -bool Segment::write(DMContext & dm_context, const Block & block) +bool Segment::write(DMContext & dm_context, const Block & block, bool flush_cache) { LOG_FMT_TRACE(log, "Segment [{}] write to disk rows: {}", segment_id, block.rows()); WriteBatches wbs(dm_context.storage_pool, dm_context.getWriteLimiter()); @@ -316,7 +316,14 @@ bool Segment::write(DMContext & dm_context, const Block & block) if (delta->appendColumnFile(dm_context, column_file)) { - flushCache(dm_context); + if (flush_cache) + { + while (!flushCache(dm_context)) + { + if (hasAbandoned()) + return false; + } + } return true; } else @@ -1129,6 +1136,29 @@ SegmentPair Segment::applySplit(DMContext & dm_context, // SegmentPtr Segment::merge(DMContext & dm_context, const ColumnDefinesPtr & schema_snap, const SegmentPtr & left, const SegmentPtr & right) { WriteBatches wbs(dm_context.storage_pool, dm_context.getWriteLimiter()); + /// This segment may contain some rows that not belong to this segment range which is left by previous split operation. + /// And only saved data in this segment will be filtered by the segment range in the merge process, + /// unsaved data will be directly copied to the new segment. + /// So we flush here to make sure that all potential data left by previous split operation is saved. + while (!left->flushCache(dm_context)) + { + // keep flush until success if not abandoned + if (left->hasAbandoned()) + { + LOG_FMT_DEBUG(left->log, "Give up merge segments left [{}], right [{}]", left->segmentId(), right->segmentId()); + return {}; + } + } + while (!right->flushCache(dm_context)) + { + // keep flush until success if not abandoned + if (right->hasAbandoned()) + { + LOG_FMT_DEBUG(right->log, "Give up merge segments left [{}], right [{}]", left->segmentId(), right->segmentId()); + return {}; + } + } + auto left_snap = left->createSnapshot(dm_context, true, CurrentMetrics::DT_SnapshotOfSegmentMerge); auto right_snap = right->createSnapshot(dm_context, true, CurrentMetrics::DT_SnapshotOfSegmentMerge); @@ -1149,6 +1179,10 @@ SegmentPtr Segment::merge(DMContext & dm_context, const ColumnDefinesPtr & schem return merged; } +/// Segments may contain some rows that not belong to its range which is left by previous split operation. +/// And only saved data in the segment will be filtered by the segment range in the merge process, +/// unsaved data will be directly copied to the new segment. +/// So remember to do a flush for the segments before merge. StableValueSpacePtr Segment::prepareMerge(DMContext & dm_context, // const ColumnDefinesPtr & schema_snap, const SegmentPtr & left, diff --git a/dbms/src/Storages/DeltaMerge/Segment.h b/dbms/src/Storages/DeltaMerge/Segment.h index 3ad29ee14a5..cccfc5091b9 100644 --- a/dbms/src/Storages/DeltaMerge/Segment.h +++ b/dbms/src/Storages/DeltaMerge/Segment.h @@ -144,7 +144,8 @@ class Segment : private boost::noncopyable bool writeToCache(DMContext & dm_context, const Block & block, size_t offset, size_t limit); /// For test only. - bool write(DMContext & dm_context, const Block & block); + bool write(DMContext & dm_context, const Block & block, bool flush_cache = true); + bool write(DMContext & dm_context, const RowKeyRange & delete_range); bool ingestColumnFiles(DMContext & dm_context, const RowKeyRange & range, const ColumnFiles & column_files, bool clear_data_in_range); diff --git a/dbms/src/Storages/DeltaMerge/tests/gtest_dm_segment.cpp b/dbms/src/Storages/DeltaMerge/tests/gtest_dm_segment.cpp index 6bf33465366..5726cfa132d 100644 --- a/dbms/src/Storages/DeltaMerge/tests/gtest_dm_segment.cpp +++ b/dbms/src/Storages/DeltaMerge/tests/gtest_dm_segment.cpp @@ -949,11 +949,17 @@ CATCH TEST_F(Segment_test, Split) try { - const size_t num_rows_write = 100; + const size_t num_rows_write_per_batch = 100; + const size_t num_rows_write = num_rows_write_per_batch * 2; { - // write to segment - Block block = DMTestEnv::prepareSimpleWriteBlock(0, num_rows_write, false); - segment->write(dmContext(), std::move(block)); + // write to segment and flush + Block block = DMTestEnv::prepareSimpleWriteBlock(0, num_rows_write_per_batch, false); + segment->write(dmContext(), std::move(block), true); + } + { + // write to segment and don't flush + Block block = DMTestEnv::prepareSimpleWriteBlock(num_rows_write_per_batch, 2 * num_rows_write_per_batch, false); + segment->write(dmContext(), std::move(block), false); } { @@ -989,7 +995,7 @@ try size_t num_rows_seg2 = 0; { { - auto in = segment->getInputStream(dmContext(), *tableColumns(), {RowKeyRange::newAll(false, 1)}); + auto in = segment->getInputStream(dmContext(), *tableColumns(), {segment->getRowKeyRange()}); in->readPrefix(); while (Block block = in->read()) { @@ -998,7 +1004,7 @@ try in->readSuffix(); } { - auto in = segment->getInputStream(dmContext(), *tableColumns(), {RowKeyRange::newAll(false, 1)}); + auto in = new_segment->getInputStream(dmContext(), *tableColumns(), {new_segment->getRowKeyRange()}); in->readPrefix(); while (Block block = in->read()) { @@ -1009,9 +1015,13 @@ try ASSERT_EQ(num_rows_seg1 + num_rows_seg2, num_rows_write); } + // delete rows in the right segment + { + new_segment->write(dmContext(), /*delete_range*/ new_segment->getRowKeyRange()); + new_segment->flushCache(dmContext()); + } + // merge segments - // TODO: enable merge test! - if (false) { segment = Segment::merge(dmContext(), tableColumns(), segment, new_segment); { @@ -1030,7 +1040,7 @@ try num_rows_read += block.rows(); } in->readSuffix(); - EXPECT_EQ(num_rows_read, num_rows_write); + EXPECT_EQ(num_rows_read, num_rows_seg1); } } }