Skip to content

Commit

Permalink
Merge pull request pandas-dev#30 from manahl/feature/multi-index-data…
Browse files Browse the repository at this point in the history
…frame

Feature/multi index dataframe
  • Loading branch information
AdrianTeng committed Sep 14, 2015
2 parents ed0e729 + 3e6c98c commit 006e251
Show file tree
Hide file tree
Showing 8 changed files with 802 additions and 16 deletions.
11 changes: 5 additions & 6 deletions arctic/arctic.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,15 +3,14 @@
import pymongo
from pymongo.errors import OperationFailure, AutoReconnect

from ._util import indent
from .auth import authenticate, get_auth
from .hooks import get_mongodb_uri
from .decorators import mongo_retry
from ._util import indent

from .exceptions import LibraryNotFoundException, ArcticException, QuotaExceededException
from .hooks import get_mongodb_uri
from .store import version_store
from .tickstore import tickstore
from .tickstore import toplevel
from .tickstore import tickstore, toplevel


__all__ = ['Arctic', 'VERSION_STORE', 'TICK_STORE', 'register_library_type']

Expand All @@ -23,7 +22,7 @@
TICK_STORE = tickstore.TICK_STORE_TYPE
LIBRARY_TYPES = {version_store.VERSION_STORE_TYPE: version_store.VersionStore,
tickstore.TICK_STORE_TYPE: tickstore.TickStore,
toplevel.TICK_STORE_TYPE: toplevel.TopLevelTickStore
toplevel.TICK_STORE_TYPE: toplevel.TopLevelTickStore,
}


Expand Down
9 changes: 8 additions & 1 deletion arctic/fixtures/arctic.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,10 +4,11 @@
import pytest as pytest

from .. import arctic as m
from ..store.bitemporal_store import BitemporalStore
from ..tickstore.tickstore import TICK_STORE_TYPE

from .mongo import mongo_proc, mongodb


logger = logging.getLogger(__name__)

mongo_proc2 = mongo_proc(executable="mongod", port="?",
Expand Down Expand Up @@ -73,6 +74,12 @@ def library(arctic, library_name):
return arctic.get_library(library_name)


@pytest.fixture(scope="function")
def bitemporal_library(arctic, library_name):
arctic.initialize_library(library_name, m.VERSION_STORE, segment='month')
return BitemporalStore(arctic.get_library(library_name))


@pytest.fixture(scope="function")
def library_secondary(arctic_secondary, library_name):
arctic_secondary.initialize_library(library_name, m.VERSION_STORE, segment='month')
Expand Down
123 changes: 123 additions & 0 deletions arctic/multi_index.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,123 @@
'''
Utility functions for multi-index dataframes. Useful for creating bi-temporal timeseries.
'''
from datetime import datetime
import logging
import types

from pandas.tseries.tools import to_datetime as dt

import numpy as np
import pandas as pd


logger = logging.getLogger(__name__)


# ----------------------- Grouping and Aggregating ---------------------------- #

def fancy_group_by(df, grouping_level=0, aggregate_level=1, method='last', max_=None, min_=None, within=None):
""" Dataframe group-by operation that supports aggregating by different methods on the index.
Parameters
----------
df: ``DataFrame``
Pandas dataframe with a MultiIndex
grouping_level: ``int`` or ``str`` or ``list`` of ``str``
Index level to group by. Defaults to 0.
aggregate_level: ``int`` or ``str``
Index level to aggregate by. Defaults to 1.
method: ``str``
Aggregation method. One of
last: Use the last (lexicographically) value from each group
first: Use the first value from each group
max_: <any>
If set, will limit results to those having aggregate level values <= this value
min_: <any>
If set, will limit results to those having aggregate level values >= this value
within: Any type supported by the index, or ``DateOffset``/timedelta-like for ``DatetimeIndex``.
If set, will limit results to those having aggregate level values within this range of the group value.
Note that this is currently unsupported for Multi-index of depth > 2
"""
if method not in ('first', 'last'):
raise ValueError('Invalid method')

if isinstance(aggregate_level, basestring):
aggregate_level = df.index.names.index(aggregate_level)

# Trim any rows outside the aggregate value bounds
if max_ is not None or min_ is not None or within is not None:
agg_idx = df.index.get_level_values(aggregate_level)
mask = np.full(len(agg_idx), True, dtype='b1')
if max_ is not None:
mask &= (agg_idx <= max_)
if min_ is not None:
mask &= (agg_idx >= min_)
if within is not None:
group_idx = df.index.get_level_values(grouping_level)
if isinstance(agg_idx, pd.DatetimeIndex):
mask &= (group_idx >= agg_idx.shift(-1, freq=within))
else:
mask &= (group_idx >= (agg_idx - within))
df = df.loc[mask]

# The sort order must be correct in order of grouping_level -> aggregate_level for the aggregation methods
# to work properly. We can check the sortdepth to see if this is in fact the case and resort if necessary.
# TODO: this might need tweaking if the levels are around the wrong way
if df.index.lexsort_depth < (aggregate_level + 1):
df = df.sortlevel(level=grouping_level)

gb = df.groupby(level=grouping_level)
if method == 'last':
return gb.last()
return gb.first()


# --------- Common as-of-date use case -------------- #

def groupby_asof(df, as_of=None, dt_col='sample_dt', asof_col='observed_dt'):
''' Common use case for selecting the latest rows from a bitemporal dataframe as-of a certain date.
Parameters
----------
df: ``pd.DataFrame``
Dataframe with a MultiIndex index
as_of: ``datetime``
Return a timeseries with values observed <= this as-of date. By default, the latest observed
values will be returned.
dt_col: ``str`` or ``int``
Name or index of the column in the MultiIndex that is the sample date
asof_col: ``str`` or ``int``
Name or index of the column in the MultiIndex that is the observed date
'''
return fancy_group_by(df,
grouping_level=dt_col,
aggregate_level=asof_col,
method='last',
max_=as_of)


# ----------------------- Insert/Append ---------------------------- #


def multi_index_insert_row(df, index_row, values_row):
""" Return a new dataframe with a row inserted for a multi-index dataframe.
This will sort the rows according to the ordered multi-index levels.
"""
row_index = pd.MultiIndex(levels=[[i] for i in index_row],
labels=[[0] for i in index_row])
row = pd.DataFrame(values_row, index=row_index, columns=df.columns)
df = pd.concat((df, row))
if df.index.lexsort_depth == len(index_row) and df.index[-2] < df.index[-1]:
# We've just appended a row to an already-sorted dataframe
return df
# The df wasn't sorted or the row has to be put in the middle somewhere
return df.sortlevel()


def insert_at(df, sample_date, values):
""" Insert some values into a bi-temporal dataframe.
This is like what would happen when we get a price correction.
"""
observed_dt = dt(datetime.now())
return multi_index_insert_row(df, [sample_date, observed_dt], values)
106 changes: 106 additions & 0 deletions arctic/store/bitemporal_store.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,106 @@
from collections import namedtuple
from datetime import datetime as dt

from arctic.date._mktz import mktz
from arctic.multi_index import groupby_asof
import pandas as pd


BitemporalItem = namedtuple('BitemporalItem', 'symbol, library, data, metadata, last_updated')


class BitemporalStore(object):
""" A versioned pandas DataFrame store.
As the name hinted, this holds versions of DataFrame by maintaining an extra 'insert time' index internally.
"""

def __init__(self, version_store, observe_column='observed_dt'):
"""
Parameters
----------
version_store : `VersionStore`
The version store that keeps the underlying data frames
observe_column : `str`
Column name for the datetime index that represents the insertion time of a row of data. Unless you intend to
read raw data out, this column is internal to this store.
"""
self._store = version_store
self.observe_column = observe_column

def read(self, symbol, as_of=None, raw=False, **kwargs):
# TODO: shall we block from_version from getting into super.read?
"""Read data for the named symbol. Returns a BitemporalItem object with
a data and metdata element (as passed into write).
Parameters
----------
symbol : `str`
symbol name for the item
as_of : `datetime.datetime`
Return the data as it was as_of the point in time.
raw : `bool`
If True, will return the full bitemporal dataframe (i.e. all versions of the data). This also means as_of is
ignored.
Returns
-------
BitemporalItem namedtuple which contains a .data and .metadata element
"""
item = self._store.read(symbol, **kwargs)
last_updated = max(item.data.index.get_level_values(self.observe_column))
if raw:
return BitemporalItem(symbol=symbol, library=self._store._arctic_lib.get_name(), data=item.data,
metadata=item.metadata,
last_updated=last_updated)
else:
index_names = list(item.data.index.names)
index_names.remove(self.observe_column)
return BitemporalItem(symbol=symbol, library=self._store._arctic_lib.get_name(),
data=groupby_asof(item.data, as_of=as_of, dt_col=index_names,
asof_col=self.observe_column),
metadata=item.metadata, last_updated=last_updated)

def update(self, symbol, data, metadata=None, upsert=True, as_of=None, **kwargs):
""" Append 'data' under the specified 'symbol' name to this library.
Parameters
----------
symbol : `str`
symbol name for the item
data : `pd.DataFrame`
to be persisted
metadata : `dict`
An optional dictionary of metadata to persist along with the symbol. If None and there are existing
metadata, current metadata will be maintained
upsert : `bool`
Write 'data' if no previous version exists.
as_of : `datetime.datetime`
The "insert time". Default to datetime.now()
"""
local_tz = mktz()
if not as_of:
as_of = dt.now()
if as_of.tzinfo is None:
as_of = as_of.replace(tzinfo=local_tz)
data = self._add_observe_dt_index(data, as_of)
if upsert and not self._store.has_symbol(symbol):
df = data
else:
existing_item = self._store.read(symbol, **kwargs)
if metadata is None:
metadata = existing_item.metadata
df = existing_item.data.append(data).sort()
self._store.write(symbol, df, metadata=metadata, prune_previous_version=True)

def write(self, *args, **kwargs):
# TODO: may be diff + append?
raise NotImplementedError('Direct write for BitemporalStore is not supported. Use append instead'
'to add / modify timeseries.')

def _add_observe_dt_index(self, df, as_of):
index_names = list(df.index.names)
index_names.append(self.observe_column)
index = [x + (as_of,) if df.index.nlevels > 1 else (x, as_of) for x in df.index.tolist()]
df = df.set_index(pd.MultiIndex.from_tuples(index, names=index_names), inplace=False)
return df
Loading

0 comments on commit 006e251

Please sign in to comment.