Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Remove in-memory data parts, part 3. #61127

Merged
merged 7 commits into from
Mar 12, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 0 additions & 4 deletions docs/en/operations/system-tables/metrics.md
Original file line number Diff line number Diff line change
Expand Up @@ -513,10 +513,6 @@ Part was moved to another disk and should be deleted in own destructor.

Not active data part with identity refcounter, it is deleting right now by a cleaner.

### PartsInMemory

In-memory parts.

### PartsOutdated

Not active data part, but could be used by only current SELECTs, could be deleted after SELECTs finishes.
Expand Down
1 change: 0 additions & 1 deletion src/Common/CurrentMetrics.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -226,7 +226,6 @@
M(PartsDeleteOnDestroy, "Part was moved to another disk and should be deleted in own destructor.") \
M(PartsWide, "Wide parts.") \
M(PartsCompact, "Compact parts.") \
M(PartsInMemory, "In-memory parts.") \
M(MMappedFiles, "Total number of mmapped files.") \
M(MMappedFileBytes, "Sum size of mmapped file regions.") \
M(AsynchronousReadWait, "Number of threads waiting for asynchronous read.") \
Expand Down
3 changes: 0 additions & 3 deletions src/Storages/MergeTree/CMakeLists.txt
Original file line number Diff line number Diff line change
@@ -1,3 +0,0 @@
if(ENABLE_EXAMPLES)
add_subdirectory(examples)
endif()
3 changes: 1 addition & 2 deletions src/Storages/MergeTree/ColumnSizeEstimator.h
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
#pragma once

#include <Storages/MergeTree/IMergeTreeDataPart.h>
#include <Storages/MergeTree/MergeTreeDataPartInMemory.h>


namespace DB
Expand All @@ -10,7 +9,7 @@ namespace DB
/* Allow to compute more accurate progress statistics */
class ColumnSizeEstimator
{
using ColumnToSize = MergeTreeDataPartInMemory::ColumnToSize;
using ColumnToSize = std::map<String, UInt64>;
ColumnToSize map;
public:

Expand Down
120 changes: 2 additions & 118 deletions src/Storages/MergeTree/DataPartsExchange.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,6 @@
#include <IO/S3Common.h>
#include <Server/HTTP/HTMLForm.h>
#include <Server/HTTP/HTTPServerResponse.h>
#include <Storages/MergeTree/MergeTreeDataPartInMemory.h>
#include <Storages/MergeTree/MergedBlockOutputStream.h>
#include <Storages/MergeTree/ReplicatedFetchList.h>
#include <Storages/StorageReplicatedMergeTree.h>
Expand Down Expand Up @@ -44,10 +43,8 @@ namespace ErrorCodes
extern const int CANNOT_WRITE_TO_OSTREAM;
extern const int CHECKSUM_DOESNT_MATCH;
extern const int INSECURE_PATH;
extern const int CORRUPTED_DATA;
extern const int LOGICAL_ERROR;
extern const int S3_ERROR;
extern const int INCORRECT_PART_TYPE;
extern const int ZERO_COPY_REPLICATION_ERROR;
}

Expand Down Expand Up @@ -191,8 +188,6 @@ void Service::processQuery(const HTMLForm & params, ReadBuffer & /*body*/, Write
}

if (data_settings->allow_remote_fs_zero_copy_replication &&
/// In memory data part does not have metadata yet.
!isInMemoryPart(part) &&
client_protocol_version >= REPLICATION_PROTOCOL_VERSION_WITH_PARTS_ZERO_COPY)
{
auto disk_type = part->getDataPartStorage().getDiskType();
Expand All @@ -205,11 +200,7 @@ void Service::processQuery(const HTMLForm & params, ReadBuffer & /*body*/, Write
}
}

if (isInMemoryPart(part))
sendPartFromMemory(part, out, send_projections);
else
sendPartFromDisk(part, out, client_protocol_version, false, send_projections);

sendPartFromDisk(part, out, client_protocol_version, false, send_projections);
data.addLastSentPart(part->info);
}
catch (const NetException &)
Expand All @@ -231,36 +222,6 @@ void Service::processQuery(const HTMLForm & params, ReadBuffer & /*body*/, Write
}
}

void Service::sendPartFromMemory(
const MergeTreeData::DataPartPtr & part, WriteBuffer & out, bool send_projections)
{
auto metadata_snapshot = data.getInMemoryMetadataPtr();
if (send_projections)
{
for (const auto & [name, projection] : part->getProjectionParts())
{
auto projection_sample_block = metadata_snapshot->projections.get(name).sample_block;
auto part_in_memory = asInMemoryPart(projection);
if (!part_in_memory)
throw Exception(ErrorCodes::LOGICAL_ERROR, "Projection {} of part {} is not stored in memory", name, part->name);

writeStringBinary(name, out);
projection->checksums.write(out);
NativeWriter block_out(out, 0, projection_sample_block);
block_out.write(part_in_memory->block);
}
}

auto part_in_memory = asInMemoryPart(part);
if (!part_in_memory)
throw Exception(ErrorCodes::LOGICAL_ERROR, "Part {} is not stored in memory", part->name);

NativeWriter block_out(out, 0, metadata_snapshot->getSampleBlock());
part->checksums.write(out);
block_out.write(part_in_memory->block);

data.getSendsThrottler()->add(part_in_memory->block.bytes());
}

MergeTreeData::DataPart::Checksums Service::sendPartFromDisk(
const MergeTreeData::DataPartPtr & part,
Expand Down Expand Up @@ -642,8 +603,6 @@ std::pair<MergeTreeData::MutableDataPartPtr, scope_guard> Fetcher::fetchSelected
remote_fs_metadata, fmt::join(capability, ", "));
if (server_protocol_version < REPLICATION_PROTOCOL_VERSION_WITH_PARTS_ZERO_COPY)
throw Exception(ErrorCodes::LOGICAL_ERROR, "Got 'remote_fs_metadata' cookie with old protocol version {}", server_protocol_version);
if (part_type == PartType::InMemory)
throw Exception(ErrorCodes::INCORRECT_PART_TYPE, "Got 'remote_fs_metadata' cookie for in-memory part");

try
{
Expand Down Expand Up @@ -702,30 +661,14 @@ std::pair<MergeTreeData::MutableDataPartPtr, scope_guard> Fetcher::fetchSelected
}

auto storage_id = data.getStorageID();
String new_part_path = part_type == PartType::InMemory ? "memory" : fs::path(data.getFullPathOnDisk(disk)) / part_name / "";
String new_part_path = fs::path(data.getFullPathOnDisk(disk)) / part_name / "";
auto entry = data.getContext()->getReplicatedFetchList().insert(
storage_id.getDatabaseName(), storage_id.getTableName(),
part_info.partition_id, part_name, new_part_path,
replica_path, uri, to_detached, sum_files_size);

in->setNextCallback(ReplicatedFetchReadCallback(*entry));

if (part_type == PartType::InMemory)
{
auto volume = std::make_shared<SingleDiskVolume>("volume_" + part_name, disk, 0);

auto data_part_storage = std::make_shared<DataPartStorageOnDiskFull>(
volume,
data.getRelativeDataPath(),
part_name);

return std::make_pair(downloadPartToMemory(
data_part_storage, part_name,
MergeTreePartInfo::fromPartName(part_name, data.format_version),
part_uuid, metadata_snapshot, context, *in,
projections, false, throttler), std::move(temporary_directory_lock));
}

auto output_buffer_getter = [](IDataPartStorage & part_storage, const String & file_name, size_t file_size)
{
return part_storage.writeFile(file_name, std::min<UInt64>(file_size, DBMS_DEFAULT_BUFFER_SIZE), {});
Expand All @@ -737,65 +680,6 @@ std::pair<MergeTreeData::MutableDataPartPtr, scope_guard> Fetcher::fetchSelected
projections, throttler, sync),std::move(temporary_directory_lock));
}

MergeTreeData::MutableDataPartPtr Fetcher::downloadPartToMemory(
MutableDataPartStoragePtr data_part_storage,
const String & part_name,
const MergeTreePartInfo & part_info,
const UUID & part_uuid,
const StorageMetadataPtr & metadata_snapshot,
ContextPtr context,
ReadWriteBufferFromHTTP & in,
size_t projections,
bool is_projection,
ThrottlerPtr throttler)
{
auto new_data_part = std::make_shared<MergeTreeDataPartInMemory>(data, part_name, part_info, data_part_storage);

for (size_t i = 0; i < projections; ++i)
{
String projection_name;
readStringBinary(projection_name, in);

MergeTreePartInfo new_part_info("all", 0, 0, 0);
auto projection_part_storage = data_part_storage->getProjection(projection_name + ".proj");

auto new_projection_part = downloadPartToMemory(
projection_part_storage, projection_name,
new_part_info, part_uuid, metadata_snapshot,
context, in, 0, true, throttler);

new_data_part->addProjectionPart(projection_name, std::move(new_projection_part));
}

MergeTreeData::DataPart::Checksums checksums;
if (!checksums.read(in))
throw Exception(ErrorCodes::CORRUPTED_DATA, "Cannot deserialize checksums");

NativeReader block_in(in, 0);
auto block = block_in.read();
throttler->add(block.bytes());

new_data_part->setColumns(block.getNamesAndTypesList(), {}, metadata_snapshot->getMetadataVersion());

if (!is_projection)
{
new_data_part->version.setCreationTID(Tx::PrehistoricTID, nullptr);
new_data_part->uuid = part_uuid;
new_data_part->is_temp = true;
new_data_part->minmax_idx->update(block, data.getMinMaxColumnsNames(metadata_snapshot->getPartitionKey()));
new_data_part->partition.create(metadata_snapshot, block, 0, context);
}

MergedBlockOutputStream part_out(
new_data_part, metadata_snapshot, block.getNamesAndTypesList(), {}, {},
CompressionCodecFactory::instance().get("NONE", {}), NO_TRANSACTION_PTR);

part_out.write(block);
part_out.finalizePart(new_data_part, false);
new_data_part->checksums.checkEqual(checksums, /* have_uncompressed = */ true);

return new_data_part;
}

void Fetcher::downloadBaseOrProjectionPartToDisk(
const String & replica_path,
Expand Down
16 changes: 0 additions & 16 deletions src/Storages/MergeTree/DataPartsExchange.h
Original file line number Diff line number Diff line change
Expand Up @@ -40,10 +40,6 @@ class Service final : public InterserverIOEndpoint

private:
MergeTreeData::DataPartPtr findPart(const String & name);
void sendPartFromMemory(
const MergeTreeData::DataPartPtr & part,
WriteBuffer & out,
bool send_projections);

MergeTreeData::DataPart::Checksums sendPartFromDisk(
const MergeTreeData::DataPartPtr & part,
Expand Down Expand Up @@ -113,18 +109,6 @@ class Fetcher final : private boost::noncopyable
ThrottlerPtr throttler,
bool sync);

MergeTreeData::MutableDataPartPtr downloadPartToMemory(
MutableDataPartStoragePtr data_part_storage,
const String & part_name,
const MergeTreePartInfo & part_info,
const UUID & part_uuid,
const StorageMetadataPtr & metadata_snapshot,
ContextPtr context,
ReadWriteBufferFromHTTP & in,
size_t projections,
bool is_projection,
ThrottlerPtr throttler);

MergeTreeData::MutableDataPartPtr downloadPartToDiskRemoteMeta(
const String & part_name,
const String & replica_path,
Expand Down
12 changes: 0 additions & 12 deletions src/Storages/MergeTree/IMergeTreeDataPart.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,6 @@ namespace CurrentMetrics

extern const Metric PartsWide;
extern const Metric PartsCompact;
extern const Metric PartsInMemory;
}

namespace DB
Expand Down Expand Up @@ -278,9 +277,6 @@ static void incrementTypeMetric(MergeTreeDataPartType type)
case MergeTreeDataPartType::Compact:
CurrentMetrics::add(CurrentMetrics::PartsCompact);
return;
case MergeTreeDataPartType::InMemory:
CurrentMetrics::add(CurrentMetrics::PartsInMemory);
return;
case MergeTreeDataPartType::Unknown:
return;
}
Expand All @@ -296,9 +292,6 @@ static void decrementTypeMetric(MergeTreeDataPartType type)
case MergeTreeDataPartType::Compact:
CurrentMetrics::sub(CurrentMetrics::PartsCompact);
return;
case MergeTreeDataPartType::InMemory:
CurrentMetrics::sub(CurrentMetrics::PartsInMemory);
return;
case MergeTreeDataPartType::Unknown:
return;
}
Expand Down Expand Up @@ -2207,11 +2200,6 @@ bool isWidePart(const MergeTreeDataPartPtr & data_part)
return (data_part && data_part->getType() == MergeTreeDataPartType::Wide);
}

bool isInMemoryPart(const MergeTreeDataPartPtr & data_part)
{
return (data_part && data_part->getType() == MergeTreeDataPartType::InMemory);
}

std::optional<std::string> getIndexExtensionFromFilesystem(const IDataPartStorage & data_part_storage)
{
if (data_part_storage.exists())
Expand Down
1 change: 0 additions & 1 deletion src/Storages/MergeTree/IMergeTreeDataPart.h
Original file line number Diff line number Diff line change
Expand Up @@ -710,7 +710,6 @@ using MergeTreeMutableDataPartPtr = std::shared_ptr<IMergeTreeDataPart>;

bool isCompactPart(const MergeTreeDataPartPtr & data_part);
bool isWidePart(const MergeTreeDataPartPtr & data_part);
bool isInMemoryPart(const MergeTreeDataPartPtr & data_part);

inline String getIndexExtension(bool is_compressed_primary_key) { return is_compressed_primary_key ? ".cidx" : ".idx"; }
std::optional<String> getIndexExtensionFromFilesystem(const IDataPartStorage & data_part_storage);
Expand Down
2 changes: 0 additions & 2 deletions src/Storages/MergeTree/IMergeTreeDataPartInfoForReader.h
Original file line number Diff line number Diff line change
Expand Up @@ -34,8 +34,6 @@ class IMergeTreeDataPartInfoForReader : public WithContext

virtual bool isWidePart() const = 0;

virtual bool isInMemoryPart() const = 0;

virtual bool isProjectionPart() const = 0;

virtual DataPartStoragePtr getDataPartStorage() const = 0;
Expand Down
2 changes: 0 additions & 2 deletions src/Storages/MergeTree/LoadedMergeTreeDataPartInfoForReader.h
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,6 @@ class LoadedMergeTreeDataPartInfoForReader final : public IMergeTreeDataPartInfo

bool isWidePart() const override { return DB::isWidePart(data_part); }

bool isInMemoryPart() const override { return DB::isInMemoryPart(data_part); }

bool isProjectionPart() const override { return data_part->isProjectionPart(); }

DataPartStoragePtr getDataPartStorage() const override { return data_part->getDataPartStoragePtr(); }
Expand Down
2 changes: 1 addition & 1 deletion src/Storages/MergeTree/MergeTask.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -309,7 +309,7 @@ bool MergeTask::ExecuteAndFinalizeHorizontalPart::prepare()
ctx->rows_sources_uncompressed_write_buf = ctx->tmp_disk->createRawStream();
ctx->rows_sources_write_buf = std::make_unique<CompressedWriteBuffer>(*ctx->rows_sources_uncompressed_write_buf);

MergeTreeDataPartInMemory::ColumnToSize local_merged_column_to_size;
std::map<String, UInt64> local_merged_column_to_size;
for (const MergeTreeData::DataPartPtr & part : global_ctx->future_part->parts)
part->accumulateColumnSizes(local_merged_column_to_size);

Expand Down
Loading
Loading