Skip to content

Commit

Permalink
fix conflict
Browse files Browse the repository at this point in the history
  • Loading branch information
lidezhu committed Jun 3, 2022
1 parent 74536ad commit b70e851
Showing 1 changed file with 24 additions and 23 deletions.
47 changes: 24 additions & 23 deletions dbms/src/Storages/DeltaMerge/Segment.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,7 @@ DMFilePtr writeIntoNewDMFile(DMContext & dm_context, //
{
auto dmfile = DMFile::create(file_id, parent_path, flags.isSingleFile(), dm_context.createChecksumConfig(flags.isSingleFile()));
auto output_stream = std::make_shared<DMFileBlockOutputStream>(dm_context.db_context, dmfile, *schema_snap, flags);
auto * mvcc_stream = typeid_cast<const DMVersionFilterBlockInputStream<DM_VERSION_FILTER_MODE_COMPACT> *>(input_stream.get());
const auto * mvcc_stream = typeid_cast<const DMVersionFilterBlockInputStream<DM_VERSION_FILTER_MODE_COMPACT> *>(input_stream.get());

input_stream->readPrefix();
output_stream->writePrefix();
Expand Down Expand Up @@ -447,7 +447,7 @@ BlockInputStreamPtr Segment::getInputStreamForDataExport(const DMContext & dm_co
const SegmentSnapshotPtr & segment_snap,
const RowKeyRange & data_range,
size_t expected_block_size,
bool reorgnize_block) const
bool reorganize_block) const
{
RowKeyRanges data_ranges{data_range};
auto read_info = getReadInfo(dm_context, columns_to_read, segment_snap, data_ranges);
Expand All @@ -464,7 +464,7 @@ BlockInputStreamPtr Segment::getInputStreamForDataExport(const DMContext & dm_co


data_stream = std::make_shared<DMRowKeyFilterBlockInputStream<true>>(data_stream, data_ranges, 0);
if (reorgnize_block)
if (reorganize_block)
{
data_stream = std::make_shared<PKSquashingBlockInputStream<false>>(data_stream, EXTRA_HANDLE_COLUMN_ID, is_common_handle);
}
Expand Down Expand Up @@ -670,7 +670,7 @@ std::optional<RowKeyValue> Segment::getSplitPointFast(DMContext & dm_context, co

size_t split_row_index = stable_rows / 2;

auto & dmfiles = stable_snap->getDMFiles();
const auto & dmfiles = stable_snap->getDMFiles();

DMFilePtr read_file;
size_t file_index = 0;
Expand All @@ -680,13 +680,13 @@ std::optional<RowKeyValue> Segment::getSplitPointFast(DMContext & dm_context, co
size_t cur_rows = 0;
for (size_t index = 0; index < dmfiles.size(); index++)
{
auto & file = dmfiles[index];
const auto & file = dmfiles[index];
size_t rows_in_file = file->getRows();
cur_rows += rows_in_file;
if (cur_rows > split_row_index)
{
cur_rows -= rows_in_file;
auto & pack_stats = file->getPackStats();
const auto & pack_stats = file->getPackStats();
for (size_t pack_id = 0; pack_id < pack_stats.size(); ++pack_id)
{
cur_rows += pack_stats[pack_id].rows;
Expand Down Expand Up @@ -750,7 +750,7 @@ std::optional<RowKeyValue> Segment::getSplitPointSlow(
{
EventRecorder recorder(ProfileEvents::DMSegmentGetSplitPoint, ProfileEvents::DMSegmentGetSplitPointNS);

auto & pk_col = getExtraHandleColumnDefine(is_common_handle);
const auto & pk_col = getExtraHandleColumnDefine(is_common_handle);
auto pk_col_defs = std::make_shared<ColumnDefines>(ColumnDefines{pk_col});
// We need to create a new delta_reader here, because the one in read_info is used to read columns other than PK column.
auto delta_reader = read_info.getDeltaReader(pk_col_defs);
Expand Down Expand Up @@ -883,13 +883,15 @@ std::optional<Segment::SplitInfo> Segment::prepareSplitLogical(DMContext & dm_co
return {};
}

GenPageId log_gen_page_id = std::bind(&StoragePool::newLogPageId, &storage_pool);
GenPageId log_gen_page_id = [&]() {
return storage_pool.newLogPageId();
};

DMFiles my_stable_files;
DMFiles other_stable_files;

auto delegate = dm_context.path_pool.getStableDiskDelegator();
for (auto & dmfile : segment_snap->stable->getDMFiles())
for (const auto & dmfile : segment_snap->stable->getDMFiles())
{
auto ori_ref_id = dmfile->refId();
auto file_id = dmfile->fileId();
Expand Down Expand Up @@ -1025,7 +1027,7 @@ std::optional<Segment::SplitInfo> Segment::prepareSplitPhysical(DMContext & dm_c
LOG_INFO(log, "prepare other_stable done");

// Remove old stable's files.
for (auto & file : stable->getDMFiles())
for (const auto & file : stable->getDMFiles())
{
// Here we should remove the ref id instead of file_id.
// Because a dmfile could be used by several segments, and only after all ref_ids are removed, then the file_id removed.
Expand Down Expand Up @@ -1157,7 +1159,7 @@ StableValueSpacePtr Segment::prepareMerge(DMContext & dm_context, //
throw Exception("The ranges of merge segments are not consecutive: first end: " + left->rowkey_range.getEnd().toDebugString()
+ ", second start: " + right->rowkey_range.getStart().toDebugString());

auto getStream = [&](const SegmentPtr & segment, const SegmentSnapshotPtr & segment_snap) {
auto get_stream = [&](const SegmentPtr & segment, const SegmentSnapshotPtr & segment_snap) {
auto read_info = segment->getReadInfo(
dm_context,
*schema_snap,
Expand Down Expand Up @@ -1185,8 +1187,8 @@ StableValueSpacePtr Segment::prepareMerge(DMContext & dm_context, //
return stream;
};

auto left_stream = getStream(left, left_snap);
auto right_stream = getStream(right, right_snap);
auto left_stream = get_stream(left, left_snap);
auto right_stream = get_stream(right, right_snap);

BlockInputStreamPtr merged_stream = std::make_shared<ConcatBlockInputStream>(BlockInputStreams{left_stream, right_stream}, nullptr);
// for the purpose to calculate StableProperty of the new segment
Expand Down Expand Up @@ -1225,16 +1227,16 @@ SegmentPtr Segment::applyMerge(DMContext & dm_context, //
/// Make sure saved packs are appended before unsaved packs.
DeltaPacks merged_packs;

auto L_first_unsaved
auto l_first_unsaved
= std::find_if(left_tail_packs.begin(), left_tail_packs.end(), [](const DeltaPackPtr & p) { return !p->isSaved(); });
auto R_first_unsaved
auto r_first_unsaved
= std::find_if(right_tail_packs.begin(), right_tail_packs.end(), [](const DeltaPackPtr & p) { return !p->isSaved(); });

merged_packs.insert(merged_packs.end(), left_tail_packs.begin(), L_first_unsaved);
merged_packs.insert(merged_packs.end(), right_tail_packs.begin(), R_first_unsaved);
merged_packs.insert(merged_packs.end(), left_tail_packs.begin(), l_first_unsaved);
merged_packs.insert(merged_packs.end(), right_tail_packs.begin(), r_first_unsaved);

merged_packs.insert(merged_packs.end(), L_first_unsaved, left_tail_packs.end());
merged_packs.insert(merged_packs.end(), R_first_unsaved, right_tail_packs.end());
merged_packs.insert(merged_packs.end(), l_first_unsaved, left_tail_packs.end());
merged_packs.insert(merged_packs.end(), r_first_unsaved, right_tail_packs.end());

auto merged_delta = std::make_shared<DeltaValueSpace>(left->delta->getId(), merged_packs);

Expand Down Expand Up @@ -1360,9 +1362,8 @@ ColumnDefinesPtr Segment::arrangeReadColumns(const ColumnDefine & handle, const
new_columns_to_read.push_back(getVersionColumnDefine());
new_columns_to_read.push_back(getTagColumnDefine());

for (size_t i = 0; i < columns_to_read.size(); ++i)
for (const auto & c : columns_to_read)
{
auto & c = columns_to_read[i];
if (c.id != handle.id && c.id != VERSION_COLUMN_ID && c.id != TAG_COLUMN_ID)
new_columns_to_read.push_back(c);
}
Expand Down Expand Up @@ -1521,7 +1522,7 @@ bool Segment::placeUpsert(const DMContext & dm_context,

IColumn::Permutation perm;

auto & handle = getExtraHandleColumnDefine(is_common_handle);
const auto & handle = getExtraHandleColumnDefine(is_common_handle);
bool do_sort = sortBlockByPk(handle, block, perm);
RowKeyValueRef first_rowkey = RowKeyColumnContainer(block.getByPosition(0).column, is_common_handle).getRowKeyValue(0);
RowKeyValueRef range_start = relevant_range.getStart();
Expand Down Expand Up @@ -1575,7 +1576,7 @@ bool Segment::placeDelete(const DMContext & dm_context,
{
EventRecorder recorder(ProfileEvents::DMPlaceDeleteRange, ProfileEvents::DMPlaceDeleteRangeNS);

auto & handle = getExtraHandleColumnDefine(is_common_handle);
const auto & handle = getExtraHandleColumnDefine(is_common_handle);

RowKeyRanges delete_ranges{delete_range};
Blocks delete_data;
Expand Down

0 comments on commit b70e851

Please sign in to comment.