From f8f583d5369438ebeba036c7c97032a027a5b0ed Mon Sep 17 00:00:00 2001 From: Ti Chi Robot Date: Wed, 20 Jul 2022 17:15:09 +0800 Subject: [PATCH] Flush segment cache when doing the compaction (#5284) (#5296) close pingcap/tiflash#5179 --- .../Storages/DeltaMerge/DeltaMergeStore.cpp | 7 +++ .../tests/gtest_dm_delta_merge_store.cpp | 49 +++++++++++++++++++ 2 files changed, 56 insertions(+) diff --git a/dbms/src/Storages/DeltaMerge/DeltaMergeStore.cpp b/dbms/src/Storages/DeltaMerge/DeltaMergeStore.cpp index a74404f3dbb..01747e8e1fe 100644 --- a/dbms/src/Storages/DeltaMerge/DeltaMergeStore.cpp +++ b/dbms/src/Storages/DeltaMerge/DeltaMergeStore.cpp @@ -1054,6 +1054,13 @@ std::optional DeltaMergeStore::mergeDeltaBySegment(const Contex segment = segment_it->second; } + if (!segment->flushCache(*dm_context)) + { + // If the flush failed, it means there are parallel updates to the segment in the background. + // In this case, we try again. + continue; + } + const auto new_segment = segmentMergeDelta(*dm_context, segment, run_thread); if (new_segment) { diff --git a/dbms/src/Storages/DeltaMerge/tests/gtest_dm_delta_merge_store.cpp b/dbms/src/Storages/DeltaMerge/tests/gtest_dm_delta_merge_store.cpp index 1d0e00a5b58..caa18661a37 100644 --- a/dbms/src/Storages/DeltaMerge/tests/gtest_dm_delta_merge_store.cpp +++ b/dbms/src/Storages/DeltaMerge/tests/gtest_dm_delta_merge_store.cpp @@ -3573,6 +3573,55 @@ try CATCH +// Verify that unflushed data will also be compacted. +TEST_P(DeltaMergeStoreMergeDeltaBySegmentTest, Flush) +try +{ + { + // Write data to first 3 segments and flush. + auto newly_written_rows = helper->rows_by_segments[0] + helper->rows_by_segments[1] + helper->rows_by_segments[2]; + Block block = DMTestEnv::prepareSimpleWriteBlock(0, newly_written_rows, false, pk_type, 5 /* new tso */); + store->write(*db_context, db_context->getSettingsRef(), block); + store->flushCache(dm_context, RowKeyRange::newAll(store->isCommonHandle(), store->getRowKeyColumnSize())); + + helper->expected_delta_rows[0] += helper->rows_by_segments[0]; + helper->expected_delta_rows[1] += helper->rows_by_segments[1]; + helper->expected_delta_rows[2] += helper->rows_by_segments[2]; + helper->verifyExpectedRowsForAllSegments(); + + auto segment1 = std::next(store->segments.begin())->second; + ASSERT_EQ(segment1->getDelta()->getUnsavedRows(), 0); + } + { + // Write new data to segment[1] without flush. + auto newly_written_rows = helper->rows_by_segments[1]; + Block block = DMTestEnv::prepareSimpleWriteBlock(helper->rows_by_segments[0], helper->rows_by_segments[0] + newly_written_rows, false, pk_type, 10 /* new tso */); + store->write(*db_context, db_context->getSettingsRef(), block); + + helper->expected_delta_rows[1] += helper->rows_by_segments[1]; + helper->verifyExpectedRowsForAllSegments(); + + auto segment1 = std::next(store->segments.begin())->second; + ASSERT_GT(segment1->getDelta()->getUnsavedRows(), 0); + } + { + auto segment1 = std::next(store->segments.begin())->second; + auto result = store->mergeDeltaBySegment(*db_context, segment1->getRowKeyRange().start, DeltaMergeStore::TaskRunThread::Foreground); + ASSERT_NE(result, std::nullopt); + + segment1 = std::next(store->segments.begin())->second; + ASSERT_EQ(*result, segment1->getRowKeyRange()); + + helper->expected_stable_rows[1] += helper->expected_delta_rows[1]; + helper->expected_delta_rows[1] = 0; + helper->verifyExpectedRowsForAllSegments(); + + ASSERT_EQ(segment1->getDelta()->getUnsavedRows(), 0); + } +} +CATCH + + } // namespace tests } // namespace DM } // namespace DB