Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Implement async update method. Improve the performance of update by parallelising reads. #2087

Open
wants to merge 15 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 7 additions & 2 deletions cpp/arcticdb/pipeline/index_segment_reader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,13 @@ using namespace arcticdb::proto::descriptors;
namespace arcticdb::pipelines::index {

IndexSegmentReader get_index_reader(const AtomKey &prev_index, const std::shared_ptr<Store> &store) {
auto [key, seg] = store->read_sync(prev_index);
return index::IndexSegmentReader{std::move(seg)};
return async_get_index_reader(prev_index, store).get();
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we should leave this implementation using read_sync, so that the scheduling overhead can be avoided if necessary

}

folly::Future<IndexSegmentReader> async_get_index_reader(const AtomKey &prev_index, const std::shared_ptr<Store> &store) {
return store->read(prev_index).thenValueInline([](std::pair<VariantKey, SegmentInMemory>&& key_seg) {
return IndexSegmentReader{std::move(key_seg.second)};
});
}

IndexSegmentReader::IndexSegmentReader(SegmentInMemory&& s) :
Expand Down
10 changes: 5 additions & 5 deletions cpp/arcticdb/pipeline/index_segment_reader.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -8,13 +8,9 @@
#pragma once

#include <arcticdb/column_store/memory_segment.hpp>
#include <arcticdb/entity/protobufs.hpp>
#include <arcticdb/pipeline/frame_slice.hpp>
#include <arcticdb/pipeline/index_fields.hpp>

#include <boost/noncopyable.hpp>

#include <cstdint>
#include <folly/futures/Future.h>

namespace arcticdb {
class Store;
Expand Down Expand Up @@ -135,6 +131,10 @@ index::IndexSegmentReader get_index_reader(
const AtomKey &prev_index,
const std::shared_ptr<Store> &store);

folly::Future<IndexSegmentReader> async_get_index_reader(
const AtomKey &prev_index,
const std::shared_ptr<Store> &store);

IndexRange get_index_segment_range(
const AtomKey &prev_index,
const std::shared_ptr<Store> &store);
Expand Down
4 changes: 4 additions & 0 deletions cpp/arcticdb/pipeline/index_utils.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -129,4 +129,8 @@ TimeseriesDescriptor get_merged_tsd(
);
}

bool is_timeseries_index(const IndexDescriptorImpl& index_desc) {
return index_desc.type() == IndexDescriptor::Type::TIMESTAMP || index_desc.type() == IndexDescriptor::Type::EMPTY;
}

} //namespace arcticdb::pipelines::index
2 changes: 2 additions & 0 deletions cpp/arcticdb/pipeline/index_utils.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -138,4 +138,6 @@ TimeseriesDescriptor get_merged_tsd(
const TimeseriesDescriptor& existing_tsd,
const std::shared_ptr<pipelines::InputTensorFrame>& new_frame);

[[nodiscard]] bool is_timeseries_index(const IndexDescriptorImpl& index_desc);

} //namespace arcticdb::pipelines::index
3 changes: 1 addition & 2 deletions cpp/arcticdb/pipeline/read_pipeline.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -61,8 +61,7 @@ void foreach_active_bit(const util::BitSet &bs, C &&visitor) {
}
}

template<typename ContainerType>
std::vector<SliceAndKey> filter_index(const ContainerType &container, std::optional<CombinedQuery<ContainerType>> &&query) {
inline std::vector<SliceAndKey> filter_index(const index::IndexSegmentReader& container, std::optional<CombinedQuery<index::IndexSegmentReader>> &&query) {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I assume the template parameter always resolved to IndexSegmentReader? Maybe rename container to match as well now?

ARCTICDB_SAMPLE_DEFAULT(FilterIndex)
std::vector<SliceAndKey> output{};
if (container.size()> 0) {
Expand Down
67 changes: 34 additions & 33 deletions cpp/arcticdb/pipeline/write_frame.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -16,20 +16,21 @@
#include <arcticdb/stream/aggregator.hpp>
#include <arcticdb/entity/protobufs.hpp>
#include <arcticdb/util/variant.hpp>
#include <arcticdb/python/python_types.hpp>
#include <arcticdb/pipeline/frame_utils.hpp>
#include <arcticdb/pipeline/write_frame.hpp>
#include <arcticdb/stream/append_map.hpp>
#include <arcticdb/async/task_scheduler.hpp>
#include <arcticdb/util/format_date.hpp>
#include <vector>
#include <array>
#include <ranges>


namespace arcticdb::pipelines {

using namespace arcticdb::entity;
using namespace arcticdb::stream;
namespace ranges = std::ranges;

WriteToSegmentTask::WriteToSegmentTask(
std::shared_ptr<InputTensorFrame> frame,
Expand Down Expand Up @@ -252,40 +253,41 @@ static RowRange partial_rewrite_row_range(
}
}

std::optional<SliceAndKey> rewrite_partial_segment(
folly::Future<std::optional<SliceAndKey>> async_rewrite_partial_segment(
const SliceAndKey& existing,
const IndexRange& index_range,
VersionId version_id,
AffectedSegmentPart affected_part,
const std::shared_ptr<Store>& store) {
const auto& key = existing.key();
auto kv = store->read(key).get();
const SegmentInMemory& segment = kv.second;
const RowRange affected_row_range = partial_rewrite_row_range(segment, index_range, affected_part);
const int64_t num_rows = affected_row_range.end() - affected_row_range.start();
if (num_rows <= 0) {
return std::nullopt;
}
SegmentInMemory output = segment.truncate(affected_row_range.start(), affected_row_range.end(), true);
const IndexValue start_ts = TimeseriesIndex::start_value_for_segment(output);
// +1 as in the key we store one nanosecond greater than the last index value in the segment
const IndexValue end_ts = std::get<NumericIndex>(TimeseriesIndex::end_value_for_segment(output)) + 1;
FrameSlice new_slice{
std::make_shared<StreamDescriptor>(output.descriptor()),
existing.slice_.col_range,
RowRange{0, num_rows},
existing.slice_.hash_bucket(),
existing.slice_.num_buckets()};

auto fut_key = store->write(
key.type(),
version_id,
key.id(),
start_ts,
end_ts,
std::move(output)
);
return SliceAndKey{std::move(new_slice), std::get<AtomKey>(std::move(fut_key).get())};
return store->read(existing.key()).thenValue([=](std::pair<VariantKey, SegmentInMemory>&& key_segment) {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Capture by copy?

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could also be a thenValueInline

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same as #2087 (comment) the implementation is in the lambda and it uses all variables. So I need to pass them to the future. Can't capture by ref as they'll die when this is put in the queue and the function returns.

const auto& key = existing.key();
const SegmentInMemory& segment = key_segment.second;
const RowRange affected_row_range = partial_rewrite_row_range(segment, index_range, affected_part);
const int64_t num_rows = affected_row_range.end() - affected_row_range.start();
if (num_rows <= 0) {
return folly::Future<std::optional<SliceAndKey>>{std::nullopt};
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think this needs folly::Future, if you specify the return type of the lambda can probably just return std::nullopt

}
SegmentInMemory output = segment.truncate(affected_row_range.start(), affected_row_range.end(), true);
const IndexValue start_ts = TimeseriesIndex::start_value_for_segment(output);
// +1 as in the key we store one nanosecond greater than the last index value in the segment
const IndexValue end_ts = std::get<NumericIndex>(TimeseriesIndex::end_value_for_segment(output)) + 1;
FrameSlice new_slice{
std::make_shared<StreamDescriptor>(output.descriptor()),
existing.slice_.col_range,
RowRange{0, num_rows},
existing.slice_.hash_bucket(),
existing.slice_.num_buckets()};
return store->write(
key.type(),
version_id,
key.id(),
start_ts,
end_ts,
std::move(output)
).thenValueInline([new_slice=std::move(new_slice)](VariantKey&& k) {
return std::make_optional<SliceAndKey>(std::move(new_slice), std::get<AtomKey>(std::move(k)));
});
});
}

std::vector<SliceAndKey> flatten_and_fix_rows(const std::array<std::vector<SliceAndKey>, 5>& groups, size_t& global_count) {
Expand All @@ -301,10 +303,9 @@ std::vector<SliceAndKey> flatten_and_fix_rows(const std::array<std::vector<Slice
return std::max(a, sk.slice_.row_range.second);
});

std::transform(std::begin(group), std::end(group), std::back_inserter(output), [&](SliceAndKey sk) {
ranges::transform(group, std::back_inserter(output), [&](SliceAndKey sk) {
auto range_start = global_count + (sk.slice_.row_range.first - group_start);
auto new_range = RowRange{range_start, range_start + (sk.slice_.row_range.diff())};
sk.slice_.row_range = new_range;
sk.slice_.row_range = RowRange{range_start, range_start + (sk.slice_.row_range.diff())};
return sk;
});

Expand Down
2 changes: 1 addition & 1 deletion cpp/arcticdb/pipeline/write_frame.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,7 @@ enum class AffectedSegmentPart {
END
};

std::optional<SliceAndKey> rewrite_partial_segment(
folly::Future<std::optional<SliceAndKey>> async_rewrite_partial_segment(
const SliceAndKey& existing,
const IndexRange& index_range,
VersionId version_id,
Expand Down
4 changes: 1 addition & 3 deletions cpp/arcticdb/processing/test/benchmark_clause.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -21,9 +21,7 @@ using namespace arcticdb;

SegmentInMemory get_segment_for_merge(const StreamId &id, size_t num_rows, size_t start, size_t step){
auto segment = SegmentInMemory{
get_test_descriptor<stream::TimeseriesIndex>(id, {
scalar_field(DataType::UINT8, "column")
}),
get_test_descriptor<stream::TimeseriesIndex>(id, std::array{scalar_field(DataType::UINT8, "column")}),
num_rows
};
auto& index_col = segment.column(0);
Expand Down
1 change: 0 additions & 1 deletion cpp/arcticdb/stream/stream_source.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@
#pragma once

#include <arcticdb/column_store/memory_segment.hpp>
#include <arcticdb/codec/segment.hpp>
#include <arcticdb/storage/storage.hpp>
#include <arcticdb/storage/storage_options.hpp>
#include <arcticdb/async/batch_read_args.hpp>
Expand Down
25 changes: 11 additions & 14 deletions cpp/arcticdb/stream/test/stream_test_common.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -7,17 +7,9 @@

#pragma once

#include <string>
#include <arcticdb/util/test/gtest.hpp>
#include <arcticdb/stream/aggregator.hpp>
#include <arcticdb/stream/stream_reader.hpp>
#include <arcticdb/stream/stream_writer.hpp>
#include <arcticdb/storage/store.hpp>

#include <arcticdb/entity/atom_key.hpp>
#include <folly/gen/Base.h>
#include <folly/futures/Future.h>
#include <arcticdb/util/variant.hpp>
#include <arcticdb/pipeline/slicing.hpp>
#include <arcticdb/pipeline/input_tensor_frame.hpp>
#include <arcticdb/storage/library.hpp>
Expand All @@ -27,7 +19,12 @@
#include <arcticdb/entity/protobufs.hpp>
#include <arcticdb/stream/piloted_clock.hpp>

#include <folly/gen/Base.h>
#include <folly/futures/Future.h>

#include <filesystem>
#include <span>
#include <string>

namespace fg = folly::gen;

Expand Down Expand Up @@ -173,21 +170,21 @@ NativeTensor test_string_column(ContainerType &vec, DTT, shape_t num_rows) {
return NativeTensor{bytes, 1, &strides, &shapes, dt, elsize, vec.data(), 1};
}

inline std::vector<entity::FieldRef> get_test_timeseries_fields() {
inline auto get_test_timeseries_fields() {
using namespace arcticdb::entity;

return {
return std::array {
scalar_field(DataType::UINT8, "smallints"),
scalar_field(DataType::INT64, "bigints"),
scalar_field(DataType::FLOAT64, "floats"),
scalar_field(DataType::ASCII_FIXED64, "strings"),
};
}

inline std::vector<entity::FieldRef> get_test_simple_fields() {
inline auto get_test_simple_fields() {
using namespace arcticdb::entity;

return {
return std::array {
scalar_field(DataType::UINT32, "index"),
scalar_field(DataType::FLOAT64, "floats"),
};
Expand Down Expand Up @@ -254,13 +251,13 @@ inline void fill_test_frame(SegmentInMemory &segment,
}

template<typename IndexType>
StreamDescriptor get_test_descriptor(const StreamId &id, const std::vector<FieldRef> &fields) {
StreamDescriptor get_test_descriptor(const StreamId &id, std::span<const FieldRef> fields) {
return IndexType::default_index().create_stream_descriptor(id, folly::Range(fields.begin(), fields.end()));
}

template<typename IndexType>
TestTensorFrame get_test_frame(const StreamId &id,
const std::vector<FieldRef> &fields,
std::span<const FieldRef> fields,
size_t num_rows,
size_t start_val,
size_t opt_row_offset = 0) {
Expand Down
4 changes: 1 addition & 3 deletions cpp/arcticdb/version/local_versioned_engine.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -560,9 +560,7 @@ VersionedItem LocalVersionedEngine::update_internal(
bool prune_previous_versions) {
ARCTICDB_RUNTIME_DEBUG(log::version(), "Command: update");
py::gil_scoped_release release_gil;
auto update_info = get_latest_undeleted_version_and_next_version_id(store(),
version_map(),
stream_id);
auto update_info = get_latest_undeleted_version_and_next_version_id(store(), version_map(), stream_id);
if (update_info.previous_index_key_.has_value()) {
if (frame->empty()) {
ARCTICDB_DEBUG(log::version(), "Updating existing data with an empty item has no effect. \n"
Expand Down
Loading
Loading