From ad0b3b4f4d370635d607d4200407822a57cb6592 Mon Sep 17 00:00:00 2001 From: Vasil Danielov Pashov Date: Wed, 18 Dec 2024 11:38:42 +0200 Subject: [PATCH] Storage mover port (#2039) #### Reference Issues/PRs Add the StorageMover class from arcticc. Some changes in the implementation of the StorageMover were needed as `batch_read_compressed` used to return not only the keys but the segments as well. In the current ArcticDB version the segment is passed as continuation and that required keeping additional vector where the segments themselves are stored. Make sure all unit tests are passing. No changes to the API are done. Note in arcticc it was in the tools module while here it's in the toolbox module. #### What does this implement or fix? #### Any other comments? #### Checklist
Checklist for code changes... - [ ] Have you updated the relevant docstrings, documentation and copyright notice? - [ ] Is this contribution tested against [all ArcticDB's features](../docs/mkdocs/docs/technical/contributing.md)? - [ ] Do all exceptions introduced raise appropriate [error messages](https://docs.arcticdb.io/error_messages/)? - [ ] Are API changes highlighted in the PR description? - [ ] Is the PR labelled as enhancement or bug so it appears in autogenerated release notes?
--- cpp/arcticdb/CMakeLists.txt | 3 + cpp/arcticdb/async/async_store.hpp | 4 +- cpp/arcticdb/async/task_scheduler.hpp | 4 +- cpp/arcticdb/entity/atom_key.hpp | 4 +- cpp/arcticdb/entity/key.cpp | 4 + cpp/arcticdb/entity/key.hpp | 34 +- cpp/arcticdb/entity/metrics.hpp | 4 +- cpp/arcticdb/storage/storage_utils.cpp | 147 ++++ cpp/arcticdb/storage/storage_utils.hpp | 41 +- cpp/arcticdb/storage/storages.hpp | 2 +- cpp/arcticdb/stream/index_aggregator.hpp | 8 + cpp/arcticdb/stream/segment_aggregator.hpp | 1 - cpp/arcticdb/stream/stream_source.hpp | 4 +- cpp/arcticdb/toolbox/library_tool.hpp | 3 +- cpp/arcticdb/toolbox/python_bindings.cpp | 29 + cpp/arcticdb/toolbox/python_bindings.hpp | 7 - cpp/arcticdb/toolbox/storage_mover.hpp | 659 ++++++++++++++++++ cpp/arcticdb/util/error_code.hpp | 3 +- .../version/local_versioned_engine.cpp | 4 +- cpp/arcticdb/version/python_bindings.cpp | 4 +- cpp/arcticdb/version/version_core.cpp | 2 +- cpp/arcticdb/version/version_store_api.cpp | 2 +- python/arcticdb/toolbox/storage.py | 3 + .../integration/toolbox/test_storage_mover.py | 330 +++++++++ 24 files changed, 1227 insertions(+), 79 deletions(-) create mode 100644 cpp/arcticdb/storage/storage_utils.cpp create mode 100644 cpp/arcticdb/toolbox/storage_mover.hpp create mode 100644 python/arcticdb/toolbox/storage.py create mode 100644 python/tests/integration/toolbox/test_storage_mover.py diff --git a/cpp/arcticdb/CMakeLists.txt b/cpp/arcticdb/CMakeLists.txt index 429b8acad4..e41495c5d4 100644 --- a/cpp/arcticdb/CMakeLists.txt +++ b/cpp/arcticdb/CMakeLists.txt @@ -308,6 +308,7 @@ set(arcticdb_srcs storage/storage.hpp storage/storage_override.hpp storage/store.hpp + storage/storage_utils.hpp stream/aggregator.hpp stream/aggregator-inl.hpp stream/append_map.hpp @@ -328,6 +329,7 @@ set(arcticdb_srcs stream/stream_utils.hpp stream/stream_writer.hpp toolbox/library_tool.hpp + toolbox/storage_mover.hpp util/allocator.hpp util/bitset.hpp util/buffer.hpp @@ -486,6 +488,7 @@ set(arcticdb_srcs storage/s3/s3_storage.cpp storage/s3/s3_storage_tool.cpp storage/storage_factory.cpp + storage/storage_utils.cpp stream/aggregator.cpp stream/append_map.cpp stream/index.cpp diff --git a/cpp/arcticdb/async/async_store.hpp b/cpp/arcticdb/async/async_store.hpp index fed575a817..3bfe961703 100644 --- a/cpp/arcticdb/async/async_store.hpp +++ b/cpp/arcticdb/async/async_store.hpp @@ -51,11 +51,11 @@ class AsyncStore : public Store { public: AsyncStore( std::shared_ptr library, - const arcticdb::proto::encoding::VariantCodec &codec, + const proto::encoding::VariantCodec &codec, EncodingVersion encoding_version ) : library_(std::move(library)), - codec_(std::make_shared(codec)), + codec_(std::make_shared(codec)), encoding_version_(encoding_version) { } diff --git a/cpp/arcticdb/async/task_scheduler.hpp b/cpp/arcticdb/async/task_scheduler.hpp index 08e4ba419c..4efd408cdd 100644 --- a/cpp/arcticdb/async/task_scheduler.hpp +++ b/cpp/arcticdb/async/task_scheduler.hpp @@ -300,13 +300,13 @@ inline auto& io_executor() { } template -inline auto submit_cpu_task(Task&& task) { +auto submit_cpu_task(Task&& task) { return TaskScheduler::instance()->submit_cpu_task(std::forward(task)); } template -inline auto submit_io_task(Task&& task) { +auto submit_io_task(Task&& task) { return TaskScheduler::instance()->submit_io_task(std::forward(task)); } diff --git a/cpp/arcticdb/entity/atom_key.hpp b/cpp/arcticdb/entity/atom_key.hpp index 5197615c3f..7240979cd0 100644 --- a/cpp/arcticdb/entity/atom_key.hpp +++ b/cpp/arcticdb/entity/atom_key.hpp @@ -91,8 +91,8 @@ class AtomKeyImpl { } friend bool operator<(const AtomKeyImpl &l, const AtomKeyImpl &r) { - auto lt = std::tie(l.id_, l.version_id_, l.index_start_, l.index_end_, l.creation_ts_); - auto rt = std::tie(r.id_, r.version_id_, r.index_start_, r.index_end_, r.creation_ts_); + const auto lt = std::tie(l.id_, l.version_id_, l.index_start_, l.index_end_, l.creation_ts_); + const auto rt = std::tie(r.id_, r.version_id_, r.index_start_, r.index_end_, r.creation_ts_); return lt < rt; } diff --git a/cpp/arcticdb/entity/key.cpp b/cpp/arcticdb/entity/key.cpp index 8c31aee12c..1632e3c5a2 100644 --- a/cpp/arcticdb/entity/key.cpp +++ b/cpp/arcticdb/entity/key.cpp @@ -90,6 +90,10 @@ KeyClass key_class_from_key_type(KeyType key_type) { return get_key_data(key_type).key_class_; } +const char* get_key_description(KeyType key_type) { + return get_key_data(key_type).description_; +} + bool is_string_key_type(KeyType key_type){ return variant_type_from_key_type(key_type) == VariantType::STRING_TYPE; } diff --git a/cpp/arcticdb/entity/key.hpp b/cpp/arcticdb/entity/key.hpp index 54b57c8fb9..3ecc51e27f 100644 --- a/cpp/arcticdb/entity/key.hpp +++ b/cpp/arcticdb/entity/key.hpp @@ -16,6 +16,10 @@ #include #include #include +#include +#include + +namespace rng = std::ranges; namespace arcticdb::entity { @@ -193,10 +197,10 @@ enum class KeyType : int { UNDEFINED }; -inline std::vector key_types_write_precedence() { +consteval auto key_types_write_precedence() { // TOMBSTONE[_ALL] keys are not included because they're not written to the storage, // they just exist inside version keys - return { + return std::array { KeyType::LIBRARY_CONFIG, KeyType::TABLE_DATA, KeyType::TABLE_INDEX, @@ -215,9 +219,9 @@ inline std::vector key_types_write_precedence() { }; } -inline std::vector key_types_read_precedence() { +consteval auto key_types_read_precedence() { auto output = key_types_write_precedence(); - std::reverse(std::begin(output), std::end(output)); + rng::reverse(output); return output; } @@ -247,7 +251,7 @@ enum class VariantType : char { VariantType variant_type_from_key_type(KeyType key_type); -inline bool is_index_key_type(KeyType key_type) { +constexpr bool is_index_key_type(KeyType key_type) { // TODO: Change name probably. return (key_type == KeyType::TABLE_INDEX) || (key_type == KeyType::MULTI_KEY); } @@ -258,30 +262,26 @@ bool is_ref_key_class(KeyType k); bool is_block_ref_key_class(KeyType k); -inline KeyType get_key_type_for_data_stream(const StreamId &) { +constexpr KeyType get_key_type_for_data_stream(const StreamId &) { return KeyType::TABLE_DATA; } -inline KeyType get_key_type_for_index_stream(const StreamId &) { +constexpr KeyType get_key_type_for_index_stream(const StreamId &) { return KeyType::TABLE_INDEX; } +const char* get_key_description(KeyType type); template -auto foreach_key_type_read_precedence(Function&& func) { - auto types = key_types_read_precedence(); - for(auto type : types) { - func(KeyType(type)); - } +constexpr auto foreach_key_type_read_precedence(Function&& func) { + rng::for_each(key_types_read_precedence(), func); } template -auto foreach_key_type_write_precedence(Function&& func) { - auto types = key_types_write_precedence(); - for(auto type : types) { - func(KeyType(type)); - } +constexpr auto foreach_key_type_write_precedence(Function&& func) { + rng::for_each(key_types_write_precedence(), func); } + inline KeyType key_type_from_int(int type_num) { util::check(type_num > 0 && type_num < int(KeyType::UNDEFINED), "Unrecognized key type number {}", type_num); return KeyType(type_num); diff --git a/cpp/arcticdb/entity/metrics.hpp b/cpp/arcticdb/entity/metrics.hpp index 4819098cbe..db56a0551c 100644 --- a/cpp/arcticdb/entity/metrics.hpp +++ b/cpp/arcticdb/entity/metrics.hpp @@ -32,8 +32,8 @@ namespace arcticdb { const std::string MONGO_INSTANCE_LABEL = "mongo_instance"; const std::string PROMETHEUS_ENV_LABEL = "env"; -const int SUMMARY_MAX_AGE = 30; -const int SUMMARY_AGE_BUCKETS = 5; +constexpr int SUMMARY_MAX_AGE = 30; +constexpr int SUMMARY_AGE_BUCKETS = 5; class MetricsConfig { public: diff --git a/cpp/arcticdb/storage/storage_utils.cpp b/cpp/arcticdb/storage/storage_utils.cpp new file mode 100644 index 0000000000..7ae6816b38 --- /dev/null +++ b/cpp/arcticdb/storage/storage_utils.cpp @@ -0,0 +1,147 @@ +#include +#include +#include +#include + +namespace arcticdb { + +std::vector filter_keys_on_existence( + const std::vector& keys, + const std::shared_ptr& store, + bool pred + ){ + auto key_existence = folly::collect(store->batch_key_exists(keys)).get(); + std::vector res; + for (size_t i = 0; i != keys.size(); i++) { + if (key_existence[i] == pred) { + res.push_back(keys[i]); + } + } + return res; +} + +void filter_keys_on_existence(std::vector& keys, const std::shared_ptr& store, bool pred) { + std::vector var_vector; + var_vector.reserve(keys.size()); + rng::copy(keys, std::back_inserter(var_vector)); + + auto key_existence = store->batch_key_exists(var_vector); + + auto keys_itr = keys.begin(); + for (size_t i = 0; i != var_vector.size(); i++) { + bool resolved = key_existence[i].wait().value(); + if (resolved == pred) { + *keys_itr = std::move(std::get(var_vector[i])); + ++keys_itr; + } + } + keys.erase(keys_itr, keys.end()); +} + +AtomKey write_table_index_tree_from_source_to_target( + const std::shared_ptr& source_store, + const std::shared_ptr& target_store, + const AtomKey& index_key, + std::optional new_version_id +) { + ARCTICDB_SAMPLE(WriteIndexSourceToTarget, 0) + // In + auto [_, index_seg] = source_store->read_sync(index_key); + index::IndexSegmentReader index_segment_reader{std::move(index_seg)}; + // Out + index::IndexWriter writer(target_store, + {index_key.id(), new_version_id.value_or(index_key.version_id())}, + std::move(index_segment_reader.mutable_tsd())); + std::vector> futures; + // Process + for (auto iter = index_segment_reader.begin(); iter != index_segment_reader.end(); ++iter) { + auto& sk = *iter; + auto& key = sk.key(); + std::optional key_to_write = atom_key_builder() + .version_id(new_version_id.value_or(key.version_id())) + .creation_ts(util::SysClock::nanos_since_epoch()) + .start_index(key.start_index()) + .end_index(key.end_index()) + .content_hash(key.content_hash()) + .build(key.id(), key.type()); + + writer.add(*key_to_write, sk.slice()); // Both const ref + futures.emplace_back(submit_io_task(async::CopyCompressedInterStoreTask{ + sk.key(), + std::move(key_to_write), + false, + false, + source_store, + {target_store}})); + } + const std::vector store_results = collect(futures).get(); + for (const async::CopyCompressedInterStoreTask::ProcessingResult& res: store_results) { + util::variant_match( + res, + [&](const async::CopyCompressedInterStoreTask::FailedTargets& failed) { + log::storage().error("Failed to move targets: {} from {} to {}", failed, source_store->name(), target_store->name()); + }, + [](const auto&){}); + } + // FUTURE: clean up already written keys if exception + return to_atom(writer.commit().get()); +} + +AtomKey copy_multi_key_from_source_to_target( + const std::shared_ptr& source_store, + const std::shared_ptr& target_store, + const AtomKey& index_key, + std::optional new_version_id) { + using namespace arcticdb::stream; + auto fut_index = source_store->read(index_key); + auto [_, index_seg] = std::move(fut_index).get(); + std::vector keys; + for (size_t idx = 0; idx < index_seg.row_count(); idx++) { + keys.push_back(stream::read_key_row(index_seg, static_cast(idx))); + } + // Recurse on the index keys inside MULTI_KEY + std::vector new_data_keys; + for (const auto &k: keys) { + auto new_key = copy_index_key_recursively(source_store, target_store, k, new_version_id); + new_data_keys.emplace_back(std::move(new_key)); + } + // Write new MULTI_KEY + + folly::Future multi_key_fut = folly::Future::makeEmpty(); + IndexAggregator multi_index_agg(index_key.id(), [&new_version_id, &index_key, &multi_key_fut, &target_store](auto &&segment) { + multi_key_fut = target_store->write(KeyType::MULTI_KEY, + new_version_id.value_or(index_key.version_id()), // version_id + index_key.id(), + 0, // start_index + 0, // end_index + std::forward(segment)).wait(); + }); + for (auto &key: new_data_keys) { + multi_index_agg.add_key(to_atom(key)); + } + if (index_seg.has_metadata()) { + google::protobuf::Any metadata = *index_seg.metadata(); + multi_index_agg.set_metadata(std::move(metadata)); + } + if (index_seg.has_index_descriptor()) { + multi_index_agg.set_timeseries_descriptor(index_seg.index_descriptor()); + } + multi_index_agg.commit(); + return to_atom(multi_key_fut.value()); +} + +AtomKey copy_index_key_recursively( + const std::shared_ptr& source_store, + const std::shared_ptr& target_store, + const AtomKey& index_key, + std::optional new_version_id) { + ARCTICDB_SAMPLE(RecurseIndexKey, 0) + if (index_key.type() == KeyType::TABLE_INDEX) { + return write_table_index_tree_from_source_to_target(source_store, target_store, index_key, new_version_id); + } else if (index_key.type() == KeyType::MULTI_KEY) { + return copy_multi_key_from_source_to_target(source_store, target_store, index_key, new_version_id); + } + internal::raise("Cannot copy index recursively. Unsupported index key type {}", index_key.type()); +} + +} \ No newline at end of file diff --git a/cpp/arcticdb/storage/storage_utils.hpp b/cpp/arcticdb/storage/storage_utils.hpp index a99e8733d8..d825d370d7 100644 --- a/cpp/arcticdb/storage/storage_utils.hpp +++ b/cpp/arcticdb/storage/storage_utils.hpp @@ -18,38 +18,13 @@ inline auto stream_id_prefix_matcher(const std::string &prefix) { std::get(id).compare(0u, prefix.size(), prefix) == 0); }; } -inline std::vector filter_keys_on_existence( - const std::vector& keys, - const std::shared_ptr& store, - bool pred - ){ - auto key_existence = folly::collect(store->batch_key_exists(keys)).get(); - std::vector res; - for (size_t i = 0; i != keys.size(); i++) { - if (key_existence[i] == pred) { - res.push_back(keys[i]); - } - } - return res; -} - -inline void filter_keys_on_existence(std::vector& keys, const std::shared_ptr& store, bool pred) { - std::vector var_vector; - var_vector.reserve(keys.size()); - std::transform(keys.begin(), keys.end(), std::back_inserter(var_vector), - [](auto&& k) { return VariantKey(std::move(k)); }); - - auto key_existence = store->batch_key_exists(var_vector); - - auto keys_itr = keys.begin(); - for (size_t i = 0; i != var_vector.size(); i++) { - bool resolved = key_existence[i].wait().value(); - if (resolved == pred) { - *keys_itr = std::move(std::get(var_vector[i])); - ++keys_itr; - } - } - keys.erase(keys_itr, keys.end()); -} +std::vector filter_keys_on_existence(const std::vector& keys, const std::shared_ptr& store, bool pred); +void filter_keys_on_existence(std::vector& keys, const std::shared_ptr& store, bool pred); + +AtomKey copy_index_key_recursively( + const std::shared_ptr& source_store, + const std::shared_ptr& target_store, + const AtomKey& index_key, + std::optional new_version_id); } //namespace arcticdb \ No newline at end of file diff --git a/cpp/arcticdb/storage/storages.hpp b/cpp/arcticdb/storage/storages.hpp index bc2de2c526..9b393cda4f 100644 --- a/cpp/arcticdb/storage/storages.hpp +++ b/cpp/arcticdb/storage/storages.hpp @@ -182,7 +182,7 @@ class Storages { OpenMode mode_; }; -inline std::shared_ptr create_storages(const LibraryPath& library_path, OpenMode mode, const decltype(std::declval().storage_by_id())& storage_configs, const NativeVariantStorage& native_storage_config) { +inline std::shared_ptr create_storages(const LibraryPath& library_path, OpenMode mode, decltype(std::declval().storage_by_id())& storage_configs, const NativeVariantStorage& native_storage_config) { Storages::StorageVector storages; for (const auto& [storage_id, storage_config]: storage_configs) { util::variant_match(native_storage_config.variant(), diff --git a/cpp/arcticdb/stream/index_aggregator.hpp b/cpp/arcticdb/stream/index_aggregator.hpp index 1af2294fcd..22cf2c9123 100644 --- a/cpp/arcticdb/stream/index_aggregator.hpp +++ b/cpp/arcticdb/stream/index_aggregator.hpp @@ -60,6 +60,10 @@ class FlatIndexingPolicy { segment_.set_timeseries_descriptor(timeseries_descriptor); } + void set_metadata(google::protobuf::Any&& metadata) { + segment_.set_metadata(std::move(metadata)); + } + private: Callback callback_; FixedSchema schema_; @@ -89,6 +93,10 @@ class IndexAggregator { indexing_policy_.set_timeseries_descriptor(timeseries_descriptor); } + void set_metadata(google::protobuf::Any&& metadata) { + indexing_policy_.set_metadata(std::move(metadata)); + } + private: IndexingPolicy indexing_policy_; }; diff --git a/cpp/arcticdb/stream/segment_aggregator.hpp b/cpp/arcticdb/stream/segment_aggregator.hpp index 95660a81a5..f25b72c56f 100644 --- a/cpp/arcticdb/stream/segment_aggregator.hpp +++ b/cpp/arcticdb/stream/segment_aggregator.hpp @@ -11,7 +11,6 @@ #include #include #include -#include #include namespace arcticdb::stream { diff --git a/cpp/arcticdb/stream/stream_source.hpp b/cpp/arcticdb/stream/stream_source.hpp index 401fa8a4ac..3d7487fb9e 100644 --- a/cpp/arcticdb/stream/stream_source.hpp +++ b/cpp/arcticdb/stream/stream_source.hpp @@ -11,8 +11,6 @@ #include #include #include -#include -#include #include #include @@ -55,7 +53,7 @@ struct StreamSource { virtual bool supports_prefix_matching() const = 0; virtual bool fast_delete() = 0; - using ReadContinuation = folly::Function; + using ReadContinuation = folly::Function; virtual folly::Future> batch_read_compressed( std::vector> &&ks, diff --git a/cpp/arcticdb/toolbox/library_tool.hpp b/cpp/arcticdb/toolbox/library_tool.hpp index da5b2bec12..bfd48821f2 100644 --- a/cpp/arcticdb/toolbox/library_tool.hpp +++ b/cpp/arcticdb/toolbox/library_tool.hpp @@ -12,7 +12,6 @@ #include #include #include -#include #include #include #include @@ -53,7 +52,7 @@ class LibraryTool { void remove(VariantKey key); - std::vector find_keys(arcticdb::entity::KeyType); + std::vector find_keys(entity::KeyType); bool key_exists(const VariantKey& key); diff --git a/cpp/arcticdb/toolbox/python_bindings.cpp b/cpp/arcticdb/toolbox/python_bindings.cpp index f53b89e07a..6d70f6e425 100644 --- a/cpp/arcticdb/toolbox/python_bindings.cpp +++ b/cpp/arcticdb/toolbox/python_bindings.cpp @@ -16,6 +16,7 @@ #include #include #include +#include namespace arcticdb::toolbox::apy { @@ -86,6 +87,34 @@ void register_bindings(py::module &m, py::exception& .def("take_lock_guard", &ReliableStorageLockManager::take_lock_guard) .def("free_lock_guard", &ReliableStorageLockManager::free_lock_guard); + + py::class_(tools, "StorageMover") + .def(py::init, std::shared_ptr>()) + .def("go", + &StorageMover::go, + "start the storage mover copy", + py::arg("batch_size") = 100) + .def("get_keys_in_source_only", + &StorageMover::get_keys_in_source_only) + .def("get_all_source_keys", + &StorageMover::get_all_source_keys, + "get_all_source_keys") + .def("incremental_copy", + &StorageMover::incremental_copy, + "incrementally copy keys") + .def("write_keys_from_source_to_target", + &StorageMover::write_keys_from_source_to_target, + "write_keys_from_source_to_target") + .def("write_symbol_trees_from_source_to_target", + &StorageMover::write_symbol_trees_from_source_to_target, + "write_symbol_trees_from_source_to_target") + .def("clone_all_keys_for_symbol", + &StorageMover::clone_all_keys_for_symbol, + "Clone all the keys that have this symbol as id to the dest library.") + .def("clone_all_keys_for_symbol_for_type", + &StorageMover::clone_all_keys_for_symbol_for_type, + "Clone all the keys that have this symbol and type to the dest library."); + // S3 Storage tool using namespace arcticdb::storage::s3; py::class_>(tools, "S3Tool") diff --git a/cpp/arcticdb/toolbox/python_bindings.hpp b/cpp/arcticdb/toolbox/python_bindings.hpp index 2975663ce5..592c368714 100644 --- a/cpp/arcticdb/toolbox/python_bindings.hpp +++ b/cpp/arcticdb/toolbox/python_bindings.hpp @@ -8,13 +8,6 @@ #pragma once #include -#include - -#include -#include - -#include -#include namespace arcticdb::toolbox::apy { diff --git a/cpp/arcticdb/toolbox/storage_mover.hpp b/cpp/arcticdb/toolbox/storage_mover.hpp new file mode 100644 index 0000000000..4a33a7d96c --- /dev/null +++ b/cpp/arcticdb/toolbox/storage_mover.hpp @@ -0,0 +1,659 @@ +#pragma once + +#include + +#include "codec/default_codecs.hpp" +#include "column_store/column_utils.hpp" + +#include "stream/test/stream_test_common.hpp" +#include "util/variant.hpp" +#include "fmt/format.h" +#include +#include "async/async_store.hpp" +#include "version/version_map.hpp" +#include +#include + +#include + +#include +#include +#include +#include + +#include +#include +#include +#include + +#include "version/version_functions.hpp" + +namespace rng = std::ranges; + +namespace arcticdb { + +constexpr std::size_t NumThreads = 50; + +struct BatchCopier { + std::atomic count_ = 0; + std::atomic objects_moved_ = 0; + std::atomic bytes_moved_ = 0; + std::atomic skipped_ = 0; + interval_timer timers_; + std::vector keys_; + std::shared_ptr source_store_; + std::shared_ptr target_store_; + size_t batch_size_; + size_t thread_count_; + + BatchCopier(std::shared_ptr source_store, + std::shared_ptr target_store, + size_t batch_size, + size_t thread_count=32) : + source_store_(std::move(source_store)), + target_store_(std::move(target_store)), + batch_size_(batch_size), + thread_count_{thread_count}{ + timers_.start_timer(); + } + + void add_key(const VariantKey& key, bool check_target=true, bool check_source=true) { + if(check_target && !is_ref_key_class(variant_key_type(key)) && target_store_->key_exists(key).get()) { + ++skipped_; + return; + } + + if(check_source && !source_store_->key_exists(key).get()) { + log::storage().warn("Found an unreadable key {}", key); + return; + } + + keys_.push_back(key); + if(keys_.size() == batch_size_) { + copy_keys(); + keys_ = std::vector(); + + if(++count_ %10 == 0) { + timers_.stop_timer(); + auto bps = bytes_moved_ / timers_.get_timer().get_results().total; + log::storage().info("Moved {}, {} objects ({} skipped), {} per second", format_bytes(bytes_moved_), objects_moved_, skipped_, format_bytes(bps)); + timers_.start_timer(); + } + } + } + + void go(std::unordered_map>&& keys, bool perform_checks) { + size_t batch_size_per_thread = std::max(batch_size_ / thread_count_, size_t{1}); + // Log approximately every 10000 objects + uint64_t logging_frequency = 10000 / batch_size_per_thread; + folly::FutureExecutor exec{thread_count_}; + std::vector> futures; + + foreach_key_type_write_precedence([&](auto key_type) { + bool check_target = perform_checks && !is_ref_key_class(key_type); + bool check_source = perform_checks; + if (auto it = keys.find(key_type); it != keys.end()) { + while(it->second.size() > 0) { + const auto start = it->second.size() >= batch_size_per_thread ? it->second.end() - batch_size_per_thread : it->second.begin(); + const auto end = it->second.end(); + const size_t size = std::distance(start, end); + std::vector> keys_to_copy; + keys_to_copy.reserve(size); + auto segments_ptr = std::make_unique>(size); + std::transform( + std::make_move_iterator(start), + std::make_move_iterator(end), + std::back_inserter(keys_to_copy), + [segments = segments_ptr.get(), pos = 0](VariantKey&& key) mutable { + return std::pair{std::move(key), [segments, pos=pos++](storage::KeySegmentPair&& segment) { + segments->at(pos) = std::move(segment); + return segments->at(pos).variant_key(); + }}; + } + ); + it->second.erase(start, end); + futures.emplace_back(exec.addFuture( + [this, keys_to_copy=std::move(keys_to_copy), &logging_frequency, check_target, check_source, segments_ptr=std::move(segments_ptr)]() mutable { + for (const auto& key: keys_to_copy) { + if(check_source && !source_store_->key_exists(key.first).get()) { + log::storage().warn("Found an unreadable key {}", key.first); + } + if(check_target && target_store_->key_exists(key.first).get()) { + ++skipped_; + } + } + auto collected_kvs = source_store_->batch_read_compressed(std::move(keys_to_copy), BatchReadArgs{}).get(); + if (!collected_kvs.empty()) { + const size_t bytes_being_copied = std::accumulate(segments_ptr->begin(), segments_ptr->end(), size_t{0}, [] (size_t a, const storage::KeySegmentPair& ks) { + return a + ks.segment().size(); + }); + target_store_->batch_write_compressed(*segments_ptr.release()).get(); + bytes_moved_.fetch_add(bytes_being_copied, std::memory_order_relaxed); + objects_moved_.fetch_add(collected_kvs.size(), std::memory_order_relaxed); + } + ++count_; + if (count_.compare_exchange_strong(logging_frequency, 0)) { + timers_.stop_timer(); + auto bps = bytes_moved_.load() / timers_.get_timer().get_results().total; + log::storage().info("Moved {}, {} objects ({} skipped), {} per second", + format_bytes(bytes_moved_.load()), + objects_moved_.load(), + skipped_.load(), + format_bytes(bps)); + timers_.start_timer(); + } + // count_ could be incremented to a value greater than logging_frequency, just reset it in this case + if (count_.load() > logging_frequency) { + count_.store(0); + } + return makeFuture(folly::Unit{}); + })); + } + } + }); + collect(futures).get(); + timers_.stop_timer(); + auto bps = bytes_moved_.load() / timers_.get_timer().get_results().total; + log::storage().info("Moved {}, {} objects ({} skipped), {} per second", + format_bytes(bytes_moved_.load()), + objects_moved_.load(), + skipped_.load(), + format_bytes(bps)); + } + + void copy_keys() { + std::vector segments(keys_.size()); + std::vector> keys_to_copy; + keys_to_copy.reserve(keys_.size()); + std::transform( + std::make_move_iterator(keys_.begin()), + std::make_move_iterator(keys_.end()), + std::back_inserter(keys_to_copy), + [&segments, i=0](VariantKey&& key) mutable { + return std::pair{std::move(key), [&segments, i=i++](storage::KeySegmentPair&& ks) { + segments.at(i) = std::move(ks); + return segments.at(i).variant_key(); + }}; + } + ); + keys_.clear(); + auto collected_kvs = source_store_->batch_read_compressed(std::move(keys_to_copy), BatchReadArgs{}).get(); + if (!collected_kvs.empty()) { + bytes_moved_ += std::accumulate(segments.begin(), segments.end(), size_t{0}, [] (size_t a, const storage::KeySegmentPair& ks) { + return a + ks.segment().size(); + }); + target_store_->batch_write_compressed(std::move(segments)).get(); + } + objects_moved_ += keys_.size(); + } + + void finalize() { + if(!keys_.empty()) { + copy_keys(); + } + timers_.stop_timer(); + auto total = timers_.get_timer().get_results().total; + auto bps = bytes_moved_ / total; + log::storage().info("Moved {} {} objects in {} - {} bps ", format_bytes(bytes_moved_), objects_moved_, total, format_bytes(bps)); + } +}; + +struct BatchDeleter { + uint64_t count = 0; + uint64_t objects_moved = 0; + uint64_t skipped = 0; + interval_timer timers; + std::vector keys; + std::shared_ptr source_store_; + std::shared_ptr target_store_; + size_t batch_size_; + + BatchDeleter(std::shared_ptr source_store, std::shared_ptr target_store, size_t batch_size) : + source_store_(std::move(source_store)), + target_store_(std::move(target_store)), + batch_size_(batch_size){ + timers.start_timer(); + } + + void delete_keys() { + target_store_->remove_keys(keys).get(); + objects_moved += keys.size(); + } + + void add_key(const VariantKey& key, bool check_target=true) { + if(check_target && !target_store_->key_exists(key).get()) { + skipped++; + log::storage().warn("Found an unreadable key {}", key); + return; + } + keys.push_back(key); + if(keys.size() == batch_size_) { + delete_keys(); + keys = std::vector(); + + if(++count %10 == 0) { + timers.stop_timer(); + auto bps = objects_moved / timers.get_timer().get_results().total; + log::storage().info("Moved {} objects ({} skipped), {} per second", objects_moved, skipped, bps); + timers.start_timer(); + } + } + } + + void finalize() { + if(!keys.empty()) { + delete_keys(); + } + timers.stop_timer(); + auto total = timers.get_timer().get_results().total; + auto bps = objects_moved / timers.get_timer().get_results().total; + log::storage().info("Moved {} objects in {} - {} per second ", objects_moved, total, bps); + } +}; + +inline MetricsConfig::Model get_model_from_proto_config(const proto::utils::PrometheusConfig& cfg) { + switch (cfg.prometheus_model()) { + case proto::utils::PrometheusConfig_PrometheusModel_NO_INIT: return MetricsConfig::Model::NO_INIT; + case proto::utils::PrometheusConfig_PrometheusModel_PUSH: return MetricsConfig::Model::PUSH; + case proto::utils::PrometheusConfig_PrometheusModel_WEB: return MetricsConfig::Model::PULL; + default: internal::raise("Unknown Prometheus proto model {}", int{cfg.prometheus_model()}); + } +} + +class ARCTICDB_VISIBILITY_HIDDEN StorageMover { +public: + StorageMover(std::shared_ptr source_library, std::shared_ptr target_library) : + source_store_(std::make_shared>(source_library, + codec::default_lz4_codec(), + encoding_version(source_library->config()))), + target_store_(std::make_shared>(target_library, + codec::default_lz4_codec(), + encoding_version(target_library->config()))), + cfg_() { + codec::check( + encoding_version(source_library->config()) == encoding_version(target_library->config()), + "The encoding version of the source library {} is {} which is different than the encoding version {} of the target library {}", + source_library->name(), encoding_version(source_library->config()),encoding_version(target_library->config()), target_library->name()); + auto const& src_cfg = source_library->config(); + util::variant_match(src_cfg, + [](std::monostate){util::raise_rte("Invalid source library cfg");}, + [&](const proto::storage::VersionStoreConfig& conf){ + if (conf.has_prometheus_config()) { + MetricsConfig prometheus_config( + conf.prometheus_config().host(), + conf.prometheus_config().port(), + conf.prometheus_config().job_name(), + conf.prometheus_config().instance(), + conf.prometheus_config().prometheus_env(), + get_model_from_proto_config(conf.prometheus_config()) + ); + PrometheusInstance::instance()->configure(prometheus_config); + } + source_symbol_list_ = conf.symbol_list(); + }); + + auto const& target_cfg = target_library->config(); + util::variant_match(target_cfg, + [](std::monostate){util::raise_rte("Invalid source library cfg");}, + [&](const proto::storage::VersionStoreConfig& conf){ + target_symbol_list_ = conf.symbol_list(); + }); + } + + void go(size_t batch_size = 1000) { + BatchCopier copier{source_store_, target_store_, batch_size}; + foreach_key_type([&](KeyType key_type) { + source_store_->iterate_type(key_type, [&](const VariantKey &&key) { + copier.add_key(key); + }); + }); + copier.finalize(); + } + + py::list get_all_source_keys() { + py::list res; + size_t count = 0; + foreach_key_type([&](KeyType key_type) { + source_store_->iterate_type(key_type, [&](const VariantKey& key) { + res.append(key); + if(++count % 10000 == 0) + log::storage().info("Got {} keys", count); + }); + }); + return res; + } + + struct MissingKeysData { + std::atomic scanned_keys_; + std::atomic missing_keys_; + std::mutex mutex_; + interval_timer timer_; + + MissingKeysData() : + scanned_keys_(0), + missing_keys_(0) + { + timer_.start_timer(); + } + + void report() { + std::lock_guard lock{mutex_}; + timer_.stop_timer(); + auto keys_per_sec = scanned_keys_ / timer_.get_timer().get_results().total; + log::version().info("Scanned {} keys of all types and found {} missing : {} keys/sec", scanned_keys_.load(), missing_keys_.load(), keys_per_sec); + timer_.start_timer(); + } + }; + + struct FindMissingKeysTask : async::BaseTask { + KeyType key_type_; + std::shared_ptr source_store_; + std::shared_ptr target_store_; + std::shared_ptr global_data_; + uint64_t keys_of_type_; + uint64_t missing_keys_of_type_; + size_t batch_size_; + bool skip_target_check_ref_; + bool skip_source_check_; + + FindMissingKeysTask( + KeyType key_type, + std::shared_ptr source_store, + std::shared_ptr target_store, + std::shared_ptr global_data, + size_t batch_size=100, + bool skip_target_check_ref=false, + bool skip_source_check=false): + key_type_(key_type), + source_store_(std::move(source_store)), + target_store_(std::move(target_store)), + global_data_(std::move(global_data)), + keys_of_type_(0), + missing_keys_of_type_(0), + batch_size_(batch_size), + skip_target_check_ref_(skip_target_check_ref), + skip_source_check_(skip_source_check){ + } + + std::vector operator()() { + interval_timer timers; + timers.start_timer(); + std::vector res; + std::vector all_keys; + source_store_->iterate_type(key_type_, [&](const VariantKey &&key) { + ++keys_of_type_; + ++global_data_->scanned_keys_; + all_keys.emplace_back(key); + if (all_keys.size() == batch_size_) { + auto key_exists = folly::collect(target_store_->batch_key_exists(all_keys)).get(); + for (size_t idx = 0; idx != all_keys.size(); idx++) { + if ((skip_target_check_ref_ && is_ref_key_class(key_type_)) || !key_exists[idx]) { + if (skip_source_check_ || source_store_->key_exists(all_keys[idx]).get()) { + res.push_back(all_keys[idx]); + ++missing_keys_of_type_; + ++global_data_->missing_keys_; + } else { + log::storage().warn("Storage contains an unreadable key {}", all_keys[idx]); + } + } + } + all_keys.clear(); + } + if(keys_of_type_ % 10000 == 0) { + timers.stop_timer(); + auto keys_per_sec = keys_of_type_ / timers.get_timer().get_results().total; + log::version().info("Scanned {} {} keys and found {} missing : {} keys/sec", keys_of_type_, get_key_description(key_type_), missing_keys_of_type_, keys_per_sec); + global_data_->report(); + timers.start_timer(); + } + }); + + if (!all_keys.empty()) { + auto key_exists = folly::collect(target_store_->batch_key_exists(all_keys)).get(); + for (size_t idx = 0; idx != all_keys.size(); idx++) { + if ((skip_target_check_ref_ && is_ref_key_class(key_type_)) || !key_exists[idx]) { + if (skip_source_check_ || source_store_->key_exists(all_keys[idx]).get()) { + res.push_back(all_keys[idx]); + ++missing_keys_of_type_; + ++global_data_->missing_keys_; + } else { + log::storage().warn("Storage contains an unreadable key {}", all_keys[idx]); + } + } + } + } + + log::storage().info("{} missing keys of type {}, scanned {}", res.size(), get_key_description(key_type_), keys_of_type_); + return res; + } + }; + + std::unordered_map> get_missing_keys(size_t batch_size, bool reverse, bool skip_target_check_ref) { + auto shared_data = std::make_shared(); + std::unordered_map> results; + auto prim = reverse ? target_store_ : source_store_; + auto second = reverse ? source_store_ : target_store_; + foreach_key_type_read_precedence([&](KeyType key_type) { + auto task = FindMissingKeysTask{key_type, prim, second, shared_data, batch_size, skip_target_check_ref, true}; + results.emplace(key_type, task()); + }); + + log::storage().info("Finished scan, collating results"); + shared_data->report(); + return results; + } + + void incremental_copy(size_t batch_size = 1000, size_t thread_count = 32, bool delete_keys=false, bool perform_checks=true) { + auto missing_keys = get_missing_keys(batch_size * 100, false, true); + log::storage().info("Copying {} missing key types", missing_keys.size()); + BatchCopier copier{source_store_, target_store_, batch_size, thread_count}; + copier.go(std::move(missing_keys), perform_checks); + + if (delete_keys) { + auto deleting_keys = get_missing_keys(batch_size * 100, true, false); + log::storage().info("Deleting {} key types", deleting_keys.size()); + BatchDeleter deleter{source_store_, target_store_, batch_size}; + foreach_key_type_read_precedence([&](auto key_type) { + if (auto it = deleting_keys.find(key_type); it != deleting_keys.end()) { + for (auto &key : it->second) + deleter.add_key(key, perform_checks); + } + }); + deleter.finalize(); + } + } + + py::list get_keys_in_source_only() { + auto all_missing = get_missing_keys(100, false, false); + + py::list res; + for(const auto& missing_of_type : all_missing) { + for (const auto &key : missing_of_type.second) + res.append(key); + } + return res; + } + + size_t clone_all_keys_for_symbol(const StreamId &stream_id, size_t batch_size) { + std::vector vkeys; + foreach_key_type([&](KeyType key_type) { + source_store_->iterate_type(key_type, [&](const VariantKey& key) { + vkeys.push_back(key); + }, std::get(stream_id)); + }); + return write_variant_keys_from_source_to_target(std::move(vkeys), batch_size); + } + + size_t clone_all_keys_for_symbol_for_type( + const StreamId &stream_id, + size_t batch_size, + KeyType key_type) { + std::vector vkeys; + source_store_->iterate_type(key_type, [&](const VariantKey& key) { + vkeys.push_back(key); + }, std::get(stream_id)); + return write_variant_keys_from_source_to_target(std::move(vkeys), batch_size); + } + + size_t write_variant_keys_from_source_to_target(std::vector&& vkeys, size_t batch_size) { + std::vector> write_futs; + + size_t total_copied = 0; + for (size_t start = 0; start < vkeys.size(); start += batch_size) { + const size_t end = std::min(start + batch_size, vkeys.size()); + const size_t copy_max_size = end - start; + std::vector> keys_to_copy(copy_max_size); + std::vector segments(copy_max_size); + size_t copied = 0; + for (size_t offset = start; offset < end; ++offset) { + if (VariantKey& key = vkeys[offset]; source_store_->key_exists(key).get() && !target_store_->key_exists(key).get()) { + keys_to_copy[copied++] = std::pair{std::move(key), [copied, &segments](storage::KeySegmentPair&& ks) { + segments[copied] = std::move(ks); + return segments[copied].variant_key(); + }}; + } + } + total_copied += copied; + [[maybe_unused]] auto keys = source_store_->batch_read_compressed(std::move(keys_to_copy), BatchReadArgs{}).get(); + write_futs.push_back(target_store_->batch_write_compressed(std::move(segments))); + } + folly::collect(write_futs).get(); + return total_copied; + } + + + size_t write_keys_from_source_to_target(const std::vector& py_keys, size_t batch_size) { + std::vector vkeys; + rng::transform(py_keys, std::back_inserter(vkeys), [](const auto& py_key) -> VariantKey { + if (py::isinstance(py_key)) { + return py_key.template cast(); + } else if (py::isinstance(py_key)) { + return py_key.template cast(); + } + internal::raise("Invalid key type"); + }); + return write_variant_keys_from_source_to_target(std::move(vkeys), batch_size); + } + + py::dict write_symbol_trees_from_source_to_target(const std::vector& py_partial_keys, bool append_versions) { + std::shared_ptr source_map(std::make_shared()); + std::shared_ptr target_map(std::make_shared()); + std::optional symbol_list; + if(target_symbol_list_) + symbol_list .emplace(target_map); + // res is a dict with key sym and value a dict showing results of the versions + py::dict res; + target_map->set_log_changes(true); + for(const auto& py_pkey: py_partial_keys) { + // For each version, outputs the version_id which was written in the dest if no error otherwise error string + py::dict sym_data; + std::unordered_map> version_to_snapshot_map; + auto sym = py_pkey.attr("id").cast(); + // Can be either numeric(version id) or string(snapshot_id) + auto ids = py_pkey.attr("versions").cast>>(); + std::vector index_keys; + for(const auto& id: ids) { + util::variant_match(id, + [&](const VersionId& numeric_id) { + auto index_key = get_specific_version(source_store_, source_map, sym, numeric_id); + if (!index_key) { + sym_data[py::int_(numeric_id)] = + fmt::format("Sym:{},Version:{},Ex:{}", sym, numeric_id, "Numeric Id not found"); + } + else { + index_keys.emplace_back(index_key.value()); + } + }, + [&](const StringId& snap_name) { + auto opt_snapshot = get_snapshot(source_store_, snap_name); + if (!opt_snapshot) { + sym_data[py::str(snap_name)] = + fmt::format("Sym:{},SnapId:{},Ex:{}", sym, snap_name, "Snapshot not found in source"); + return; + } + // A snapshot will normally be in a ref key, but for old libraries it still needs to fall back to iteration of + // atom keys. + auto variant_snap_key = opt_snapshot.value().first; + auto snapshot_segment = opt_snapshot.value().second; + auto opt_idx_for_stream_id = row_id_for_stream_in_snapshot_segment( + snapshot_segment, variant_key_type(variant_snap_key) == KeyType::SNAPSHOT_REF, sym); + if (opt_idx_for_stream_id) { + auto stream_idx = opt_idx_for_stream_id.value(); + auto index_key = read_key_row(snapshot_segment, stream_idx); + version_to_snapshot_map[index_key.version_id()].push_back(snap_name); + index_keys.emplace_back(std::move(index_key)); + } + else { + sym_data[py::str(snap_name)] = + fmt::format("Sym:{},SnapId:{},Ex:{}", sym, snap_name, "Symbol not found in source snapshot"); + } + } + ); + } + // Remove duplicate keys + rng::sort(index_keys, [&](const auto& k1, const auto& k2) {return k1.version_id() < k2.version_id();}); + auto to_erase = rng::unique(index_keys, std::equal_to{}, [](const auto& k){ return k.version_id();}); + index_keys.erase(to_erase.begin(), to_erase.end()); + for(const auto& index_key: index_keys) { + VersionId v_id = index_key.version_id(); + try { + std::optional new_version_id; + std::optional previous_key; + if (append_versions) { + auto [maybe_prev, _] = get_latest_version(target_store_, target_map, sym); + if (maybe_prev){ + new_version_id = std::make_optional(maybe_prev.value().version_id() + 1); + previous_key = std::move(maybe_prev); + } + } else { + if (auto target_index_key = get_specific_version(target_store_, target_map, sym, v_id)) { + throw storage::DuplicateKeyException(target_index_key.value()); + } + } + const auto new_index_key = copy_index_key_recursively(source_store_, target_store_, index_key, new_version_id); + target_map->write_version(target_store_, new_index_key, previous_key); + if(symbol_list) + symbol_list->add_symbol(target_store_, new_index_key.id(), new_version_id.value_or(0)); + + // Change the version in the result map + sym_data[py::int_(v_id)] = new_version_id ? new_version_id.value() : v_id; + // Give the new version id to the snapshots + if (version_to_snapshot_map.contains(v_id)) { + for(const auto& snap_name: version_to_snapshot_map[v_id]) { + sym_data[py::str(snap_name)] = sym_data[py::int_(v_id)]; + } + } + } + catch (std::exception &e) { + auto key = py::int_(v_id); + auto error = fmt::format("Sym:{},Version:{},Ex:{}", sym, v_id, e.what()); + sym_data[key] = error; + // Give the error to snapshots which also had the same version_id + if (version_to_snapshot_map.contains(v_id)) { + for(const auto& snap_name: version_to_snapshot_map[v_id]) { + sym_data[py::str(snap_name)] = error; + } + } + } + } + util::variant_match(sym, + [&sym_data, &res](const NumericId& numeric_id) { + res[py::int_(numeric_id)] = sym_data; + }, + [&sym_data, &res](const StringId& string_id) { + res[py::str(string_id)] = sym_data; + } + ); + } + return res; + } + +private: + std::shared_ptr source_store_; + std::shared_ptr target_store_; + proto::storage::VersionStoreConfig cfg_; + bool target_symbol_list_; + bool source_symbol_list_; +}; + +} diff --git a/cpp/arcticdb/util/error_code.hpp b/cpp/arcticdb/util/error_code.hpp index 5f0b6b6486..9a229e5d66 100644 --- a/cpp/arcticdb/util/error_code.hpp +++ b/cpp/arcticdb/util/error_code.hpp @@ -103,7 +103,8 @@ inline std::unordered_map get_error_category_names() ERROR_CODE(9001, E_UNKNOWN_CODEC) \ ERROR_CODE(9002, E_ZSDT_ENCODING) \ ERROR_CODE(9003, E_LZ4_ENCODING) \ - ERROR_CODE(9004, E_INPUT_TOO_LARGE) + ERROR_CODE(9004, E_INPUT_TOO_LARGE) \ + ERROR_CODE(9005, E_ENCODING_VERSION_MISMATCH) enum class ErrorCode : detail::BaseType { #define ERROR_CODE(code, Name, ...) Name = code, diff --git a/cpp/arcticdb/version/local_versioned_engine.cpp b/cpp/arcticdb/version/local_versioned_engine.cpp index c3edc934f5..cd2ee0e5a1 100644 --- a/cpp/arcticdb/version/local_versioned_engine.cpp +++ b/cpp/arcticdb/version/local_versioned_engine.cpp @@ -1654,7 +1654,7 @@ std::unordered_map LocalVersionedEngine::scan_object_size auto& sizes_info = sizes[key_type]; ++sizes_info.count; key_size_calculators.emplace_back(std::forward(k), [&sizes_info] (auto&& ks) { - auto key_seg = std::move(ks); + auto key_seg = std::forward(ks); sizes_info.compressed_size += key_seg.segment().size(); const auto& desc = key_seg.segment().descriptor(); sizes_info.uncompressed_size += desc.uncompressed_bytes(); @@ -1685,7 +1685,7 @@ std::unordered_map> LocalVer store->iterate_type(key_type, [&keys, &mutex, &sizes, key_type](const VariantKey&& k){ keys.emplace_back(std::forward(k), [key_type, &sizes, &mutex] (auto&& ks) { - auto key_seg = std::move(ks); + auto key_seg = std::forward(ks); auto variant_key = key_seg.variant_key(); auto stream_id = variant_key_id(variant_key); auto compressed_size = key_seg.segment().size(); diff --git a/cpp/arcticdb/version/python_bindings.cpp b/cpp/arcticdb/version/python_bindings.cpp index 00a5398629..bb8c8518cb 100644 --- a/cpp/arcticdb/version/python_bindings.cpp +++ b/cpp/arcticdb/version/python_bindings.cpp @@ -7,12 +7,10 @@ #include #include -#include #include #include #include #include -#include #include #include #include @@ -26,6 +24,7 @@ #include #include + namespace arcticdb::version_store { [[nodiscard]] static std::pair compute_first_last_dates( @@ -111,6 +110,7 @@ void register_bindings(py::module &version, py::exception>(version, "RefKey") diff --git a/cpp/arcticdb/version/version_core.cpp b/cpp/arcticdb/version/version_core.cpp index 7d2032a60c..b5e5be90eb 100644 --- a/cpp/arcticdb/version/version_core.cpp +++ b/cpp/arcticdb/version/version_core.cpp @@ -1657,7 +1657,7 @@ VersionedItem compact_incomplete_impl( }); return util::variant_match(std::move(result), - [&slices, &pipeline_context, &store, &options, &user_meta](CompactionWrittenKeys& written_keys) -> VersionedItem { + [&slices, &pipeline_context, &store, &user_meta](CompactionWrittenKeys& written_keys) -> VersionedItem { auto vit = collate_and_write( store, pipeline_context, diff --git a/cpp/arcticdb/version/version_store_api.cpp b/cpp/arcticdb/version/version_store_api.cpp index bbb1d10b5e..ab8c369371 100644 --- a/cpp/arcticdb/version/version_store_api.cpp +++ b/cpp/arcticdb/version/version_store_api.cpp @@ -11,7 +11,6 @@ #include #include #include -#include #include #include #include @@ -1167,5 +1166,6 @@ ReadResult read_dataframe_from_file( void PythonVersionStore::force_delete_symbol(const StreamId& stream_id) { version_map()->delete_all_versions(store(), stream_id); delete_all_for_stream(store(), stream_id, true); + version_map()->flush(); } } //namespace arcticdb::version_store diff --git a/python/arcticdb/toolbox/storage.py b/python/arcticdb/toolbox/storage.py new file mode 100644 index 0000000000..21753cd416 --- /dev/null +++ b/python/arcticdb/toolbox/storage.py @@ -0,0 +1,3 @@ +from typing import NamedTuple, Union, List + +SymbolVersionsPair = NamedTuple("SymbolVersionsPair", [("id", Union[int, str]), ("versions", List[Union[int, str]])]) diff --git a/python/tests/integration/toolbox/test_storage_mover.py b/python/tests/integration/toolbox/test_storage_mover.py new file mode 100644 index 0000000000..0d7c1ae861 --- /dev/null +++ b/python/tests/integration/toolbox/test_storage_mover.py @@ -0,0 +1,330 @@ +import numpy as np +import pytest + +from hypothesis import given, strategies as st, settings +from arcticdb.config import Defaults +from arcticdb.version_store.helper import ArcticMemoryConfig, get_lib_cfg, add_lmdb_library_to_env +from arcticdb.toolbox.library_tool import KeyType +from arcticdb.toolbox.storage import SymbolVersionsPair +from arcticdb_ext.tools import StorageMover +from pandas import DataFrame +from pandas.testing import assert_frame_equal +from arcticdb.util.test import sample_dataframe +from arcticc.pb2.storage_pb2 import EnvironmentConfigsMap +import hypothesis +import sys + + +# configure_test_logger("DEBUG") + +def create_local_lmdb_cfg(lib_name=Defaults.LIB, db_dir=Defaults.DATA_DIR, description=None): + cfg = EnvironmentConfigsMap() + add_lmdb_library_to_env( + cfg, + lib_name=lib_name, + env_name=Defaults.ENV, + db_dir=db_dir, + description=description + ) + return cfg + +@pytest.fixture +def arctidb_native_local_lib_cfg_extra(tmpdir): + def create(): + return create_local_lmdb_cfg(lib_name="local.extra", db_dir=str(tmpdir)) + + return create + +@pytest.fixture +def arctidb_native_local_lib_cfg(tmpdir): + def create(lib_name): + return create_local_lmdb_cfg(lib_name=lib_name, db_dir=str(tmpdir)) + return create + +def create_default_config(): + return create_local_lmdb_cfg() + +def add_data(version_store): + version_store.write("symbol", sample_dataframe()) + version_store.write("pickled", {"a": 1}, pickle_on_failure=True) + version_store.snapshot("mysnap") + version_store.write("rec_norm", data={"a": np.arange(5), "b": np.arange(8), "c": None}, recursive_normalizers=True) + version_store.write("symbol", sample_dataframe()) + version_store.snapshot("mysnap2") + + +def compare_two_libs(lib1, lib2): + ver1 = lib1.list_versions() + ver2 = lib2.list_versions() + + print(ver1) + print(ver2) + + assert len(lib1.list_versions()) == len(lib2.list_versions()) + assert lib1.list_versions() == lib2.list_versions() + assert lib1.list_snapshots() == lib2.list_snapshots() + + assert_frame_equal(lib1.read("symbol", as_of=0).data, lib2.read("symbol", as_of=0).data) + assert_frame_equal(lib1.read("symbol", as_of=1).data, lib2.read("symbol", as_of=1).data) + assert_frame_equal(lib1.read("symbol", as_of="mysnap").data, lib2.read("symbol", as_of="mysnap").data) + assert_frame_equal(lib1.read("symbol", as_of="mysnap2").data, lib2.read("symbol", as_of="mysnap2").data) + + assert lib1.read("pickled").data == lib2.read("pickled").data + assert lib1.read("pickled", as_of="mysnap").data == lib2.read("pickled", as_of="mysnap").data + assert lib1.read("pickled", as_of="mysnap2").data == lib2.read("pickled", as_of="mysnap2").data + + assert lib1.read("rec_norm").data.keys() == lib2.read("rec_norm").data.keys() + assert all(lib1.read("rec_norm").data["a"] == lib2.read("rec_norm").data["a"]) + assert all(lib1.read("rec_norm").data["b"] == lib2.read("rec_norm").data["b"]) + assert lib1.read("rec_norm").data["c"] == lib2.read("rec_norm").data["c"] + assert lib1.read("rec_norm", as_of="mysnap2").data.keys() == lib2.read("rec_norm", as_of="mysnap2").data.keys() + + +def test_storage_mover_single_go(lmdb_version_store_v1, arctidb_native_local_lib_cfg_extra): + add_data(lmdb_version_store_v1) + arctic = ArcticMemoryConfig(arctidb_native_local_lib_cfg_extra(), env=Defaults.ENV) + lib_cfg = get_lib_cfg(arctic, Defaults.ENV, "local.extra") + lib_cfg.version.symbol_list = True + dst_lib = arctic["local.extra"] + + s = StorageMover(lmdb_version_store_v1._library, dst_lib._library) + s.go() + + compare_two_libs(lmdb_version_store_v1, dst_lib) + + +def test_storage_mover_key_by_key(lmdb_version_store_v1, arctidb_native_local_lib_cfg_extra): + add_data(lmdb_version_store_v1) + arctic = ArcticMemoryConfig(arctidb_native_local_lib_cfg_extra(), env=Defaults.ENV) + lib_cfg = get_lib_cfg(arctic, Defaults.ENV, "local.extra") + lib_cfg.version.symbol_list = True + dst_lib = arctic["local.extra"] + + s = StorageMover(lmdb_version_store_v1._library, dst_lib._library) + all_keys = s.get_all_source_keys() + for key in all_keys: + s.write_keys_from_source_to_target([key], 2) + + compare_two_libs(lmdb_version_store_v1, dst_lib) + +@pytest.mark.xfail(sys.platform == "win32", reason="Numpy strings are not implemented for Windows") +def test_storage_mover_symbol_tree(arctidb_native_local_lib_cfg_extra, arctidb_native_local_lib_cfg, lib_name): + col_per_group = 5 + row_per_segment = 10 + local_lib_cfg = arctidb_native_local_lib_cfg(lib_name) + lib = local_lib_cfg.env_by_id[Defaults.ENV].lib_by_path[lib_name] + lib.version.write_options.column_group_size = col_per_group + lib.version.write_options.segment_row_size = row_per_segment + lib.version.symbol_list = True + lmdb_version_store_symbol_list = ArcticMemoryConfig(local_lib_cfg, Defaults.ENV)[lib_name] + + lmdb_version_store_symbol_list.write("symbol", sample_dataframe(), metadata="yolo") + lmdb_version_store_symbol_list.write("symbol", sample_dataframe(), metadata="yolo2") + lmdb_version_store_symbol_list.write("snapshot_test", 1) + lmdb_version_store_symbol_list.snapshot("my_snap") + lmdb_version_store_symbol_list.snapshot("my_snap2") + lmdb_version_store_symbol_list.snapshot("snapshot_test", 2) + lmdb_version_store_symbol_list.delete_version("snapshot_test", 0) + lmdb_version_store_symbol_list.write("pickled", {"a": 1}, metadata="cantyolo", pickle_on_failure=True) + lmdb_version_store_symbol_list.write("pickled", {"b": 1}, metadata="cantyolo2", pickle_on_failure=True) + lmdb_version_store_symbol_list.write("pickled", {"c": 1}, metadata="yoloded", pickle_on_failure=True) + lmdb_version_store_symbol_list.write( + "rec_norm", + data={"a": np.arange(1000), "b": np.arange(8000), "c": None}, + metadata="realyolo", + recursive_normalizers=True, + ) + lmdb_version_store_symbol_list.write( + "rec_norm", + data={"e": np.arange(1000), "f": np.arange(8000), "g": None}, + metadata="realyolo2", + recursive_normalizers=True, + ) + + lmdb_version_store_symbol_list.write("dup_data", np.array(["YOLO"] * 10000)) + + arctic = ArcticMemoryConfig(arctidb_native_local_lib_cfg_extra(), env=Defaults.ENV) + lib_cfg = get_lib_cfg(arctic, Defaults.ENV, "local.extra") + lib_cfg.version.symbol_list = True + dst_lib = arctic["local.extra"] + + s = StorageMover(lmdb_version_store_symbol_list._library, dst_lib._library) + sv1 = SymbolVersionsPair("symbol", [1, 0]) + sv2 = SymbolVersionsPair("pickled", [2, 0]) + sv3 = SymbolVersionsPair("rec_norm", [1, 0]) + sv4 = SymbolVersionsPair("dup_data", [0]) + sv5 = SymbolVersionsPair("snapshot_test", ["my_snap", "my_snap2"]) + res = s.write_symbol_trees_from_source_to_target([sv1, sv2, sv3, sv4, sv5], False) + assert len(res) == 5 + for r in res: + for v in res[r]: + assert type(res[r][v]) == int + + assert len(dst_lib.list_versions()) == 8 + assert_frame_equal(lmdb_version_store_symbol_list.read("symbol").data, dst_lib.read("symbol").data) + assert_frame_equal(lmdb_version_store_symbol_list.read("symbol", 0).data, dst_lib.read("symbol", 0).data) + assert lmdb_version_store_symbol_list.read("symbol").metadata == dst_lib.read("symbol").metadata + assert lmdb_version_store_symbol_list.read("symbol", 0).metadata == dst_lib.read("symbol", 0).metadata + + assert lmdb_version_store_symbol_list.read("pickled").data == dst_lib.read("pickled").data + assert lmdb_version_store_symbol_list.read("pickled", 0).data == dst_lib.read("pickled", 0).data + assert lmdb_version_store_symbol_list.read("pickled").metadata == dst_lib.read("pickled").metadata + assert lmdb_version_store_symbol_list.read("pickled", 0).metadata == dst_lib.read("pickled", 0).metadata + + def comp_dict(d1, d2): + assert len(d1) == len(d2) + for k in d1: + if isinstance(d1[k], np.ndarray): + assert (d1[k] == d2[k]).all() + else: + assert d1[k] == d2[k] + + comp_dict(lmdb_version_store_symbol_list.read("rec_norm").data, dst_lib.read("rec_norm").data) + comp_dict(lmdb_version_store_symbol_list.read("rec_norm", 0).data, dst_lib.read("rec_norm", 0).data) + assert lmdb_version_store_symbol_list.read("rec_norm").metadata == dst_lib.read("rec_norm").metadata + assert lmdb_version_store_symbol_list.read("rec_norm", 0).metadata == dst_lib.read("rec_norm", 0).metadata + + np.testing.assert_equal(lmdb_version_store_symbol_list.read("dup_data").data, dst_lib.read("dup_data").data) + assert lmdb_version_store_symbol_list.read("dup_data").metadata == dst_lib.read("dup_data").metadata + + assert lmdb_version_store_symbol_list.read("snapshot_test", "my_snap").data, dst_lib.read("snapshot_test", 0).data + + lmdb_version_store_symbol_list.write("new_symbol", 1) + lmdb_version_store_symbol_list.snapshot("new_snap") + lmdb_version_store_symbol_list.write("new_symbol", 2) + lmdb_version_store_symbol_list.snapshot("new_snap2") + lmdb_version_store_symbol_list.write("new_symbol", 3) + lmdb_version_store_symbol_list.delete_version("new_symbol", 1) + sv6 = SymbolVersionsPair("new_symbol", [2, 0, "new_snap", "new_snap2"]) + dst_lib.write("new_symbol", 0) + + res = s.write_symbol_trees_from_source_to_target([sv6], True) + assert len(res) == 1 + assert "new_symbol" in res + assert res["new_symbol"][2] == 3 + assert res["new_symbol"][0] == 1 + assert res["new_symbol"]["new_snap"] == 1 + assert res["new_symbol"]["new_snap2"] == 2 + + assert dst_lib.read("new_symbol", 0).data == 0 + assert dst_lib.read("new_symbol", 1).data == 1 + assert dst_lib.read("new_symbol", 2).data == 2 + assert dst_lib.read("new_symbol", 3).data == 3 + + +def test_storage_mover_and_key_checker(lmdb_version_store_v1, arctidb_native_local_lib_cfg_extra): + add_data(lmdb_version_store_v1) + arctic = ArcticMemoryConfig(arctidb_native_local_lib_cfg_extra(), env=Defaults.ENV) + lib_cfg = get_lib_cfg(arctic, Defaults.ENV, "local.extra") + lib_cfg.version.symbol_list = True + dst_lib = arctic["local.extra"] + + s = StorageMover(lmdb_version_store_v1._library, dst_lib._library) + s.go() + + keys = s.get_keys_in_source_only() + assert len(keys) == 0 + + +def test_storage_mover_clone_keys_for_symbol(lmdb_version_store_v1, arctidb_native_local_lib_cfg_extra): + add_data(lmdb_version_store_v1) + lmdb_version_store_v1.write("a", 1) + lmdb_version_store_v1.write("a", 2) + lmdb_version_store_v1.write("b", 1) + arctic = ArcticMemoryConfig(arctidb_native_local_lib_cfg_extra(), env=Defaults.ENV) + lib_cfg = get_lib_cfg(arctic, Defaults.ENV, "local.extra") + lib_cfg.version.symbol_list = True + dst_lib = arctic["local.extra"] + + s = StorageMover(lmdb_version_store_v1._library, dst_lib._library) + s.clone_all_keys_for_symbol("a", 1000) + assert dst_lib.read("a").data == 2 + + +@pytest.fixture() +def lib_with_gaps_and_reused_keys(version_store_factory): + lib = version_store_factory(name="source", de_duplication=True, col_per_group=2, segment_row_size=2) + + lib.write("x", 0) + lib.write("x", 1) + lib.write("x", 2) + lib.snapshot("s2") + lib.write("x", DataFrame({"c": [0, 1]}, index=[0, 1])) + lib.write("x", DataFrame({"c": list(range(5))}, index=list(range(5))), prune_previous_version=True) # 2 slices + lib.write("x", 5) + lib.delete_version("x", 5) + lib.write("x", 6) + + return lib + + +@pytest.mark.parametrize("mode", ("check assumptions", "go", "no force")) +def test_correct_versions_in_destination(mode, lib_with_gaps_and_reused_keys, lmdb_version_store_v1): + s = StorageMover(lib_with_gaps_and_reused_keys._library, lmdb_version_store_v1._library) + if mode == "check assumptions": + check = lib_with_gaps_and_reused_keys + elif mode == "go": + s.go() + check = lmdb_version_store_v1 + else: + s.write_symbol_trees_from_source_to_target([SymbolVersionsPair("x", ["s2", 4, 6])], False) + check = lmdb_version_store_v1 + + lt = check.library_tool() + + assert {vi["version"] for vi in check.list_versions("x")} == {2, 4, 6} + assert len(lt.find_keys(KeyType.TABLE_INDEX)) == 3 + assert [k.version_id for k in lt.find_keys(KeyType.TABLE_DATA)] == [2, 3, 4, 4, 6] + + +@settings(deadline=None, suppress_health_check=(hypothesis.HealthCheck.function_scoped_fixture,)) +@given(to_copy=st.permutations(["s2", 4, 6]), existing=st.booleans()) +def test_correct_versions_in_destination_force(to_copy, existing, lib_with_gaps_and_reused_keys, version_store_factory): + try: + _tmp_test_body(to_copy, existing, lib_with_gaps_and_reused_keys, version_store_factory) + except: + import traceback + + traceback.print_exc() + raise + + +def _tmp_test_body(to_copy, existing, lib_with_gaps_and_reused_keys, version_store_factory): + # mongoose_copy_data's force mode rewrite version numbers in the target + source = lib_with_gaps_and_reused_keys + target = version_store_factory(name="_unique_") + + if existing: + target.write("x", 0) + + s = StorageMover(source._library, target._library) + s.write_symbol_trees_from_source_to_target([SymbolVersionsPair("x", to_copy)], True) + + actual_vers = sorted(vi["version"] for vi in target.list_versions("x")) + print(to_copy, existing, "->", actual_vers) + + lt = target.library_tool() + start = 0 if existing else 2 # mover starts at the first input version if target is empty.... + n = int(existing) + len(to_copy) + assert actual_vers == list(range(start, start + n)) + assert len(lt.find_keys(KeyType.TABLE_INDEX)) == n + + source_keys = source.library_tool().find_keys(KeyType.TABLE_DATA) + expected_target = [] + for item in to_copy: + if item == "s2": + expected_target.append(source_keys[0]) + elif item == 4: + expected_target.extend(source_keys[1:4]) + else: + expected_target.append(source_keys[-1]) + expected_target.sort() # key=lambda k: (k.version_id, k.start_index)) + + target_keys = lt.find_keys(KeyType.TABLE_DATA) + target_keys.sort() + if existing: + target_keys.pop(0) + + for a, e in zip(target_keys, expected_target): + assert a.content_hash == e.content_hash + assert a.creation_ts >= source_keys[-1].creation_ts