diff --git a/activity_browser/app/bwutils/manager.py b/activity_browser/app/bwutils/manager.py new file mode 100644 index 000000000..3d7d3551a --- /dev/null +++ b/activity_browser/app/bwutils/manager.py @@ -0,0 +1,228 @@ +# -*- coding: utf-8 -*- +from abc import abstractmethod +from collections.abc import Iterator +import itertools +from typing import Iterable, List, Optional, Tuple + +from asteval import Interpreter +from bw2data.backends.peewee import ExchangeDataset +from bw2data.parameters import ( + ProjectParameter, DatabaseParameter, ActivityParameter, + ParameterizedExchange, get_new_symbols +) +from bw2parameters import ParameterSet +from bw2parameters.errors import MissingName +import numpy as np +from stats_arrays import MCRandomNumberGenerator, UncertaintyBase + +from .utils import Index, Parameters, Indices, StaticParameters + + +class ParameterManager(object): + """A manager for Brightway2 parameters, allowing for formula evaluation + without touching the database. + """ + def __init__(self): + self.parameters: Parameters = Parameters.from_bw_parameters() + self.initial: StaticParameters = StaticParameters() + self.indices: Indices = self.construct_indices() + + def construct_indices(self) -> Indices: + """Given that ParameterizedExchanges will always have the same order of + indices, construct them once and reuse when needed. + """ + indices = Indices() + for p in self.initial.act_by_group_db: + params = self.initial.exc_by_group(p.group) + indices.extend( + Index.build_from_exchange(ExchangeDataset.get_by_id(pk)) + for pk in params + ) + return indices + + def recalculate_project_parameters(self) -> dict: + data = self.initial.project() + if not data: + return {} + + new_values = self.parameters.data_by_group("project") + + for name, amount in new_values.items(): + data[name]["amount"] = amount + + ParameterSet(data).evaluate_and_set_amount_field() + return StaticParameters.prune_result_data(data) + + def recalculate_database_parameters(self, database: str, global_params: dict = None) -> dict: + data = self.initial.by_database(database) + if not data: + return {} + + glo = global_params or {} + new_values = self.parameters.data_by_group(database) + for name, amount in new_values.items(): + data[name]["amount"] = amount + + new_symbols = get_new_symbols(data.values(), set(data)) + missing = new_symbols.difference(glo) + if missing: + raise MissingName("The following variables aren't defined:\n{}".format("|".join(missing))) + + glo = Parameters.static(glo, needed=new_symbols) if new_symbols else None + + ParameterSet(data, glo).evaluate_and_set_amount_field() + return StaticParameters.prune_result_data(data) + + def process_database_parameters(self, global_params: dict = None) -> dict: + glo = global_params or self.recalculate_project_parameters() + all_db = {} + for database in self.initial.databases: + db = self.recalculate_database_parameters(database, glo) + all_db[database] = {x: y for x, y in db.items()} if db else {} + return all_db + + def recalculate_activity_parameters(self, group: str, global_params: dict = None) -> dict: + data = self.initial.act_by_group(group) + if not data: + return {} + + new_values = self.parameters.data_by_group(group) + glo = global_params or {} + for name, amount in new_values.items(): + data[name]["amount"] = amount + + new_symbols = get_new_symbols(data.values(), set(data)) + missing = new_symbols.difference(global_params) + if missing: + raise MissingName("The following variables aren't defined:\n{}".format("|".join(missing))) + + glo = Parameters.static(glo, needed=new_symbols) if new_symbols else None + + ParameterSet(data, glo).evaluate_and_set_amount_field() + return StaticParameters.prune_result_data(data) + + def recalculate_exchanges(self, group: str, global_params: dict = None) -> Iterable[Tuple[int, float]]: + """ Constructs a list of exc.id/amount tuples for the + ParameterizedExchanges in the given group. + """ + params = self.initial.exc_by_group(group) + if not params: + return [] + + glo = global_params or {} + interpreter = Interpreter() + interpreter.symtable.update(glo) + return [(k, interpreter(v)) for k, v in params.items()] + + def process_exchanges(self, global_params: dict = None, db_params: dict = None, + build_indices: bool = True) -> np.ndarray: + dbs = db_params or {} + complete_data = np.zeros(len(self.indices)) + + offset = 0 + for p in self.initial.act_by_group_db: + combination = {x: y for x, y in global_params.items()} if global_params else {} + combination.update(dbs.get(p.database, {})) + combination.update(self.recalculate_activity_parameters(p.group, combination)) + + recalculated = self.recalculate_exchanges(p.group, global_params=combination) + # If the parameter group contains no ParameterizedExchanges, skip. + if not recalculated: + continue + # `data` contains the recalculated amounts for the exchanges. + _, data = zip(*recalculated) + complete_data[offset:len(data) + offset] = data + offset += len(data) + + return complete_data + + def calculate(self) -> np.ndarray: + """ Convenience function that takes calculates the current parameters + and returns a fully-formed set of exchange amounts and indices. + + All parameter types are recalculated in turn before interpreting the + ParameterizedExchange formulas into amounts. + """ + global_project = self.recalculate_project_parameters() + all_db = self.process_database_parameters(global_project) + data = self.process_exchanges(global_project, all_db) + return data + + @abstractmethod + def recalculate(self, values: List[float]) -> np.ndarray: + """ Convenience function that takes the given new values and recalculates. + Returning a fully-formed set of exchange amounts and indices. + + All parameter types are recalculated in turn before interpreting the + ParameterizedExchange formulas into amounts. + """ + self.parameters.update(values) + return self.calculate() + + @staticmethod + def has_parameterized_exchanges() -> bool: + """ Test if ParameterizedExchanges exist, no point to using this manager + otherwise. + """ + return ParameterizedExchange.select().exists() + + +class MonteCarloParameterManager(ParameterManager, Iterator): + """Use to sample the uncertainty of parameter values, mostly for use in + Monte Carlo calculations. + + Each iteration will sample the parameter uncertainty, after which + all parameters and parameterized exchanges are recalculated. These + recalculated values are then returned as a simplified `params` array, + which is similar to the `tech_params` and `bio_params` arrays in the + LCA classes. + + Makes use of the `MCRandomNumberGenerator` to sample from all of the + distributions in the same way. + + """ + + def __init__(self, seed: Optional[int] = None): + super().__init__() + parameters = itertools.chain( + ProjectParameter.select(), DatabaseParameter.select(), + ActivityParameter.select() + ) + self.uncertainties = UncertaintyBase.from_dicts( + *[getattr(p, "data", {}) for p in parameters] + ) + self.mc_generator = MCRandomNumberGenerator(self.uncertainties, seed=seed) + + def __iter__(self): + return self + + def __next__(self): + return self.next() + + def recalculate(self, iterations: int = 10) -> np.ndarray: + assert iterations > 0, "Must have a positive amount of iterations" + if iterations == 1: + return self.next() + # Construct indices, prepare sized array and sample parameter + # uncertainty distributions `interations` times. + all_data = np.empty((iterations, len(self.indices)), dtype=Indices.array_dtype) + random_bounded_values = self.mc_generator.generate(iterations) + + # Now, repeatedly replace parameter amounts with sampled data and + # recalculate. Every processed row is added to the sized array. + for i in range(iterations): + values = random_bounded_values.take(i, axis=1) + self.parameters.update(values) + data = self.calculate() + all_data[i] = self.indices.mock_params(data) + + return all_data + + def next(self) -> np.ndarray: + """Similar to `recalculate` but only performs a single sampling and + recalculation. + """ + values = self.mc_generator.next() + self.parameters.update(values) + data = self.calculate() + return self.indices.mock_params(data) diff --git a/activity_browser/app/bwutils/montecarlo.py b/activity_browser/app/bwutils/montecarlo.py index b6e1303e9..a7a1f008d 100644 --- a/activity_browser/app/bwutils/montecarlo.py +++ b/activity_browser/app/bwutils/montecarlo.py @@ -1,24 +1,31 @@ # -*- coding: utf-8 -*- +from time import time + import brightway2 as bw -from stats_arrays.random import MCRandomNumberGenerator from bw2calc.utils import get_seed import numpy as np import pandas as pd -from time import time +from stats_arrays import MCRandomNumberGenerator + +from .manager import MonteCarloParameterManager class CSMonteCarloLCA(object): """A Monte Carlo LCA for multiple functional units and methods loaded from a calculation setup.""" - def __init__(self, cs_name, seed=None): - try: - cs = bw.calculation_setups[cs_name] - self.cs_name = cs_name - except KeyError: + def __init__(self, cs_name): + if cs_name not in bw.calculation_setups: raise ValueError( "{} is not a known `calculation_setup`.".format(cs_name) ) - # self.seed = seed or get_seed() + self.cs_name = cs_name + cs = bw.calculation_setups[cs_name] + self.seed = None + self.cf_rngs = {} + self.CF_rng_vectors = {} + self.include_parameters = False + self.param_rng = None + self.param_cols = ["input", "output", "type"] # functional units self.func_units = cs['inv'] @@ -50,38 +57,53 @@ def __init__(self, cs_name, seed=None): self.lca = bw.LCA(demand=self.func_units_dict, method=self.methods[0]) - def load_data(self): self.lca.load_lci_data() self.lca.tech_rng = MCRandomNumberGenerator(self.lca.tech_params, seed=self.seed) self.lca.bio_rng = MCRandomNumberGenerator(self.lca.bio_params, seed=self.seed) if self.lca.lcia: - self.cf_rngs = dict() # we need as many cf_rng as impact categories, because they are of different size - for method in self.methods: - self.lca.switch_method(method) + self.cf_rngs = {} # we need as many cf_rng as impact categories, because they are of different size + for m in self.methods: + self.lca.switch_method(m) self.lca.load_lcia_data() - self.cf_rngs[method] = MCRandomNumberGenerator(self.lca.cf_params, seed=self.seed) + self.cf_rngs[m] = MCRandomNumberGenerator(self.lca.cf_params, seed=self.seed) + # Construct the MC parameter manager + if self.include_parameters: + self.param_rng = MonteCarloParameterManager(seed=self.seed) - def calculate(self, iterations=10, seed=None): + def calculate(self, iterations=10, seed: int = None, parameters: bool = False): start = time() self.seed = seed or get_seed() + self.include_parameters = parameters self.load_data() self.results = np.zeros((iterations, len(self.func_units), len(self.methods))) for iteration in range(iterations): if not hasattr(self.lca, "tech_rng"): self.load_data() - self.lca.rebuild_technosphere_matrix(self.lca.tech_rng.next()) - self.lca.rebuild_biosphere_matrix(self.lca.bio_rng.next()) + + tech_vector = self.lca.tech_rng.next() + bio_vector = self.lca.bio_rng.next() + if self.include_parameters: + param_exchanges = self.param_rng.next() + # combination of 'input', 'output', 'type' columns is unique + # For each recalculated exchange, match it to either matrix and + # override the value within that matrix. + for p in param_exchanges: + tech_vector[self.lca.tech_params[self.param_cols] == p[self.param_cols]] = p["amount"] + bio_vector[self.lca.bio_params[self.param_cols] == p[self.param_cols]] = p["amount"] + + self.lca.rebuild_technosphere_matrix(tech_vector) + self.lca.rebuild_biosphere_matrix(bio_vector) if not hasattr(self.lca, "demand_array"): self.lca.build_demand_array() self.lca.lci_calculation() # pre-calculating CF vectors enables the use of the SAME CF vector for each FU in a given run - self.CF_rngs = dict() - for method in self.methods: - self.CF_rngs[method] = self.cf_rngs[method].next() + cf_vectors = {} + for m in self.methods: + cf_vectors[m] = self.cf_rngs[m].next() # lca_scores = np.zeros((len(self.func_units), len(self.methods))) @@ -90,9 +112,9 @@ def calculate(self, iterations=10, seed=None): self.lca.redo_lci(func_unit) # lca calculation # iterate over methods - for col, method in self.rev_method_index.items(): - self.lca.switch_method(method) - self.lca.rebuild_characterization_matrix(self.CF_rngs[method]) + for col, m in self.rev_method_index.items(): + self.lca.switch_method(m) + self.lca.rebuild_characterization_matrix(cf_vectors[m]) self.lca.lcia_calculation() # lca_scores[row, col] = self.lca.score self.results[iteration, row, col] = self.lca.score @@ -104,7 +126,7 @@ def calculate(self, iterations=10, seed=None): # self.results[(method, func_unit)] = lca_scores @property - def func_units_dict(self): + def func_units_dict(self) -> dict: """Return a dictionary of functional units (key, demand).""" return {key: 1 for func_unit in self.func_units for key in func_unit} @@ -113,7 +135,8 @@ def get_results_by(self, act_key=None, method=None): - if a method is provided, results will be given for all functional units and runs - if a functional unit is provided, results will be given for all methods and runs - if a functional unit and method is provided, results will be given for all runs of that combination - - if nothing is given, all results are returned""" + - if nothing is given, all results are returned + """ if act_key: act_index = self.activity_index.get(act_key) print('Activity key provided:', act_key, act_index) @@ -132,12 +155,12 @@ def get_results_by(self, act_key=None, method=None): return np.squeeze(self.results[:, act_index, method_index]) def get_results_dataframe(self, act_key=None, method=None, labelled=True): - """ -Return a Pandas DataFrame with results for all runs either for -- all functional units and a selected method or -- all methods and a selected functional unit. + """Return a Pandas DataFrame with results for all runs either for + - all functional units and a selected method or + - all methods and a selected functional unit. -If labelled=True, then the activity keys are converted to a human readable format. + If labelled=True, then the activity keys are converted to a human + readable format. """ if act_key and method or not act_key and not method: raise ValueError('Must provide activity key or method, but not both.') @@ -156,13 +179,16 @@ def get_results_dataframe(self, act_key=None, method=None, labelled=True): return df - def get_labels(self, key_list, fields=['name', 'reference product', 'location', 'database'], - separator=' | ', max_length=False): - keys = [k for k in key_list] # need to do this as the keys come from a pd.Multiindex - translated_keys = [] - for k in keys: - act = bw.get_activity(k).as_dict() - translated_keys.append(separator.join([act.get(field, '') for field in fields])) + @staticmethod + def get_labels(key_list, fields: list = None, separator=' | ', + max_length: int = None) -> list: + fields = fields or ['name', 'reference product', 'location', 'database'] + # need to do this as the keys come from a pd.Multiindex + acts = (bw.get_activity(key).as_dict() for key in (k for k in key_list)) + translated_keys = [ + separator.join([act.get(field, '') for field in fields]) + for act in acts + ] # if max_length: # translated_keys = [wrap_text(k, max_length=max_length) for k in translated_keys] return translated_keys diff --git a/activity_browser/app/bwutils/presamples/__init__.py b/activity_browser/app/bwutils/presamples/__init__.py index b3c698807..b55c6f8d6 100644 --- a/activity_browser/app/bwutils/presamples/__init__.py +++ b/activity_browser/app/bwutils/presamples/__init__.py @@ -1,5 +1,5 @@ # -*- coding: utf-8 -*- -from .manager import PresamplesParameterManager, process_brightway_parameters +from .manager import PresamplesParameterManager from .presamples_mlca import PresamplesContributions, PresamplesMLCA from .utils import ( count_presample_packages, find_all_package_names, get_package_path, diff --git a/activity_browser/app/bwutils/presamples/manager.py b/activity_browser/app/bwutils/presamples/manager.py index eae9912f3..3487872c5 100644 --- a/activity_browser/app/bwutils/presamples/manager.py +++ b/activity_browser/app/bwutils/presamples/manager.py @@ -1,226 +1,37 @@ # -*- coding: utf-8 -*- -import itertools from typing import Iterable, List, Optional, Tuple -from asteval import Interpreter -import brightway2 as bw -from bw2data.backends.peewee import ExchangeDataset -from bw2data.parameters import (ActivityParameter, DatabaseParameter, - ParameterizedExchange, ProjectParameter, - get_new_symbols) -from bw2parameters import ParameterSet -from bw2parameters.errors import MissingName import numpy as np from peewee import IntegrityError import presamples as ps +from ..manager import ParameterManager -class PresamplesParameterManager(object): - """ Used to recalculate brightway parameters without editing the database - - The `param_values` property and `get_altered_values` method are used to - retrieve either the whole list of prepared parameter values or a - subset of it selected by group. - - The `recalculate_*` methods are used to perform the actual calculations - and will read out the relevant parameters from the database. Each of the - methods will optionally return a dictionary of the parameter names - with their recalculated amounts. +class PresamplesParameterManager(ParameterManager): + """ Used to recalculate brightway parameters without editing the database """ - __slots__ = 'parameter_values' - - def __init__(self): - self.parameter_values: List[tuple] = [] - - @property - def param_values(self) -> Iterable[tuple]: - return self.parameter_values - - @param_values.setter - def param_values(self, values: Iterable[tuple]) -> None: - if isinstance(values, list): - self.parameter_values = values - else: - self.parameter_values = list(values) - - def get_altered_values(self, group: str) -> dict: - """ Parses the `param_values` to extract the relevant subset of - changed parameters. - """ - return {k: v for k, g, v in self.param_values if g == group} - - @classmethod - def construct(cls, scenario_values: Iterable[float] = None) -> 'PresamplesParameterManager': - """ Construct an instance of itself and populate it with either the - default parameter values or altered values. - - If altered values are given, demands that the amount of values - is equal to the amount of parameters. - """ - param_list = list(process_brightway_parameters()) - - ppm = cls() - if scenario_values: - scenario = list(scenario_values) - assert len(param_list) == len(scenario) - ppm.param_values = replace_amounts(param_list, scenario) - else: - ppm.param_values = param_list - return ppm - - @staticmethod - def _static(data: dict, needed: set) -> dict: - """ Similar to the `static` method for each Parameter class where the - ``needed`` variable is a set of the keys actually needed from ``data``. - """ - return {k: data[k] for k in data.keys() & needed} - - @staticmethod - def _prune_result_data(data: dict) -> dict: - """ Takes a str->dict dictionary and extracts the amount field from - the dictionary. - """ - return {k: v.get("amount") for k, v in data.items()} - - def recalculate_project_parameters(self) -> Optional[dict]: - new_values = self.get_altered_values("project") - - data = ProjectParameter.load() - if not data: - return - - for name, amount in new_values.items(): - data[name]["amount"] = amount - - ParameterSet(data).evaluate_and_set_amount_field() - return self._prune_result_data(data) - - def recalculate_database_parameters(self, database: str, global_params: dict = None) -> Optional[dict]: - new_values = self.get_altered_values(database) - if global_params is None: - global_params = {} - - data = DatabaseParameter.load(database) - if not data: - return - - for name, amount in new_values.items(): - data[name]["amount"] = amount - - new_symbols = get_new_symbols(data.values(), set(data)) - missing = new_symbols.difference(global_params) - if missing: - raise MissingName("The following variables aren't defined:\n{}".format("|".join(missing))) - - glo = self._static(global_params, needed=new_symbols) if new_symbols else None - - ParameterSet(data, glo).evaluate_and_set_amount_field() - return self._prune_result_data(data) - - def recalculate_activity_parameters(self, group: str, global_params: dict = None) -> Optional[dict]: - new_values = self.get_altered_values(group) - if global_params is None: - global_params = {} - - data = ActivityParameter.load(group) - if not data: - return - - for name, amount in new_values.items(): - data[name]["amount"] = amount - - new_symbols = get_new_symbols(data.values(), set(data)) - missing = new_symbols.difference(global_params) - if missing: - raise MissingName("The following variables aren't defined:\n{}".format("|".join(missing))) - - glo = self._static(global_params, needed=new_symbols) if new_symbols else None - - ParameterSet(data, glo).evaluate_and_set_amount_field() - return self._prune_result_data(data) - - @staticmethod - def recalculate_exchanges(group: str, global_params: dict = None) -> List[Tuple[int, float]]: - """ Constructs a list of exc.id/amount tuples for the - ParameterizedExchanges in the given group. - """ - if global_params is None: - global_params = {} - - params = (ParameterizedExchange.select() - .where(ParameterizedExchange.group == group)) - - if not params.exists(): - return [] - - interpreter = Interpreter() - interpreter.symtable.update(global_params) - return [(p.exchange, interpreter(p.formula)) for p in params] - - def recalculate_scenario(self, scenario_values: Iterable[float]) -> (np.ndarray, np.ndarray): - """ Convenience function that takes new parameter values and returns - a fully-formed set of exchange amounts and indices. - - All parameter types are recalculated in turn before interpreting the - ParameterizedExchange formulas into amounts. - """ - self.param_values = replace_amounts(self.param_values, scenario_values) - global_project = self.recalculate_project_parameters() - all_db = {} - for p in DatabaseParameter.select(DatabaseParameter.database).distinct(): - db = self.recalculate_database_parameters(p.database, global_project) - all_db[p.database] = {x: y for x, y in db.items()} if db else {} - - complete_data = [] - complete_indices = [] - - for p in ActivityParameter.select(ActivityParameter.group, ActivityParameter.database).distinct(): - combination = {x: y for x, y in global_project.items()} if global_project else {} - combination.update(all_db.get(p.database, {})) - act = self.recalculate_activity_parameters(p.group, combination) - combination.update(act) - - recalculated = self.recalculate_exchanges(p.group, global_params=combination) - # If the parameter group contains no ParameterizedExchanges, skip. - if not recalculated: - continue - # `data` contains the recalculated amounts for the exchanges. - ids, data = zip(*recalculated) - indices = [] - for pk in ids: - exc = ExchangeDataset.get_by_id(pk) - input_key = (exc.input_database, exc.input_code) - output_key = (exc.output_database, exc.output_code) - if exc.input_database == bw.config.biosphere: - indices.append((input_key, output_key, "biosphere")) - else: - indices.append((input_key, output_key, "technosphere")) - complete_data.extend(data) - complete_indices.extend(indices) - - # After recalculating all the exchanges and adding all samples and indices - # to lists, format them according to presamples requirements: - # eg: samples as a column of floats and indices as a row of tuples. - samples = np.array(complete_data) - samples = samples.reshape(1, -1).T - indices = np.array(complete_indices) - return samples, indices - - @staticmethod - def can_build_presamples() -> bool: - """ Test if ParameterizedExchanges exist, no point to building presamples - otherwise - """ - return ParameterizedExchange.select().exists() + def recalculate(self, values: List[float]) -> np.ndarray: + data = super().recalculate(values) + # After recalculating all the exchanges format them according to + # presamples requirements: samples as a column of floats. + samples = data.reshape(1, -1).T + return samples + + def reformat_indices(self) -> np.ndarray: + """Additional information is required for storing the indices as presamples.""" + result = np.zeros(len(self.indices), dtype=object) + for i, idx in enumerate(self.indices): + result[i] = (idx.input, idx.output, idx.input.database_type) + return result def presamples_from_scenarios(self, name: str, scenarios: Iterable[Tuple[str, Iterable]]) -> (str, str): """ When given a iterable of multiple parameter scenarios, construct a presamples package with all of the recalculated exchange amounts. """ - sample_data, indice_data = zip(*(self.recalculate_scenario(values) for _, values in scenarios)) + sample_data = [self.recalculate(list(values)) for _, values in scenarios] samples = np.concatenate(sample_data, axis=1) - indices = next(iter(indice_data)) + indices = self.reformat_indices() arrays = ps.split_inventory_presamples(samples, indices) ps_id, ps_path = ps.create_presamples_package( @@ -247,23 +58,3 @@ def store_presamples_as_resource(name: str, path: str, description: str = None) resource = ps.PresampleResource.get(name=name) finally: return resource - - -def process_brightway_parameters() -> Iterable[tuple]: - """ Converts brightway parameters of all types into a simple structure - in order of possible dependency. - """ - return itertools.chain( - ((p.name, "project", p.amount) for p in ProjectParameter.select()), - ((p.name, p.database, p.amount) for p in DatabaseParameter.select()), - ((p.name, p.group, p.amount) for p in ActivityParameter.select()) - ) - - -def replace_amounts(parameters: Iterable[tuple], amounts: Iterable[float]) -> Iterable[tuple]: - """ Specifically does not check for the length of both values to - allow the use of generators. - """ - return ( - (n, g, amount) for ((n, g, _), amount) in zip(parameters, amounts) - ) diff --git a/activity_browser/app/bwutils/utils.py b/activity_browser/app/bwutils/utils.py new file mode 100644 index 000000000..5a8a31625 --- /dev/null +++ b/activity_browser/app/bwutils/utils.py @@ -0,0 +1,197 @@ +# -*- coding: utf-8 -*- +from collections import UserList +from itertools import chain +from typing import Iterable, List, NamedTuple, Optional + +from bw2data import config +from bw2data.backends.peewee import ActivityDataset, ExchangeDataset +from bw2data.parameters import ( + ProjectParameter, DatabaseParameter, ActivityParameter, + ParameterizedExchange, +) +from bw2data.utils import TYPE_DICTIONARY +import numpy as np + + +""" +This script is a collection of simple NamedTuple classes as well as Iterators +or UserLists specifically built to hold these objects. + +While not strictly required to run all of the brightway code, these classes do allow +for a significant amount of repeated logic or heavy IO calls to be avoided by either +holding values in memory or allowing simple shortcuts to retrieve them. +""" + + +class Parameter(NamedTuple): + name: str + group: str + amount: float = 1.0 + param_type: Optional[str] = None + + +class Key(NamedTuple): + database: str + code: str + + @property + def database_type(self) -> str: + return "biosphere" if self.database == config.biosphere else "technosphere" + + +class Index(NamedTuple): + input: Key + output: Key + + @classmethod + def build_from_exchange(cls, exc: ExchangeDataset) -> 'Index': + return cls( + input=Key(exc.input_database, exc.input_code), + output=Key(exc.output_database, exc.output_code) + ) + + @property + def input_document_id(self) -> int: + return ActivityDataset.get( + ActivityDataset.code == self.input.code, + ActivityDataset.database == self.input.database + ).id + + @property + def output_document_id(self) -> int: + return ActivityDataset.get( + ActivityDataset.code == self.output.code, + ActivityDataset.database == self.output.database + ).id + + @property + def exchange_type(self) -> int: + exc_type = ExchangeDataset.get( + ExchangeDataset.input_code == self.input.code, + ExchangeDataset.input_database == self.input.database, + ExchangeDataset.output_code == self.output.code, + ExchangeDataset.output_database == self.output.database).type + return TYPE_DICTIONARY.get(exc_type, -1) + + @property + def ids_exc_type(self) -> (int, int, int): + return self.input_document_id, self.output_document_id, self.exchange_type + + +class Parameters(UserList): + data: List[Parameter] + + @classmethod + def from_bw_parameters(cls) -> 'Parameters': + """Construct a Parameters list from brightway2 parameters.""" + return cls(chain( + (Parameter(p.name, "project", p.amount, "project") + for p in ProjectParameter.select()), + (Parameter(p.name, p.database, p.amount, "database") + for p in DatabaseParameter.select()), + (Parameter(p.name, p.group, p.amount, "activity") + for p in ActivityParameter.select()), + )) + + def by_group(self, group: str) -> Iterable[Parameter]: + return (p for p in self.data if p.group == group) + + def data_by_group(self, group: str) -> dict: + """Parses the `data` to extract the relevant subset of parameters.""" + return {p.name: p.amount for p in self.data if p.group == group} + + @staticmethod + def static(data: dict, needed: set) -> dict: + """Similar to the `static` method for each Parameter class where the + ``needed`` variable is a set of the keys actually needed from ``data``. + """ + return {k: data[k] for k in data.keys() & needed} + + def update(self, values: Iterable[float]) -> None: + """Replace parameters in the list if their linked value is not + NaN. + """ + assert len(values) == len(self.data) + for i, (p, v) in enumerate(zip(self.data, values)): + if not np.isnan(v): + self.data[i] = p._replace(amount=v) + + +class Indices(UserList): + data: List[Index] + + array_dtype = [ + ('input', ' np.ndarray: + """Using the given values, construct a numpy array that can be used + to match against the `tech_params` and `bio_params` arrays of the + brightway LCA classes. + """ + assert len(self.data) == len(values) + data = np.zeros(len(self.data), dtype=self.array_dtype) + for i, d in enumerate(self.data): + data[i] = (*d.ids_exc_type, values[i]) + return data + + +class StaticParameters(object): + """Contains the initial values for all the parameters in the project. + + This object should be initialized once, after which the methods can be + used to read out parameter information as it was stored in the database + originally. This avoids a lot of database calls in repeated recalculations. + """ + + def __init__(self): + self._project_params = ProjectParameter.load() + self._db_params = { + p.database: DatabaseParameter.load(p.database) + for p in DatabaseParameter.select(DatabaseParameter.database).distinct() + } + self._act_params = { + p.group: ActivityParameter.load(p.group) + for p in ActivityParameter.select(ActivityParameter.group).distinct() + } + self._distinct_act_params = [ + p for p in (ActivityParameter + .select(ActivityParameter.group, ActivityParameter.database) + .distinct()) + ] + self._exc_params = [p for p in ParameterizedExchange.select()] + + def project(self) -> dict: + """Mirrors `ProjectParameter.load()`.""" + return {k: v for k, v in self._project_params.items()} + + @property + def databases(self) -> set: + return set(self._db_params) + + def by_database(self, database: str) -> dict: + """Mirrors `DatabaseParameter.load(database)`.""" + return {k: v for k, v in self._db_params.get(database, {}).items()} + + @property + def groups(self) -> set: + groups = set(self._act_params) + return groups.union(p.group for p in self._exc_params) + + def act_by_group(self, group: str) -> dict: + """Mirrors `ActivityParameter.load(group)`""" + return {k: v for k, v in self._act_params.get(group, {}).items()} + + @property + def act_by_group_db(self) -> list: + return self._distinct_act_params + + def exc_by_group(self, group: str) -> dict: + """Mirrors `ParameterizedExchange.load(group)`""" + return { + p.exchange: p.formula for p in self._exc_params if p.group == group + } + + @staticmethod + def prune_result_data(data: dict) -> dict: + return {k: v.get("amount") for k, v in data.items()} diff --git a/activity_browser/app/ui/tables/scenarios.py b/activity_browser/app/ui/tables/scenarios.py index 29a20802f..2a084e62c 100644 --- a/activity_browser/app/ui/tables/scenarios.py +++ b/activity_browser/app/ui/tables/scenarios.py @@ -6,6 +6,7 @@ from PySide2.QtCore import Slot from PySide2.QtWidgets import QComboBox +from ...bwutils.utils import Parameters from ...bwutils import presamples as ps_utils from ...signals import signals from .views import ABDataFrameSimpleCopy, dataframe_sync @@ -70,7 +71,7 @@ def sync(self, df: pd.DataFrame = None) -> None: """ Construct the dataframe from the existing parameters, if ``df`` is given, perform a merge to possibly include additional columns. """ - data = ps_utils.process_brightway_parameters() + data = [p[:3] for p in Parameters.from_bw_parameters()] self.dataframe = pd.DataFrame(data, columns=self.HEADERS) if df is not None: required = set(self.MATCH_COLS) diff --git a/activity_browser/app/ui/tabs/LCA_results_tabs.py b/activity_browser/app/ui/tabs/LCA_results_tabs.py index 5e55d8828..b72847ba6 100644 --- a/activity_browser/app/ui/tabs/LCA_results_tabs.py +++ b/activity_browser/app/ui/tabs/LCA_results_tabs.py @@ -923,6 +923,8 @@ def __init__(self, parent=None): self.layout.addLayout(get_header_layout('Monte Carlo Simulation')) self.scenario_label = QLabel("Scenario:") + self.include_parameters = QCheckBox(self) + self.include_parameters.setChecked(False) self.add_MC_ui_elements() @@ -938,7 +940,7 @@ def __init__(self, parent=None): self.connect_signals() def connect_signals(self): - self.button_run.clicked.connect(self.calculate_MC_LCA) + self.button_run.clicked.connect(self.calculate_mc_lca) # signals.monte_carlo_ready.connect(self.update_mc) # self.combobox_fu.currentIndexChanged.connect(self.update_plot) self.combobox_methods.currentIndexChanged.connect( @@ -981,6 +983,8 @@ def add_MC_ui_elements(self): self.hlayout_run.addWidget(self.iterations) self.hlayout_run.addWidget(self.label_seed) self.hlayout_run.addWidget(self.seed) + self.hlayout_run.addWidget(QLabel("Explore parameter uncertainty:")) + self.hlayout_run.addWidget(self.include_parameters) self.hlayout_run.addStretch(1) self.layout_mc.addLayout(self.hlayout_run) @@ -1036,8 +1040,14 @@ def build_export(self, has_table: bool = True, has_plot: bool = True) -> QWidget export_widget.hide() return export_widget - def calculate_MC_LCA(self): + @QtCore.Slot(name="calculateMcLca") + def calculate_mc_lca(self): + self.method_selection_widget.hide() + self.plot.hide() + self.export_widget.hide() + iterations = int(self.iterations.text()) + seed = None if self.seed.text(): print('SEED: ', self.seed.text()) try: @@ -1046,14 +1056,11 @@ def calculate_MC_LCA(self): traceback.print_exc() QMessageBox.warning(self, 'Warning', 'Seed value must be an integer number or left empty.') self.seed.setText('') - else: - seed = None - self.method_selection_widget.hide() - self.plot.hide() - self.export_widget.hide() + return + include_params = self.include_parameters.isChecked() try: - self.parent.mc.calculate(iterations=iterations, seed=seed) + self.parent.mc.calculate(iterations=iterations, seed=seed, parameters=include_params) self.update_mc() except InvalidParamsError as e: # This can occur if uncertainty data is missing or otherwise broken # print(e) diff --git a/activity_browser/app/ui/tabs/parameters.py b/activity_browser/app/ui/tabs/parameters.py index 5b9676069..ea337d4e4 100644 --- a/activity_browser/app/ui/tabs/parameters.py +++ b/activity_browser/app/ui/tabs/parameters.py @@ -342,10 +342,9 @@ def save_scenarios(self): QMessageBox.Ok, QMessageBox.Ok ) - @Slot(name="createPresamplesPackage") def calculate_scenarios(self): - if not ps_utils.PresamplesParameterManager.can_build_presamples(): + if not ps_utils.PresamplesParameterManager.has_parameterized_exchanges(): QMessageBox.warning( self, "No parameterized exchanges", "Please set formulas on exchanges to make use of scenario analysis.", @@ -372,7 +371,7 @@ def calculate_scenarios(self): def build_presamples_packages(self, name: str): """ Calculate and store presamples arrays from parameter scenarios. """ - ppm = ps_utils.PresamplesParameterManager.construct() + ppm = ps_utils.PresamplesParameterManager() names, data = zip(*self.tbl.iterate_scenarios()) ps_id, path = ppm.presamples_from_scenarios(name, zip(names, data)) description = "{}".format(tuple(names))