Skip to content

Commit

Permalink
Chunks are held on rocksdb for 25 hours (#2252)
Browse files Browse the repository at this point in the history
  • Loading branch information
ErakhtinB authored Nov 4, 2024
1 parent fbbb302 commit 8a593bb
Show file tree
Hide file tree
Showing 8 changed files with 224 additions and 24 deletions.
5 changes: 4 additions & 1 deletion core/injector/application_injector.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -272,11 +272,14 @@ namespace {
// NOLINTNEXTLINE(cppcoreguidelines-narrowing-conversions)
options.max_open_files = soft_limit.value() / 2;

const std::unordered_map<std::string, int32_t> column_ttl = {
{"avaliability_storage", 25 * 60 * 60}}; // 25 hours
auto db_res =
storage::RocksDb::create(app_config.databasePath(chain_spec->id()),
options,
app_config.dbCacheSize(),
prevent_destruction);
prevent_destruction,
column_ttl);
if (!db_res) {
auto log = log::createLogger("Injector", "injector");
log->critical(
Expand Down
53 changes: 53 additions & 0 deletions core/parachain/availability/store/candidate_chunk_key.hpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
/**
* Copyright Quadrivium LLC
* All Rights Reserved
* SPDX-License-Identifier: Apache-2.0
*/

#pragma once

#include <optional>

#include <boost/endian/conversion.hpp>

#include "parachain/types.hpp"
#include "primitives/common.hpp"

namespace kagome {
struct CandidateChunkKey {
static constexpr size_t kCandidateHashSize =
sizeof(parachain::CandidateHash);
static constexpr size_t kChunkIndexSize = sizeof(parachain::ChunkIndex);
using Key = common::Blob<kCandidateHashSize + kChunkIndexSize>;
using HashKey = common::Blob<kCandidateHashSize>;

static Key encode(const parachain::CandidateHash &candidate_hash,
const parachain::ChunkIndex &chunk_index) {
Key key;
std::copy_n(
encode_hash(candidate_hash).data(), kCandidateHashSize, key.data());
boost::endian::store_big_u32(key.data() + kCandidateHashSize,
chunk_index);
return key;
}

static HashKey encode_hash(const parachain::CandidateHash &candidate_hash) {
HashKey key;
std::copy_n(candidate_hash.data(), kCandidateHashSize, key.data());
return key;
}

static std::optional<
std::pair<parachain::CandidateHash, parachain::ChunkIndex>>
decode(common::BufferView key) {
if (key.size() != Key::size()) {
return std::nullopt;
}
std::pair<parachain::CandidateHash, parachain::ChunkIndex> candidateChunk;
std::copy_n(key.data(), kCandidateHashSize, candidateChunk.first.data());
candidateChunk.second =
boost::endian::load_big_u32(key.data() + kCandidateHashSize);
return candidateChunk;
}
};
} // namespace kagome
135 changes: 132 additions & 3 deletions core/parachain/availability/store/store_impl.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,15 @@
*/

#include "parachain/availability/store/store_impl.hpp"
#include "candidate_chunk_key.hpp"

namespace kagome::parachain {
AvailabilityStoreImpl::AvailabilityStoreImpl(
std::shared_ptr<storage::SpacedStorage> storage)
: storage_{std::move(storage)} {
BOOST_ASSERT(storage_ != nullptr);
}

bool AvailabilityStoreImpl::hasChunk(const CandidateHash &candidate_hash,
ValidatorIndex index) const {
return state_.sharedAccess([&](const auto &state) {
Expand Down Expand Up @@ -43,7 +50,7 @@ namespace kagome::parachain {
std::optional<AvailabilityStore::ErasureChunk>
AvailabilityStoreImpl::getChunk(const CandidateHash &candidate_hash,
ValidatorIndex index) const {
return state_.sharedAccess(
auto chunk = state_.sharedAccess(
[&](const auto &state)
-> std::optional<AvailabilityStore::ErasureChunk> {
auto it = state.per_candidate_.find(candidate_hash);
Expand All @@ -56,6 +63,30 @@ namespace kagome::parachain {
}
return it2->second;
});
if (chunk) {
return chunk;
}
auto space = storage_->getSpace(storage::Space::kAvaliabilityStorage);
if (not space) {
SL_ERROR(logger, "Failed to get space");
return std::nullopt;
}
auto chunk_from_db =
space->get(CandidateChunkKey::encode(candidate_hash, index));
if (not chunk_from_db) {
return std::nullopt;
}
const auto decoded_chunk =
scale::decode<ErasureChunk>(chunk_from_db.value());
if (not decoded_chunk) {
SL_ERROR(logger,
"Failed to decode chunk candidate {} index {} error {}",
candidate_hash,
index,
decoded_chunk.error());
return std::nullopt;
}
return decoded_chunk.value();
}

std::optional<AvailabilityStore::ParachainBlock>
Expand Down Expand Up @@ -90,7 +121,7 @@ namespace kagome::parachain {

std::vector<AvailabilityStore::ErasureChunk> AvailabilityStoreImpl::getChunks(
const CandidateHash &candidate_hash) const {
return state_.sharedAccess([&](const auto &state) {
auto chunks = state_.sharedAccess([&](const auto &state) {
std::vector<AvailabilityStore::ErasureChunk> chunks;
auto it = state.per_candidate_.find(candidate_hash);
if (it != state.per_candidate_.end()) {
Expand All @@ -100,6 +131,57 @@ namespace kagome::parachain {
}
return chunks;
});
if (not chunks.empty()) {
return chunks;
}
auto space = storage_->getSpace(storage::Space::kAvaliabilityStorage);
if (not space) {
SL_ERROR(logger, "Failed to get space");
return chunks;
}
auto cursor = space->cursor();
if (not cursor) {
SL_ERROR(logger, "Failed to get cursor for AvaliabilityStorage");
return chunks;
}
const auto seek_key = CandidateChunkKey::encode_hash(candidate_hash);
auto seek_res = cursor->seek(seek_key);
if (not seek_res) {
SL_ERROR(logger, "Failed to seek, error: {}", seek_res.error());
return chunks;
}
if (not seek_res.value()) {
SL_DEBUG(logger, "Seek not found for candidate {}", candidate_hash);
return chunks;
}
const auto check_key = [&seek_key](const auto &key) {
if (not key) {
return false;
}
const auto &key_value = key.value();
return key_value.size() >= seek_key.size()
and std::equal(seek_key.begin(), seek_key.end(), key_value.begin());
};
while (cursor->isValid() and check_key(cursor->key())) {
const auto cursor_opt_value = cursor->value();
if (cursor_opt_value) {
auto decoded_res =
scale::decode<ErasureChunk>(cursor_opt_value.value());
if (decoded_res) {
chunks.emplace_back(std::move(decoded_res.value()));
} else {
SL_ERROR(
logger, "Failed to decode value, error: {}", decoded_res.error());
}
} else {
SL_ERROR(
logger, "Failed to get value for key {}", cursor->key()->toHex());
}
if (not cursor->next()) {
break;
}
}
return chunks;
}

void AvailabilityStoreImpl::printStoragesLoad() {
Expand All @@ -122,7 +204,30 @@ namespace kagome::parachain {
state.candidates_[relay_parent].insert(candidate_hash);
auto &candidate_data = state.per_candidate_[candidate_hash];
for (auto &&chunk : std::move(chunks)) {
auto encoded_chunk = scale::encode(chunk);
const auto chunk_index = chunk.index;
candidate_data.chunks[chunk.index] = std::move(chunk);
if (not encoded_chunk) {
SL_ERROR(logger,
"Failed to encode chunk, error: {}",
encoded_chunk.error());
continue;
}
auto space = storage_->getSpace(storage::Space::kAvaliabilityStorage);
if (not space) {
SL_ERROR(logger, "Failed to get space");
continue;
}
if (auto res = space->put(
CandidateChunkKey::encode(candidate_hash, chunk_index),
std::move(encoded_chunk.value()));
not res) {
SL_ERROR(logger,
"Failed to put chunk candidate {} index {} error {}",
candidate_hash,
chunk_index,
res.error());
}
}
candidate_data.pov = pov;
candidate_data.data = data;
Expand All @@ -132,18 +237,42 @@ namespace kagome::parachain {
void AvailabilityStoreImpl::putChunk(const network::RelayHash &relay_parent,
const CandidateHash &candidate_hash,
ErasureChunk &&chunk) {
auto encoded_chunk = scale::encode(chunk);
const auto chunk_index = chunk.index;
state_.exclusiveAccess([&](auto &state) {
state.candidates_[relay_parent].insert(candidate_hash);
state.per_candidate_[candidate_hash].chunks[chunk.index] =
std::move(chunk);
});
if (not encoded_chunk) {
SL_ERROR(
logger, "Failed to encode chunk, error: {}", encoded_chunk.error());
return;
}

auto space = storage_->getSpace(storage::Space::kAvaliabilityStorage);
if (not space) {
SL_ERROR(logger, "Failed to get AvaliabilityStorage space");
return;
}

if (auto res =
space->put(CandidateChunkKey::encode(candidate_hash, chunk_index),
std::move(encoded_chunk.value()));
not res) {
SL_ERROR(logger,
"Failed to put chunk candidate {} index {} error {}",
candidate_hash,
chunk_index,
res.error());
}
}

void AvailabilityStoreImpl::remove(const network::RelayHash &relay_parent) {
state_.exclusiveAccess([&](auto &state) {
if (auto it = state.candidates_.find(relay_parent);
it != state.candidates_.end()) {
for (auto const &l : it->second) {
for (const auto &l : it->second) {
state.per_candidate_.erase(l);
}
state.candidates_.erase(it);
Expand Down
4 changes: 3 additions & 1 deletion core/parachain/availability/store/store_impl.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -11,12 +11,13 @@
#include <unordered_map>
#include <unordered_set>
#include "log/logger.hpp"
#include "storage/spaced_storage.hpp"
#include "utils/safe_object.hpp"

namespace kagome::parachain {
class AvailabilityStoreImpl : public AvailabilityStore {
public:
~AvailabilityStoreImpl() override = default;
AvailabilityStoreImpl(std::shared_ptr<storage::SpacedStorage> storage);

bool hasChunk(const CandidateHash &candidate_hash,
ValidatorIndex index) const override;
Expand Down Expand Up @@ -56,5 +57,6 @@ namespace kagome::parachain {

log::Logger logger = log::createLogger("AvailabilityStore", "parachain");
SafeObject<State> state_{};
std::shared_ptr<storage::SpacedStorage> storage_;
};
} // namespace kagome::parachain
43 changes: 26 additions & 17 deletions core/storage/rocksdb/rocksdb.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,8 @@ namespace kagome::storage {
const filesystem::path &path,
rocksdb::Options options,
uint32_t memory_budget_mib,
bool prevent_destruction) {
bool prevent_destruction,
const std::unordered_map<std::string, int32_t> &column_ttl) {
OUTCOME_TRY(mkdirs(path));

auto log = log::createLogger("RocksDB", "storage");
Expand All @@ -61,24 +62,32 @@ namespace kagome::storage {
const uint32_t other_spaces_cache_size =
(memory_budget - trie_space_cache_size) / (storage::Space::kTotal - 1);
std::vector<rocksdb::ColumnFamilyDescriptor> column_family_descriptors;
column_family_descriptors.reserve(Space::kTotal);
std::vector<int32_t> ttls;
for (auto i = 0; i < Space::kTotal; ++i) {
const auto space_name = spaceName(static_cast<Space>(i));
auto ttl = 0;
if (const auto it = column_ttl.find(space_name); it != column_ttl.end()) {
ttl = it->second;
}
column_family_descriptors.emplace_back(
spaceName(static_cast<Space>(i)),
space_name,
configureColumn(i != Space::kTrieNode ? other_spaces_cache_size
: trie_space_cache_size));
ttls.push_back(ttl);
SL_INFO(log, "Column family {} configured with TTL {}", space_name, ttl);
}

std::vector<std::string> existing_families;
auto res = rocksdb::DB::ListColumnFamilies(
options, path.native(), &existing_families);
if (!res.ok() && !res.IsPathNotFound()) {
SL_ERROR(log,
"Can't open database in {}: {}",
"Can't list column families in {}: {}",
absolute_path.native(),
res.ToString());
return status_as_error(res);
}

for (auto &family : existing_families) {
if (std::ranges::find_if(
column_family_descriptors,
Expand All @@ -93,21 +102,21 @@ namespace kagome::storage {

options.create_missing_column_families = true;
auto rocks_db = std::shared_ptr<RocksDb>(new RocksDb);
auto status = rocksdb::DB::Open(options,
path.native(),
column_family_descriptors,
&rocks_db->column_family_handles_,
&rocks_db->db_);
if (status.ok()) {
return rocks_db;
auto status = rocksdb::DBWithTTL::Open(options,
path.native(),
column_family_descriptors,
&rocks_db->column_family_handles_,
&rocks_db->db_,
ttls);
if (not status.ok()) {
SL_ERROR(log,
"Can't open database in {}: {}",
absolute_path.native(),
status.ToString());
return status_as_error(status);
}

SL_ERROR(log,
"Can't open database in {}: {}",
absolute_path.native(),
status.ToString());

return status_as_error(status);
return rocks_db;
}

std::shared_ptr<BufferStorage> RocksDb::getSpace(Space space) {
Expand Down
6 changes: 4 additions & 2 deletions core/storage/rocksdb/rocksdb.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@

#include <rocksdb/db.h>
#include <rocksdb/table.h>
#include <rocksdb/utilities/db_ttl.h>
#include <boost/container/flat_map.hpp>

#include "filesystem/common.hpp"
Expand Down Expand Up @@ -48,7 +49,8 @@ namespace kagome::storage {
const filesystem::path &path,
rocksdb::Options options = rocksdb::Options(),
uint32_t memory_budget_mib = kDefaultStateCacheSizeMiB,
bool prevent_destruction = false);
bool prevent_destruction = false,
const std::unordered_map<std::string, int32_t>& column_ttl = {});

std::shared_ptr<BufferStorage> getSpace(Space space) override;

Expand Down Expand Up @@ -77,7 +79,7 @@ namespace kagome::storage {

static rocksdb::ColumnFamilyOptions configureColumn(uint32_t memory_budget);

rocksdb::DB *db_{};
rocksdb::DBWithTTL *db_{};
std::vector<ColumnFamilyHandlePtr> column_family_handles_;
boost::container::flat_map<Space, std::shared_ptr<BufferStorage>> spaces_;
rocksdb::ReadOptions ro_;
Expand Down
Loading

0 comments on commit 8a593bb

Please sign in to comment.