Skip to content

Commit

Permalink
pylinting
Browse files Browse the repository at this point in the history
  • Loading branch information
treyra committed Jul 29, 2024
1 parent 69727ac commit d2418b1
Show file tree
Hide file tree
Showing 4 changed files with 20 additions and 38 deletions.
21 changes: 3 additions & 18 deletions failurePy/load/estimatorConstructors.py
Original file line number Diff line number Diff line change
Expand Up @@ -136,24 +136,9 @@ def loadPhysicalStateSubEstimator(inputDict,linear,numState,generalFaultFlag,sil
from failurePy.estimators.beliefInitialization import uniformFailureBeliefMarginalizedFEJKalman as beliefInitializationF
#UKF
elif subEstimatorFString in {"unscentedKalmanFilter".lower(), "unscentedKalman".lower(), "unscented","ukf"}:
#Import estimator
from failurePy.estimators.unscentedKalmanFilter import predictAndUpdateAll as physicalStateSubEstimatorFUnWrapped
from failurePy.estimators.unscentedKalmanFilter import createWeights
#We need to wrap up the Estimator parameters tuple here, so we don't need to worry about it later when calling it!
#We'll use a nested function. Note that since we only define this once, with no looping, we shouldn't need to worry about late closing bindings
#But if we were to modify the values of that tuple at any point, this could introduce hard to spot errors.
alpha = loadOptionalParameter("alpha",inputDict,.001,silent=silent)
beta = loadOptionalParameter("beta",inputDict,2,silent=silent)
kappa = loadOptionalParameter("kappa",inputDict,0,silent=silent)
meanWeights,covarianceWeights = createWeights(alpha,beta,kappa,numState)
filterParametersTuple = (alpha,kappa,meanWeights,covarianceWeights) #beta isn't needed anymore
#Define physicalStateSubEstimatorF to be unscented estimator, with the weighting parameters fixed
def physicalStateSubEstimatorF(nominalAction,observation,previousFilters,possibleFailures,systemF,systemParametersTuple,physicalStateJacobianF):
return physicalStateSubEstimatorFUnWrapped(nominalAction,observation,previousFilters,possibleFailures,systemF,systemParametersTuple,physicalStateJacobianF,filterParametersTuple)

from failurePy.estimators.kalmanFilterCommon import sampleFromFilter as physicalStateSubEstimatorSampleF
#initialize to uniform belief for now
from failurePy.estimators.beliefInitialization import uniformFailureBeliefMarginalizedKalman as beliefInitializationF
#Not a development priority at this pint
futureCapability = "An unscentedKalmanFilter compatible with our marginalized filter is possible future work, but is not currently implemented"
raise NotImplementedError(futureCapability)

else:
raiseSpecificationNotFoundError(subEstimatorFString,"physicalStateSubEstimator")
Expand Down
4 changes: 1 addition & 3 deletions failurePy/load/systemConstructors.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,14 +2,12 @@
Module for loading and constructing each system
"""

import os

import numbers

import jax.numpy as jnp
from jax.scipy.linalg import block_diag as blockDiag

from failurePy.load.yamlLoaderUtilityMethods import checkParameter, loadOptionalParameter, loadRequiredParameter, raiseIncompatibleSpecifications, getInputDict, raiseSpecificationNotFoundError
from failurePy.load.yamlLoaderUtilityMethods import checkParameter, loadOptionalParameter, loadRequiredParameter, raiseIncompatibleSpecifications

#We do a lot of conditional importing to only load in the models, solvers, estimators as needed, and bind them to shared names to pass back to pipeline.
#We could change this to be done in sub modules, or import everything and conditionally bind the names, but we'd be importing a lot more than we need to. Maybe look into manifests?
Expand Down
25 changes: 12 additions & 13 deletions failurePy/load/yamlLoader.py
Original file line number Diff line number Diff line change
Expand Up @@ -155,23 +155,22 @@ def loadExperimentParams(inputDict,extraData=False,silent=False):
futureCapability = "The distributed version of s-FEAST is intended future work, but is not currently implemented"
raise NotImplementedError(futureCapability)

#Single agent otherwise
else:
#Load dimensions and linearity, from this construct system parameters. dt is pretty universal, so making an exception for it
systemF, systemParametersTuple, physicalStateJacobianF, dim, linear, dt, sigmaW, sigmaV, numAct= loadAndBuildSingleAgentSystem(inputDict,providedFailure,generalFaultFlag,silent) # pylint: disable=invalid-name
#Single agent only for now
#Load dimensions and linearity, from this construct system parameters. dt is pretty universal, so making an exception for it
systemF, systemParametersTuple, physicalStateJacobianF, dim, linear, dt, sigmaW, sigmaV, numAct= loadAndBuildSingleAgentSystem(inputDict,providedFailure,generalFaultFlag,silent) # pylint: disable=invalid-name

#To get the number of states, this is length of covariance matrix, which is the -2 element of the systemParametersTuple
covarianceQ = systemParametersTuple[-2]
numState = len(covarianceQ)
#To get the number of states, this is length of covariance matrix, which is the -2 element of the systemParametersTuple
covarianceQ = systemParametersTuple[-2]
numState = len(covarianceQ)

#Load estimator and belief initialization function
estimatorF,physicalStateSubEstimatorF,physicalStateSubEstimatorSampleF, beliefInitializationF = loadEstimatorAndBelief(inputDict,linear,numState,generalFaultFlag,silent=silent)
#Load estimator and belief initialization function
estimatorF,physicalStateSubEstimatorF,physicalStateSubEstimatorSampleF, beliefInitializationF = loadEstimatorAndBelief(inputDict,linear,numState,generalFaultFlag,silent=silent)

#Load reward function
rewardF, safetyFunctionF = loadRewardF(inputDict,physicalStateSubEstimatorSampleF,silent)
#Load reward function
rewardF, safetyFunctionF = loadRewardF(inputDict,physicalStateSubEstimatorSampleF,silent)

#Load the solver(s) to run, and return parameters needed. Names are used for data logging
solverFList, solverParametersTuplesList, solverNamesList = loadSolvers(inputDict,systemParametersTuple,dim,linear,legacyPaperCodeFlag,safetyFunctionF,silent)
#Load the solver(s) to run, and return parameters needed. Names are used for data logging
solverFList, solverParametersTuplesList, solverNamesList = loadSolvers(inputDict,systemParametersTuple,dim,linear,legacyPaperCodeFlag,safetyFunctionF,silent)


#Load plotting parameters (these are all optional and silent)
Expand Down
8 changes: 4 additions & 4 deletions failurePy/pipeline.py
Original file line number Diff line number Diff line change
Expand Up @@ -531,8 +531,8 @@ def runSingleAgentTrial(initializationDict,nSimulationsPerTree,nExperimentSteps,
#return results!
return makeTrialResultDict(physicalStateList,failureStateList,beliefList,rewards,actionList,initialFailureParticles,success,timeStep,wctStartTime,saveTreeFlag,treeList,computeSafetyAtEnd)

def runNetworkTrial(initializationDict,nSimulationsPerTree,nExperimentSteps,systemF,systemParametersTuple,solverF,solverParametersTuple,estimatorF,
physicalStateSubEstimatorF,physicalStateJacobianF,physicalStateSubEstimatorSampleF,rewardF,initialFailureParticles, generalFaultDict, diagnosisThreshold,filterDivergenceHandlingMethod,
def runNetworkTrial(initializationDict,nSimulationsPerTree,nExperimentSteps,systemF,systemParametersTuple,solverF,solverParametersTuple,estimatorF, # pylint: disable=unused-argument
physicalStateSubEstimatorF,physicalStateJacobianF,physicalStateSubEstimatorSampleF,rewardF,initialFailureParticles, generalFaultDict, diagnosisThreshold,filterDivergenceHandlingMethod, # pylint: disable=unused-argument
saveTreeFlag, rngKey, computeSuccessAtEnd=False,computeSafetyAtEnd=True, numWarmStart=0): # pylint: disable=unused-argument
"""
Networked systems runner
Expand Down Expand Up @@ -807,8 +807,8 @@ def visualize(experimentParamsDict,saveDirectoryPath):
if experimentParamsDict["networkFlag"]:
futureCapability = "The distributed version of s-FEAST is intended future work, but is not currently implemented"
raise NotImplementedError(futureCapability)
else:
visualizeFirstTrajectory(saveDirectoryPath,experimentParamsDict,outputFilePath)
#else:
visualizeFirstTrajectory(saveDirectoryPath,experimentParamsDict,outputFilePath)

def getCommandLineArgs():
"""
Expand Down

0 comments on commit d2418b1

Please sign in to comment.