Skip to content

Commit

Permalink
Optimize memory consumption of delta index in DT engine (#716) (#787)
Browse files Browse the repository at this point in the history
* Add delta_index_size to stat

* Optimize memory usage of DeltaTree

* fix error

* update delta tree's size

* Add check for delta value

* Using merge type and count, using UInt32 is enough

* fix error

Co-authored-by: flow <flowbehappy@gmail.com>
  • Loading branch information
ti-srebot and flowbehappy authored Jun 17, 2020
1 parent 08ed56a commit 1fadcf1
Show file tree
Hide file tree
Showing 10 changed files with 177 additions and 124 deletions.
25 changes: 8 additions & 17 deletions dbms/src/Storages/DeltaMerge/DeltaMerge.h
Original file line number Diff line number Diff line change
@@ -1,13 +1,11 @@
#pragma once

#include <Common/Exception.h>

#include <DataStreams/IBlockInputStream.h>
#include <IO/ReadBufferFromFile.h>
#include <IO/ReadHelpers.h>
#include <IO/WriteBufferFromFile.h>
#include <IO/WriteHelpers.h>

#include <DataStreams/IBlockInputStream.h>
#include <Storages/DeltaMerge/DeltaMergeDefines.h>
#include <Storages/DeltaMerge/DeltaTree.h>
#include <Storages/DeltaMerge/HandleFilter.h>
Expand Down Expand Up @@ -251,13 +249,14 @@ class DeltaMergeBlockInputStream final : public SkippableBlockInputStream, Alloc
}
else
{
switch (delta_index_it.getType())
if (delta_index_it.isDelete())
{
case DT_DEL:
// Delete.
writeDeleteFromDelta(delta_index_it.getCount());
break;
case DT_INS:
}
else
{
// Insert.
bool do_write = true;
if constexpr (skippable_place)
{
Expand All @@ -274,14 +273,6 @@ class DeltaMergeBlockInputStream final : public SkippableBlockInputStream, Alloc
use_delta_rows = delta_index_it.getCount();
writeInsertFromDelta(output_columns, output_write_limit);
}
break;
}
default:
throw Exception("Entry type " + DTTypeString(delta_index_it.getType())
+ " is not supported, is end: " + DB::toString(delta_index_it == delta_index_end)
+ ", use_stable_rows: " + DB::toString(use_stable_rows)
+ ", stable_ignore: " + DB::toString(stable_ignore) + ", stable_done: " + DB::toString(stable_done)
+ ", delta_done: " + DB::toString(delta_done) + ", delta_done: " + DB::toString(delta_done));
}
}

Expand Down Expand Up @@ -490,8 +481,8 @@ class DeltaMergeBlockInputStream final : public SkippableBlockInputStream, Alloc
{
UInt64 prev_sid;
{
prev_sid = delta_index_it.getSid();
if (delta_index_it.getType() == DT_DEL)
prev_sid = delta_index_it.getSid();
if (delta_index_it.isDelete())
prev_sid += delta_index_it.getCount();
}

Expand Down
2 changes: 1 addition & 1 deletion dbms/src/Storages/DeltaMerge/DeltaMergeDefines.h
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ namespace DM
/// For example, the max capacity of leaf node would be: M * S.
static constexpr size_t DT_S = 3;
static constexpr size_t DT_M = 55;
static constexpr size_t DT_F = 13;
static constexpr size_t DT_F = 20;

using Ids = std::vector<UInt64>;

Expand Down
4 changes: 4 additions & 0 deletions dbms/src/Storages/DeltaMerge/DeltaMergeStore.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1516,6 +1516,8 @@ DeltaMergeStoreStat DeltaMergeStore::getStat()
stat.total_delta_rows += delta->getRows();
stat.total_delta_size += delta->getBytes();

stat.delta_index_size += delta->getDeltaIndexBytes();

total_delta_cache_rows += delta->getTotalCacheRows();
total_delta_cache_size += delta->getTotalCacheBytes();
total_delta_valid_cache_rows += delta->getValidCacheRows();
Expand Down Expand Up @@ -1619,6 +1621,8 @@ SegmentStats DeltaMergeStore::getSegmentStats()
stat.delta_rate = (Float64)delta->getRows() / stat.rows;
stat.delta_cache_size = delta->getTotalCacheBytes();

stat.delta_index_size = delta->getDeltaIndexBytes();

stats.push_back(stat);
}
return stats;
Expand Down
4 changes: 4 additions & 0 deletions dbms/src/Storages/DeltaMerge/DeltaMergeStore.h
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,8 @@ struct SegmentStat

Float64 delta_rate = 0;
UInt64 delta_cache_size = 0;

UInt64 delta_index_size = 0;
};
using SegmentStats = std::vector<SegmentStat>;

Expand All @@ -68,6 +70,8 @@ struct DeltaMergeStoreStat
Float64 delta_cache_rate = 0;
Float64 delta_cache_wasted_rate = 0;

UInt64 delta_index_size = 0;

Float64 avg_segment_rows = 0;
Float64 avg_segment_size = 0;

Expand Down
Loading

0 comments on commit 1fadcf1

Please sign in to comment.