Skip to content

Commit

Permalink
Fix defect ensemble model (with Code) and genetic algorithm (#2317)
Browse files Browse the repository at this point in the history
* Closes #2304

* added test

* Update tests/framework/Optimizers/GeneticAlgorithms/continuous/unconstrained/test_ensembleModel_withGA_Code_and_Functions.xml

* removed trailing

* added missing input file

* Update tests/framework/Optimizers/GeneticAlgorithms/continuous/unconstrained/metaModelWithCodeAndFunctionsAndGenetic/decayConstantB.py

* isinstance

* fixed parallel execution

* readded heron and baycal?

* removed trailing spaces

* some cleaning

* fixed test name and rook error message

* module ensemble doc

* Apply suggestions from code review

* removed trailing whitespaces

* fix for 'cvxpy'...this is not a very maintainable way to handle such exceptions

* changed approach
  • Loading branch information
alfoa authored May 16, 2024
1 parent 579eb05 commit 782ccc0
Show file tree
Hide file tree
Showing 15 changed files with 348 additions and 543 deletions.
12 changes: 10 additions & 2 deletions ravenframework/JobHandler.py
Original file line number Diff line number Diff line change
Expand Up @@ -702,7 +702,7 @@ def reAddJob(self, runner):
runner.trackTime('queue')
self.__submittedJobs.append(runner.identifier)

def addClientJob(self, args, functionToRun, identifier, metadata=None, uniqueHandler="any"):
def addClientJob(self, args, functionToRun, identifier, metadata=None, uniqueHandler="any", groupInfo = None):
"""
Method to add an internal run (function execution), without consuming
resources (free spots). This can be used for client handling (see
Expand All @@ -719,11 +719,19 @@ def addClientJob(self, args, functionToRun, identifier, metadata=None, uniqueHan
this runner. For example, if present, to retrieve this runner using the
method jobHandler.getFinished, the uniqueHandler needs to be provided.
If uniqueHandler == 'any', every "client" can get this runner.
@ In, groupInfo, dict, optional, {id:string, size:int}.
- "id": it is a special keyword attached to
this runner to identify that this runner belongs to a special set of runs that need to be
grouped together (all will be retrievable only when all the runs ended).
- "size", number of runs in this group self.__batching
NOTE: If the "size" of the group is only set the first time a job of this group is added.
Consequentially the size is immutable
@ Out, None
"""
self.addJob(args, functionToRun, identifier, metadata,
forceUseThreads = True, uniqueHandler = uniqueHandler,
clientQueue = True)
clientQueue = True, groupInfo = groupInfo)


def addFinishedJob(self, data, metadata=None, uniqueHandler="any", profile=False):
"""
Expand Down
60 changes: 41 additions & 19 deletions ravenframework/Models/EnsembleModel.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,8 @@
# See the License for the specific language governing permissions and
# limitations under the License.
"""
Module where the base class and the specialization of different type of Model are
EnsembleModel module, containing the class and methods to create a comunication 'pipeline' among
different models in terms of Input/Output relation
"""
#for future compatibility with Python 3--------------------------------------------------------------
from __future__ import division, print_function, unicode_literals, absolute_import
Expand All @@ -38,7 +39,8 @@

class EnsembleModel(Dummy):
"""
EnsembleModel class. This class is aimed to create a comunication 'pipe' among different models in terms of Input/Output relation
EnsembleModel class. This class is aimed to create a comunication 'pipe' among
different models in terms of Input/Output relation
"""
@classmethod
def specializeValidateDict(cls):
Expand Down Expand Up @@ -118,7 +120,9 @@ def localInputAndChecks(self,xmlNode):
# list(metadataToTranfer, ModelSource,Alias (optional))
if 'source' not in childChild.attrib.keys():
self.raiseAnError(IOError, 'when metadataToTransfer XML block is defined, the "source" attribute must be inputted!')
self.modelsInputDictionary[modelName][childChild.tag].append([childChild.text.strip(),childChild.attrib['source'],childChild.attrib.get("alias",None)])
self.modelsInputDictionary[modelName][childChild.tag].append([childChild.text.strip(),
childChild.attrib['source'],
childChild.attrib.get("alias",None)])
else:
try:
self.modelsInputDictionary[modelName][childChild.tag].append(childChild.text.strip())
Expand Down Expand Up @@ -158,7 +162,7 @@ def __readSettings(self, xmlNode):
elif child.tag == 'initialConditions':
for var in child:
if "repeat" in var.attrib.keys():
self.initialConditions[var.tag] = np.repeat([float(var.text.split()[0])], int(var.attrib['repeat'])) #np.array([float(var.text.split()[0]) for _ in range(int(var.attrib['repeat']))])
self.initialConditions[var.tag] = np.repeat([float(var.text.split()[0])], int(var.attrib['repeat']))
else:
try:
values = var.text.split()
Expand Down Expand Up @@ -342,7 +346,8 @@ def initialize(self,runInfo,inputs,initDict=None):
self.raiseAnError(IOError,"Picard's iterations mode activated but no initial conditions provided!")
else:
if len(self.initialStartModels) !=0:
self.raiseAnError(IOError, "The 'initialStartModels' xml node is not needed for non-Picard calculations, since the running sequence can be automatically determined by the code! Please delete this node to avoid a mistake.")
self.raiseAnError(IOError, "The 'initialStartModels' xml node is not needed for non-Picard calculations, "
"since the running sequence can be automatically determined by the code! Please delete this node to avoid a mistake.")
self.raiseAMessage("EnsembleModel connections determined a linear system. Picard's iterations not activated!")

for modelIn in self.modelsDictionary.keys():
Expand Down Expand Up @@ -395,7 +400,7 @@ def __selectInputSubset(self,modelName, kwargs):
for key in kwargs["SampledVars"].keys():
if key in self.modelsDictionary[modelName]['Input']:
selectedkwargs['SampledVars'][key] = kwargs["SampledVars"][key]
selectedkwargs['SampledVarsPb'][key] = kwargs["SampledVarsPb"][key] if 'SampledVarsPb' in kwargs.keys() and key in kwargs["SampledVarsPb"].keys() else 1.0
selectedkwargs['SampledVarsPb'][key] = kwargs["SampledVarsPb"][key] if 'SampledVarsPb' in kwargs and key in kwargs["SampledVarsPb"] else 1.
return selectedkwargs

def createNewInput(self,myInput,samplerType,**kwargs):
Expand All @@ -422,7 +427,7 @@ def createNewInput(self,myInput,samplerType,**kwargs):
for modelIn, specs in self.modelsDictionary.items():
for inp in specs['Input']:
if inp not in allCoveredVariables:
self.raiseAnError(RuntimeError,"for sub-model "+ modelIn + " the input "+inp+" has not been found among other models' outputs and sampled variables!")
self.raiseAnError(RuntimeError,f"for sub-model {modelIn} the input {inp} has not been found among other models' outputs and sampled variables!")

## Now prepare the new inputs for each model
for modelIn, specs in self.modelsDictionary.items():
Expand Down Expand Up @@ -488,8 +493,8 @@ def collectOutput(self,finishedJob,output):

def getAdditionalInputEdits(self,inputInfo):
"""
Collects additional edits for the sampler to use when creating a new input. In this case, it calls all the getAdditionalInputEdits methods
of the sub-models
Collects additional edits for the sampler to use when creating a new input. In this case,
it calls all the getAdditionalInputEdits methods of the sub-models
@ In, inputInfo, dict, dictionary in which to add edits
@ Out, None.
"""
Expand Down Expand Up @@ -549,6 +554,7 @@ def submit(self,myInput,samplerType,jobHandler,**kwargs):
for index in range(nRuns):
if batchMode:
kw = kwargs['batchInfo']['batchRealizations'][index]
kw['batchRun'] = index + 1
else:
kw = kwargs

Expand All @@ -563,7 +569,14 @@ def submit(self,myInput,samplerType,jobHandler,**kwargs):
uniqueHandler=uniqueHandler, forceUseThreads=forceThreads,
groupInfo={'id': kwargs['batchInfo']['batchId'], 'size': nRuns} if batchMode else None)
else:
jobHandler.addClientJob((self, myInput, samplerType, kwargs), self.__class__.evaluateSample, prefix, kwargs)
# for parallel strategy 2, the ensemble model works as a step => it needs the jobHandler
kw['jobHandler'] = jobHandler
# for parallel strategy 2, we need to make sure that the batchMode is set to False in the inner runs since only the
# ensemble model evaluation should be batched (THIS IS REQUIRED because the CODE does not submit runs like the other models)
kw['batchMode'] = False
jobHandler.addClientJob((self, myInput, samplerType, kw), self.__class__.evaluateSample, prefix, metadata=metadata,
uniqueHandler=uniqueHandler,
groupInfo={'id': kwargs['batchInfo']['batchId'], 'size': nRuns} if batchMode else None)

def __retrieveDependentOutput(self,modelIn,listOfOutputs, typeOutputs):
"""
Expand Down Expand Up @@ -660,16 +673,19 @@ def _externalRun(self,inRun, jobHandler = None):#, jobHandler):
else:
self.raiseAnError(IOError,"No initial conditions provided for variable "+ initialConditionToSet)
# set new identifiers
inputKwargs[modelIn]['prefix'] = modelIn+utils.returnIdSeparator()+identifier
inputKwargs[modelIn]['uniqueHandler'] = self.name+identifier
suffix = ''
if 'batchRun' in inputKwargs[modelIn]:
suffix = f"{utils.returnIdSeparator()}{inputKwargs[modelIn]['batchRun']}"
inputKwargs[modelIn]['prefix'] = f"{modelIn}{utils.returnIdSeparator()}{identifier}{suffix}"
inputKwargs[modelIn]['uniqueHandler'] = f"{self.name}{identifier}{suffix}"
if metadataToTransfer is not None:
inputKwargs[modelIn]['metadataToTransfer'] = metadataToTransfer

for key, value in dependentOutput.items():
inputKwargs[modelIn]["SampledVars" ][key] = dependentOutput[key]
## FIXME it is a mistake (Andrea). The SampledVarsPb for this variable should be transferred from outside
## Who has this information? -- DPM 4/11/17
inputKwargs[modelIn]["SampledVarsPb"][key] = 1.0
inputKwargs[modelIn]["SampledVarsPb"][key] = 1.
self._replaceVariablesNamesWithAliasSystem(inputKwargs[modelIn]["SampledVars" ],'input',False)
self._replaceVariablesNamesWithAliasSystem(inputKwargs[modelIn]["SampledVarsPb"],'input',False)
## FIXME: this will come after we rework the "runInfo" collection in the code
Expand All @@ -696,8 +712,10 @@ def _externalRun(self,inRun, jobHandler = None):#, jobHandler):
if iterationCount == 1:
residueContainer[modelIn]['iterValues'][1][out] = np.zeros(len(residueContainer[modelIn]['iterValues'][0][out]))
for out in gotOutputs[modelCnt].keys():
residueContainer[modelIn]['residue'][out] = abs(np.asarray(residueContainer[modelIn]['iterValues'][0][out]) - np.asarray(residueContainer[modelIn]['iterValues'][1][out]))
residueContainer[modelIn]['Norm'] = np.linalg.norm(np.asarray(list(residueContainer[modelIn]['iterValues'][1].values()))-np.asarray(list(residueContainer[modelIn]['iterValues'][0].values())))
residueContainer[modelIn]['residue'][out] = abs(np.asarray(residueContainer[modelIn]['iterValues'][0][out]) -
np.asarray(residueContainer[modelIn]['iterValues'][1][out]))
residueContainer[modelIn]['Norm'] = np.linalg.norm(np.asarray(list(residueContainer[modelIn]['iterValues'][1].values()))-
np.asarray(list(residueContainer[modelIn]['iterValues'][0].values())))

# if nonlinear system, check the total residue and convergence
if self.activatePicard:
Expand Down Expand Up @@ -735,9 +753,11 @@ def __advanceModel(self, identifier, modelToExecute, origInputList, inputKwargs,
@ Out, evaluation, dict, the evaluation dictionary with the "unprojected" data
"""
returnDict = {}

suffix = ''
if 'batchRun' in inputKwargs:
suffix = f"{utils.returnIdSeparator()}{inputKwargs['batchRun']}"
self.raiseADebug('Submitting model',modelToExecute['Instance'].name)
localIdentifier = modelToExecute['Instance'].name+utils.returnIdSeparator()+identifier
localIdentifier = f"{modelToExecute['Instance'].name}{utils.returnIdSeparator()}{identifier}{suffix}"
if self.parallelStrategy == 1:
# we evaluate the model directly
try:
Expand All @@ -756,7 +776,7 @@ def __advanceModel(self, identifier, modelToExecute, origInputList, inputKwargs,
time.sleep(1.e-3)
moveOn = True
# get job that just finished to gather the results
finishedRun = jobHandler.getFinished(jobIdentifier = localIdentifier, uniqueHandler=self.name+identifier)
finishedRun = jobHandler.getFinished(jobIdentifier = localIdentifier, uniqueHandler=f"{self.name}{identifier}{suffix}")
evaluation = finishedRun[0].getEvaluation()
if isinstance(evaluation, rerror):
if finishedRun[0].exceptionTrace is not None:
Expand All @@ -767,7 +787,9 @@ def __advanceModel(self, identifier, modelToExecute, origInputList, inputKwargs,
evaluation = None
# the model failed
for modelToRemove in list(set(self.orderList) - set([modelToExecute['Instance'].name])):
jobHandler.getFinished(jobIdentifier = modelToRemove + utils.returnIdSeparator() + identifier, uniqueHandler = self.name + identifier)
jobHandler.getFinished(jobIdentifier = f"{modelToRemove}{utils.returnIdSeparator()}{identifier}{suffix}",
uniqueHandler = f"{self.name}{identifier}{suffix}")

else:
# collect the target evaluation
modelToExecute['Instance'].collectOutput(finishedRun[0],inRunTargetEvaluations)
Expand Down
2 changes: 1 addition & 1 deletion ravenframework/Steps/MultiRun.py
Original file line number Diff line number Diff line change
Expand Up @@ -214,7 +214,7 @@ def _localTakeAstepRun(self, inDictionary):
for idx in currentFailures:
finishedJobList.pop(idx)

if type(finishedJobObjs).__name__ in 'list': # TODO: should be consistent, if no batching should batch size be 1 or 0 ?
if isinstance(finishedJobObjs, list): # TODO: should be consistent, if no batching should batch size be 1 or 0 ?
# if sampler claims it's batching, then only collect once, since it will collect the batch
# together, not one-at-a-time
# FIXME: IN HERE WE SEND IN THE INSTANCE OF THE FIRST JOB OF A BATCH
Expand Down
3 changes: 3 additions & 0 deletions rook/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -471,6 +471,9 @@ def term_handler(signum, _):
for child in node.children:
#print(test_name,"child",child)
child_type = child.attrib['type']
if child_type not in differs:
raise IOError("Differ type '" + child_type + "' unknown! Available are: " +
', '.join(list(differs.keys())) + ". Test file: "+ test_file)
child_param_handler = differs[child_type].get_valid_params()
if not child_param_handler.check_for_required(child.attrib):
raise IOError("Missing Parameters in: " + child.tag + "/" + node.tag +
Expand Down
3 changes: 3 additions & 0 deletions scripts/library_handler.py
Original file line number Diff line number Diff line change
Expand Up @@ -170,6 +170,9 @@ def checkSingleLibrary(lib, version=None, useImportCheck=False):
## this avoids actually importing the modules
if usePackageMeta and not useImportCheck:
found, msg, foundVersion = findLibAndVersion(lib, version=version)
if not found:
# try slower approach
found, msg, foundVersion = findLibAndVersionSubprocess(lib, version=version)
# otherwise, use the slower subprocess method
else:
found, msg, foundVersion = findLibAndVersionSubprocess(lib, version=version)
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
# Copyright 2017 Battelle Energy Alliance, LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

def run(self, Input):
self.sum = self.A + self.B + self.C + self.D
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
# Copyright Nucube Energy, Inc.

def evaluate(self):
return self.decay_A+0.0001

Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
<?xml version="1.0" ?>
<AnalyticalBateman>
<totalTime>300</totalTime>
<powerHistory>1 1 1</powerHistory>
<flux>1e14 1e14 1e14</flux>
<stepDays>0 100 200 400</stepDays>
<timeSteps>100 100 100</timeSteps>
<nuclides>
<A>
<equationType>N1</equationType>
<initialMass>1.0</initialMass>
<decayConstant>$RAVEN-decay_A|10$</decayConstant>
<sigma>$RAVEN-sigma-A|10$</sigma>
<ANumber>230</ANumber>
</A>
<B>
<equationType>N2</equationType>
<initialMass>1.0</initialMass>
<decayConstant>$RAVEN-decay_B:0.000000006$</decayConstant>
<sigma>$RAVEN-sigma-B:5$</sigma>
<ANumber>200</ANumber>
</B>
<C>
<equationType>N3</equationType>
<initialMass>1.0</initialMass>
<decayConstant>$RAVEN-decay-C:0.000000008$</decayConstant>
<sigma>$RAVEN-sigma-C:3$</sigma>
<ANumber>150</ANumber>
</C>
<D>
<equationType>N4</equationType>
<initialMass>1.0</initialMass>
<decayConstant>$RAVEN-decay-D:0.000000009$</decayConstant>
<sigma>$RAVEN-sigma-D:1$</sigma>
<ANumber>100</ANumber>
</D>
</nuclides>
</AnalyticalBateman>

Loading

0 comments on commit 782ccc0

Please sign in to comment.