Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

I12b unscented kalman edits #180

Merged
merged 10 commits into from
Feb 7, 2024
116 changes: 116 additions & 0 deletions examples/scripts/exp_UKF.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,116 @@
import pybop
import pybamm
import numpy as np
from examples.standalone.model import ExponentialDecay

# Parameter set and model definition
parameter_set = pybamm.ParameterValues({"k": "[input]", "y0": "[input]"})
model = ExponentialDecay(parameter_set=parameter_set, n_states=1)
x0 = np.array([0.1, 1.0])

# Fitting parameters
parameters = [
pybop.Parameter(
"k",
prior=pybop.Gaussian(0.1, 0.05),
bounds=[0, 1],
),
pybop.Parameter(
"y0",
prior=pybop.Gaussian(1, 0.05),
bounds=[0, 3],
),
]

# Verification: save fixed inputs for testing
inputs = dict()
for i, param in enumerate(parameters):
inputs[param.name] = x0[i]

# Make a prediction with measurement noise
sigma = 1e-2
t_eval = np.linspace(0, 20, 10)
values = model.predict(t_eval=t_eval, inputs=inputs)
values = values["2y"].data
corrupt_values = values + np.random.normal(0, sigma, len(t_eval))

# Verification step: compute the analytical solution for 2y
expected_values = 2 * inputs["y0"] * np.exp(-inputs["k"] * t_eval)

# Verification step: make another prediction using the Observer class
model.build(parameters=parameters)
simulator = pybop.Observer(parameters, model, signal=["2y"], x0=x0)
simulator._time_data = t_eval
measurements = simulator.evaluate(x0)
measurements = measurements[:, 0]

# Verification step: Compare by plotting
go = pybop.PlotlyManager().go
line1 = go.Scatter(x=t_eval, y=corrupt_values, name="Corrupt values", mode="markers")
line2 = go.Scatter(
x=t_eval, y=expected_values, name="Expected trajectory", mode="lines"
)
line3 = go.Scatter(x=t_eval, y=measurements, name="Observed values", mode="markers")
fig = go.Figure(data=[line1, line2, line3])

# Form dataset
dataset = pybop.Dataset(
{
"Time [s]": t_eval,
"Current function [A]": 0 * t_eval, # placeholder
"2y": corrupt_values,
}
)

# Build the model to get the number of states
model.build(dataset=dataset.data, parameters=parameters)

# Define the UKF observer
signal = ["2y"]
n_states = model.n_states
n_signals = len(signal)
covariance = np.diag([sigma**2] * n_states)
process_noise = np.diag([1e-6] * n_states)
measurement_noise = np.diag([sigma**2] * n_signals)
observer = pybop.UnscentedKalmanFilterObserver(
parameters,
model,
covariance,
process_noise,
measurement_noise,
dataset,
signal=signal,
x0=x0,
)

# Verification step: Find the maximum likelihood estimate given the true parameters
estimation = observer.evaluate(x0)
estimation = estimation[:, 0]

# Verification step: Add the estimate to the plot
line4 = go.Scatter(x=t_eval, y=estimation, name="Estimated trajectory", mode="lines")
fig.add_trace(line4)
fig.show()

# Generate problem, cost function, and optimisation class
cost = pybop.ObserverCost(observer)
optim = pybop.Optimisation(cost, optimiser=pybop.CMAES, verbose=True)

# Run optimisation
x, final_cost = optim.run()
print("Estimated parameters:", x)

# Plot the timeseries output (requires model that returns Voltage)
pybop.quick_plot(x, cost, title="Optimised Comparison")

# Plot convergence
pybop.plot_convergence(optim)

# Plot the parameter traces
pybop.plot_parameters(optim)

# Plot the cost landscape
pybop.plot_cost2d(cost, steps=15)

# Plot the cost landscape with optimisation path
pybop.plot_cost2d(cost, optim=optim, steps=15)
82 changes: 82 additions & 0 deletions examples/scripts/spm_UKF.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,82 @@
import pybop
import numpy as np

# Parameter set and model definition
parameter_set = pybop.ParameterSet.pybamm("Chen2020")
model = pybop.lithium_ion.SPM(parameter_set=parameter_set)

# Fitting parameters
parameters = [
pybop.Parameter(
"Negative electrode active material volume fraction",
prior=pybop.Gaussian(0.6, 0.05),
bounds=[0.5, 0.8],
),
pybop.Parameter(
"Positive electrode active material volume fraction",
prior=pybop.Gaussian(0.48, 0.05),
bounds=[0.4, 0.7],
),
]

# Make a prediction with measurement noise
sigma = 0.001
t_eval = np.arange(0, 300, 2)
values = model.predict(t_eval=t_eval)
corrupt_values = values["Voltage [V]"].data + np.random.normal(0, sigma, len(t_eval))

# Form dataset
dataset = pybop.Dataset(
{
"Time [s]": t_eval,
"Current function [A]": values["Current [A]"].data,
"Voltage [V]": corrupt_values,
}
)

# Build the model to get the number of states
model.build(dataset=dataset.data, parameters=parameters)

# Define the UKF observer, setting the particle boundaries as uncertain states
signal = ["Voltage [V]"]
n_states = model.n_states
n_signals = len(signal)
covariance = np.diag([0] * 29 + [sigma**2] + [0] * 9 + [sigma**2])
process_noise = np.diag([0] * 29 + [1e-6] + [0] * 9 + [1e-6])
measurement_noise = np.diag([sigma**2])
observer = pybop.UnscentedKalmanFilterObserver(
parameters,
model,
covariance,
process_noise,
measurement_noise,
dataset,
signal=signal,
)

# Generate problem, cost function, and optimisation class
cost = pybop.ObserverCost(observer)
optim = pybop.Optimisation(cost, optimiser=pybop.PSO, verbose=True)

# Parameter identification using the current observer implementation is very slow
# so let's restrict the number of iterations and reduce the number of plots
optim.set_max_iterations(5)

# Run optimisation
x, final_cost = optim.run()
print("Estimated parameters:", x)

# Plot the timeseries output (requires model that returns Voltage)
pybop.quick_plot(x, cost, title="Optimised Comparison")

# # Plot convergence
# pybop.plot_convergence(optim)

# # Plot the parameter traces
# pybop.plot_parameters(optim)

# # Plot the cost landscape
# pybop.plot_cost2d(cost, steps=5)

# # Plot the cost landscape with optimisation path
# pybop.plot_cost2d(cost, optim=optim, steps=5)
Original file line number Diff line number Diff line change
Expand Up @@ -15,15 +15,15 @@
def __init__(
self,
name: str = "Constant Model",
parameters: pybamm.ParameterValues = None,
nstate: int = 1,
parameter_set: pybamm.ParameterValues = None,
n_states: int = 1,
):
super().__init__()
self.nstate = nstate
if nstate < 1:
raise ValueError("nstate must be greater than 0")
self.n_states = n_states
if n_states < 1:
raise ValueError("The number of states (n_states) must be greater than 0")

Check warning on line 24 in examples/standalone/model.py

View check run for this annotation

Codecov / codecov/patch

examples/standalone/model.py#L24

Added line #L24 was not covered by tests
self.pybamm_model = pybamm.BaseModel()
ys = [pybamm.Variable(f"y_{i}") for i in range(nstate)]
ys = [pybamm.Variable(f"y_{i}") for i in range(n_states)]
k = pybamm.Parameter("k")
y0 = pybamm.Parameter("y0")
self.pybamm_model.rhs = {y: -k * y for y in ys}
Expand All @@ -41,7 +41,7 @@
self.name = name

self.default_parameter_values = (
default_parameter_values if parameters is None else parameters
default_parameter_values if parameter_set is None else parameter_set
)
self._parameter_set = self.default_parameter_values
self._unprocessed_parameter_set = self._parameter_set
Expand Down
30 changes: 14 additions & 16 deletions pybop/_costs.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
import numpy as np

from pybop.observers.observer import Observer
from pybop._problem import FittingProblem


class BaseCost:
Expand All @@ -26,6 +25,8 @@ class BaseCost:
The bounds for the model parameters.
n_parameters : int
The number of parameters in the model.
n_outputs : int
The number of outputs in the model.
"""

def __init__(self, problem):
Expand All @@ -35,6 +36,7 @@ def __init__(self, problem):
self.x0 = problem.x0
self.bounds = problem.bounds
self.n_parameters = problem.n_parameters
self.n_outputs = problem.n_outputs

def __call__(self, x, grad=None):
"""
Expand Down Expand Up @@ -255,13 +257,13 @@ def _evaluateS1(self, x):
y, dy = self.problem.evaluateS1(x)
if len(y) < len(self._target):
e = np.float64(np.inf)
de = self._de * np.ones(self.problem.n_parameters)
de = self._de * np.ones(self.n_parameters)
else:
dy = dy.reshape(
(
self.problem.n_time_data,
self.problem.n_outputs,
self.problem.n_parameters,
self.n_outputs,
self.n_parameters,
)
)
r = y - self._target
Expand Down Expand Up @@ -297,11 +299,11 @@ class ObserverCost(BaseCost):

"""

def __init__(self, problem: FittingProblem, observer: Observer):
super(ObserverCost, self).__init__(problem)
def __init__(self, observer: Observer):
super().__init__(problem=observer)
self._observer = observer

def __call__(self, x, grad=None):
def _evaluate(self, x, grad=None):
"""
Calculate the observer cost for a given set of parameters.

Expand All @@ -317,16 +319,12 @@ def __call__(self, x, grad=None):
-------
float
The observer cost (negative of the log likelihood).

"""
try:
inputs = {key: x[i] for i, key in enumerate(self._observer._model.fit_keys)}
log_likelihood = self._observer.log_likelihood(
self.problem.target(), self.problem.time_data(), inputs
)
return -log_likelihood
except Exception as e:
raise ValueError(f"Error in cost calculation: {e}")
inputs = {key: x[i] for i, key in enumerate(self._observer._model.fit_keys)}
log_likelihood = self._observer.log_likelihood(
self._target, self._observer.time_data(), inputs
)
return -log_likelihood

def evaluateS1(self, x):
"""
Expand Down
12 changes: 5 additions & 7 deletions pybop/_problem.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,5 @@
from typing import Optional
import numpy as np

from pybop.observers.observer import Observer


class BaseProblem:
"""
Expand All @@ -16,6 +13,8 @@ class BaseProblem:
The model to be used for the problem (default: None).
check_model : bool, optional
Flag to indicate if the model should be checked (default: True).
signal: List[str]
The signal to observe.
init_soc : float, optional
Initial state of charge (default: None).
x0 : np.ndarray, optional
Expand Down Expand Up @@ -132,8 +131,8 @@ class FittingProblem(BaseProblem):
The model to fit.
parameters : list
List of parameters for the problem.
dataset : list
List of data objects to fit the model to.
dataset : Dataset
Dataset object containing the data to fit the model to.
signal : str, optional
The signal to fit (default: "Voltage [V]").
"""
Expand All @@ -147,15 +146,14 @@ def __init__(
signal=["Voltage [V]"],
init_soc=None,
x0=None,
observer: Optional[Observer] = None,
):
super().__init__(parameters, model, check_model, signal, init_soc, x0)
self._dataset = dataset.data

# Check that the dataset contains time and current
for name in ["Time [s]", "Current function [A]"] + self.signal:
if name not in self._dataset:
raise ValueError(f"expected {name} in list of dataset")
raise ValueError(f"Expected {name} in list of dataset")

self._time_data = self._dataset["Time [s]"]
self.n_time_data = len(self._time_data)
Expand Down
8 changes: 5 additions & 3 deletions pybop/models/base_model.py
Original file line number Diff line number Diff line change
Expand Up @@ -113,6 +113,8 @@ def build(
# Clear solver and setup model
self._solver._model_set_up = {}

self.n_states = self._built_model.len_rhs_and_alg # len_rhs + len_alg

def set_init_soc(self, init_soc):
"""
Set the initial state of charge for the battery model.
Expand Down Expand Up @@ -214,7 +216,7 @@ def step(self, state: TimeSeriesState, time: np.ndarray) -> TimeSeriesState:
The time to predict the system to (in whatever time units the model is in)
"""
dt = time - state.t
new_sol = self.solver.step(
new_sol = self._solver.step(
state.sol, self.built_model, dt, npts=2, inputs=state.inputs, save=False
)
return TimeSeriesState(sol=new_sol, inputs=state.inputs, t=time)
Expand Down Expand Up @@ -252,7 +254,7 @@ def simulate(self, inputs, t_eval) -> np.ndarray[np.float64]:
inputs=inputs,
allow_infeasible_solutions=self.allow_infeasible_solutions,
):
sol = self.solver.solve(self.built_model, inputs=inputs, t_eval=t_eval)
sol = self._solver.solve(self.built_model, inputs=inputs, t_eval=t_eval)

predictions = [sol[signal].data for signal in self.signal]

Expand Down Expand Up @@ -295,7 +297,7 @@ def simulateS1(self, inputs, t_eval):
inputs=inputs,
allow_infeasible_solutions=self.allow_infeasible_solutions,
):
sol = self.solver.solve(
sol = self._solver.solve(
self.built_model,
inputs=inputs,
t_eval=t_eval,
Expand Down
Loading
Loading