Skip to content

Commit

Permalink
Support for audit logs in ChunkStore (pandas-dev#319)
Browse files Browse the repository at this point in the history
  • Loading branch information
bmoscon authored Jan 30, 2017
1 parent 43840f7 commit c5951f0
Show file tree
Hide file tree
Showing 3 changed files with 139 additions and 17 deletions.
2 changes: 2 additions & 0 deletions CHANGES.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,10 @@
* Bugfix: #300 to_datetime deprecated in pandas, use to_pydatetime instead
* Bugfix: #309 formatting change for DateRange ```__str__```
* Feature: #313 set and read user specified metadata in chunkstore
* Feature: #319 Audit log support in ChunkStor
* Bugfix: #216 Tickstore write fails with named index column


### 1.36 (2016-12-13)

* Feature: Default to hashed based sharding
Expand Down
92 changes: 80 additions & 12 deletions arctic/chunkstore/chunkstore.py
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,7 @@ def __init__(self, arctic_lib):
self._collection = arctic_lib.get_top_level_collection()
self._symbols = self._collection.symbols
self._mdata = self._collection.metadata
self._audit = self._collection.audit

def __getstate__(self):
return {'arctic_lib': self._arctic_lib}
Expand All @@ -107,7 +108,7 @@ def _checksum(self, fields, data):
sha.update(data)
return Binary(sha.digest())

def delete(self, symbol, chunk_range=None):
def delete(self, symbol, chunk_range=None, audit=None):
"""
Delete all chunks for a symbol, or optionally, chunks within a range
Expand All @@ -117,6 +118,8 @@ def delete(self, symbol, chunk_range=None):
symbol name for the item
chunk_range: range object
a date range to delete
audit: dict
dict to store in the audit log
"""
if chunk_range is not None:
sym = self._get_symbol_info(symbol)
Expand Down Expand Up @@ -144,6 +147,16 @@ def delete(self, symbol, chunk_range=None):
self._collection.delete_many(query)
self._symbols.delete_many(query)
self._mdata.delete_many(query)

if audit is not None:
audit['symbol'] = symbol
if chunk_range is not None:
audit['rows_deleted'] = row_adjust
audit['action'] = 'range delete'
else:
audit['action'] = 'symbol delete'

self._audit.insert_one(audit)

def list_symbols(self, partial_match=None):
"""
Expand All @@ -166,7 +179,7 @@ def list_symbols(self, partial_match=None):
def _get_symbol_info(self, symbol):
return self._symbols.find_one({SYMBOL: symbol})

def rename(self, from_symbol, to_symbol):
def rename(self, from_symbol, to_symbol, audit=None):
"""
Rename a symbol
Expand All @@ -176,6 +189,8 @@ def rename(self, from_symbol, to_symbol):
the existing symbol that will be renamed
to_symbol: str
the new symbol name
audit: dict
audit information
"""

sym = self._get_symbol_info(from_symbol)
Expand All @@ -191,6 +206,14 @@ def rename(self, from_symbol, to_symbol):
{'$set': {SYMBOL: to_symbol}})
mongo_retry(self._mdata.update_many)({SYMBOL: from_symbol},
{'$set': {SYMBOL: to_symbol}})
mongo_retry(self._audit.update_many)({'symbol': from_symbol},
{'$set': {'symbol': to_symbol}})
if audit is not None:
audit['symbol'] = to_symbol
audit['action'] = 'symbol rename'
audit['old_symbol'] = from_symbol
self._audit.insert_one(audit)


def read(self, symbol, chunk_range=None, filter_data=True, **kwargs):
"""
Expand Down Expand Up @@ -245,8 +268,25 @@ def read(self, symbol, chunk_range=None, filter_data=True, **kwargs):
if not filter_data or chunk_range is None:
return data
return CHUNKER_MAP[sym[CHUNKER]].filter(data, chunk_range)

def read_audit_log(self, symbol=None):
"""
Reads the audit log
Parameters
----------
symbol: str
optionally only retrieve specific symbol's audit information
Returns
-------
list of dicts
"""
if symbol:
return [x for x in self._audit.find({'symbol': symbol}, {'_id': False})]
return [x for x in self._audit.find({}, {'_id': False})]

def write(self, symbol, item, metadata=None, chunker=DateChunker(), **kwargs):
def write(self, symbol, item, metadata=None, chunker=DateChunker(), audit=None, **kwargs):
"""
Writes data from item to symbol in the database
Expand All @@ -260,6 +300,8 @@ def write(self, symbol, item, metadata=None, chunker=DateChunker(), **kwargs):
optional per symbol metadata
chunker: Object of type Chunker
A chunker that chunks the data in item
audit: dict
audit information
kwargs:
optional keyword args that are passed to the chunker. Includes:
chunk_size:
Expand Down Expand Up @@ -336,8 +378,13 @@ def write(self, symbol, item, metadata=None, chunker=DateChunker(), **kwargs):
mongo_retry(self._symbols.update_one)({SYMBOL: symbol},
{'$set': doc},
upsert=True)
if audit is not None:
audit['symbol'] = symbol
audit['action'] = 'write'
audit['chunks'] = chunk_count
self._audit.insert_one(audit)

def __update(self, sym, item, metadata=None, combine_method=None, chunk_range=None):
def __update(self, sym, item, metadata=None, combine_method=None, chunk_range=None, audit=None):
'''
helper method used by update and append since they very closely
resemble eachother. Really differ only by the combine method.
Expand All @@ -361,6 +408,8 @@ def __update(self, sym, item, metadata=None, combine_method=None, chunk_range=No
op = False
chunker = CHUNKER_MAP[sym[CHUNKER]]

appended = 0
new_chunks = 0
for start, end, _, record in chunker.to_chunks(item, chunk_size=sym[CHUNK_SIZE]):
# read out matching chunks
df = self.read(symbol, chunk_range=chunker.to_range(start, end), filter_data=False)
Expand All @@ -371,10 +420,12 @@ def __update(self, sym, item, metadata=None, combine_method=None, chunk_range=No
if record is None or record.equals(df):
continue

sym[APPEND_COUNT] += len(record)
sym[APPEND_COUNT] += len(record) - len(df)
appended += len(record) - len(df)
sym[LEN] += len(record) - len(df)
else:
sym[CHUNK_COUNT] += 1
new_chunks += 1
sym[LEN] += len(record)

data = SER_MAP[sym[SERIALIZER]].serialize(record)
Expand Down Expand Up @@ -420,8 +471,14 @@ def __update(self, sym, item, metadata=None, combine_method=None, chunk_range=No

sym[USERMETA] = metadata
self._symbols.replace_one({SYMBOL: symbol}, sym)

def append(self, symbol, item, metadata=None):
if audit is not None:
if new_chunks > 0:
audit['new_chunks'] = new_chunks
if appended > 0:
audit['appended_rows'] = appended
self._audit.insert_one(audit)

def append(self, symbol, item, metadata=None, audit=None):
"""
Appends data from item to symbol's data in the database.
Expand All @@ -435,13 +492,18 @@ def append(self, symbol, item, metadata=None):
the data to append
metadata: ?
optional per symbol metadata
audit: dict
optional audit information
"""
sym = self._get_symbol_info(symbol)
if not sym:
raise NoDataFoundException("Symbol does not exist.")
self.__update(sym, item, metadata=metadata, combine_method=SER_MAP[sym[SERIALIZER]].combine)
if audit is not None:
audit['symbol'] = symbol
audit['action'] = 'append'
self.__update(sym, item, metadata=metadata, combine_method=SER_MAP[sym[SERIALIZER]].combine, audit=audit)

def update(self, symbol, item, metadata=None, chunk_range=None, upsert=False, **kwargs):
def update(self, symbol, item, metadata=None, chunk_range=None, upsert=False, audit=None, **kwargs):
"""
Overwrites data in DB with data in item for the given symbol.
Expand All @@ -462,6 +524,8 @@ def update(self, symbol, item, metadata=None, chunk_range=None, upsert=False, **
original data.
upsert: bool
if True, will write the data even if the symbol does not exist.
audit: dict
optional audit information
kwargs:
optional keyword args passed to write during an upsert. Includes:
chunk_size
Expand All @@ -470,15 +534,18 @@ def update(self, symbol, item, metadata=None, chunk_range=None, upsert=False, **
sym = self._get_symbol_info(symbol)
if not sym:
if upsert:
return self.write(symbol, item, metadata=metadata, **kwargs)
return self.write(symbol, item, metadata=metadata, audit=audit, **kwargs)
else:
raise NoDataFoundException("Symbol does not exist.")
if audit is not None:
audit['symbol'] = symbol
audit['action'] = 'update'
if chunk_range is not None:
if len(CHUNKER_MAP[sym[CHUNKER]].filter(item, chunk_range)) == 0:
raise Exception('Range must be inclusive of data')
self.__update(sym, item, metadata=metadata, combine_method=self.serializer.combine, chunk_range=chunk_range)
self.__update(sym, item, metadata=metadata, combine_method=self.serializer.combine, chunk_range=chunk_range, audit=audit)
else:
self.__update(sym, item, metadata=metadata, combine_method=lambda old, new: new, chunk_range=chunk_range)
self.__update(sym, item, metadata=metadata, combine_method=lambda old, new: new, chunk_range=chunk_range, audit=audit)

def get_info(self, symbol):
"""
Expand All @@ -499,6 +566,7 @@ def get_info(self, symbol):
ret = {}
ret['chunk_count'] = sym[CHUNK_COUNT]
ret['len'] = sym[LEN]
ret['appended_rows'] = sym[APPEND_COUNT]
ret['metadata'] = sym[METADATA]
ret['chunker'] = sym[CHUNKER]
ret['chunk_size'] = sym[CHUNK_SIZE]
Expand Down
62 changes: 57 additions & 5 deletions tests/integration/chunkstore/test_chunkstore.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
import random
import pytest
import pymongo
import pickle

from arctic.chunkstore.chunkstore import START, SYMBOL
from arctic.chunkstore.passthrough_chunker import PassthroughChunker
Expand Down Expand Up @@ -661,6 +662,7 @@ def test_get_info(chunkstore_lib):
)
chunkstore_lib.write('test_df', df)
info = {'len': 3,
'appended_rows': 0,
'chunk_count': 3,
'metadata': {'columns': [u'date', u'id', u'data']},
'chunker': u'date',
Expand Down Expand Up @@ -688,6 +690,7 @@ def test_get_info_after_append(chunkstore_lib):
assert_frame_equal(chunkstore_lib.read('test_df'), pd.concat([df, df2]).sort_index())

info = {'len': 6,
'appended_rows': 2,
'chunk_count': 4,
'metadata': {'columns': [u'date', u'id', u'data']},
'chunker': u'date',
Expand Down Expand Up @@ -715,6 +718,7 @@ def test_get_info_after_update(chunkstore_lib):
chunkstore_lib.update('test_df', df2)

info = {'len': 4,
'appended_rows': 0,
'chunk_count': 4,
'metadata': {'columns': [u'date', u'id', u'data']},
'chunker': u'date',
Expand Down Expand Up @@ -991,6 +995,10 @@ def test_rename(chunkstore_lib):
with pytest.raises(Exception) as e:
chunkstore_lib.rename('new_name', 'new_name')
assert('already exists' in str(e))

with pytest.raises(NoDataFoundException) as e:
chunkstore_lib.rename('doesnt_exist', 'temp')
assert('No data found for doesnt_exist' in str(e))

assert('test' not in chunkstore_lib.list_symbols())

Expand Down Expand Up @@ -1225,12 +1233,12 @@ def test_stats(chunkstore_lib):


def test_metadata(chunkstore_lib):
df = DataFrame(data={'data': np.random.randint(0, 100, size=2)},
df = DataFrame(data={'data': np.random.randint(0, 100, size=2)},
index=pd.date_range('2016-01-01', '2016-01-02'))
df.index.name = 'date'
chunkstore_lib.write('data', df, metadata = 'some metadata')
m = chunkstore_lib.read_metadata('data')
assert(m == u'some metadata')
df.index.name = 'date'
chunkstore_lib.write('data', df, metadata = 'some metadata')
m = chunkstore_lib.read_metadata('data')
assert(m == u'some metadata')


def test_metadata_update(chunkstore_lib):
Expand Down Expand Up @@ -1281,3 +1289,47 @@ def test_write_metadata(chunkstore_lib):
def test_write_metadata_nosymbol(chunkstore_lib):
with pytest.raises(NoDataFoundException):
chunkstore_lib.write_metadata('doesnt_exist', 'meta')


def test_audit(chunkstore_lib):
df = DataFrame(data={'data': np.random.randint(0, 100, size=2)},
index=pd.date_range('2016-01-01', '2016-01-02'))
df.index.name = 'date'
chunkstore_lib.write('data', df, audit={'user': 'test_user'})
df = DataFrame(data={'data': np.random.randint(0, 100, size=10)},
index=pd.date_range('2016-01-01', '2016-01-10'))
df.index.name = 'date'
chunkstore_lib.write('data', df, audit={'user': 'other_user'})

assert(len(chunkstore_lib.read_audit_log()) == 2)
assert(len(chunkstore_lib.read_audit_log(symbol='data')) == 2)
assert(len(chunkstore_lib.read_audit_log(symbol='none')) == 0)

chunkstore_lib.append('data', df, audit={'user': 'test_user'})
assert(chunkstore_lib.read_audit_log()[-1]['appended_rows'] == 10)

df = DataFrame(data={'data': np.random.randint(0, 100, size=5)},
index=pd.date_range('2017-01-01', '2017-01-5'))
df.index.name = 'date'
chunkstore_lib.update('data', df, audit={'user': 'other_user'})
assert(chunkstore_lib.read_audit_log()[-1]['new_chunks'] == 5)

chunkstore_lib.rename('data', 'data_new', audit={'user': 'temp_user'})
assert(chunkstore_lib.read_audit_log()[-1]['action'] == 'symbol rename')

chunkstore_lib.delete('data_new', chunk_range=DateRange('2016-01-01', '2016-01-02'), audit={'user': 'test_user'})
chunkstore_lib.delete('data_new', audit={'user': 'test_user'})
assert(chunkstore_lib.read_audit_log()[-1]['action'] == 'symbol delete')
assert(chunkstore_lib.read_audit_log()[-2]['action'] == 'range delete')


def test_chunkstore_misc(chunkstore_lib):

p = pickle.dumps(chunkstore_lib)
c = pickle.loads(p)
assert(chunkstore_lib._arctic_lib.get_name() == c._arctic_lib.get_name())

assert("arctic_test.TEST" in str(chunkstore_lib))
assert(str(chunkstore_lib) == repr(chunkstore_lib))


0 comments on commit c5951f0

Please sign in to comment.