Skip to content

Commit

Permalink
Merge pull request pandas-dev#18 from mstampfer/master
Browse files Browse the repository at this point in the history
Snapshot specific symbol versions
  • Loading branch information
jamesblackburn committed Jul 29, 2015
2 parents 38b9667 + 8b1640b commit f6d73b7
Show file tree
Hide file tree
Showing 3 changed files with 73 additions and 9 deletions.
19 changes: 11 additions & 8 deletions arctic/store/version_store.py
Original file line number Diff line number Diff line change
Expand Up @@ -666,9 +666,9 @@ def _write_audit(self, user, message, changed_version):
# Create the audit entry
mongo_retry(self._audit.insert_one)(audit)

def snapshot(self, snap_name, metadata=None, skip_symbols=None):
def snapshot(self, snap_name, metadata=None, skip_symbols=None, versions=None):
"""
Snapshot the current versions of symbols in the library. Can be used like:
Snapshot versions of symbols in the library. Can be used like:
Parameters
----------
Expand All @@ -678,6 +678,8 @@ def snapshot(self, snap_name, metadata=None, skip_symbols=None):
an optional dictionary of metadata to persist along with the symbol.
skip_symbols : `collections.Iterable`
optional symbols to be excluded from the snapshot
versions: `dict`
an optional dictionary of versions of the symbols to be snapshot
"""
# Ensure the user doesn't insert duplicates
snapshot = self._snapshots.find_one({'name': snap_name})
Expand All @@ -688,22 +690,23 @@ def snapshot(self, snap_name, metadata=None, skip_symbols=None):
snapshot = {'_id': bson.ObjectId()}
snapshot['name'] = snap_name
snapshot['metadata'] = metadata

skip_symbols = set() if skip_symbols is None else set(skip_symbols)

if skip_symbols is None:
skip_symbols = set()
else:
skip_symbols = set(skip_symbols)
if versions is None:
versions = {sym: None for sym in set(self.list_symbols()) - skip_symbols}

# Loop over, and snapshot all versions except those we've been asked to skip
for sym in set(self.list_symbols()) - skip_symbols:
for sym in versions:
try:
sym = self._read_metadata(sym, read_preference=ReadPreference.PRIMARY)
sym = self._read_metadata(sym, read_preference=ReadPreference.PRIMARY, as_of=versions[sym])
# Update the parents field of the version document
mongo_retry(self._versions.update_one)({'_id': sym['_id']},
{'$addToSet': {'parent': snapshot['_id']}})
except NoDataFoundException:
# Version has been deleted, not included in the snapshot
pass

mongo_retry(self._snapshots.insert_one)(snapshot)

def delete_snapshot(self, snap_name):
Expand Down
40 changes: 40 additions & 0 deletions tests/integration/store/test_version_store.py
Original file line number Diff line number Diff line change
Expand Up @@ -491,6 +491,46 @@ def test_snapshot(library):
assert versions[2]['snapshots'] == ['current']


def test_snapshot_with_versions(library):
""" Test snapshot of write versions consistency """
library.write(symbol, ts1)
library.write(symbol, ts2)

# ensure snapshot of previous version is taken
library.snapshot('previous', versions={symbol: 1})
versions = library.list_versions(symbol)
assert versions[0]['snapshots'] == []
assert versions[1]['snapshots'] == ['previous']
assert_frame_equal(library.read(symbol, as_of='previous').data, ts1)

# ensure new snapshots are ordered after previous ones
library.snapshot('new')
versions = library.list_versions(symbol)
assert versions[0]['snapshots'] == ['new']
assert versions[0]['version'] == 2
assert_frame_equal(library.read(symbol, as_of='new').data, ts2)

assert versions[1]['snapshots'] == ['previous']
assert versions[1]['version'] == 1
assert_frame_equal(library.read(symbol, as_of='previous').data, ts1)

# ensure snapshot of previous version doesn't overwrite current version
library.write(symbol, ts1, prune_previous_version=True)
library.snapshot('another', versions={symbol: 1})
versions = library.list_versions(symbol)

assert versions[0]['snapshots'] == []
assert versions[0]['version'] == 3
assert_frame_equal(library.read(symbol).data, ts1)

assert versions[1]['snapshots'] == ['new']
assert versions[1]['version'] == 2

assert versions[2]['snapshots'] == ['previous', 'another']
assert versions[2]['version'] == 1
assert_frame_equal(library.read(symbol, as_of='another').data, ts1)


def test_snapshot_exclusion(library):
library.write(symbol, ts1)
library.snapshot('current', skip_symbols=[symbol])
Expand Down
23 changes: 22 additions & 1 deletion tests/unit/store/test_version_store.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@
from arctic.store import version_store
from arctic.store.version_store import VersionStore, VersionedItem
from arctic.arctic import ArcticLibraryBinding, Arctic
from arctic.exceptions import ConcurrentModificationException
from arctic.exceptions import ConcurrentModificationException, DuplicateSnapshotException
from pymongo.errors import OperationFailure
from pymongo.collection import Collection

Expand Down Expand Up @@ -202,3 +202,24 @@ def test_read_reports_random_errors():
VersionStore.read(self, sentinel.symbol, sentinel.as_of, sentinel.from_version)
assert 'bad' in str(e)
assert le.call_count == 1


def test_snapshot():
vs = create_autospec(VersionStore, _snapshots=Mock(),
_collection=Mock(),
_versions=Mock())
vs._snapshots.find_one.return_value = False
vs._versions.update_one.__name__ = 'name'
vs._snapshots.insert_one.__name__ = 'name'
vs.list_symbols.return_value = ['foo', 'bar']
VersionStore.snapshot(vs, "symbol")
assert vs._read_metadata.call_args_list == [call('foo', as_of=None, read_preference=ReadPreference.PRIMARY),
call('bar', as_of=None, read_preference=ReadPreference.PRIMARY)]


def test_snapshot_duplicate_raises_exception():
vs = create_autospec(VersionStore, _snapshots=Mock())
with pytest.raises(DuplicateSnapshotException) as e:
vs._snapshots.find_one.return_value = True
VersionStore.snapshot(vs, 'symbol')
assert "Snapshot 'symbol' already exists" in str(e.value)

0 comments on commit f6d73b7

Please sign in to comment.