diff --git a/CHANGES.md b/CHANGES.md index b6ed14d0381b0..45e0d1b34042d 100644 --- a/CHANGES.md +++ b/CHANGES.md @@ -4,8 +4,10 @@ * Bugfix: #300 to_datetime deprecated in pandas, use to_pydatetime instead * Bugfix: #309 formatting change for DateRange ```__str__``` * Feature: #313 set and read user specified metadata in chunkstore + * Feature: #319 Audit log support in ChunkStor * Bugfix: #216 Tickstore write fails with named index column + ### 1.36 (2016-12-13) * Feature: Default to hashed based sharding diff --git a/arctic/chunkstore/chunkstore.py b/arctic/chunkstore/chunkstore.py index 5742500b30131..905ca0fbf923e 100644 --- a/arctic/chunkstore/chunkstore.py +++ b/arctic/chunkstore/chunkstore.py @@ -83,6 +83,7 @@ def __init__(self, arctic_lib): self._collection = arctic_lib.get_top_level_collection() self._symbols = self._collection.symbols self._mdata = self._collection.metadata + self._audit = self._collection.audit def __getstate__(self): return {'arctic_lib': self._arctic_lib} @@ -107,7 +108,7 @@ def _checksum(self, fields, data): sha.update(data) return Binary(sha.digest()) - def delete(self, symbol, chunk_range=None): + def delete(self, symbol, chunk_range=None, audit=None): """ Delete all chunks for a symbol, or optionally, chunks within a range @@ -117,6 +118,8 @@ def delete(self, symbol, chunk_range=None): symbol name for the item chunk_range: range object a date range to delete + audit: dict + dict to store in the audit log """ if chunk_range is not None: sym = self._get_symbol_info(symbol) @@ -144,6 +147,16 @@ def delete(self, symbol, chunk_range=None): self._collection.delete_many(query) self._symbols.delete_many(query) self._mdata.delete_many(query) + + if audit is not None: + audit['symbol'] = symbol + if chunk_range is not None: + audit['rows_deleted'] = row_adjust + audit['action'] = 'range delete' + else: + audit['action'] = 'symbol delete' + + self._audit.insert_one(audit) def list_symbols(self, partial_match=None): """ @@ -166,7 +179,7 @@ def list_symbols(self, partial_match=None): def _get_symbol_info(self, symbol): return self._symbols.find_one({SYMBOL: symbol}) - def rename(self, from_symbol, to_symbol): + def rename(self, from_symbol, to_symbol, audit=None): """ Rename a symbol @@ -176,6 +189,8 @@ def rename(self, from_symbol, to_symbol): the existing symbol that will be renamed to_symbol: str the new symbol name + audit: dict + audit information """ sym = self._get_symbol_info(from_symbol) @@ -191,6 +206,14 @@ def rename(self, from_symbol, to_symbol): {'$set': {SYMBOL: to_symbol}}) mongo_retry(self._mdata.update_many)({SYMBOL: from_symbol}, {'$set': {SYMBOL: to_symbol}}) + mongo_retry(self._audit.update_many)({'symbol': from_symbol}, + {'$set': {'symbol': to_symbol}}) + if audit is not None: + audit['symbol'] = to_symbol + audit['action'] = 'symbol rename' + audit['old_symbol'] = from_symbol + self._audit.insert_one(audit) + def read(self, symbol, chunk_range=None, filter_data=True, **kwargs): """ @@ -245,8 +268,25 @@ def read(self, symbol, chunk_range=None, filter_data=True, **kwargs): if not filter_data or chunk_range is None: return data return CHUNKER_MAP[sym[CHUNKER]].filter(data, chunk_range) + + def read_audit_log(self, symbol=None): + """ + Reads the audit log + + Parameters + ---------- + symbol: str + optionally only retrieve specific symbol's audit information + + Returns + ------- + list of dicts + """ + if symbol: + return [x for x in self._audit.find({'symbol': symbol}, {'_id': False})] + return [x for x in self._audit.find({}, {'_id': False})] - def write(self, symbol, item, metadata=None, chunker=DateChunker(), **kwargs): + def write(self, symbol, item, metadata=None, chunker=DateChunker(), audit=None, **kwargs): """ Writes data from item to symbol in the database @@ -260,6 +300,8 @@ def write(self, symbol, item, metadata=None, chunker=DateChunker(), **kwargs): optional per symbol metadata chunker: Object of type Chunker A chunker that chunks the data in item + audit: dict + audit information kwargs: optional keyword args that are passed to the chunker. Includes: chunk_size: @@ -336,8 +378,13 @@ def write(self, symbol, item, metadata=None, chunker=DateChunker(), **kwargs): mongo_retry(self._symbols.update_one)({SYMBOL: symbol}, {'$set': doc}, upsert=True) + if audit is not None: + audit['symbol'] = symbol + audit['action'] = 'write' + audit['chunks'] = chunk_count + self._audit.insert_one(audit) - def __update(self, sym, item, metadata=None, combine_method=None, chunk_range=None): + def __update(self, sym, item, metadata=None, combine_method=None, chunk_range=None, audit=None): ''' helper method used by update and append since they very closely resemble eachother. Really differ only by the combine method. @@ -361,6 +408,8 @@ def __update(self, sym, item, metadata=None, combine_method=None, chunk_range=No op = False chunker = CHUNKER_MAP[sym[CHUNKER]] + appended = 0 + new_chunks = 0 for start, end, _, record in chunker.to_chunks(item, chunk_size=sym[CHUNK_SIZE]): # read out matching chunks df = self.read(symbol, chunk_range=chunker.to_range(start, end), filter_data=False) @@ -371,10 +420,12 @@ def __update(self, sym, item, metadata=None, combine_method=None, chunk_range=No if record is None or record.equals(df): continue - sym[APPEND_COUNT] += len(record) + sym[APPEND_COUNT] += len(record) - len(df) + appended += len(record) - len(df) sym[LEN] += len(record) - len(df) else: sym[CHUNK_COUNT] += 1 + new_chunks += 1 sym[LEN] += len(record) data = SER_MAP[sym[SERIALIZER]].serialize(record) @@ -420,8 +471,14 @@ def __update(self, sym, item, metadata=None, combine_method=None, chunk_range=No sym[USERMETA] = metadata self._symbols.replace_one({SYMBOL: symbol}, sym) - - def append(self, symbol, item, metadata=None): + if audit is not None: + if new_chunks > 0: + audit['new_chunks'] = new_chunks + if appended > 0: + audit['appended_rows'] = appended + self._audit.insert_one(audit) + + def append(self, symbol, item, metadata=None, audit=None): """ Appends data from item to symbol's data in the database. @@ -435,13 +492,18 @@ def append(self, symbol, item, metadata=None): the data to append metadata: ? optional per symbol metadata + audit: dict + optional audit information """ sym = self._get_symbol_info(symbol) if not sym: raise NoDataFoundException("Symbol does not exist.") - self.__update(sym, item, metadata=metadata, combine_method=SER_MAP[sym[SERIALIZER]].combine) + if audit is not None: + audit['symbol'] = symbol + audit['action'] = 'append' + self.__update(sym, item, metadata=metadata, combine_method=SER_MAP[sym[SERIALIZER]].combine, audit=audit) - def update(self, symbol, item, metadata=None, chunk_range=None, upsert=False, **kwargs): + def update(self, symbol, item, metadata=None, chunk_range=None, upsert=False, audit=None, **kwargs): """ Overwrites data in DB with data in item for the given symbol. @@ -462,6 +524,8 @@ def update(self, symbol, item, metadata=None, chunk_range=None, upsert=False, ** original data. upsert: bool if True, will write the data even if the symbol does not exist. + audit: dict + optional audit information kwargs: optional keyword args passed to write during an upsert. Includes: chunk_size @@ -470,15 +534,18 @@ def update(self, symbol, item, metadata=None, chunk_range=None, upsert=False, ** sym = self._get_symbol_info(symbol) if not sym: if upsert: - return self.write(symbol, item, metadata=metadata, **kwargs) + return self.write(symbol, item, metadata=metadata, audit=audit, **kwargs) else: raise NoDataFoundException("Symbol does not exist.") + if audit is not None: + audit['symbol'] = symbol + audit['action'] = 'update' if chunk_range is not None: if len(CHUNKER_MAP[sym[CHUNKER]].filter(item, chunk_range)) == 0: raise Exception('Range must be inclusive of data') - self.__update(sym, item, metadata=metadata, combine_method=self.serializer.combine, chunk_range=chunk_range) + self.__update(sym, item, metadata=metadata, combine_method=self.serializer.combine, chunk_range=chunk_range, audit=audit) else: - self.__update(sym, item, metadata=metadata, combine_method=lambda old, new: new, chunk_range=chunk_range) + self.__update(sym, item, metadata=metadata, combine_method=lambda old, new: new, chunk_range=chunk_range, audit=audit) def get_info(self, symbol): """ @@ -499,6 +566,7 @@ def get_info(self, symbol): ret = {} ret['chunk_count'] = sym[CHUNK_COUNT] ret['len'] = sym[LEN] + ret['appended_rows'] = sym[APPEND_COUNT] ret['metadata'] = sym[METADATA] ret['chunker'] = sym[CHUNKER] ret['chunk_size'] = sym[CHUNK_SIZE] diff --git a/tests/integration/chunkstore/test_chunkstore.py b/tests/integration/chunkstore/test_chunkstore.py index 46fb8b5dcd243..214f74e5a51c0 100644 --- a/tests/integration/chunkstore/test_chunkstore.py +++ b/tests/integration/chunkstore/test_chunkstore.py @@ -8,6 +8,7 @@ import random import pytest import pymongo +import pickle from arctic.chunkstore.chunkstore import START, SYMBOL from arctic.chunkstore.passthrough_chunker import PassthroughChunker @@ -661,6 +662,7 @@ def test_get_info(chunkstore_lib): ) chunkstore_lib.write('test_df', df) info = {'len': 3, + 'appended_rows': 0, 'chunk_count': 3, 'metadata': {'columns': [u'date', u'id', u'data']}, 'chunker': u'date', @@ -688,6 +690,7 @@ def test_get_info_after_append(chunkstore_lib): assert_frame_equal(chunkstore_lib.read('test_df'), pd.concat([df, df2]).sort_index()) info = {'len': 6, + 'appended_rows': 2, 'chunk_count': 4, 'metadata': {'columns': [u'date', u'id', u'data']}, 'chunker': u'date', @@ -715,6 +718,7 @@ def test_get_info_after_update(chunkstore_lib): chunkstore_lib.update('test_df', df2) info = {'len': 4, + 'appended_rows': 0, 'chunk_count': 4, 'metadata': {'columns': [u'date', u'id', u'data']}, 'chunker': u'date', @@ -991,6 +995,10 @@ def test_rename(chunkstore_lib): with pytest.raises(Exception) as e: chunkstore_lib.rename('new_name', 'new_name') assert('already exists' in str(e)) + + with pytest.raises(NoDataFoundException) as e: + chunkstore_lib.rename('doesnt_exist', 'temp') + assert('No data found for doesnt_exist' in str(e)) assert('test' not in chunkstore_lib.list_symbols()) @@ -1225,12 +1233,12 @@ def test_stats(chunkstore_lib): def test_metadata(chunkstore_lib): - df = DataFrame(data={'data': np.random.randint(0, 100, size=2)}, + df = DataFrame(data={'data': np.random.randint(0, 100, size=2)}, index=pd.date_range('2016-01-01', '2016-01-02')) - df.index.name = 'date' - chunkstore_lib.write('data', df, metadata = 'some metadata') - m = chunkstore_lib.read_metadata('data') - assert(m == u'some metadata') + df.index.name = 'date' + chunkstore_lib.write('data', df, metadata = 'some metadata') + m = chunkstore_lib.read_metadata('data') + assert(m == u'some metadata') def test_metadata_update(chunkstore_lib): @@ -1281,3 +1289,47 @@ def test_write_metadata(chunkstore_lib): def test_write_metadata_nosymbol(chunkstore_lib): with pytest.raises(NoDataFoundException): chunkstore_lib.write_metadata('doesnt_exist', 'meta') + + +def test_audit(chunkstore_lib): + df = DataFrame(data={'data': np.random.randint(0, 100, size=2)}, + index=pd.date_range('2016-01-01', '2016-01-02')) + df.index.name = 'date' + chunkstore_lib.write('data', df, audit={'user': 'test_user'}) + df = DataFrame(data={'data': np.random.randint(0, 100, size=10)}, + index=pd.date_range('2016-01-01', '2016-01-10')) + df.index.name = 'date' + chunkstore_lib.write('data', df, audit={'user': 'other_user'}) + + assert(len(chunkstore_lib.read_audit_log()) == 2) + assert(len(chunkstore_lib.read_audit_log(symbol='data')) == 2) + assert(len(chunkstore_lib.read_audit_log(symbol='none')) == 0) + + chunkstore_lib.append('data', df, audit={'user': 'test_user'}) + assert(chunkstore_lib.read_audit_log()[-1]['appended_rows'] == 10) + + df = DataFrame(data={'data': np.random.randint(0, 100, size=5)}, + index=pd.date_range('2017-01-01', '2017-01-5')) + df.index.name = 'date' + chunkstore_lib.update('data', df, audit={'user': 'other_user'}) + assert(chunkstore_lib.read_audit_log()[-1]['new_chunks'] == 5) + + chunkstore_lib.rename('data', 'data_new', audit={'user': 'temp_user'}) + assert(chunkstore_lib.read_audit_log()[-1]['action'] == 'symbol rename') + + chunkstore_lib.delete('data_new', chunk_range=DateRange('2016-01-01', '2016-01-02'), audit={'user': 'test_user'}) + chunkstore_lib.delete('data_new', audit={'user': 'test_user'}) + assert(chunkstore_lib.read_audit_log()[-1]['action'] == 'symbol delete') + assert(chunkstore_lib.read_audit_log()[-2]['action'] == 'range delete') + + +def test_chunkstore_misc(chunkstore_lib): + + p = pickle.dumps(chunkstore_lib) + c = pickle.loads(p) + assert(chunkstore_lib._arctic_lib.get_name() == c._arctic_lib.get_name()) + + assert("arctic_test.TEST" in str(chunkstore_lib)) + assert(str(chunkstore_lib) == repr(chunkstore_lib)) + + \ No newline at end of file