Skip to content

Commit

Permalink
Improve dump implementation
Browse files Browse the repository at this point in the history
  • Loading branch information
fpagnoux committed Aug 27, 2018
1 parent 8d61900 commit e1dbbb4
Show file tree
Hide file tree
Showing 7 changed files with 153 additions and 186 deletions.
29 changes: 14 additions & 15 deletions openfisca_core/data_storage.py
Original file line number Diff line number Diff line change
Expand Up @@ -167,21 +167,20 @@ def get_known_periods(self):

def restore(self):
self._files = files = {}
if os.path.isdir(self.storage_dir):
# Restore self._files from content of storage_dir.
for filename in os.listdir(self.storage_dir):
if not filename.endswith('.npy'):
continue
path = os.path.join(self.storage_dir, filename)
filename_core = filename.rsplit('.', 1)[0]
if '_' in filename_core:
period, extra_params_str = filename_core.split('_', 1)
period = periods.period(period)
extra_params = tuple(extra_params_str.split('_'))
files.setdefault(period, {})[extra_params] = path
else:
period = periods.period(filename_core)
files[period] = path
# Restore self._files from content of storage_dir.
for filename in os.listdir(self.storage_dir):
if not filename.endswith('.npy'):
continue
path = os.path.join(self.storage_dir, filename)
filename_core = filename.rsplit('.', 1)[0]
if '_' in filename_core:
period, extra_params_str = filename_core.split('_', 1)
period = periods.period(period)
extra_params = tuple(extra_params_str.split('_'))
files.setdefault(period, {})[extra_params] = path
else:
period = periods.period(filename_core)
files[period] = path

def __del__(self):
if self.preserve_storage_dir:
Expand Down
8 changes: 6 additions & 2 deletions openfisca_core/entities.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
import traceback
import warnings
import textwrap
import os
from os import linesep

import numpy as np
import dpath
Expand Down Expand Up @@ -171,7 +171,7 @@ def to_json(cls):
def check_variable_defined_for_entity(self, variable_name):
variable_entity = self.simulation.tax_benefit_system.get_variable(variable_name, check_existence = True).entity
if not isinstance(self, variable_entity):
message = os.linesep.join([
message = linesep.join([
"You tried to compute the variable '{0}' for the entity '{1}';".format(variable_name, self.plural),
"however the variable '{0}' is defined for '{1}'.".format(variable_name, variable_entity.plural),
"Learn more about entities in our documentation:",
Expand Down Expand Up @@ -463,6 +463,10 @@ def members_position(self):
def members_role(self, members_role):
self._members_role = members_role

@members_position.setter
def members_position(self, members_position):
self._members_position = members_position

@property
def roles_count(self):
warnings.warn(' '.join([
Expand Down
17 changes: 0 additions & 17 deletions openfisca_core/holders.py
Original file line number Diff line number Diff line change
Expand Up @@ -141,23 +141,6 @@ def get_known_periods(self):
return list(self._memory_storage.get_known_periods()) + list((
self._disk_storage.get_known_periods() if self._disk_storage else []))

def restore(self):
"""
Restore data to disk from previous dump.
Note: In practice, this method only clears data from memory, so that it will be read from disk later.
"""
memory_storage = self._memory_storage
if memory_storage is not None:
# Create disk storage when it doesn't exist to be able to restore
# holder from the dump of another simulation.
disk_storage = self._disk_storage
if disk_storage is None:
disk_storage = self.create_disk_storage()
disk_storage.restore()

memory_storage._arrays.clear()

def set_input(self, period, array):
"""
Set a variable's value (``array``) for a given period (``period``)
Expand Down
3 changes: 1 addition & 2 deletions openfisca_core/scenarios.py
Original file line number Diff line number Diff line change
Expand Up @@ -358,7 +358,7 @@ def json_to_instance(value, state = None):
value = value, state = state or conv.default_state)
return json_to_instance

def new_simulation(self, data_storage_dir = None, debug = False, use_baseline = False, trace = False, opt_out_cache = False):
def new_simulation(self, debug = False, use_baseline = False, trace = False, opt_out_cache = False):
assert isinstance(use_baseline, (bool, int)), \
'Parameter use_baseline must be a boolean. When True, the baseline tax-benefit system is used.'
tax_benefit_system = self.tax_benefit_system
Expand All @@ -369,7 +369,6 @@ def new_simulation(self, data_storage_dir = None, debug = False, use_baseline =
break
tax_benefit_system = baseline
simulation = simulations.Simulation(
data_storage_dir = data_storage_dir,
debug = debug,
period = self.period,
tax_benefit_system = tax_benefit_system,
Expand Down
27 changes: 1 addition & 26 deletions openfisca_core/simulations.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@


from __future__ import unicode_literals, print_function, division, absolute_import
import os
from os import linesep
import tempfile
import logging
Expand All @@ -14,7 +13,6 @@
from openfisca_core.commons import empty_clone, stringify_array, basestring_type, to_unicode
from openfisca_core.tracers import Tracer
from openfisca_core.indexed_enums import Enum, EnumArray
from openfisca_core.tools.simulation_dumper import dump_simulation, restore_simulation


log = logging.getLogger(__name__)
Expand Down Expand Up @@ -82,7 +80,7 @@ def __init__(
self.opt_out_cache = opt_out_cache

self.memory_config = memory_config
self._data_storage_dir = data_storage_dir
self._data_storage_dir = None
self.instantiate_entities(simulation_json)

def instantiate_entities(self, simulation_json):
Expand Down Expand Up @@ -422,12 +420,6 @@ def get_memory_usage(self, variables = None):

# ----- Misc ----- #

def dump(self, directory):
"""
Write all simulation data to a directory, so that it can be restored later.
"""
dump_simulation(self, directory)

def get_variable_entity(self, variable_name):

variable = self.tax_benefit_system.get_variable(variable_name, check_existence = True)
Expand Down Expand Up @@ -471,23 +463,6 @@ def clone(self, debug = False, trace = False):

return new

def restore(self):
variables_names = set()
# Restore existing variables.
for entity in self.entities.values():
for variable_name, holder in entity._holders.items():
holder.restore()
variables_names.add(variable_name)

# Restore other variables dumped by previous simulation.
storage_dir = self.data_storage_dir
if os.path.isdir(storage_dir):
for variable_name in os.listdir(storage_dir):
if '.' in variable_name or variable_name in variables_names:
continue
holder = self.get_holder(variable_name)
holder.restore()


def check_type(input, input_type, path = []):
json_type_map = {
Expand Down
105 changes: 105 additions & 0 deletions openfisca_core/tools/simulation_dumper.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,105 @@
# -*- coding: utf-8 -*-

from __future__ import unicode_literals, print_function, division, absolute_import

import os

import numpy as np

from openfisca_core.simulations import Simulation
from openfisca_core.data_storage import OnDiskStorage
from openfisca_core.periods import ETERNITY


def dump_simulation(simulation, directory):
"""
Write data to disk, so that it can be restored later.
"""
if not os.path.isdir(directory):
os.mkdir(directory)

entities_dump_dir = os.path.join(directory, "__entities__")
os.mkdir(entities_dump_dir)

for entity in simulation.entities.values():
# Dump entity structure
_dump_entity(entity, entities_dump_dir)

# Dump variable values
for holder in entity._holders.values():
_dump_holder(holder, directory)


def _dump_holder(holder, directory):
disk_storage = holder.create_disk_storage(directory, preserve = True)
for period in holder.get_known_periods():
value = holder.get_array(period)
disk_storage.put(value, period)


def _dump_entity(entity, directory):
path = os.path.join(directory, entity.key)
os.mkdir(path)
np.save(os.path.join(path, "id.npy"), entity.ids)

if entity.is_person:
return

np.save(os.path.join(path, "members_position.npy"), entity.members_position)
np.save(os.path.join(path, "members_entity_id.npy"), entity.members_entity_id)
np.save(os.path.join(path, "members_legacy_role.npy"), entity.members_legacy_role)
encoded_roles = np.select(
[entity.members_role == role for role in entity.flattened_roles],
[role.key for role in entity.flattened_roles],
)
np.save(os.path.join(path, "members_role.npy"), encoded_roles)


def _restore_entity(entity, directory):
path = os.path.join(directory, entity.key)

entity.ids = np.load(os.path.join(path, "id.npy"))
entity.count = len(entity.ids)

if entity.is_person:
return

entity.members_position = np.load(os.path.join(path, "members_position.npy"))
entity.members_entity_id = np.load(os.path.join(path, "members_entity_id.npy"))
entity.members_legacy_role = np.load(os.path.join(path, "members_legacy_role.npy"))
encoded_roles = np.load(os.path.join(path, "members_role.npy"))
entity.members_role = np.select(
[encoded_roles == role.key for role in entity.flattened_roles],
[role for role in entity.flattened_roles],
)


def restore_simulation(directory, tax_benefit_system, **kwargs):
simulation = Simulation(tax_benefit_system, **kwargs)

entities_dump_dir = os.path.join(directory, "__entities__")
for entity in simulation.entities.values():
_restore_entity(entity, entities_dump_dir)

variables_to_restore = (variable for variable in os.listdir(directory) if variable != "__entities__")
for variable in variables_to_restore:
_restore_holder(simulation, variable, directory)

return simulation


def _restore_holder(simulation, variable, directory):
storage_dir = os.path.join(directory, variable)
is_variable_eternal = simulation.tax_benefit_system.get_variable(variable).definition_period == ETERNITY
disk_storage = OnDiskStorage(
storage_dir,
is_eternal = is_variable_eternal,
preserve_storage_dir = True
)
disk_storage.restore()

holder = simulation.get_holder(variable)

for period in disk_storage.get_known_periods():
value = disk_storage.get(period)
holder.put_in_cache(value, period)
Loading

0 comments on commit e1dbbb4

Please sign in to comment.