Skip to content

Commit

Permalink
Feature flag empty index
Browse files Browse the repository at this point in the history
  • Loading branch information
Vasil Pashov committed Apr 15, 2024
1 parent 2d1512f commit 38adaf6
Show file tree
Hide file tree
Showing 3 changed files with 21 additions and 19 deletions.
15 changes: 7 additions & 8 deletions cpp/arcticdb/python/python_to_tensor_frame.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -264,17 +264,16 @@ std::shared_ptr<InputTensorFrame> py_ndf_to_frame(

// idx_names are passed by the python layer. They are empty in case row count index is used see:
// https://github.com/man-group/ArcticDB/blob/4184a467d9eee90600ddcbf34d896c763e76f78f/python/arcticdb/version_store/_normalization.py#L291
// Currently the python layers assign RowRange index to both empty dataframes and dataframes wich do not specify
// Currently the python layers assign RowRange index to both empty dataframes and dataframes which do not specify
// index explicitly. Thus we handle this case after all columns are read so that we know how many rows are there.
if (idx_names.empty()) {
if (!empty_types || res->num_rows > 0) {
res->index = stream::RowCountIndex();
res->desc.set_index_type(IndexDescriptor::ROWCOUNT);
} else {
res->index = stream::EmptyIndex();
res->desc.set_index_type(IndexDescriptor::EMPTY);
}
res->index = stream::RowCountIndex();
res->desc.set_index_type(IndexDescriptor::ROWCOUNT);
}

if (empty_types && res->num_rows == 0) {
res->index = stream::EmptyIndex();
res->desc.set_index_type(IndexDescriptor::EMPTY);
}

ARCTICDB_DEBUG(log::version(), "Received frame with descriptor {}", res->desc);
Expand Down
23 changes: 12 additions & 11 deletions python/arcticdb/version_store/_normalization.py
Original file line number Diff line number Diff line change
Expand Up @@ -289,10 +289,13 @@ def _from_tz_timestamp(ts, tz):
_range_index_props_are_public = hasattr(RangeIndex, "start")


def _normalize_single_index(index, index_names, index_norm, dynamic_strings=None, string_max_len=None):
def _normalize_single_index(index, index_names, index_norm, dynamic_strings=None, string_max_len=None, empty_types=False):
# index: pd.Index or np.ndarray -> np.ndarray
index_tz = None
if isinstance(index, RangeIndex):
is_empty = len(index) == 0
if empty_types and is_empty and not index_norm.is_physically_stored:
return [], []
elif isinstance(index, RangeIndex):
if index.name:
if not isinstance(index.name, int) and not isinstance(index.name, str):
raise NormalizationException(
Expand All @@ -301,10 +304,8 @@ def _normalize_single_index(index, index_names, index_norm, dynamic_strings=None
if isinstance(index.name, int):
index_norm.is_int = True
index_norm.name = str(index.name)
if isinstance(index, RangeIndex):
# skip index since we can reconstruct it, so no need to actually store it
index_norm.start = index.start if _range_index_props_are_public else index._start
index_norm.step = index.step if _range_index_props_are_public else index._step
index_norm.start = index.start if _range_index_props_are_public else index._start
index_norm.step = index.step if _range_index_props_are_public else index._step
return [], []
else:
coerce_type = DTN64_DTYPE if len(index) == 0 else None
Expand Down Expand Up @@ -370,7 +371,7 @@ def _denormalize_single_index(item, norm_meta):
name = norm_meta.index.name if norm_meta.index.name else None
return RangeIndex(start=norm_meta.index.start, stop=stop, step=norm_meta.index.step, name=name)
else:
return None
return DatetimeIndex([])
else:
return RangeIndex(start=0, stop=0, step=1)
# this means that the index is not a datetime index and it's been represented as a regular field in the stream
Expand Down Expand Up @@ -553,9 +554,9 @@ def _index_to_records(self, df, pd_norm, dynamic_strings, string_max_len, empty_
else:
index_norm = pd_norm.index
index_norm.is_physically_stored = not isinstance(index, RangeIndex) and not empty_df
index = DatetimeIndex([]) if empty_df else index
index = DatetimeIndex([]) if IS_PANDAS_TWO and empty_df else index

return _normalize_single_index(index, list(index.names), index_norm, dynamic_strings, string_max_len)
return _normalize_single_index(index, list(index.names), index_norm, dynamic_strings, string_max_len, empty_types=empty_types)

def _index_from_records(self, item, norm_meta):
# type: (NormalizationMetadata.Pandas, _SUPPORTED_NATIVE_RETURN_TYPES, Bool)->Union[Index, DatetimeIndex, MultiIndex]
Expand Down Expand Up @@ -1094,13 +1095,13 @@ def write(obj):


class TimeFrameNormalizer(Normalizer):
def normalize(self, item, string_max_len=None, dynamic_strings=False, coerce_columns=None, **kwargs):
def normalize(self, item, string_max_len=None, dynamic_strings=False, coerce_columns=None, empty_types=False, **kwargs):
norm_meta = NormalizationMetadata()
norm_meta.ts.mark = True
index_norm = norm_meta.ts.common.index
index_norm.is_physically_stored = len(item.times) > 0 and not isinstance(item.times, RangeIndex)
index_names, ix_vals = _normalize_single_index(
item.times, ["times"], index_norm, dynamic_strings, string_max_len
item.times, ["times"], index_norm, dynamic_strings, string_max_len, empty_types=empty_types
)
columns_names, columns_vals = _normalize_columns(
item.columns_names,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -584,6 +584,7 @@ def append_index(self, request):
@pytest.fixture(autouse=True)
def create_empty_column(self, lmdb_version_store_static_and_dynamic, dtype, empty_index):
lmdb_version_store_static_and_dynamic.write("sym", pd.DataFrame({"col": []}, dtype=dtype, index=empty_index))
assert lmdb_version_store_static_and_dynamic.read("sym").data.index.equals(pd.DatetimeIndex([]))
yield

def test_integer(self, lmdb_version_store_static_and_dynamic, int_dtype, dtype, append_index):
Expand Down Expand Up @@ -744,6 +745,7 @@ class TestCanUpdateEmptyColumn:
@pytest.fixture(autouse=True)
def create_empty_column(self, lmdb_version_store_static_and_dynamic, dtype, empty_index):
lmdb_version_store_static_and_dynamic.write("sym", pd.DataFrame({"col": []}, dtype=dtype, index=empty_index))
assert lmdb_version_store_static_and_dynamic.read("sym").data.index.equals(pd.DatetimeIndex([]))
yield

def update_index(self):
Expand Down

0 comments on commit 38adaf6

Please sign in to comment.