From 38adaf6f18c901f8789d7751f72a5794e6d8f96d Mon Sep 17 00:00:00 2001 From: Vasil Pashov Date: Mon, 15 Apr 2024 21:39:13 +0300 Subject: [PATCH] Feature flag empty index --- .../python/python_to_tensor_frame.cpp | 15 ++++++------ .../arcticdb/version_store/_normalization.py | 23 ++++++++++--------- .../version_store/test_empty_column_type.py | 2 ++ 3 files changed, 21 insertions(+), 19 deletions(-) diff --git a/cpp/arcticdb/python/python_to_tensor_frame.cpp b/cpp/arcticdb/python/python_to_tensor_frame.cpp index 8017f3b9b48..c7f2a0381d7 100644 --- a/cpp/arcticdb/python/python_to_tensor_frame.cpp +++ b/cpp/arcticdb/python/python_to_tensor_frame.cpp @@ -264,17 +264,16 @@ std::shared_ptr py_ndf_to_frame( // idx_names are passed by the python layer. They are empty in case row count index is used see: // https://github.com/man-group/ArcticDB/blob/4184a467d9eee90600ddcbf34d896c763e76f78f/python/arcticdb/version_store/_normalization.py#L291 - // Currently the python layers assign RowRange index to both empty dataframes and dataframes wich do not specify + // Currently the python layers assign RowRange index to both empty dataframes and dataframes which do not specify // index explicitly. Thus we handle this case after all columns are read so that we know how many rows are there. if (idx_names.empty()) { - if (!empty_types || res->num_rows > 0) { - res->index = stream::RowCountIndex(); - res->desc.set_index_type(IndexDescriptor::ROWCOUNT); - } else { - res->index = stream::EmptyIndex(); - res->desc.set_index_type(IndexDescriptor::EMPTY); - } + res->index = stream::RowCountIndex(); + res->desc.set_index_type(IndexDescriptor::ROWCOUNT); + } + if (empty_types && res->num_rows == 0) { + res->index = stream::EmptyIndex(); + res->desc.set_index_type(IndexDescriptor::EMPTY); } ARCTICDB_DEBUG(log::version(), "Received frame with descriptor {}", res->desc); diff --git a/python/arcticdb/version_store/_normalization.py b/python/arcticdb/version_store/_normalization.py index dbfe3618e4c..dbea68503a5 100644 --- a/python/arcticdb/version_store/_normalization.py +++ b/python/arcticdb/version_store/_normalization.py @@ -289,10 +289,13 @@ def _from_tz_timestamp(ts, tz): _range_index_props_are_public = hasattr(RangeIndex, "start") -def _normalize_single_index(index, index_names, index_norm, dynamic_strings=None, string_max_len=None): +def _normalize_single_index(index, index_names, index_norm, dynamic_strings=None, string_max_len=None, empty_types=False): # index: pd.Index or np.ndarray -> np.ndarray index_tz = None - if isinstance(index, RangeIndex): + is_empty = len(index) == 0 + if empty_types and is_empty and not index_norm.is_physically_stored: + return [], [] + elif isinstance(index, RangeIndex): if index.name: if not isinstance(index.name, int) and not isinstance(index.name, str): raise NormalizationException( @@ -301,10 +304,8 @@ def _normalize_single_index(index, index_names, index_norm, dynamic_strings=None if isinstance(index.name, int): index_norm.is_int = True index_norm.name = str(index.name) - if isinstance(index, RangeIndex): - # skip index since we can reconstruct it, so no need to actually store it - index_norm.start = index.start if _range_index_props_are_public else index._start - index_norm.step = index.step if _range_index_props_are_public else index._step + index_norm.start = index.start if _range_index_props_are_public else index._start + index_norm.step = index.step if _range_index_props_are_public else index._step return [], [] else: coerce_type = DTN64_DTYPE if len(index) == 0 else None @@ -370,7 +371,7 @@ def _denormalize_single_index(item, norm_meta): name = norm_meta.index.name if norm_meta.index.name else None return RangeIndex(start=norm_meta.index.start, stop=stop, step=norm_meta.index.step, name=name) else: - return None + return DatetimeIndex([]) else: return RangeIndex(start=0, stop=0, step=1) # this means that the index is not a datetime index and it's been represented as a regular field in the stream @@ -553,9 +554,9 @@ def _index_to_records(self, df, pd_norm, dynamic_strings, string_max_len, empty_ else: index_norm = pd_norm.index index_norm.is_physically_stored = not isinstance(index, RangeIndex) and not empty_df - index = DatetimeIndex([]) if empty_df else index + index = DatetimeIndex([]) if IS_PANDAS_TWO and empty_df else index - return _normalize_single_index(index, list(index.names), index_norm, dynamic_strings, string_max_len) + return _normalize_single_index(index, list(index.names), index_norm, dynamic_strings, string_max_len, empty_types=empty_types) def _index_from_records(self, item, norm_meta): # type: (NormalizationMetadata.Pandas, _SUPPORTED_NATIVE_RETURN_TYPES, Bool)->Union[Index, DatetimeIndex, MultiIndex] @@ -1094,13 +1095,13 @@ def write(obj): class TimeFrameNormalizer(Normalizer): - def normalize(self, item, string_max_len=None, dynamic_strings=False, coerce_columns=None, **kwargs): + def normalize(self, item, string_max_len=None, dynamic_strings=False, coerce_columns=None, empty_types=False, **kwargs): norm_meta = NormalizationMetadata() norm_meta.ts.mark = True index_norm = norm_meta.ts.common.index index_norm.is_physically_stored = len(item.times) > 0 and not isinstance(item.times, RangeIndex) index_names, ix_vals = _normalize_single_index( - item.times, ["times"], index_norm, dynamic_strings, string_max_len + item.times, ["times"], index_norm, dynamic_strings, string_max_len, empty_types=empty_types ) columns_names, columns_vals = _normalize_columns( item.columns_names, diff --git a/python/tests/unit/arcticdb/version_store/test_empty_column_type.py b/python/tests/unit/arcticdb/version_store/test_empty_column_type.py index b0311cdbdda..f999e461a5d 100644 --- a/python/tests/unit/arcticdb/version_store/test_empty_column_type.py +++ b/python/tests/unit/arcticdb/version_store/test_empty_column_type.py @@ -584,6 +584,7 @@ def append_index(self, request): @pytest.fixture(autouse=True) def create_empty_column(self, lmdb_version_store_static_and_dynamic, dtype, empty_index): lmdb_version_store_static_and_dynamic.write("sym", pd.DataFrame({"col": []}, dtype=dtype, index=empty_index)) + assert lmdb_version_store_static_and_dynamic.read("sym").data.index.equals(pd.DatetimeIndex([])) yield def test_integer(self, lmdb_version_store_static_and_dynamic, int_dtype, dtype, append_index): @@ -744,6 +745,7 @@ class TestCanUpdateEmptyColumn: @pytest.fixture(autouse=True) def create_empty_column(self, lmdb_version_store_static_and_dynamic, dtype, empty_index): lmdb_version_store_static_and_dynamic.write("sym", pd.DataFrame({"col": []}, dtype=dtype, index=empty_index)) + assert lmdb_version_store_static_and_dynamic.read("sym").data.index.equals(pd.DatetimeIndex([])) yield def update_index(self):