From 127202509639ca519d45d84c81bd3f6e4ebde893 Mon Sep 17 00:00:00 2001 From: Edward J M Easton Date: Mon, 17 Aug 2015 18:22:49 +0100 Subject: [PATCH 01/22] Added utility methods for working with multi-index Pandas dataframes. This is useful for working with bitemporal time-series data where you would like to see what a timeseries was as-of a point in time. --- arctic/multi_index.py | 122 +++++++++++++++++ tests/unit/test_multi_index.py | 230 +++++++++++++++++++++++++++++++++ 2 files changed, 352 insertions(+) create mode 100644 arctic/multi_index.py create mode 100644 tests/unit/test_multi_index.py diff --git a/arctic/multi_index.py b/arctic/multi_index.py new file mode 100644 index 0000000000000..2214d64fa8e21 --- /dev/null +++ b/arctic/multi_index.py @@ -0,0 +1,122 @@ +''' +Utility functions for multi-index dataframes. Useful for creating bi-temporal timeseries. +''' +import logging +import types +from datetime import datetime + +import numpy as np +import pandas as pd +from pandas.tseries.tools import to_datetime as dt + +logger = logging.getLogger(__name__) + + +# ----------------------- Grouping and Aggregating ---------------------------- # + +def fancy_group_by(df, grouping_level=0, aggregate_level=1, method='last', max_=None, min_=None, within=None): + """ Dataframe group-by operation that supports aggregating by different methods on the index. + + Parameters + ---------- + df: ``DataFrame`` + Pandas dataframe with a MultiIndex + grouping_level: ``int`` or ``str`` + Index level to group by. Defaults to 0. + aggregate_level: ``int`` or ``str`` + Index level to aggregate by. Defaults to 1. + method: ``str`` + Aggregation method. One of + last: Use the last (lexigraphically) value from each group + first: Use the first value from each group + within: Any type supported by the index, or ``DateOffset``/timedelta-like for ``DatetimeIndex``. + If set, will limit results to those having aggregate level values within this range of the group value + max_: + If set, will limit results to those having aggregate level values <= this value + min_: + If set, will limit results to those having aggregate level values >= this value + """ + if method not in ('first', 'last'): + raise ValueError('Invalid method') + + if isinstance(aggregate_level, types.StringType): + aggregate_level = df.index.names.index(aggregate_level) + + agg_idx = df.index.get_level_values(aggregate_level) + group_idx = df.index.get_level_values(grouping_level) + + # Trim any rows outside the aggregate value bounds + if max_ is not None or min_ is not None or within is not None: + mask = np.full(len(agg_idx), True, dtype='b1') + if max_ is not None: + mask &= (agg_idx <= max_) + if min_ is not None: + mask &= (agg_idx >= min_) + if within is not None: + if isinstance(agg_idx, pd.DatetimeIndex): + mask &= (group_idx >= agg_idx.shift(-1, freq=within)) + else: + mask &= (group_idx >= (agg_idx - within)) + df = df.loc[mask] + + # The sort order must be correct in order of grouping_level -> aggregate_level for the aggregation methods + # to work properly. We can check the sortdepth to see if this is in fact the case and resort if necessary. + # TODO: this might need tweaking if the levels are around the wrong way + if df.index.lexsort_depth < (aggregate_level + 1): + df = df.sortlevel(level=grouping_level) + + gb = df.groupby(level=grouping_level) + if method == 'last': + return gb.last() + return gb.first() + + +# --------- Common as-of-date use case -------------- # + +def groupby_asof(df, as_of=None, dt_col='sample_dt', asof_col='observed_dt'): + ''' Common use case for selecting the latest rows from a bitemporal dataframe as-of a certain date. + + Parameters + ---------- + df: ``pd.DataFrame`` + Dataframe with a MultiIndex index + as_of: ``datetime`` + Return a timeseries with values observed <= this as-of date. By default, the latest observed + values will be returned. + dt_col: ``str`` or ``int`` + Name or index of the column in the MultiIndex that is the sample date + asof_col: ``str`` or ``int`` + Name or index of the column in the MultiIndex that is the observed date + ''' + return fancy_group_by(df, + grouping_level=dt_col, + aggregate_level=asof_col, + method='last', + max_=as_of) + + +# ----------------------- Insert/Append ---------------------------- # + + +def multi_index_insert_row(df, index_row, values_row): + """ Return a new dataframe with a row inserted for a multi-index dataframe. + This will sort the rows according to the ordered multi-index levels. + """ + row_index = pd.MultiIndex(levels=[[i] for i in index_row], + labels=[[0] for i in index_row]) + row = pd.DataFrame(values_row, index=row_index, columns=df.columns) + df = pd.concat((df, row)) + if df.index.lexsort_depth == len(index_row) and df.index[-2] < df.index[-1]: + # We've just appended a row to an already-sorted dataframe + return df + # The df wasn't sorted or the row has to be put in the middle somewhere + return df.sortlevel() + + +def insert_at(df, sample_date, values): + """ Insert some values into a bi-temporal dataframe. + This is like what would happen when we get a price correction. + """ + observed_dt = dt(datetime.now()) + return multi_index_insert_row(df, [sample_date, observed_dt], values) + diff --git a/tests/unit/test_multi_index.py b/tests/unit/test_multi_index.py new file mode 100644 index 0000000000000..1b322f696adbe --- /dev/null +++ b/tests/unit/test_multi_index.py @@ -0,0 +1,230 @@ +from datetime import timedelta +import itertools + +import numpy as np +import pandas as pd +from pandas.tseries.tools import to_datetime as dt + +from arctic.multi_index import groupby_asof, fancy_group_by, insert_at + + +def get_bitemporal_test_data(): + # Create an index of 8 sample dates, 2 rows per date + sample_dates = pd.date_range('1/1/2014', periods=4, freq='D') + sample_dates = pd.DatetimeIndex(data=sorted(itertools.chain(sample_dates, sample_dates))) + + # Create a list of insert dates. These are a year later than sample date, to show + # that they don't necessarily have to be related + insert_dates = pd.date_range('1/1/2015', periods=8, freq='D') + + # Build the bitemporal index + index = pd.MultiIndex.from_arrays([sample_dates, insert_dates], names=['sample_dt', 'observed_dt']) + + # Create the dataframe with a couple of column, each value incrementing by 0.1 on the successive updates so + # we can tell them apart + prices = [[1.0, 10.0], + [1.1, 10.1], + [2.0, 20.0], + [2.1, 20.1], + [3.0, 30.0], + [3.1, 30.1], + [4.0, 40.0], + [4.1, 40.1]] + + df = pd.DataFrame(prices, index=index, columns=['OPEN', 'CLOSE']) + + # OPEN CLOSE + # sample_dt observed_dt + # 2014-01-01 2015-01-01 1.0 10.0 + # 2015-01-02 1.1 10.1 + # 2014-01-02 2015-01-03 2.0 20.0 + # 2015-01-04 2.1 20.1 + # 2014-01-03 2015-01-05 3.0 30.0 + # 2015-01-06 3.1 30.1 + # 2014-01-04 2015-01-07 4.0 40.0 + # 2015-01-08 4.1 40.1 + return df + + +def test__can_create_df_with_multiple_index(): + """ I can make a Pandas DF with an multi-index on sampled_dt and observed_dt + """ + df = get_bitemporal_test_data() + + # Our index has 2 levels + assert df.index.names == ['sample_dt', 'observed_dt'] + assert all(df.columns == ['OPEN', 'CLOSE']) + + # We should have 8 rows + assert len(df) == 8 + + # .. or 4, when we only count sample date + assert len(df.groupby(level='sample_dt').sum()) == 4 + + +def test__get_ts__asof_latest(): + """ I can get the latest known value for each sample date + """ + df = groupby_asof(get_bitemporal_test_data()) + assert len(df) == 4 + assert all(df['OPEN'] == [1.1, 2.1, 3.1, 4.1]) + assert all(df['CLOSE'] == [10.1, 20.1, 30.1, 40.1]) + + +def test__get_ts__asof_datetime(): + """ I can get a timeseries as-of a particular point in time + """ + df = groupby_asof(get_bitemporal_test_data(), as_of=dt('2015-01-05')) + assert len(df) == 3 + assert all(df['OPEN'] == [1.1, 2.1, 3.0]) + assert all(df['CLOSE'] == [10.1, 20.1, 30.0]) + + +def test__get_ts__unsorted_index(): + """ I can get a timeseries as-of a date when the index isn't sorted properly + """ + df = get_bitemporal_test_data() + # Swap the third and fourth rows around. This would break the group-by if we didn't check + # for sortedness + df = df.reindex(df.index[[0, 1, 3, 2, 4, 5, 6, 7]]) + df = groupby_asof(df) + assert len(df) == 4 + assert all(df['OPEN'] == [1.1, 2.1, 3.1, 4.1]) + assert all(df['CLOSE'] == [10.1, 20.1, 30.1, 40.1]) + + +# --------- Min/Max using numeric index ----------- # + +def get_numeric_index_test_data(): + group_idx = sorted(4 * range(4)) + agg_idx = range(16) + prices = np.arange(32).reshape(16, 2) * 10 + df = pd.DataFrame(prices, index=[group_idx, agg_idx], columns=['OPEN', 'CLOSE']) + # OPEN CLOSE + # 0 0 0 10 + # 1 20 30 + # 2 40 50 + # 3 60 70 + # 1 4 80 90 + # 5 100 110 + # 6 120 130 + # 7 140 150 + # 2 8 160 170 + # 9 180 190 + # 10 200 210 + # 11 220 230 + # 3 12 240 250 + # 13 260 270 + # 14 280 290 + # 15 300 310 + return df + + +def test__minmax_last(): + df = get_numeric_index_test_data() + df = fancy_group_by(df, min_=3, max_=10, method='last') + assert all(df['OPEN'] == [60, 140, 200]) + assert all(df['CLOSE'] == [70, 150, 210]) + + +def test__minmax_first(): + df = get_numeric_index_test_data() + df = fancy_group_by(df, min_=3, max_=10, method='first') + assert all(df['OPEN'] == [60, 80, 160]) + assert all(df['CLOSE'] == [70, 90, 170]) + + +def test__within_numeric_first(): + df = get_numeric_index_test_data() + df = fancy_group_by(df, within=5, method='first') + assert all(df['OPEN'] == [0, 80]) + assert all(df['CLOSE'] == [10, 90]) + + +def test__within_numeric_last(): + df = get_numeric_index_test_data() + df = fancy_group_by(df, within=5, method='last') + assert all(df['OPEN'] == [60, 120]) + assert all(df['CLOSE'] == [70, 130]) + + +# --------- Datetime index ----------- # + + +def get_datetime_index_test_data(): + sample_dates = pd.DatetimeIndex(4 * [dt('1/1/2014 21:30')] + + 4 * [dt('2/1/2014 21:30')] + + 4 * [dt('3/1/2014 21:30')]) + observed_dates = [dt('1/1/2014 22:00'), dt('1/1/2014 22:30'), dt('2/1/2014 00:00'), dt('1/1/2015 21:30'), + dt('2/1/2014 23:00'), dt('2/1/2014 23:30'), dt('3/1/2014 00:00'), dt('2/1/2015 21:30'), + dt('3/1/2014 21:30'), dt('3/1/2014 22:30'), dt('4/1/2014 00:00'), dt('3/1/2015 21:30'), + ] + index = pd.MultiIndex.from_arrays([sample_dates, observed_dates], names=['sample_dt', 'observed_dt']) + + prices = np.arange(24).reshape(12, 2) * 10 + df = pd.DataFrame(prices, index=index, columns=['OPEN', 'CLOSE']) + + # OPEN CLOSE + # sample_dt observed_dt + # 2014-01-01 21:30:00 2014-01-01 22:00:00 0 10 + # 2014-01-01 22:30:00 20 30 + # 2014-02-01 00:00:00 40 50 + # 2015-01-01 21:30:00 60 70 + # 2014-02-01 21:30:00 2014-02-01 23:00:00 80 90 + # 2014-02-01 23:30:00 100 110 + # 2014-03-01 00:00:00 120 130 + # 2015-02-01 21:30:00 140 150 + # 2014-03-01 21:30:00 2014-03-01 21:30:00 160 170 + # 2014-03-01 22:30:00 180 190 + # 2014-04-01 00:00:00 200 210 + # 2015-03-01 21:30:00 220 230 + return df + + +def test__first_within_datetime(): + ''' This shows the groupby function can give you a timeseries of points that were observed + within a rolling window of the sample time. + This is like saying 'give me the timeseries as it was on the day'. + It usually makes sense I think for the window to be the same as the sample period. + ''' + df = get_datetime_index_test_data() + df = fancy_group_by(df, within=timedelta(hours=1), method='first') + assert all(df['OPEN'] == [0, 160]) + assert all(df['CLOSE'] == [10, 170]) + + +def test__last_within_datetime(): + ''' Last-observed variant of the above. + ''' + df = get_datetime_index_test_data() + df = fancy_group_by(df, within=timedelta(hours=1), method='last') + assert all(df['OPEN'] == [20, 180]) + assert all(df['CLOSE'] == [30, 190]) + + +# ----------------------- Row Insertion ---------------------------- # + +def test__can_insert_row(): + """ I can insert a new row into a bitemp ts and it comes back when selecting the latest data + """ + df = get_bitemporal_test_data() + df = insert_at(df, dt('2014-01-03'), [[9, 90]]) + assert len(df) == 9 + df = groupby_asof(df) + assert len(df) == 4 + assert df.loc[dt('2014-01-03')]['OPEN'] == 9 + assert df.loc[dt('2014-01-03')]['CLOSE'] == 90 + + +def test__can_append_row(): + """ I can append a new row to a bitemp ts and it comes back when selecting the latest data + """ + df = get_bitemporal_test_data() + df = insert_at(df, dt('2014-01-05'), [[9, 90]]) + + assert len(df) == 9 + + df = groupby_asof(df) + assert len(df) == 5 + assert df.loc[dt('2014-01-05')]['OPEN'] == 9 + assert df.loc[dt('2014-01-05')]['CLOSE'] == 90 From 3b55e4ab11813cc63344936be275ad1db8bae7d4 Mon Sep 17 00:00:00 2001 From: Adrian Teng Date: Wed, 26 Aug 2015 11:23:18 +0100 Subject: [PATCH 02/22] First cut of bitemporal store --- arctic/arctic.py | 15 +++--- arctic/fixtures/arctic.py | 6 +++ arctic/store/bitemporal_store.py | 51 +++++++++++++++++++ .../store/test_bitemporal_store.py | 46 +++++++++++++++++ 4 files changed, 111 insertions(+), 7 deletions(-) create mode 100644 arctic/store/bitemporal_store.py create mode 100644 tests/integration/store/test_bitemporal_store.py diff --git a/arctic/arctic.py b/arctic/arctic.py index 4cf2697524590..0894e63babbad 100644 --- a/arctic/arctic.py +++ b/arctic/arctic.py @@ -3,15 +3,14 @@ import pymongo from pymongo.errors import OperationFailure, AutoReconnect +from ._util import indent from .auth import authenticate, get_auth -from .hooks import get_mongodb_uri from .decorators import mongo_retry -from ._util import indent - from .exceptions import LibraryNotFoundException, ArcticException, QuotaExceededException -from .store import version_store -from .tickstore import tickstore -from .tickstore import toplevel +from .hooks import get_mongodb_uri +from .store import version_store, bitemporal_store +from .tickstore import tickstore, toplevel + __all__ = ['Arctic', 'VERSION_STORE', 'TICK_STORE', 'register_library_type'] @@ -20,10 +19,12 @@ # Default Arctic application name: 'arctic' APPLICATION_NAME = 'arctic' VERSION_STORE = version_store.VERSION_STORE_TYPE +BITEMPORAL_STORE = bitemporal_store.BITEMPORAL_STORE_TYPE TICK_STORE = tickstore.TICK_STORE_TYPE LIBRARY_TYPES = {version_store.VERSION_STORE_TYPE: version_store.VersionStore, tickstore.TICK_STORE_TYPE: tickstore.TickStore, - toplevel.TICK_STORE_TYPE: toplevel.TopLevelTickStore + toplevel.TICK_STORE_TYPE: toplevel.TopLevelTickStore, + bitemporal_store.BITEMPORAL_STORE_TYPE: bitemporal_store.BitemporalStore } diff --git a/arctic/fixtures/arctic.py b/arctic/fixtures/arctic.py index bf61081240a59..f16cc59e96755 100644 --- a/arctic/fixtures/arctic.py +++ b/arctic/fixtures/arctic.py @@ -73,6 +73,12 @@ def library(arctic, library_name): return arctic.get_library(library_name) +@pytest.fixture(scope="function") +def bitemporal_library(arctic, library_name): + arctic.initialize_library(library_name, m.BITEMPORAL_STORE, segment='month') # TODO: segment=month?? + return arctic.get_library(library_name) + + @pytest.fixture(scope="function") def library_secondary(arctic_secondary, library_name): arctic_secondary.initialize_library(library_name, m.VERSION_STORE, segment='month') diff --git a/arctic/store/bitemporal_store.py b/arctic/store/bitemporal_store.py new file mode 100644 index 0000000000000..e2419cb236485 --- /dev/null +++ b/arctic/store/bitemporal_store.py @@ -0,0 +1,51 @@ +from collections import namedtuple +from datetime import datetime as dt + +from arctic.multi_index import fancy_group_by +import pandas as pd + +from .version_store import VersionStore + + +BITEMPORAL_STORE_TYPE = 'BitemporalStore' + +BitemporalItem = namedtuple('BitemporalItem', 'data, metadata') + + +class BitemporalStore(VersionStore): + + def __init__(self, arctic_lib, observe_column='observed_dt', sample_column='sample_dt'): + super(BitemporalStore, self).__init__(arctic_lib) + self.observe_column = observe_column + self.sample_column = sample_column + + def read(self, symbol, as_of=None, from_version=None, **kwargs): + item = super(BitemporalStore, self).read(symbol, from_version=from_version, **kwargs) + + result = BitemporalItem(data=fancy_group_by(item.data, grouping_level=self.observe_column, + aggregate_level=self.sample_column, max_=as_of), + metadata=item.metadata) + return result + + def append(self, symbol, data, metadata=None, upsert=True, **kwargs): + data = self._preprocess_incoming_data(data) + if upsert and not self.has_symbol(symbol): + df = data + else: + df = super(BitemporalStore, self).read(symbol, **kwargs).data.append(data) + super(BitemporalStore, self).write(symbol, df, metadata=metadata, prune_previous_version=True) + + def write(self, *args, **kwargs): + # TODO: may be diff + append? + raise NotImplementedError('Direct write for BitemporalStore is not supported. Use append instead' + 'to add / modify timeseries.') + + def _preprocess_incoming_data(self, df): + if self.sample_column not in df: + # TODO: Move this to multi_index + now = dt.now() + df = pd.concat([df, pd.DataFrame([now] * len(df), index=df.index, columns=[self.sample_column])], axis=1) + df.set_index(self.sample_column, append=True, inplace=True) + return df + + diff --git a/tests/integration/store/test_bitemporal_store.py b/tests/integration/store/test_bitemporal_store.py new file mode 100644 index 0000000000000..5e5146befaf77 --- /dev/null +++ b/tests/integration/store/test_bitemporal_store.py @@ -0,0 +1,46 @@ +''' +Created on 25 Aug 2015 + +@author: ateng +''' +from pandas.util.testing import assert_frame_equal +import pytest + +from tests.util import read_str_as_pandas + + +ts1 = read_str_as_pandas(""" observed_dt | near + 2012-09-08 17:06:11.040 | 1.0 + 2012-10-08 17:06:11.040 | 2.0 + 2012-10-09 17:06:11.040 | 2.5 + 2012-11-08 17:06:11.040 | 3.0""") + +ts1_append = read_str_as_pandas(""" observed_dt | near + 2012-09-08 17:06:11.040 | 1.0 + 2012-10-08 17:06:11.040 | 2.0 + 2012-10-09 17:06:11.040 | 2.5 + 2012-11-08 17:06:11.040 | 3.0 + 2012-11-09 17:06:11.040 | 3.5""") + + +def test_new_ts_read_write(bitemporal_library): + bitemporal_library.append('spam', ts1) + assert_frame_equal(ts1, bitemporal_library.read('spam').data) + + +def test_existing_ts_append_and_read(bitemporal_library): + bitemporal_library.append('spam', ts1) + bitemporal_library.append('spam', ts1_append[-1:]) + assert_frame_equal(ts1_append, bitemporal_library.read('spam').data) + + +def test_existing_ts_update_existing_data_and_read(bitemporal_library): + bitemporal_library.append('spam', ts1) + bitemporal_library.append('spam', read_str_as_pandas(""" observed_dt | near + 2012-10-09 17:06:11.040 | 4.2""")) + expected_ts = read_str_as_pandas(""" observed_dt | near + 2012-09-08 17:06:11.040 | 1.0 + 2012-10-08 17:06:11.040 | 2.0 + 2012-10-09 17:06:11.040 | 4.2 + 2012-11-08 17:06:11.040 | 3.0""") + assert_frame_equal(expected_ts, bitemporal_library.read('spam').data) From 6575853a9b0be95b0b6afb3eda8a219c2515e251 Mon Sep 17 00:00:00 2001 From: Adrian Teng Date: Wed, 26 Aug 2015 12:34:37 +0100 Subject: [PATCH 03/22] Adding 'as_of' support for append --- arctic/store/bitemporal_store.py | 19 ++++++------ .../store/test_bitemporal_store.py | 29 +++++++++++++++++++ 2 files changed, 39 insertions(+), 9 deletions(-) diff --git a/arctic/store/bitemporal_store.py b/arctic/store/bitemporal_store.py index e2419cb236485..53b2c3f41662a 100644 --- a/arctic/store/bitemporal_store.py +++ b/arctic/store/bitemporal_store.py @@ -9,7 +9,7 @@ BITEMPORAL_STORE_TYPE = 'BitemporalStore' -BitemporalItem = namedtuple('BitemporalItem', 'data, metadata') +BitemporalItem = namedtuple('BitemporalItem', 'symbol, library, data, metadata') class BitemporalStore(VersionStore): @@ -22,13 +22,15 @@ def __init__(self, arctic_lib, observe_column='observed_dt', sample_column='samp def read(self, symbol, as_of=None, from_version=None, **kwargs): item = super(BitemporalStore, self).read(symbol, from_version=from_version, **kwargs) - result = BitemporalItem(data=fancy_group_by(item.data, grouping_level=self.observe_column, + result = BitemporalItem(symbol=symbol, + library=self._arctic_lib.get_name(), + data=fancy_group_by(item.data, grouping_level=self.observe_column, aggregate_level=self.sample_column, max_=as_of), metadata=item.metadata) return result - def append(self, symbol, data, metadata=None, upsert=True, **kwargs): - data = self._preprocess_incoming_data(data) + def append(self, symbol, data, metadata=None, upsert=True, as_of=None, **kwargs): + data = self._preprocess_incoming_data(data, as_of) if upsert and not self.has_symbol(symbol): df = data else: @@ -40,12 +42,11 @@ def write(self, *args, **kwargs): raise NotImplementedError('Direct write for BitemporalStore is not supported. Use append instead' 'to add / modify timeseries.') - def _preprocess_incoming_data(self, df): + def _preprocess_incoming_data(self, df, as_of): if self.sample_column not in df: # TODO: Move this to multi_index - now = dt.now() - df = pd.concat([df, pd.DataFrame([now] * len(df), index=df.index, columns=[self.sample_column])], axis=1) + if not as_of: + as_of = dt.now() + df = pd.concat([df, pd.DataFrame([as_of] * len(df), index=df.index, columns=[self.sample_column])], axis=1) df.set_index(self.sample_column, append=True, inplace=True) return df - - diff --git a/tests/integration/store/test_bitemporal_store.py b/tests/integration/store/test_bitemporal_store.py index 5e5146befaf77..fca3712632391 100644 --- a/tests/integration/store/test_bitemporal_store.py +++ b/tests/integration/store/test_bitemporal_store.py @@ -3,6 +3,9 @@ @author: ateng ''' +from datetime import datetime as dt + +from mock import patch from pandas.util.testing import assert_frame_equal import pytest @@ -44,3 +47,29 @@ def test_existing_ts_update_existing_data_and_read(bitemporal_library): 2012-10-09 17:06:11.040 | 4.2 2012-11-08 17:06:11.040 | 3.0""") assert_frame_equal(expected_ts, bitemporal_library.read('spam').data) + + +def test_read_ts_as_of_historical_date(bitemporal_library): + with patch('arctic.store.bitemporal_store.dt') as mock_dt: + mock_dt.now.return_value = dt(2015, 5, 1) + mock_dt.side_effect = lambda *args, **kwargs: dt(*args, **kwargs) + bitemporal_library.append('spam', ts1) + + bitemporal_library.append('spam', read_str_as_pandas(""" observed_dt | near + 2012-10-09 17:06:11.040 | 4.2"""), + as_of=dt(2015, 5, 2)) + + bitemporal_library.append('spam', read_str_as_pandas(""" observed_dt | near + 2012-10-09 17:06:11.040 | 6.6"""), + as_of=dt(2015, 5, 3)) + + expected_ts = read_str_as_pandas(""" observed_dt | near + 2012-09-08 17:06:11.040 | 1.0 + 2012-10-08 17:06:11.040 | 2.0 + 2012-10-09 17:06:11.040 | 4.2 + 2012-11-08 17:06:11.040 | 3.0""") + + assert_frame_equal(expected_ts, bitemporal_library.read('spam', as_of=dt(2015, 5, 2, 10)).data) + +# bitemporal_library.append('spam', read_str_as_pandas(""" observed_dt | sample_dt | near +# 2012-10-09 17:06:11.040 | 2015-05-02 00:00:00.000 | 4.2""")) From 356667f5a4b8ad2f3b55a5ec970e7113cfedf354 Mon Sep 17 00:00:00 2001 From: Adrian Teng Date: Wed, 26 Aug 2015 13:20:19 +0100 Subject: [PATCH 04/22] Extra test cases on read as_of --- arctic/store/bitemporal_store.py | 8 +++---- .../store/test_bitemporal_store.py | 23 +++++++++++-------- 2 files changed, 18 insertions(+), 13 deletions(-) diff --git a/arctic/store/bitemporal_store.py b/arctic/store/bitemporal_store.py index 53b2c3f41662a..a1b4d5022d623 100644 --- a/arctic/store/bitemporal_store.py +++ b/arctic/store/bitemporal_store.py @@ -19,11 +19,11 @@ def __init__(self, arctic_lib, observe_column='observed_dt', sample_column='samp self.observe_column = observe_column self.sample_column = sample_column - def read(self, symbol, as_of=None, from_version=None, **kwargs): - item = super(BitemporalStore, self).read(symbol, from_version=from_version, **kwargs) + def read(self, symbol, as_of=None, **kwargs): + # TODO: shall we block from_version from getting into super.read? + item = super(BitemporalStore, self).read(symbol, **kwargs) - result = BitemporalItem(symbol=symbol, - library=self._arctic_lib.get_name(), + result = BitemporalItem(symbol=symbol, library=self._arctic_lib.get_name(), data=fancy_group_by(item.data, grouping_level=self.observe_column, aggregate_level=self.sample_column, max_=as_of), metadata=item.metadata) diff --git a/tests/integration/store/test_bitemporal_store.py b/tests/integration/store/test_bitemporal_store.py index fca3712632391..453993fae56c8 100644 --- a/tests/integration/store/test_bitemporal_store.py +++ b/tests/integration/store/test_bitemporal_store.py @@ -49,7 +49,7 @@ def test_existing_ts_update_existing_data_and_read(bitemporal_library): assert_frame_equal(expected_ts, bitemporal_library.read('spam').data) -def test_read_ts_as_of_historical_date(bitemporal_library): +def test_read_ts_with_historical_update(bitemporal_library): with patch('arctic.store.bitemporal_store.dt') as mock_dt: mock_dt.now.return_value = dt(2015, 5, 1) mock_dt.side_effect = lambda *args, **kwargs: dt(*args, **kwargs) @@ -63,13 +63,18 @@ def test_read_ts_as_of_historical_date(bitemporal_library): 2012-10-09 17:06:11.040 | 6.6"""), as_of=dt(2015, 5, 3)) - expected_ts = read_str_as_pandas(""" observed_dt | near - 2012-09-08 17:06:11.040 | 1.0 - 2012-10-08 17:06:11.040 | 2.0 - 2012-10-09 17:06:11.040 | 4.2 - 2012-11-08 17:06:11.040 | 3.0""") + assert_frame_equal(bitemporal_library.read('spam', as_of=dt(2015, 5, 2, 10)).data, read_str_as_pandas( + """observed_dt | near + 2012-09-08 17:06:11.040 | 1.0 + 2012-10-08 17:06:11.040 | 2.0 + 2012-10-09 17:06:11.040 | 4.2 + 2012-11-08 17:06:11.040 | 3.0""")) + + assert_frame_equal(bitemporal_library.read('spam').data, read_str_as_pandas("""observed_dt | near + 2012-09-08 17:06:11.040 | 1.0 + 2012-10-08 17:06:11.040 | 2.0 + 2012-10-09 17:06:11.040 | 6.6 + 2012-11-08 17:06:11.040 | 3.0""")) - assert_frame_equal(expected_ts, bitemporal_library.read('spam', as_of=dt(2015, 5, 2, 10)).data) + assert_frame_equal(bitemporal_library.read('spam', as_of=dt(2015, 5, 1, 10)).data, ts1) -# bitemporal_library.append('spam', read_str_as_pandas(""" observed_dt | sample_dt | near -# 2012-10-09 17:06:11.040 | 2015-05-02 00:00:00.000 | 4.2""")) From 908e7a66841d2d77657b7d44cd7cd0b6359d6a5d Mon Sep 17 00:00:00 2001 From: Adrian Teng Date: Wed, 26 Aug 2015 15:49:11 +0100 Subject: [PATCH 05/22] Put observe_column / sample_column the right way --- arctic/store/bitemporal_store.py | 10 +++++----- .../integration/store/test_bitemporal_store.py | 17 ++++++++--------- 2 files changed, 13 insertions(+), 14 deletions(-) diff --git a/arctic/store/bitemporal_store.py b/arctic/store/bitemporal_store.py index a1b4d5022d623..9d3f83f933e87 100644 --- a/arctic/store/bitemporal_store.py +++ b/arctic/store/bitemporal_store.py @@ -24,8 +24,8 @@ def read(self, symbol, as_of=None, **kwargs): item = super(BitemporalStore, self).read(symbol, **kwargs) result = BitemporalItem(symbol=symbol, library=self._arctic_lib.get_name(), - data=fancy_group_by(item.data, grouping_level=self.observe_column, - aggregate_level=self.sample_column, max_=as_of), + data=fancy_group_by(item.data, grouping_level=self.sample_column, + aggregate_level=self.observe_column, max_=as_of), metadata=item.metadata) return result @@ -43,10 +43,10 @@ def write(self, *args, **kwargs): 'to add / modify timeseries.') def _preprocess_incoming_data(self, df, as_of): - if self.sample_column not in df: + if self.observe_column not in df: # TODO: Move this to multi_index if not as_of: as_of = dt.now() - df = pd.concat([df, pd.DataFrame([as_of] * len(df), index=df.index, columns=[self.sample_column])], axis=1) - df.set_index(self.sample_column, append=True, inplace=True) + df = pd.concat([df, pd.DataFrame([as_of] * len(df), index=df.index, columns=[self.observe_column])], axis=1) + df.set_index(self.observe_column, append=True, inplace=True) return df diff --git a/tests/integration/store/test_bitemporal_store.py b/tests/integration/store/test_bitemporal_store.py index 453993fae56c8..6455c89bd492d 100644 --- a/tests/integration/store/test_bitemporal_store.py +++ b/tests/integration/store/test_bitemporal_store.py @@ -12,13 +12,13 @@ from tests.util import read_str_as_pandas -ts1 = read_str_as_pandas(""" observed_dt | near +ts1 = read_str_as_pandas(""" sample_dt | near 2012-09-08 17:06:11.040 | 1.0 2012-10-08 17:06:11.040 | 2.0 2012-10-09 17:06:11.040 | 2.5 2012-11-08 17:06:11.040 | 3.0""") -ts1_append = read_str_as_pandas(""" observed_dt | near +ts1_append = read_str_as_pandas(""" sample_dt | near 2012-09-08 17:06:11.040 | 1.0 2012-10-08 17:06:11.040 | 2.0 2012-10-09 17:06:11.040 | 2.5 @@ -39,9 +39,9 @@ def test_existing_ts_append_and_read(bitemporal_library): def test_existing_ts_update_existing_data_and_read(bitemporal_library): bitemporal_library.append('spam', ts1) - bitemporal_library.append('spam', read_str_as_pandas(""" observed_dt | near + bitemporal_library.append('spam', read_str_as_pandas(""" sample_dt | near 2012-10-09 17:06:11.040 | 4.2""")) - expected_ts = read_str_as_pandas(""" observed_dt | near + expected_ts = read_str_as_pandas(""" sample_dt | near 2012-09-08 17:06:11.040 | 1.0 2012-10-08 17:06:11.040 | 2.0 2012-10-09 17:06:11.040 | 4.2 @@ -55,26 +55,25 @@ def test_read_ts_with_historical_update(bitemporal_library): mock_dt.side_effect = lambda *args, **kwargs: dt(*args, **kwargs) bitemporal_library.append('spam', ts1) - bitemporal_library.append('spam', read_str_as_pandas(""" observed_dt | near + bitemporal_library.append('spam', read_str_as_pandas(""" sample_dt | near 2012-10-09 17:06:11.040 | 4.2"""), as_of=dt(2015, 5, 2)) - bitemporal_library.append('spam', read_str_as_pandas(""" observed_dt | near + bitemporal_library.append('spam', read_str_as_pandas(""" sample_dt | near 2012-10-09 17:06:11.040 | 6.6"""), as_of=dt(2015, 5, 3)) assert_frame_equal(bitemporal_library.read('spam', as_of=dt(2015, 5, 2, 10)).data, read_str_as_pandas( - """observed_dt | near + """sample_dt | near 2012-09-08 17:06:11.040 | 1.0 2012-10-08 17:06:11.040 | 2.0 2012-10-09 17:06:11.040 | 4.2 2012-11-08 17:06:11.040 | 3.0""")) - assert_frame_equal(bitemporal_library.read('spam').data, read_str_as_pandas("""observed_dt | near + assert_frame_equal(bitemporal_library.read('spam').data, read_str_as_pandas(""" sample_dt | near 2012-09-08 17:06:11.040 | 1.0 2012-10-08 17:06:11.040 | 2.0 2012-10-09 17:06:11.040 | 6.6 2012-11-08 17:06:11.040 | 3.0""")) assert_frame_equal(bitemporal_library.read('spam', as_of=dt(2015, 5, 1, 10)).data, ts1) - From 66e49ba150812a6c706f4018f4791300acc21652 Mon Sep 17 00:00:00 2001 From: Adrian Teng Date: Thu, 27 Aug 2015 13:22:23 +0100 Subject: [PATCH 06/22] Refactor the appending logic + extra test cases --- arctic/store/bitemporal_store.py | 21 ++++++----- .../store/test_bitemporal_store.py | 36 +++++++++++++++++++ 2 files changed, 48 insertions(+), 9 deletions(-) diff --git a/arctic/store/bitemporal_store.py b/arctic/store/bitemporal_store.py index 9d3f83f933e87..85037198843b3 100644 --- a/arctic/store/bitemporal_store.py +++ b/arctic/store/bitemporal_store.py @@ -30,11 +30,17 @@ def read(self, symbol, as_of=None, **kwargs): return result def append(self, symbol, data, metadata=None, upsert=True, as_of=None, **kwargs): - data = self._preprocess_incoming_data(data, as_of) + assert self.observe_column not in data + if not as_of: + as_of = dt.now() + data = self._add_observe_dt_index(data, as_of) if upsert and not self.has_symbol(symbol): df = data else: - df = super(BitemporalStore, self).read(symbol, **kwargs).data.append(data) + existing_item = super(BitemporalStore, self).read(symbol, **kwargs) + if metadata is None: + metadata = existing_item.metadata + df = existing_item.data.append(data) super(BitemporalStore, self).write(symbol, df, metadata=metadata, prune_previous_version=True) def write(self, *args, **kwargs): @@ -42,11 +48,8 @@ def write(self, *args, **kwargs): raise NotImplementedError('Direct write for BitemporalStore is not supported. Use append instead' 'to add / modify timeseries.') - def _preprocess_incoming_data(self, df, as_of): - if self.observe_column not in df: - # TODO: Move this to multi_index - if not as_of: - as_of = dt.now() - df = pd.concat([df, pd.DataFrame([as_of] * len(df), index=df.index, columns=[self.observe_column])], axis=1) - df.set_index(self.observe_column, append=True, inplace=True) + def _add_observe_dt_index(self, df, as_of): + df = df.set_index(pd.MultiIndex.from_product([df.index, as_of], + names=[self.sample_column, self.observe_column]), + inplace=False) return df diff --git a/tests/integration/store/test_bitemporal_store.py b/tests/integration/store/test_bitemporal_store.py index 6455c89bd492d..146d435d56898 100644 --- a/tests/integration/store/test_bitemporal_store.py +++ b/tests/integration/store/test_bitemporal_store.py @@ -77,3 +77,39 @@ def test_read_ts_with_historical_update(bitemporal_library): 2012-11-08 17:06:11.040 | 3.0""")) assert_frame_equal(bitemporal_library.read('spam', as_of=dt(2015, 5, 1, 10)).data, ts1) + + +def test_read_ts_with_historical_update_and_new_row(bitemporal_library): + with patch('arctic.store.bitemporal_store.dt') as mock_dt: + mock_dt.now.return_value = dt(2015, 5, 1) + mock_dt.side_effect = lambda *args, **kwargs: dt(*args, **kwargs) + bitemporal_library.append('spam', ts1) + + bitemporal_library.append('spam', read_str_as_pandas(""" sample_dt | near + 2012-10-09 17:06:11.040 | 4.2 + 2012-12-01 17:06:11.040 | 100"""), + as_of=dt(2015, 5, 2)) + + assert_frame_equal(bitemporal_library.read('spam').data, read_str_as_pandas(""" sample_dt | near + 2012-09-08 17:06:11.040 | 1.0 + 2012-10-08 17:06:11.040 | 2.0 + 2012-10-09 17:06:11.040 | 4.2 + 2012-11-08 17:06:11.040 | 3.0 + 2012-12-01 17:06:11.040 | 100""")) + + assert_frame_equal(bitemporal_library.read('spam', as_of=dt(2015, 5, 1, 10)).data, ts1) + + +def test_insert_new_rows_in_middle_remains_sorted(bitemporal_library): + bitemporal_library.append('spam', ts1) + bitemporal_library.append('spam', read_str_as_pandas(""" sample_dt | near + 2012-10-09 12:00:00.000 | 30.0 + 2012-12-01 17:06:11.040 | 100""")) + + assert_frame_equal(bitemporal_library.read('spam').data, read_str_as_pandas(""" sample_dt | near + 2012-09-08 17:06:11.040 | 1.0 + 2012-10-08 17:06:11.040 | 2.0 + 2012-10-09 12:00:00.000 | 30.0 + 2012-10-09 17:06:11.040 | 2.5 + 2012-11-08 17:06:11.040 | 3.0 + 2012-12-01 17:06:11.040 | 100""")) From fb642673239203594ac0d36f21d37a15df0d7b98 Mon Sep 17 00:00:00 2001 From: Adrian Teng Date: Thu, 27 Aug 2015 15:55:43 +0100 Subject: [PATCH 07/22] Extra test case on inserting versions in between --- .../store/test_bitemporal_store.py | 26 +++++++++++++++++++ 1 file changed, 26 insertions(+) diff --git a/tests/integration/store/test_bitemporal_store.py b/tests/integration/store/test_bitemporal_store.py index 146d435d56898..88e7ca926db92 100644 --- a/tests/integration/store/test_bitemporal_store.py +++ b/tests/integration/store/test_bitemporal_store.py @@ -113,3 +113,29 @@ def test_insert_new_rows_in_middle_remains_sorted(bitemporal_library): 2012-10-09 17:06:11.040 | 2.5 2012-11-08 17:06:11.040 | 3.0 2012-12-01 17:06:11.040 | 100""")) + + +def test_insert_versions_inbetween_works_ok(bitemporal_library): + bitemporal_library.append('spam', ts1, as_of=dt(2015, 5, 1)) + bitemporal_library.append('spam', read_str_as_pandas(""" sample_dt | near + 2012-12-01 17:06:11.040 | 100"""), + as_of=dt(2015, 5, 10)) + + bitemporal_library.append('spam', read_str_as_pandas(""" sample_dt | near + 2012-12-01 17:06:11.040 | 25"""), + as_of=dt(2015, 5, 8)) + + assert_frame_equal(bitemporal_library.read('spam').data, read_str_as_pandas(""" sample_dt | near + 2012-09-08 17:06:11.040 | 1.0 + 2012-10-08 17:06:11.040 | 2.0 + 2012-10-09 17:06:11.040 | 2.5 + 2012-11-08 17:06:11.040 | 3.0 + 2012-12-01 17:06:11.040 | 100""")) + + assert_frame_equal(bitemporal_library.read('spam', as_of=dt(2015, 5, 9)).data, read_str_as_pandas( + """ sample_dt | near + 2012-09-08 17:06:11.040 | 1.0 + 2012-10-08 17:06:11.040 | 2.0 + 2012-10-09 17:06:11.040 | 2.5 + 2012-11-08 17:06:11.040 | 3.0 + 2012-12-01 17:06:11.040 | 25""")) From 289c9e1dbb01890d3124b859d7e0d1158728e708 Mon Sep 17 00:00:00 2001 From: Adrian Teng Date: Thu, 27 Aug 2015 16:59:02 +0100 Subject: [PATCH 08/22] Add support to read raw bitemporal df & docstring & test --- arctic/store/bitemporal_store.py | 65 ++++++++++++++++--- .../store/test_bitemporal_store.py | 36 +++++++++- tests/util.py | 18 ++--- 3 files changed, 102 insertions(+), 17 deletions(-) diff --git a/arctic/store/bitemporal_store.py b/arctic/store/bitemporal_store.py index 85037198843b3..a4443796bac5c 100644 --- a/arctic/store/bitemporal_store.py +++ b/arctic/store/bitemporal_store.py @@ -13,23 +13,72 @@ class BitemporalStore(VersionStore): + """ A versioned pandas DataFrame store. (currently only supports single index df) - def __init__(self, arctic_lib, observe_column='observed_dt', sample_column='sample_dt'): + As the name hinted, this holds versions of DataFrame by maintaining an extra 'insert time' index internally. + """ + + def __init__(self, arctic_lib, sample_column='sample_dt', observe_column='observed_dt'): + """ + Parameters + ---------- + arctic_lib : `ArcticLibraryBinding` + sample_column : `str` + Column name for the datetime index that represents that samaple time of the data. + observe_column : `str` + Column name for the datetime index that represents the insertion time of a row of data. This column is + internal to this store. + """ super(BitemporalStore, self).__init__(arctic_lib) self.observe_column = observe_column self.sample_column = sample_column - def read(self, symbol, as_of=None, **kwargs): + def read(self, symbol, as_of=None, raw=False, **kwargs): # TODO: shall we block from_version from getting into super.read? - item = super(BitemporalStore, self).read(symbol, **kwargs) + """Read data for the named symbol. Returns a BitemporalItem object with + a data and metdata element (as passed into write). + + Parameters + ---------- + symbol : `str` + symbol name for the item + as_of : `datetime.datetime` + Return the data as it was as_of the point in time. + raw : `bool` + If True, will return the full bitemporal dataframe (i.e. all versions of the data). This also means as_of is + ignored. - result = BitemporalItem(symbol=symbol, library=self._arctic_lib.get_name(), - data=fancy_group_by(item.data, grouping_level=self.sample_column, - aggregate_level=self.observe_column, max_=as_of), - metadata=item.metadata) - return result + Returns + ------- + BitemporalItem namedtuple which contains a .data and .metadata element + """ + item = super(BitemporalStore, self).read(symbol, **kwargs) + if raw: + return BitemporalItem(symbol=symbol, library=self._arctic_lib.get_name(), data=item.data, + metadata=item.metadata) + else: + return BitemporalItem(symbol=symbol, library=self._arctic_lib.get_name(), + data=fancy_group_by(item.data, grouping_level=self.sample_column, + aggregate_level=self.observe_column, max_=as_of), + metadata=item.metadata) def append(self, symbol, data, metadata=None, upsert=True, as_of=None, **kwargs): + """ Append 'data' under the specified 'symbol' name to this library. + + Parameters + ---------- + symbol : `str` + symbol name for the item + data : `pd.DataFrame` + to be persisted + metadata : `dict` + An optional dictionary of metadata to persist along with the symbol. If None and there are existing + metadata, current metadata will be maintained + upsert : `bool` + Write 'data' if no previous version exists. + as_of : `datetime.datetime` + The "insert time". Default to datetime.now()t + """ assert self.observe_column not in data if not as_of: as_of = dt.now() diff --git a/tests/integration/store/test_bitemporal_store.py b/tests/integration/store/test_bitemporal_store.py index 88e7ca926db92..e204cec92e023 100644 --- a/tests/integration/store/test_bitemporal_store.py +++ b/tests/integration/store/test_bitemporal_store.py @@ -7,7 +7,6 @@ from mock import patch from pandas.util.testing import assert_frame_equal -import pytest from tests.util import read_str_as_pandas @@ -31,6 +30,16 @@ def test_new_ts_read_write(bitemporal_library): assert_frame_equal(ts1, bitemporal_library.read('spam').data) +def test_read_ts_raw(bitemporal_library): + bitemporal_library.append('spam', ts1, as_of=dt(2015, 5, 1)) + assert_frame_equal(bitemporal_library.read('spam', raw=True).data, read_str_as_pandas( + """ sample_dt | observed_dt | near + 2012-09-08 17:06:11.040 | 2015-05-01 | 1.0 + 2012-10-08 17:06:11.040 | 2015-05-01 | 2.0 + 2012-10-09 17:06:11.040 | 2015-05-01 | 2.5 + 2012-11-08 17:06:11.040 | 2015-05-01 | 3.0""", num_index=2)) + + def test_existing_ts_append_and_read(bitemporal_library): bitemporal_library.append('spam', ts1) bitemporal_library.append('spam', ts1_append[-1:]) @@ -139,3 +148,28 @@ def test_insert_versions_inbetween_works_ok(bitemporal_library): 2012-10-09 17:06:11.040 | 2.5 2012-11-08 17:06:11.040 | 3.0 2012-12-01 17:06:11.040 | 25""")) + + +def test_read_ts_raw_all_version_ok(bitemporal_library): + bitemporal_library.append('spam', ts1, as_of=dt(2015, 5, 1)) + bitemporal_library.append('spam', read_str_as_pandas(""" sample_dt | near + 2012-12-01 17:06:11.040 | 25"""), + as_of=dt(2015, 5, 5)) + bitemporal_library.append('spam', read_str_as_pandas(""" sample_dt | near + 2012-11-08 17:06:11.040 | 42"""), + as_of=dt(2015, 5, 3)) + bitemporal_library.append('spam', read_str_as_pandas(""" sample_dt | near + 2012-10-08 17:06:11.040 | 42 + 2013-01-01 17:06:11.040 | 100"""), + as_of=dt(2015, 5, 10,)) + assert_frame_equal(bitemporal_library.read('spam', raw=True).data, read_str_as_pandas( + """ sample_dt | observed_dt | near + 2012-09-08 17:06:11.040 | 2015-05-01 | 1.0 + 2012-10-08 17:06:11.040 | 2015-05-01 | 2.0 + 2012-10-09 17:06:11.040 | 2015-05-01 | 2.5 + 2012-11-08 17:06:11.040 | 2015-05-01 | 3.0 + 2012-11-08 17:06:11.040 | 2015-05-03 | 42 + 2012-12-01 17:06:11.040 | 2015-05-05 | 25 + 2012-10-08 17:06:11.040 | 2015-05-10 | 42 + 2013-01-01 17:06:11.040 | 2015-05-10 | 100""", num_index=2)) + diff --git a/tests/util.py b/tests/util.py index c1b25b751689a..2770e046b5861 100644 --- a/tests/util.py +++ b/tests/util.py @@ -1,20 +1,22 @@ -from contextlib import contextmanager from cStringIO import StringIO -from dateutil.rrule import rrule, DAILY -import dateutil +from contextlib import contextmanager from datetime import datetime as dt +import sys + +import dateutil +from dateutil.rrule import rrule, DAILY import pandas + import numpy as np -import sys -def read_str_as_pandas(ts_str): +def read_str_as_pandas(ts_str, num_index=1): labels = [x.strip() for x in ts_str.split('\n')[0].split('|')] - pd = pandas.read_csv(StringIO(ts_str), sep='|', index_col=0, + pd = pandas.read_csv(StringIO(ts_str), sep='|', index_col=range(num_index), date_parser=dateutil.parser.parse) # Trim the whitespace on the column names - pd.columns = labels[1:] - pd.index.name = labels[0] + pd.columns = labels[num_index:] + pd.index.names = labels[0:num_index] return pd From 559cd12d57e21beef749175066fb8c9a06fe410d Mon Sep 17 00:00:00 2001 From: Adrian Teng Date: Thu, 27 Aug 2015 17:06:01 +0100 Subject: [PATCH 09/22] read raw should be sorted by observed_dt --- arctic/store/bitemporal_store.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/arctic/store/bitemporal_store.py b/arctic/store/bitemporal_store.py index a4443796bac5c..fc354c4d29c53 100644 --- a/arctic/store/bitemporal_store.py +++ b/arctic/store/bitemporal_store.py @@ -55,7 +55,7 @@ def read(self, symbol, as_of=None, raw=False, **kwargs): item = super(BitemporalStore, self).read(symbol, **kwargs) if raw: return BitemporalItem(symbol=symbol, library=self._arctic_lib.get_name(), data=item.data, - metadata=item.metadata) + metadata=item.metadata).sort(self.observe_column) else: return BitemporalItem(symbol=symbol, library=self._arctic_lib.get_name(), data=fancy_group_by(item.data, grouping_level=self.sample_column, From 6889f008e253187812915ee621a2aac8a641bdcb Mon Sep 17 00:00:00 2001 From: Adrian Teng Date: Thu, 27 Aug 2015 17:36:30 +0100 Subject: [PATCH 10/22] Add unit test --- tests/unit/store/test_bitemporal_store.py | 23 +++++++++++++++++++++++ 1 file changed, 23 insertions(+) create mode 100644 tests/unit/store/test_bitemporal_store.py diff --git a/tests/unit/store/test_bitemporal_store.py b/tests/unit/store/test_bitemporal_store.py new file mode 100644 index 0000000000000..b91a1b5273b11 --- /dev/null +++ b/tests/unit/store/test_bitemporal_store.py @@ -0,0 +1,23 @@ +from datetime import datetime as dt +from mock import create_autospec +from pandas.util.testing import assert_frame_equal + +from arctic.store.bitemporal_store import BitemporalStore +from tests.util import read_str_as_pandas + + +ts1 = read_str_as_pandas(""" sample_dt | near + 2012-09-08 17:06:11.040 | 1.0 + 2012-10-08 17:06:11.040 | 2.0 + 2012-10-09 17:06:11.040 | 2.5 + 2012-11-08 17:06:11.040 | 3.0""") + +def test_add_observe_dt_index(): + self = create_autospec(BitemporalStore, observe_column='col_a', + sample_column='col_b') + assert_frame_equal(BitemporalStore._add_observe_dt_index(self, ts1, as_of=dt(2001, 1, 1)), + read_str_as_pandas("""col_b | col_a | near + 2012-09-08 17:06:11.040 | 2001-01-01 | 1.0 + 2012-10-08 17:06:11.040 | 2001-01-01 | 2.0 + 2012-10-09 17:06:11.040 | 2001-01-01 | 2.5 + 2012-11-08 17:06:11.040 | 2001-01-01 | 3.0""", num_index=2)) From fed2e7f1783f397aed97b04c8f0649f86e75f886 Mon Sep 17 00:00:00 2001 From: Adrian Teng Date: Wed, 2 Sep 2015 19:05:13 +0100 Subject: [PATCH 11/22] Clean up typo --- arctic/multi_index.py | 10 ++++++---- arctic/store/bitemporal_store.py | 12 ++++++------ 2 files changed, 12 insertions(+), 10 deletions(-) diff --git a/arctic/multi_index.py b/arctic/multi_index.py index 2214d64fa8e21..5d7d07d46cd5c 100644 --- a/arctic/multi_index.py +++ b/arctic/multi_index.py @@ -1,13 +1,15 @@ ''' Utility functions for multi-index dataframes. Useful for creating bi-temporal timeseries. ''' +from datetime import datetime import logging import types -from datetime import datetime + +from pandas.tseries.tools import to_datetime as dt import numpy as np import pandas as pd -from pandas.tseries.tools import to_datetime as dt + logger = logging.getLogger(__name__) @@ -27,7 +29,7 @@ def fancy_group_by(df, grouping_level=0, aggregate_level=1, method='last', max_= Index level to aggregate by. Defaults to 1. method: ``str`` Aggregation method. One of - last: Use the last (lexigraphically) value from each group + last: Use the last (lexicographically) value from each group first: Use the first value from each group within: Any type supported by the index, or ``DateOffset``/timedelta-like for ``DatetimeIndex``. If set, will limit results to those having aggregate level values within this range of the group value @@ -39,7 +41,7 @@ def fancy_group_by(df, grouping_level=0, aggregate_level=1, method='last', max_= if method not in ('first', 'last'): raise ValueError('Invalid method') - if isinstance(aggregate_level, types.StringType): + if isinstance(aggregate_level, basestring): aggregate_level = df.index.names.index(aggregate_level) agg_idx = df.index.get_level_values(aggregate_level) diff --git a/arctic/store/bitemporal_store.py b/arctic/store/bitemporal_store.py index fc354c4d29c53..a97bb6ed09126 100644 --- a/arctic/store/bitemporal_store.py +++ b/arctic/store/bitemporal_store.py @@ -1,7 +1,7 @@ from collections import namedtuple from datetime import datetime as dt -from arctic.multi_index import fancy_group_by +from arctic.multi_index import groupby_asof import pandas as pd from .version_store import VersionStore @@ -24,7 +24,7 @@ def __init__(self, arctic_lib, sample_column='sample_dt', observe_column='observ ---------- arctic_lib : `ArcticLibraryBinding` sample_column : `str` - Column name for the datetime index that represents that samaple time of the data. + Column name for the datetime index that represents that sample time of the data. observe_column : `str` Column name for the datetime index that represents the insertion time of a row of data. This column is internal to this store. @@ -58,9 +58,9 @@ def read(self, symbol, as_of=None, raw=False, **kwargs): metadata=item.metadata).sort(self.observe_column) else: return BitemporalItem(symbol=symbol, library=self._arctic_lib.get_name(), - data=fancy_group_by(item.data, grouping_level=self.sample_column, - aggregate_level=self.observe_column, max_=as_of), - metadata=item.metadata) + data=groupby_asof(item.data, as_of=as_of, dt_col=self.sample_column, + asof_col=self.observe_column), + metadata=item.metadata, last_updated=max(item.data.index, key=lambda x: x[1])) def append(self, symbol, data, metadata=None, upsert=True, as_of=None, **kwargs): """ Append 'data' under the specified 'symbol' name to this library. @@ -77,7 +77,7 @@ def append(self, symbol, data, metadata=None, upsert=True, as_of=None, **kwargs) upsert : `bool` Write 'data' if no previous version exists. as_of : `datetime.datetime` - The "insert time". Default to datetime.now()t + The "insert time". Default to datetime.now() """ assert self.observe_column not in data if not as_of: From a52aef6903937fbf3a0bddbee3013382c406c400 Mon Sep 17 00:00:00 2001 From: Adrian Teng Date: Thu, 3 Sep 2015 09:48:26 +0100 Subject: [PATCH 12/22] Ensure stored dataframe is lex-sorted --- arctic/store/bitemporal_store.py | 7 ++++--- tests/integration/store/test_bitemporal_store.py | 2 +- 2 files changed, 5 insertions(+), 4 deletions(-) diff --git a/arctic/store/bitemporal_store.py b/arctic/store/bitemporal_store.py index a97bb6ed09126..58d31cec645dd 100644 --- a/arctic/store/bitemporal_store.py +++ b/arctic/store/bitemporal_store.py @@ -9,7 +9,7 @@ BITEMPORAL_STORE_TYPE = 'BitemporalStore' -BitemporalItem = namedtuple('BitemporalItem', 'symbol, library, data, metadata') +BitemporalItem = namedtuple('BitemporalItem', 'symbol, library, data, metadata, last_updated') class BitemporalStore(VersionStore): @@ -55,7 +55,8 @@ def read(self, symbol, as_of=None, raw=False, **kwargs): item = super(BitemporalStore, self).read(symbol, **kwargs) if raw: return BitemporalItem(symbol=symbol, library=self._arctic_lib.get_name(), data=item.data, - metadata=item.metadata).sort(self.observe_column) + metadata=item.metadata, + last_updated=max(item.data.index, key=lambda x: x[1])) else: return BitemporalItem(symbol=symbol, library=self._arctic_lib.get_name(), data=groupby_asof(item.data, as_of=as_of, dt_col=self.sample_column, @@ -89,7 +90,7 @@ def append(self, symbol, data, metadata=None, upsert=True, as_of=None, **kwargs) existing_item = super(BitemporalStore, self).read(symbol, **kwargs) if metadata is None: metadata = existing_item.metadata - df = existing_item.data.append(data) + df = existing_item.data.append(data).sort() super(BitemporalStore, self).write(symbol, df, metadata=metadata, prune_previous_version=True) def write(self, *args, **kwargs): diff --git a/tests/integration/store/test_bitemporal_store.py b/tests/integration/store/test_bitemporal_store.py index e204cec92e023..0edb8682f390b 100644 --- a/tests/integration/store/test_bitemporal_store.py +++ b/tests/integration/store/test_bitemporal_store.py @@ -166,10 +166,10 @@ def test_read_ts_raw_all_version_ok(bitemporal_library): """ sample_dt | observed_dt | near 2012-09-08 17:06:11.040 | 2015-05-01 | 1.0 2012-10-08 17:06:11.040 | 2015-05-01 | 2.0 + 2012-10-08 17:06:11.040 | 2015-05-10 | 42 2012-10-09 17:06:11.040 | 2015-05-01 | 2.5 2012-11-08 17:06:11.040 | 2015-05-01 | 3.0 2012-11-08 17:06:11.040 | 2015-05-03 | 42 2012-12-01 17:06:11.040 | 2015-05-05 | 25 - 2012-10-08 17:06:11.040 | 2015-05-10 | 42 2013-01-01 17:06:11.040 | 2015-05-10 | 100""", num_index=2)) From 63ba9c063ce9273299a6664c1911e61c32bce503 Mon Sep 17 00:00:00 2001 From: Adrian Teng Date: Thu, 3 Sep 2015 17:04:02 +0100 Subject: [PATCH 13/22] Convert BitempralStore to a simple VersionStore wrapper --- arctic/arctic.py | 4 +--- arctic/fixtures/arctic.py | 7 ++++--- arctic/store/bitemporal_store.py | 25 +++++++++++-------------- 3 files changed, 16 insertions(+), 20 deletions(-) diff --git a/arctic/arctic.py b/arctic/arctic.py index 0894e63babbad..bac653d6b4f4e 100644 --- a/arctic/arctic.py +++ b/arctic/arctic.py @@ -8,7 +8,7 @@ from .decorators import mongo_retry from .exceptions import LibraryNotFoundException, ArcticException, QuotaExceededException from .hooks import get_mongodb_uri -from .store import version_store, bitemporal_store +from .store import version_store from .tickstore import tickstore, toplevel @@ -19,12 +19,10 @@ # Default Arctic application name: 'arctic' APPLICATION_NAME = 'arctic' VERSION_STORE = version_store.VERSION_STORE_TYPE -BITEMPORAL_STORE = bitemporal_store.BITEMPORAL_STORE_TYPE TICK_STORE = tickstore.TICK_STORE_TYPE LIBRARY_TYPES = {version_store.VERSION_STORE_TYPE: version_store.VersionStore, tickstore.TICK_STORE_TYPE: tickstore.TickStore, toplevel.TICK_STORE_TYPE: toplevel.TopLevelTickStore, - bitemporal_store.BITEMPORAL_STORE_TYPE: bitemporal_store.BitemporalStore } diff --git a/arctic/fixtures/arctic.py b/arctic/fixtures/arctic.py index f16cc59e96755..e35655a590367 100644 --- a/arctic/fixtures/arctic.py +++ b/arctic/fixtures/arctic.py @@ -4,10 +4,11 @@ import pytest as pytest from .. import arctic as m +from ..store.bitemporal_store import BitemporalStore from ..tickstore.tickstore import TICK_STORE_TYPE - from .mongo import mongo_proc, mongodb + logger = logging.getLogger(__name__) mongo_proc2 = mongo_proc(executable="mongod", port="?", @@ -75,8 +76,8 @@ def library(arctic, library_name): @pytest.fixture(scope="function") def bitemporal_library(arctic, library_name): - arctic.initialize_library(library_name, m.BITEMPORAL_STORE, segment='month') # TODO: segment=month?? - return arctic.get_library(library_name) + arctic.initialize_library(library_name, m.VERSION_STORE, segment='month') # TODO: segment=month?? + return BitemporalStore(arctic.get_library(library_name)) @pytest.fixture(scope="function") diff --git a/arctic/store/bitemporal_store.py b/arctic/store/bitemporal_store.py index 58d31cec645dd..2c3f8321fa29a 100644 --- a/arctic/store/bitemporal_store.py +++ b/arctic/store/bitemporal_store.py @@ -4,32 +4,29 @@ from arctic.multi_index import groupby_asof import pandas as pd -from .version_store import VersionStore - - -BITEMPORAL_STORE_TYPE = 'BitemporalStore' BitemporalItem = namedtuple('BitemporalItem', 'symbol, library, data, metadata, last_updated') -class BitemporalStore(VersionStore): +class BitemporalStore(object): """ A versioned pandas DataFrame store. (currently only supports single index df) As the name hinted, this holds versions of DataFrame by maintaining an extra 'insert time' index internally. """ - def __init__(self, arctic_lib, sample_column='sample_dt', observe_column='observed_dt'): + def __init__(self, version_store, sample_column='sample_dt', observe_column='observed_dt'): """ Parameters ---------- - arctic_lib : `ArcticLibraryBinding` + version_store : `VersionStore` + The version store that keeps the underlying data frames sample_column : `str` Column name for the datetime index that represents that sample time of the data. observe_column : `str` Column name for the datetime index that represents the insertion time of a row of data. This column is internal to this store. """ - super(BitemporalStore, self).__init__(arctic_lib) + self._store = version_store self.observe_column = observe_column self.sample_column = sample_column @@ -52,13 +49,13 @@ def read(self, symbol, as_of=None, raw=False, **kwargs): ------- BitemporalItem namedtuple which contains a .data and .metadata element """ - item = super(BitemporalStore, self).read(symbol, **kwargs) + item = self._store.read(symbol, **kwargs) if raw: - return BitemporalItem(symbol=symbol, library=self._arctic_lib.get_name(), data=item.data, + return BitemporalItem(symbol=symbol, library=self._store._arctic_lib.get_name(), data=item.data, metadata=item.metadata, last_updated=max(item.data.index, key=lambda x: x[1])) else: - return BitemporalItem(symbol=symbol, library=self._arctic_lib.get_name(), + return BitemporalItem(symbol=symbol, library=self._store._arctic_lib.get_name(), data=groupby_asof(item.data, as_of=as_of, dt_col=self.sample_column, asof_col=self.observe_column), metadata=item.metadata, last_updated=max(item.data.index, key=lambda x: x[1])) @@ -84,14 +81,14 @@ def append(self, symbol, data, metadata=None, upsert=True, as_of=None, **kwargs) if not as_of: as_of = dt.now() data = self._add_observe_dt_index(data, as_of) - if upsert and not self.has_symbol(symbol): + if upsert and not self._store.has_symbol(symbol): df = data else: - existing_item = super(BitemporalStore, self).read(symbol, **kwargs) + existing_item = self._store.read(symbol, **kwargs) if metadata is None: metadata = existing_item.metadata df = existing_item.data.append(data).sort() - super(BitemporalStore, self).write(symbol, df, metadata=metadata, prune_previous_version=True) + self._store.write(symbol, df, metadata=metadata, prune_previous_version=True) def write(self, *args, **kwargs): # TODO: may be diff + append? From 261282a6461c6c080bfed3192902f9e3d77fc129 Mon Sep 17 00:00:00 2001 From: Adrian Teng Date: Fri, 4 Sep 2015 15:29:52 +0100 Subject: [PATCH 14/22] Ensure read/append supports timezone --- arctic/store/bitemporal_store.py | 4 +++ .../store/test_bitemporal_store.py | 28 +++++++++++++++---- 2 files changed, 27 insertions(+), 5 deletions(-) diff --git a/arctic/store/bitemporal_store.py b/arctic/store/bitemporal_store.py index 2c3f8321fa29a..6b34132ac8d43 100644 --- a/arctic/store/bitemporal_store.py +++ b/arctic/store/bitemporal_store.py @@ -1,6 +1,7 @@ from collections import namedtuple from datetime import datetime as dt +from arctic.date._mktz import mktz from arctic.multi_index import groupby_asof import pandas as pd @@ -78,8 +79,11 @@ def append(self, symbol, data, metadata=None, upsert=True, as_of=None, **kwargs) The "insert time". Default to datetime.now() """ assert self.observe_column not in data + local_tz = mktz() if not as_of: as_of = dt.now() + if as_of.tzinfo is None: + as_of = as_of.replace(tzinfo=local_tz) data = self._add_observe_dt_index(data, as_of) if upsert and not self._store.has_symbol(symbol): df = data diff --git a/tests/integration/store/test_bitemporal_store.py b/tests/integration/store/test_bitemporal_store.py index 0edb8682f390b..e390bc2b4eb05 100644 --- a/tests/integration/store/test_bitemporal_store.py +++ b/tests/integration/store/test_bitemporal_store.py @@ -8,6 +8,9 @@ from mock import patch from pandas.util.testing import assert_frame_equal +from arctic.date._mktz import mktz +from arctic.fixtures.arctic import bitemporal_library +import pandas as pd from tests.util import read_str_as_pandas @@ -31,7 +34,7 @@ def test_new_ts_read_write(bitemporal_library): def test_read_ts_raw(bitemporal_library): - bitemporal_library.append('spam', ts1, as_of=dt(2015, 5, 1)) + bitemporal_library.append('spam', ts1, as_of=dt(2015, 5, 1, tzinfo=mktz('UTC'))) assert_frame_equal(bitemporal_library.read('spam', raw=True).data, read_str_as_pandas( """ sample_dt | observed_dt | near 2012-09-08 17:06:11.040 | 2015-05-01 | 1.0 @@ -151,17 +154,17 @@ def test_insert_versions_inbetween_works_ok(bitemporal_library): def test_read_ts_raw_all_version_ok(bitemporal_library): - bitemporal_library.append('spam', ts1, as_of=dt(2015, 5, 1)) + bitemporal_library.append('spam', ts1, as_of=dt(2015, 5, 1, tzinfo=mktz('UTC'))) bitemporal_library.append('spam', read_str_as_pandas(""" sample_dt | near 2012-12-01 17:06:11.040 | 25"""), - as_of=dt(2015, 5, 5)) + as_of=dt(2015, 5, 5, tzinfo=mktz('UTC'))) bitemporal_library.append('spam', read_str_as_pandas(""" sample_dt | near 2012-11-08 17:06:11.040 | 42"""), - as_of=dt(2015, 5, 3)) + as_of=dt(2015, 5, 3, tzinfo=mktz('UTC'))) bitemporal_library.append('spam', read_str_as_pandas(""" sample_dt | near 2012-10-08 17:06:11.040 | 42 2013-01-01 17:06:11.040 | 100"""), - as_of=dt(2015, 5, 10,)) + as_of=dt(2015, 5, 10, tzinfo=mktz('UTC'))) assert_frame_equal(bitemporal_library.read('spam', raw=True).data, read_str_as_pandas( """ sample_dt | observed_dt | near 2012-09-08 17:06:11.040 | 2015-05-01 | 1.0 @@ -173,3 +176,18 @@ def test_read_ts_raw_all_version_ok(bitemporal_library): 2012-12-01 17:06:11.040 | 2015-05-05 | 25 2013-01-01 17:06:11.040 | 2015-05-10 | 100""", num_index=2)) + +def test_bitemporal_store_saves_as_of_with_timezone(bitemporal_library): + local_tz = mktz() + bitemporal_library.append('spam', ts1, as_of=dt(2015, 5, 1)) + df = bitemporal_library.read('spam', raw=True).data + assert all([x[1].replace(tzinfo=mktz('UTC')) == dt(2015, 5, 1, tzinfo=local_tz) for x in df.index]) + + +def test_bitemporal_store_read_as_of_timezone(bitemporal_library): + bitemporal_library.append('spam', ts1, as_of=dt(2015, 5, 1, tzinfo=mktz('Europe/London'))) + bitemporal_library.append('spam', read_str_as_pandas(""" sample_dt | near + 2012-12-01 17:06:11.040 | 25"""), + as_of=dt(2015, 5, 2, tzinfo=mktz('Europe/London'))) + df = bitemporal_library.read('spam', as_of=dt(2015, 5, 2, tzinfo=mktz('Asia/Hong_Kong'))).data + assert_frame_equal(df, ts1) From e7c95503200e2039b920c02283c1846a5c310adb Mon Sep 17 00:00:00 2001 From: Adrian Teng Date: Mon, 7 Sep 2015 17:19:26 +0100 Subject: [PATCH 15/22] Refactor append -> update --- arctic/store/bitemporal_store.py | 2 +- .../store/test_bitemporal_store.py | 52 +++++++++---------- 2 files changed, 27 insertions(+), 27 deletions(-) diff --git a/arctic/store/bitemporal_store.py b/arctic/store/bitemporal_store.py index 6b34132ac8d43..0b4162452ea35 100644 --- a/arctic/store/bitemporal_store.py +++ b/arctic/store/bitemporal_store.py @@ -61,7 +61,7 @@ def read(self, symbol, as_of=None, raw=False, **kwargs): asof_col=self.observe_column), metadata=item.metadata, last_updated=max(item.data.index, key=lambda x: x[1])) - def append(self, symbol, data, metadata=None, upsert=True, as_of=None, **kwargs): + def update(self, symbol, data, metadata=None, upsert=True, as_of=None, **kwargs): """ Append 'data' under the specified 'symbol' name to this library. Parameters diff --git a/tests/integration/store/test_bitemporal_store.py b/tests/integration/store/test_bitemporal_store.py index e390bc2b4eb05..13f80b61d9bbf 100644 --- a/tests/integration/store/test_bitemporal_store.py +++ b/tests/integration/store/test_bitemporal_store.py @@ -20,7 +20,7 @@ 2012-10-09 17:06:11.040 | 2.5 2012-11-08 17:06:11.040 | 3.0""") -ts1_append = read_str_as_pandas(""" sample_dt | near +ts1_update = read_str_as_pandas(""" sample_dt | near 2012-09-08 17:06:11.040 | 1.0 2012-10-08 17:06:11.040 | 2.0 2012-10-09 17:06:11.040 | 2.5 @@ -29,12 +29,12 @@ def test_new_ts_read_write(bitemporal_library): - bitemporal_library.append('spam', ts1) + bitemporal_library.update('spam', ts1) assert_frame_equal(ts1, bitemporal_library.read('spam').data) def test_read_ts_raw(bitemporal_library): - bitemporal_library.append('spam', ts1, as_of=dt(2015, 5, 1, tzinfo=mktz('UTC'))) + bitemporal_library.update('spam', ts1, as_of=dt(2015, 5, 1, tzinfo=mktz('UTC'))) assert_frame_equal(bitemporal_library.read('spam', raw=True).data, read_str_as_pandas( """ sample_dt | observed_dt | near 2012-09-08 17:06:11.040 | 2015-05-01 | 1.0 @@ -43,15 +43,15 @@ def test_read_ts_raw(bitemporal_library): 2012-11-08 17:06:11.040 | 2015-05-01 | 3.0""", num_index=2)) -def test_existing_ts_append_and_read(bitemporal_library): - bitemporal_library.append('spam', ts1) - bitemporal_library.append('spam', ts1_append[-1:]) - assert_frame_equal(ts1_append, bitemporal_library.read('spam').data) +def test_existing_ts_update_and_read(bitemporal_library): + bitemporal_library.update('spam', ts1) + bitemporal_library.update('spam', ts1_update[-1:]) + assert_frame_equal(ts1_update, bitemporal_library.read('spam').data) def test_existing_ts_update_existing_data_and_read(bitemporal_library): - bitemporal_library.append('spam', ts1) - bitemporal_library.append('spam', read_str_as_pandas(""" sample_dt | near + bitemporal_library.update('spam', ts1) + bitemporal_library.update('spam', read_str_as_pandas(""" sample_dt | near 2012-10-09 17:06:11.040 | 4.2""")) expected_ts = read_str_as_pandas(""" sample_dt | near 2012-09-08 17:06:11.040 | 1.0 @@ -65,13 +65,13 @@ def test_read_ts_with_historical_update(bitemporal_library): with patch('arctic.store.bitemporal_store.dt') as mock_dt: mock_dt.now.return_value = dt(2015, 5, 1) mock_dt.side_effect = lambda *args, **kwargs: dt(*args, **kwargs) - bitemporal_library.append('spam', ts1) + bitemporal_library.update('spam', ts1) - bitemporal_library.append('spam', read_str_as_pandas(""" sample_dt | near + bitemporal_library.update('spam', read_str_as_pandas(""" sample_dt | near 2012-10-09 17:06:11.040 | 4.2"""), as_of=dt(2015, 5, 2)) - bitemporal_library.append('spam', read_str_as_pandas(""" sample_dt | near + bitemporal_library.update('spam', read_str_as_pandas(""" sample_dt | near 2012-10-09 17:06:11.040 | 6.6"""), as_of=dt(2015, 5, 3)) @@ -95,9 +95,9 @@ def test_read_ts_with_historical_update_and_new_row(bitemporal_library): with patch('arctic.store.bitemporal_store.dt') as mock_dt: mock_dt.now.return_value = dt(2015, 5, 1) mock_dt.side_effect = lambda *args, **kwargs: dt(*args, **kwargs) - bitemporal_library.append('spam', ts1) + bitemporal_library.update('spam', ts1) - bitemporal_library.append('spam', read_str_as_pandas(""" sample_dt | near + bitemporal_library.update('spam', read_str_as_pandas(""" sample_dt | near 2012-10-09 17:06:11.040 | 4.2 2012-12-01 17:06:11.040 | 100"""), as_of=dt(2015, 5, 2)) @@ -113,8 +113,8 @@ def test_read_ts_with_historical_update_and_new_row(bitemporal_library): def test_insert_new_rows_in_middle_remains_sorted(bitemporal_library): - bitemporal_library.append('spam', ts1) - bitemporal_library.append('spam', read_str_as_pandas(""" sample_dt | near + bitemporal_library.update('spam', ts1) + bitemporal_library.update('spam', read_str_as_pandas(""" sample_dt | near 2012-10-09 12:00:00.000 | 30.0 2012-12-01 17:06:11.040 | 100""")) @@ -128,12 +128,12 @@ def test_insert_new_rows_in_middle_remains_sorted(bitemporal_library): def test_insert_versions_inbetween_works_ok(bitemporal_library): - bitemporal_library.append('spam', ts1, as_of=dt(2015, 5, 1)) - bitemporal_library.append('spam', read_str_as_pandas(""" sample_dt | near + bitemporal_library.update('spam', ts1, as_of=dt(2015, 5, 1)) + bitemporal_library.update('spam', read_str_as_pandas(""" sample_dt | near 2012-12-01 17:06:11.040 | 100"""), as_of=dt(2015, 5, 10)) - bitemporal_library.append('spam', read_str_as_pandas(""" sample_dt | near + bitemporal_library.update('spam', read_str_as_pandas(""" sample_dt | near 2012-12-01 17:06:11.040 | 25"""), as_of=dt(2015, 5, 8)) @@ -154,14 +154,14 @@ def test_insert_versions_inbetween_works_ok(bitemporal_library): def test_read_ts_raw_all_version_ok(bitemporal_library): - bitemporal_library.append('spam', ts1, as_of=dt(2015, 5, 1, tzinfo=mktz('UTC'))) - bitemporal_library.append('spam', read_str_as_pandas(""" sample_dt | near + bitemporal_library.update('spam', ts1, as_of=dt(2015, 5, 1, tzinfo=mktz('UTC'))) + bitemporal_library.update('spam', read_str_as_pandas(""" sample_dt | near 2012-12-01 17:06:11.040 | 25"""), as_of=dt(2015, 5, 5, tzinfo=mktz('UTC'))) - bitemporal_library.append('spam', read_str_as_pandas(""" sample_dt | near + bitemporal_library.update('spam', read_str_as_pandas(""" sample_dt | near 2012-11-08 17:06:11.040 | 42"""), as_of=dt(2015, 5, 3, tzinfo=mktz('UTC'))) - bitemporal_library.append('spam', read_str_as_pandas(""" sample_dt | near + bitemporal_library.update('spam', read_str_as_pandas(""" sample_dt | near 2012-10-08 17:06:11.040 | 42 2013-01-01 17:06:11.040 | 100"""), as_of=dt(2015, 5, 10, tzinfo=mktz('UTC'))) @@ -179,14 +179,14 @@ def test_read_ts_raw_all_version_ok(bitemporal_library): def test_bitemporal_store_saves_as_of_with_timezone(bitemporal_library): local_tz = mktz() - bitemporal_library.append('spam', ts1, as_of=dt(2015, 5, 1)) + bitemporal_library.update('spam', ts1, as_of=dt(2015, 5, 1)) df = bitemporal_library.read('spam', raw=True).data assert all([x[1].replace(tzinfo=mktz('UTC')) == dt(2015, 5, 1, tzinfo=local_tz) for x in df.index]) def test_bitemporal_store_read_as_of_timezone(bitemporal_library): - bitemporal_library.append('spam', ts1, as_of=dt(2015, 5, 1, tzinfo=mktz('Europe/London'))) - bitemporal_library.append('spam', read_str_as_pandas(""" sample_dt | near + bitemporal_library.update('spam', ts1, as_of=dt(2015, 5, 1, tzinfo=mktz('Europe/London'))) + bitemporal_library.update('spam', read_str_as_pandas(""" sample_dt | near 2012-12-01 17:06:11.040 | 25"""), as_of=dt(2015, 5, 2, tzinfo=mktz('Europe/London'))) df = bitemporal_library.read('spam', as_of=dt(2015, 5, 2, tzinfo=mktz('Asia/Hong_Kong'))).data From 1e50a82e910eb58b7ebdc39b56f8d36cedfbeb8b Mon Sep 17 00:00:00 2001 From: Adrian Teng Date: Tue, 8 Sep 2015 15:09:45 +0100 Subject: [PATCH 16/22] Allow string index in read_str_as_pandas --- tests/util.py | 10 ++++++++-- 1 file changed, 8 insertions(+), 2 deletions(-) diff --git a/tests/util.py b/tests/util.py index 2770e046b5861..c477127f10e80 100644 --- a/tests/util.py +++ b/tests/util.py @@ -10,10 +10,16 @@ import numpy as np +def dt_or_str_parser(string): + try: + return dateutil.parser.parse(string) + except ValueError: + return string.strip() + + def read_str_as_pandas(ts_str, num_index=1): labels = [x.strip() for x in ts_str.split('\n')[0].split('|')] - pd = pandas.read_csv(StringIO(ts_str), sep='|', index_col=range(num_index), - date_parser=dateutil.parser.parse) + pd = pandas.read_csv(StringIO(ts_str), sep='|', index_col=range(num_index), date_parser=dt_or_str_parser) # Trim the whitespace on the column names pd.columns = labels[num_index:] pd.index.names = labels[0:num_index] From aea66e549ec614326e34303623b7733f525917fa Mon Sep 17 00:00:00 2001 From: Adrian Teng Date: Wed, 9 Sep 2015 15:50:35 +0100 Subject: [PATCH 17/22] Add multi-index of depth > 2 support --- arctic/multi_index.py | 13 ++-- arctic/store/bitemporal_store.py | 17 +++--- .../store/test_bitemporal_store.py | 59 ++++++++++++++++++- tests/unit/test_multi_index.py | 28 ++++++++- 4 files changed, 98 insertions(+), 19 deletions(-) diff --git a/arctic/multi_index.py b/arctic/multi_index.py index 5d7d07d46cd5c..0433fdd47e374 100644 --- a/arctic/multi_index.py +++ b/arctic/multi_index.py @@ -23,7 +23,7 @@ def fancy_group_by(df, grouping_level=0, aggregate_level=1, method='last', max_= ---------- df: ``DataFrame`` Pandas dataframe with a MultiIndex - grouping_level: ``int`` or ``str`` + grouping_level: ``int`` or ``str`` or ``list`` of ``str`` Index level to group by. Defaults to 0. aggregate_level: ``int`` or ``str`` Index level to aggregate by. Defaults to 1. @@ -31,12 +31,13 @@ def fancy_group_by(df, grouping_level=0, aggregate_level=1, method='last', max_= Aggregation method. One of last: Use the last (lexicographically) value from each group first: Use the first value from each group - within: Any type supported by the index, or ``DateOffset``/timedelta-like for ``DatetimeIndex``. - If set, will limit results to those having aggregate level values within this range of the group value max_: If set, will limit results to those having aggregate level values <= this value min_: If set, will limit results to those having aggregate level values >= this value + within: Any type supported by the index, or ``DateOffset``/timedelta-like for ``DatetimeIndex``. + If set, will limit results to those having aggregate level values within this range of the group value. + Note that this is currently unsupported for Multi-index of depth > 2 """ if method not in ('first', 'last'): raise ValueError('Invalid method') @@ -44,17 +45,16 @@ def fancy_group_by(df, grouping_level=0, aggregate_level=1, method='last', max_= if isinstance(aggregate_level, basestring): aggregate_level = df.index.names.index(aggregate_level) - agg_idx = df.index.get_level_values(aggregate_level) - group_idx = df.index.get_level_values(grouping_level) - # Trim any rows outside the aggregate value bounds if max_ is not None or min_ is not None or within is not None: + agg_idx = df.index.get_level_values(aggregate_level) mask = np.full(len(agg_idx), True, dtype='b1') if max_ is not None: mask &= (agg_idx <= max_) if min_ is not None: mask &= (agg_idx >= min_) if within is not None: + group_idx = df.index.get_level_values(grouping_level) if isinstance(agg_idx, pd.DatetimeIndex): mask &= (group_idx >= agg_idx.shift(-1, freq=within)) else: @@ -121,4 +121,3 @@ def insert_at(df, sample_date, values): """ observed_dt = dt(datetime.now()) return multi_index_insert_row(df, [sample_date, observed_dt], values) - diff --git a/arctic/store/bitemporal_store.py b/arctic/store/bitemporal_store.py index 0b4162452ea35..d6b86656b1b5e 100644 --- a/arctic/store/bitemporal_store.py +++ b/arctic/store/bitemporal_store.py @@ -15,21 +15,18 @@ class BitemporalStore(object): As the name hinted, this holds versions of DataFrame by maintaining an extra 'insert time' index internally. """ - def __init__(self, version_store, sample_column='sample_dt', observe_column='observed_dt'): + def __init__(self, version_store, observe_column='observed_dt'): """ Parameters ---------- version_store : `VersionStore` The version store that keeps the underlying data frames - sample_column : `str` - Column name for the datetime index that represents that sample time of the data. observe_column : `str` Column name for the datetime index that represents the insertion time of a row of data. This column is internal to this store. """ self._store = version_store self.observe_column = observe_column - self.sample_column = sample_column def read(self, symbol, as_of=None, raw=False, **kwargs): # TODO: shall we block from_version from getting into super.read? @@ -56,9 +53,10 @@ def read(self, symbol, as_of=None, raw=False, **kwargs): metadata=item.metadata, last_updated=max(item.data.index, key=lambda x: x[1])) else: + index_names = list(item.data.index.names) + index_names.remove(self.observe_column) return BitemporalItem(symbol=symbol, library=self._store._arctic_lib.get_name(), - data=groupby_asof(item.data, as_of=as_of, dt_col=self.sample_column, - asof_col=self.observe_column), + data=groupby_asof(item.data, as_of=as_of, dt_col=index_names, asof_col=self.observe_column), metadata=item.metadata, last_updated=max(item.data.index, key=lambda x: x[1])) def update(self, symbol, data, metadata=None, upsert=True, as_of=None, **kwargs): @@ -100,7 +98,8 @@ def write(self, *args, **kwargs): 'to add / modify timeseries.') def _add_observe_dt_index(self, df, as_of): - df = df.set_index(pd.MultiIndex.from_product([df.index, as_of], - names=[self.sample_column, self.observe_column]), - inplace=False) + index_names = list(df.index.names) + index_names.append(self.observe_column) + index = [x + (as_of,) if df.index.nlevels > 1 else (x, as_of) for x in df.index.tolist()] + df = df.set_index(pd.MultiIndex.from_tuples(index, names=index_names), inplace=False) return df diff --git a/tests/integration/store/test_bitemporal_store.py b/tests/integration/store/test_bitemporal_store.py index 13f80b61d9bbf..203a4c17767df 100644 --- a/tests/integration/store/test_bitemporal_store.py +++ b/tests/integration/store/test_bitemporal_store.py @@ -9,10 +9,11 @@ from pandas.util.testing import assert_frame_equal from arctic.date._mktz import mktz -from arctic.fixtures.arctic import bitemporal_library import pandas as pd from tests.util import read_str_as_pandas +pytest_plugins = ['arctic.fixtures.arctic'] + ts1 = read_str_as_pandas(""" sample_dt | near 2012-09-08 17:06:11.040 | 1.0 @@ -191,3 +192,59 @@ def test_bitemporal_store_read_as_of_timezone(bitemporal_library): as_of=dt(2015, 5, 2, tzinfo=mktz('Europe/London'))) df = bitemporal_library.read('spam', as_of=dt(2015, 5, 2, tzinfo=mktz('Asia/Hong_Kong'))).data assert_frame_equal(df, ts1) + + +def test_multi_index_ts_read_write(bitemporal_library): + ts = read_str_as_pandas(""" index 1 | index 2 | near + 2012-09-08 17:06:11.040 | SPAM Index | 1.0 + 2012-10-08 17:06:11.040 | SPAM Index | 2.0 + 2012-10-09 17:06:11.040 | SPAM Index | 2.5 + 2012-11-08 17:06:11.040 | SPAM Index | 3.0""", num_index=2) + bitemporal_library.update('spam', ts) + assert_frame_equal(ts, bitemporal_library.read('spam').data) + + +def test_multi_index_ts_read_raw(bitemporal_library): + ts = read_str_as_pandas(""" index 1 | index 2 | near + 2012-09-08 17:06:11.040 | SPAM Index | 1.0 + 2012-10-08 17:06:11.040 | SPAM Index | 2.0 + 2012-10-09 17:06:11.040 | SPAM Index | 2.5 + 2012-11-08 17:06:11.040 | SPAM Index | 3.0""", num_index=2) + + expected_ts = read_str_as_pandas(""" index 1 | index 2 | observed_dt | near + 2012-09-08 17:06:11.040 | SPAM Index | 2015-01-01 | 1.0 + 2012-10-08 17:06:11.040 | SPAM Index | 2015-01-01 | 2.0 + 2012-10-09 17:06:11.040 | SPAM Index | 2015-01-01 | 2.5 + 2012-11-08 17:06:11.040 | SPAM Index | 2015-01-01 | 3.0""", num_index=3) + bitemporal_library.update('spam', ts, as_of=dt(2015, 1, 1)) + assert_frame_equal(expected_ts, bitemporal_library.read('spam', raw=True).data) + + +def test_multi_index_update(bitemporal_library): + ts = read_str_as_pandas(""" index 1 | index 2 | near + 2012-09-08 17:06:11.040 | SPAM Index | 1.0 + 2012-09-08 17:06:11.040 | EGG Index | 1.1 + 2012-10-08 17:06:11.040 | SPAM Index | 2.0 + 2012-10-08 17:06:11.040 | EGG Index | 2.1 + 2012-10-09 17:06:11.040 | SPAM Index | 2.5 + 2012-10-09 17:06:11.040 | EGG Index | 2.6 + 2012-11-08 17:06:11.040 | SPAM Index | 3.0 + 2012-11-08 17:06:11.040 | EGG Index | 3.1""", num_index=2) + ts2 = read_str_as_pandas(""" index 1 | index 2 | near + 2012-09-08 17:06:11.040 | SPAM Index | 1.2 + 2012-09-08 17:06:11.040 | EGG Index | 1.6 + 2012-12-08 17:06:11.040 | SPAM Index | 4.0""", num_index=2) + expected_ts = read_str_as_pandas(""" index 1 | index 2 | near + 2012-09-08 17:06:11.040 | EGG Index | 1.6 + 2012-09-08 17:06:11.040 | SPAM Index | 1.2 + 2012-10-08 17:06:11.040 | EGG Index | 2.1 + 2012-10-08 17:06:11.040 | SPAM Index | 2.0 + 2012-10-09 17:06:11.040 | EGG Index | 2.6 + 2012-10-09 17:06:11.040 | SPAM Index | 2.5 + 2012-11-08 17:06:11.040 | EGG Index | 3.1 + 2012-11-08 17:06:11.040 | SPAM Index | 3.0 + 2012-12-08 17:06:11.040 | EGG Index | + 2012-12-08 17:06:11.040 | SPAM Index | 4.0""", num_index=2) + bitemporal_library.update('spam', ts) + bitemporal_library.update('spam', ts2) + assert_frame_equal(expected_ts, bitemporal_library.read('spam').data) diff --git a/tests/unit/test_multi_index.py b/tests/unit/test_multi_index.py index 1b322f696adbe..4eb50dbc2e6a6 100644 --- a/tests/unit/test_multi_index.py +++ b/tests/unit/test_multi_index.py @@ -1,11 +1,13 @@ from datetime import timedelta import itertools -import numpy as np -import pandas as pd from pandas.tseries.tools import to_datetime as dt +from pandas.util.testing import assert_frame_equal from arctic.multi_index import groupby_asof, fancy_group_by, insert_at +import numpy as np +import pandas as pd +from tests.util import read_str_as_pandas def get_bitemporal_test_data(): @@ -93,6 +95,28 @@ def test__get_ts__unsorted_index(): assert all(df['CLOSE'] == [10.1, 20.1, 30.1, 40.1]) +def test_fancy_group_by_multi_index(): + ts = read_str_as_pandas(""" index 1 | index 2 | observed_dt | near + 2012-09-08 17:06:11.040 | SPAM Index | 2015-01-01 | 1.0 + 2012-09-08 17:06:11.040 | EGG Index | 2015-01-01 | 1.6 + 2012-10-08 17:06:11.040 | SPAM Index | 2015-01-01 | 2.0 + 2012-10-08 17:06:11.040 | SPAM Index | 2015-01-05 | 4.2 + 2012-10-08 17:06:11.040 | EGG Index | 2015-01-01 | 2.1 + 2012-10-09 17:06:11.040 | SPAM Index | 2015-01-01 | 2.5 + 2012-10-09 17:06:11.040 | EGG Index | 2015-01-01 | 2.6 + 2012-11-08 17:06:11.040 | SPAM Index | 2015-01-01 | 3.0""", num_index=3) + expected_ts = read_str_as_pandas(""" index 1 | index 2 | near + 2012-09-08 17:06:11.040 | EGG Index | 1.6 + 2012-09-08 17:06:11.040 | SPAM Index | 1.0 + 2012-10-08 17:06:11.040 | EGG Index | 2.1 + 2012-10-08 17:06:11.040 | SPAM Index | 4.2 + 2012-10-09 17:06:11.040 | EGG Index | 2.6 + 2012-10-09 17:06:11.040 | SPAM Index | 2.5 + 2012-11-08 17:06:11.040 | EGG Index | + 2012-11-08 17:06:11.040 | SPAM Index | 3.0""", num_index=2) + assert_frame_equal(expected_ts, groupby_asof(ts, dt_col=['index 1', 'index 2'], asof_col='observed_dt')) + + # --------- Min/Max using numeric index ----------- # def get_numeric_index_test_data(): From 32a9a773e736b2b32f28ccf1909851a058041239 Mon Sep 17 00:00:00 2001 From: Adrian Teng Date: Wed, 9 Sep 2015 16:23:33 +0100 Subject: [PATCH 18/22] refactor last_update to support mult-index --- arctic/store/bitemporal_store.py | 11 ++++++----- tests/integration/store/test_bitemporal_store.py | 11 +++++++++-- 2 files changed, 15 insertions(+), 7 deletions(-) diff --git a/arctic/store/bitemporal_store.py b/arctic/store/bitemporal_store.py index d6b86656b1b5e..3728057007438 100644 --- a/arctic/store/bitemporal_store.py +++ b/arctic/store/bitemporal_store.py @@ -10,7 +10,7 @@ class BitemporalStore(object): - """ A versioned pandas DataFrame store. (currently only supports single index df) + """ A versioned pandas DataFrame store. As the name hinted, this holds versions of DataFrame by maintaining an extra 'insert time' index internally. """ @@ -22,8 +22,8 @@ def __init__(self, version_store, observe_column='observed_dt'): version_store : `VersionStore` The version store that keeps the underlying data frames observe_column : `str` - Column name for the datetime index that represents the insertion time of a row of data. This column is - internal to this store. + Column name for the datetime index that represents the insertion time of a row of data. Unless you intent to + read raw data out, this column is internal to this store. """ self._store = version_store self.observe_column = observe_column @@ -48,16 +48,17 @@ def read(self, symbol, as_of=None, raw=False, **kwargs): BitemporalItem namedtuple which contains a .data and .metadata element """ item = self._store.read(symbol, **kwargs) + last_updated = max(item.data.index.get_level_values(self.observe_column)) if raw: return BitemporalItem(symbol=symbol, library=self._store._arctic_lib.get_name(), data=item.data, metadata=item.metadata, - last_updated=max(item.data.index, key=lambda x: x[1])) + last_updated=last_updated) else: index_names = list(item.data.index.names) index_names.remove(self.observe_column) return BitemporalItem(symbol=symbol, library=self._store._arctic_lib.get_name(), data=groupby_asof(item.data, as_of=as_of, dt_col=index_names, asof_col=self.observe_column), - metadata=item.metadata, last_updated=max(item.data.index, key=lambda x: x[1])) + metadata=item.metadata, last_updated=last_updated) def update(self, symbol, data, metadata=None, upsert=True, as_of=None, **kwargs): """ Append 'data' under the specified 'symbol' name to this library. diff --git a/tests/integration/store/test_bitemporal_store.py b/tests/integration/store/test_bitemporal_store.py index 203a4c17767df..fae9ec88e2e3f 100644 --- a/tests/integration/store/test_bitemporal_store.py +++ b/tests/integration/store/test_bitemporal_store.py @@ -44,6 +44,12 @@ def test_read_ts_raw(bitemporal_library): 2012-11-08 17:06:11.040 | 2015-05-01 | 3.0""", num_index=2)) +def test_last_update(bitemporal_library): + bitemporal_library.update('spam', ts1, as_of=dt(2015, 1, 1)) + bitemporal_library.update('spam', ts1, as_of=dt(2015, 1, 2)) + assert bitemporal_library.read('spam').last_updated == dt(2015, 1, 2) + + def test_existing_ts_update_and_read(bitemporal_library): bitemporal_library.update('spam', ts1) bitemporal_library.update('spam', ts1_update[-1:]) @@ -245,6 +251,7 @@ def test_multi_index_update(bitemporal_library): 2012-11-08 17:06:11.040 | SPAM Index | 3.0 2012-12-08 17:06:11.040 | EGG Index | 2012-12-08 17:06:11.040 | SPAM Index | 4.0""", num_index=2) - bitemporal_library.update('spam', ts) - bitemporal_library.update('spam', ts2) + bitemporal_library.update('spam', ts, as_of=dt(2015, 1, 1)) + bitemporal_library.update('spam', ts2, as_of=dt(2015, 1, 2)) assert_frame_equal(expected_ts, bitemporal_library.read('spam').data) + assert bitemporal_library.read('spam').last_updated == dt(2015, 1, 2) From 8c51e9f1773769e01a03f146e7050f2b6666c761 Mon Sep 17 00:00:00 2001 From: Adrian Teng Date: Fri, 11 Sep 2015 15:50:37 +0100 Subject: [PATCH 19/22] Clean up --- arctic/store/bitemporal_store.py | 8 +++++--- 1 file changed, 5 insertions(+), 3 deletions(-) diff --git a/arctic/store/bitemporal_store.py b/arctic/store/bitemporal_store.py index 3728057007438..85ef62294732e 100644 --- a/arctic/store/bitemporal_store.py +++ b/arctic/store/bitemporal_store.py @@ -22,7 +22,7 @@ def __init__(self, version_store, observe_column='observed_dt'): version_store : `VersionStore` The version store that keeps the underlying data frames observe_column : `str` - Column name for the datetime index that represents the insertion time of a row of data. Unless you intent to + Column name for the datetime index that represents the insertion time of a row of data. Unless you intend to read raw data out, this column is internal to this store. """ self._store = version_store @@ -57,7 +57,8 @@ def read(self, symbol, as_of=None, raw=False, **kwargs): index_names = list(item.data.index.names) index_names.remove(self.observe_column) return BitemporalItem(symbol=symbol, library=self._store._arctic_lib.get_name(), - data=groupby_asof(item.data, as_of=as_of, dt_col=index_names, asof_col=self.observe_column), + data=groupby_asof(item.data, as_of=as_of, dt_col=index_names, + asof_col=self.observe_column), metadata=item.metadata, last_updated=last_updated) def update(self, symbol, data, metadata=None, upsert=True, as_of=None, **kwargs): @@ -77,7 +78,8 @@ def update(self, symbol, data, metadata=None, upsert=True, as_of=None, **kwargs) as_of : `datetime.datetime` The "insert time". Default to datetime.now() """ - assert self.observe_column not in data + if self.observe_column in data: + raise ValueError("Column {} is not allowed as it is being used by bitemporal store interally.".format(self.observe_column)) local_tz = mktz() if not as_of: as_of = dt.now() From f5f6a788f27d3b01782a2e2e1e82489bff80559a Mon Sep 17 00:00:00 2001 From: Adrian Teng Date: Fri, 11 Sep 2015 16:59:20 +0100 Subject: [PATCH 20/22] Fix column name check --- arctic/store/bitemporal_store.py | 2 +- tests/unit/store/test_bitemporal_store.py | 27 ++++++++++++++++------- 2 files changed, 20 insertions(+), 9 deletions(-) diff --git a/arctic/store/bitemporal_store.py b/arctic/store/bitemporal_store.py index 85ef62294732e..b92228ef55a97 100644 --- a/arctic/store/bitemporal_store.py +++ b/arctic/store/bitemporal_store.py @@ -78,7 +78,7 @@ def update(self, symbol, data, metadata=None, upsert=True, as_of=None, **kwargs) as_of : `datetime.datetime` The "insert time". Default to datetime.now() """ - if self.observe_column in data: + if self.observe_column in data.index.names: raise ValueError("Column {} is not allowed as it is being used by bitemporal store interally.".format(self.observe_column)) local_tz = mktz() if not as_of: diff --git a/tests/unit/store/test_bitemporal_store.py b/tests/unit/store/test_bitemporal_store.py index b91a1b5273b11..b90bc2014199e 100644 --- a/tests/unit/store/test_bitemporal_store.py +++ b/tests/unit/store/test_bitemporal_store.py @@ -1,6 +1,8 @@ from datetime import datetime as dt -from mock import create_autospec + +from mock import create_autospec, sentinel from pandas.util.testing import assert_frame_equal +import pytest from arctic.store.bitemporal_store import BitemporalStore from tests.util import read_str_as_pandas @@ -12,12 +14,21 @@ 2012-10-09 17:06:11.040 | 2.5 2012-11-08 17:06:11.040 | 3.0""") + def test_add_observe_dt_index(): - self = create_autospec(BitemporalStore, observe_column='col_a', - sample_column='col_b') + self = create_autospec(BitemporalStore, observe_column='col_a') assert_frame_equal(BitemporalStore._add_observe_dt_index(self, ts1, as_of=dt(2001, 1, 1)), - read_str_as_pandas("""col_b | col_a | near - 2012-09-08 17:06:11.040 | 2001-01-01 | 1.0 - 2012-10-08 17:06:11.040 | 2001-01-01 | 2.0 - 2012-10-09 17:06:11.040 | 2001-01-01 | 2.5 - 2012-11-08 17:06:11.040 | 2001-01-01 | 3.0""", num_index=2)) + read_str_as_pandas("""sample_dt | col_a | near + 2012-09-08 17:06:11.040 | 2001-01-01 | 1.0 + 2012-10-08 17:06:11.040 | 2001-01-01 | 2.0 + 2012-10-09 17:06:11.040 | 2001-01-01 | 2.5 + 2012-11-08 17:06:11.040 | 2001-01-01 | 3.0""", num_index=2)) + + +def test_update_with_observe_column_fails(): + self = create_autospec(BitemporalStore, observe_column='col_a') + with pytest.raises(ValueError) as e: + BitemporalStore.update(self, sentinel.symbol, read_str_as_pandas( + """col_b | col_a | near + 2012-09-08 17:06:11.040 | 2001-01-01 | 1.0""", num_index=2)) + assert str(e.value) == "Column col_a is not allowed as it is being used by bitemporal store interally." From 0b4d927cec8ea33be3a40ef61ae0557e25195631 Mon Sep 17 00:00:00 2001 From: Adrian Teng Date: Mon, 14 Sep 2015 11:19:13 +0100 Subject: [PATCH 21/22] Remove restriction on column name --- arctic/fixtures/arctic.py | 2 +- arctic/store/bitemporal_store.py | 2 -- tests/integration/store/test_bitemporal_store.py | 10 ++++++++++ tests/unit/store/test_bitemporal_store.py | 12 +----------- 4 files changed, 12 insertions(+), 14 deletions(-) diff --git a/arctic/fixtures/arctic.py b/arctic/fixtures/arctic.py index e35655a590367..c1f8d71caa0c8 100644 --- a/arctic/fixtures/arctic.py +++ b/arctic/fixtures/arctic.py @@ -76,7 +76,7 @@ def library(arctic, library_name): @pytest.fixture(scope="function") def bitemporal_library(arctic, library_name): - arctic.initialize_library(library_name, m.VERSION_STORE, segment='month') # TODO: segment=month?? + arctic.initialize_library(library_name, m.VERSION_STORE, segment='month') return BitemporalStore(arctic.get_library(library_name)) diff --git a/arctic/store/bitemporal_store.py b/arctic/store/bitemporal_store.py index b92228ef55a97..7d6c67d074d7b 100644 --- a/arctic/store/bitemporal_store.py +++ b/arctic/store/bitemporal_store.py @@ -78,8 +78,6 @@ def update(self, symbol, data, metadata=None, upsert=True, as_of=None, **kwargs) as_of : `datetime.datetime` The "insert time". Default to datetime.now() """ - if self.observe_column in data.index.names: - raise ValueError("Column {} is not allowed as it is being used by bitemporal store interally.".format(self.observe_column)) local_tz = mktz() if not as_of: as_of = dt.now() diff --git a/tests/integration/store/test_bitemporal_store.py b/tests/integration/store/test_bitemporal_store.py index fae9ec88e2e3f..5d68defe131ea 100644 --- a/tests/integration/store/test_bitemporal_store.py +++ b/tests/integration/store/test_bitemporal_store.py @@ -44,6 +44,16 @@ def test_read_ts_raw(bitemporal_library): 2012-11-08 17:06:11.040 | 2015-05-01 | 3.0""", num_index=2)) +def test_write_ts_with_column_name_same_as_observed_dt_ok(bitemporal_library): + ts1 = read_str_as_pandas(""" sample_dt | observed_dt | near + 2012-09-08 17:06:11.040 | 2015-1-1 | 1.0 + 2012-10-08 17:06:11.040 | 2015-1-1 | 2.0 + 2012-10-09 17:06:11.040 | 2015-1-1 | 2.5 + 2012-11-08 17:06:11.040 | 2015-1-1 | 3.0""") + bitemporal_library.update('spam', ts1) + assert_frame_equal(ts1, bitemporal_library.read('spam').data) + + def test_last_update(bitemporal_library): bitemporal_library.update('spam', ts1, as_of=dt(2015, 1, 1)) bitemporal_library.update('spam', ts1, as_of=dt(2015, 1, 2)) diff --git a/tests/unit/store/test_bitemporal_store.py b/tests/unit/store/test_bitemporal_store.py index b90bc2014199e..c24175bef8b85 100644 --- a/tests/unit/store/test_bitemporal_store.py +++ b/tests/unit/store/test_bitemporal_store.py @@ -1,8 +1,7 @@ from datetime import datetime as dt -from mock import create_autospec, sentinel +from mock import create_autospec from pandas.util.testing import assert_frame_equal -import pytest from arctic.store.bitemporal_store import BitemporalStore from tests.util import read_str_as_pandas @@ -23,12 +22,3 @@ def test_add_observe_dt_index(): 2012-10-08 17:06:11.040 | 2001-01-01 | 2.0 2012-10-09 17:06:11.040 | 2001-01-01 | 2.5 2012-11-08 17:06:11.040 | 2001-01-01 | 3.0""", num_index=2)) - - -def test_update_with_observe_column_fails(): - self = create_autospec(BitemporalStore, observe_column='col_a') - with pytest.raises(ValueError) as e: - BitemporalStore.update(self, sentinel.symbol, read_str_as_pandas( - """col_b | col_a | near - 2012-09-08 17:06:11.040 | 2001-01-01 | 1.0""", num_index=2)) - assert str(e.value) == "Column col_a is not allowed as it is being used by bitemporal store interally." From 3e6c98cd46e4f6015a80a2eccb348d3a9f619a87 Mon Sep 17 00:00:00 2001 From: Adrian Teng Date: Mon, 14 Sep 2015 15:47:48 +0100 Subject: [PATCH 22/22] Fix tests due to groupby changes on 0.16 --- tests/integration/store/test_bitemporal_store.py | 1 - tests/unit/test_multi_index.py | 1 - 2 files changed, 2 deletions(-) diff --git a/tests/integration/store/test_bitemporal_store.py b/tests/integration/store/test_bitemporal_store.py index 5d68defe131ea..5b54bbd237542 100644 --- a/tests/integration/store/test_bitemporal_store.py +++ b/tests/integration/store/test_bitemporal_store.py @@ -259,7 +259,6 @@ def test_multi_index_update(bitemporal_library): 2012-10-09 17:06:11.040 | SPAM Index | 2.5 2012-11-08 17:06:11.040 | EGG Index | 3.1 2012-11-08 17:06:11.040 | SPAM Index | 3.0 - 2012-12-08 17:06:11.040 | EGG Index | 2012-12-08 17:06:11.040 | SPAM Index | 4.0""", num_index=2) bitemporal_library.update('spam', ts, as_of=dt(2015, 1, 1)) bitemporal_library.update('spam', ts2, as_of=dt(2015, 1, 2)) diff --git a/tests/unit/test_multi_index.py b/tests/unit/test_multi_index.py index 4eb50dbc2e6a6..e98a478c05432 100644 --- a/tests/unit/test_multi_index.py +++ b/tests/unit/test_multi_index.py @@ -112,7 +112,6 @@ def test_fancy_group_by_multi_index(): 2012-10-08 17:06:11.040 | SPAM Index | 4.2 2012-10-09 17:06:11.040 | EGG Index | 2.6 2012-10-09 17:06:11.040 | SPAM Index | 2.5 - 2012-11-08 17:06:11.040 | EGG Index | 2012-11-08 17:06:11.040 | SPAM Index | 3.0""", num_index=2) assert_frame_equal(expected_ts, groupby_asof(ts, dt_col=['index 1', 'index 2'], asof_col='observed_dt'))