Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix defect ensemble model (with Code) and genetic algorithm #2317

Merged
merged 19 commits into from
May 16, 2024
Merged
Show file tree
Hide file tree
Changes from 12 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 10 additions & 2 deletions ravenframework/JobHandler.py
Original file line number Diff line number Diff line change
Expand Up @@ -702,7 +702,7 @@ def reAddJob(self, runner):
runner.trackTime('queue')
self.__submittedJobs.append(runner.identifier)

def addClientJob(self, args, functionToRun, identifier, metadata=None, uniqueHandler="any"):
def addClientJob(self, args, functionToRun, identifier, metadata=None, uniqueHandler="any", groupInfo = None):
"""
Method to add an internal run (function execution), without consuming
resources (free spots). This can be used for client handling (see
Expand All @@ -719,11 +719,19 @@ def addClientJob(self, args, functionToRun, identifier, metadata=None, uniqueHan
this runner. For example, if present, to retrieve this runner using the
method jobHandler.getFinished, the uniqueHandler needs to be provided.
If uniqueHandler == 'any', every "client" can get this runner.
@ In, groupInfo, dict, optional, {id:string, size:int}.
- "id": it is a special keyword attached to
this runner to identify that this runner belongs to a special set of runs that need to be
grouped together (all will be retrievable only when all the runs ended).
- "size", number of runs in this group self.__batching
NOTE: If the "size" of the group is only set the first time a job of this group is added.
Consequentially the size is immutable
@ Out, None
"""
self.addJob(args, functionToRun, identifier, metadata,
forceUseThreads = True, uniqueHandler = uniqueHandler,
clientQueue = True)
clientQueue = True, groupInfo = groupInfo)


def addFinishedJob(self, data, metadata=None, uniqueHandler="any", profile=False):
"""
Expand Down
61 changes: 40 additions & 21 deletions ravenframework/Models/EnsembleModel.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,9 +11,7 @@
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""
Module where the base class and the specialization of different type of Model are
"""

#for future compatibility with Python 3--------------------------------------------------------------
from __future__ import division, print_function, unicode_literals, absolute_import
#End compatibility block for Python 3----------------------------------------------------------------
Expand All @@ -38,7 +36,8 @@

class EnsembleModel(Dummy):
"""
EnsembleModel class. This class is aimed to create a comunication 'pipe' among different models in terms of Input/Output relation
EnsembleModel class. This class is aimed to create a comunication 'pipe' among
different models in terms of Input/Output relation
"""
@classmethod
def specializeValidateDict(cls):
Expand Down Expand Up @@ -118,7 +117,9 @@ def localInputAndChecks(self,xmlNode):
# list(metadataToTranfer, ModelSource,Alias (optional))
if 'source' not in childChild.attrib.keys():
self.raiseAnError(IOError, 'when metadataToTransfer XML block is defined, the "source" attribute must be inputted!')
self.modelsInputDictionary[modelName][childChild.tag].append([childChild.text.strip(),childChild.attrib['source'],childChild.attrib.get("alias",None)])
self.modelsInputDictionary[modelName][childChild.tag].append([childChild.text.strip(),
childChild.attrib['source'],
childChild.attrib.get("alias",None)])
else:
try:
self.modelsInputDictionary[modelName][childChild.tag].append(childChild.text.strip())
Expand Down Expand Up @@ -158,7 +159,7 @@ def __readSettings(self, xmlNode):
elif child.tag == 'initialConditions':
for var in child:
if "repeat" in var.attrib.keys():
self.initialConditions[var.tag] = np.repeat([float(var.text.split()[0])], int(var.attrib['repeat'])) #np.array([float(var.text.split()[0]) for _ in range(int(var.attrib['repeat']))])
self.initialConditions[var.tag] = np.repeat([float(var.text.split()[0])], int(var.attrib['repeat']))
else:
try:
values = var.text.split()
Expand Down Expand Up @@ -342,7 +343,8 @@ def initialize(self,runInfo,inputs,initDict=None):
self.raiseAnError(IOError,"Picard's iterations mode activated but no initial conditions provided!")
else:
if len(self.initialStartModels) !=0:
self.raiseAnError(IOError, "The 'initialStartModels' xml node is not needed for non-Picard calculations, since the running sequence can be automatically determined by the code! Please delete this node to avoid a mistake.")
self.raiseAnError(IOError, "The 'initialStartModels' xml node is not needed for non-Picard calculations, "
"since the running sequence can be automatically determined by the code! Please delete this node to avoid a mistake.")
self.raiseAMessage("EnsembleModel connections determined a linear system. Picard's iterations not activated!")

for modelIn in self.modelsDictionary.keys():
Expand Down Expand Up @@ -395,7 +397,7 @@ def __selectInputSubset(self,modelName, kwargs):
for key in kwargs["SampledVars"].keys():
if key in self.modelsDictionary[modelName]['Input']:
selectedkwargs['SampledVars'][key] = kwargs["SampledVars"][key]
selectedkwargs['SampledVarsPb'][key] = kwargs["SampledVarsPb"][key] if 'SampledVarsPb' in kwargs.keys() and key in kwargs["SampledVarsPb"].keys() else 1.0
selectedkwargs['SampledVarsPb'][key] = kwargs["SampledVarsPb"][key] if 'SampledVarsPb' in kwargs and key in kwargs["SampledVarsPb"] else 1.
return selectedkwargs

def createNewInput(self,myInput,samplerType,**kwargs):
Expand All @@ -422,7 +424,7 @@ def createNewInput(self,myInput,samplerType,**kwargs):
for modelIn, specs in self.modelsDictionary.items():
for inp in specs['Input']:
if inp not in allCoveredVariables:
self.raiseAnError(RuntimeError,"for sub-model "+ modelIn + " the input "+inp+" has not been found among other models' outputs and sampled variables!")
self.raiseAnError(RuntimeError,f"for sub-model {modelIn} the input {inp} has not been found among other models' outputs and sampled variables!")

## Now prepare the new inputs for each model
for modelIn, specs in self.modelsDictionary.items():
Expand Down Expand Up @@ -488,8 +490,8 @@ def collectOutput(self,finishedJob,output):

def getAdditionalInputEdits(self,inputInfo):
"""
Collects additional edits for the sampler to use when creating a new input. In this case, it calls all the getAdditionalInputEdits methods
of the sub-models
Collects additional edits for the sampler to use when creating a new input. In this case,
it calls all the getAdditionalInputEdits methods of the sub-models
@ In, inputInfo, dict, dictionary in which to add edits
@ Out, None.
"""
Expand Down Expand Up @@ -549,6 +551,7 @@ def submit(self,myInput,samplerType,jobHandler,**kwargs):
for index in range(nRuns):
if batchMode:
kw = kwargs['batchInfo']['batchRealizations'][index]
kw['batchRun'] = index + 1
else:
kw = kwargs

Expand All @@ -563,7 +566,14 @@ def submit(self,myInput,samplerType,jobHandler,**kwargs):
uniqueHandler=uniqueHandler, forceUseThreads=forceThreads,
groupInfo={'id': kwargs['batchInfo']['batchId'], 'size': nRuns} if batchMode else None)
else:
jobHandler.addClientJob((self, myInput, samplerType, kwargs), self.__class__.evaluateSample, prefix, kwargs)
# for parallel strategy 2, the ensemble model works as a step => it needs the jobHandler
kw['jobHandler'] = jobHandler
# for parallel strategy 2, we need to make sure that the batchMode is set to False in the inner runs since only the
# ensemble model evaluation should be batched (THIS IS REQUIRED because the CODE does not submit runs like the other models)
kw['batchMode'] = False
jobHandler.addClientJob((self, myInput, samplerType, kw), self.__class__.evaluateSample, prefix, metadata=metadata,
uniqueHandler=uniqueHandler,
groupInfo={'id': kwargs['batchInfo']['batchId'], 'size': nRuns} if batchMode else None)

def __retrieveDependentOutput(self,modelIn,listOfOutputs, typeOutputs):
"""
Expand Down Expand Up @@ -660,16 +670,19 @@ def _externalRun(self,inRun, jobHandler = None):#, jobHandler):
else:
self.raiseAnError(IOError,"No initial conditions provided for variable "+ initialConditionToSet)
# set new identifiers
inputKwargs[modelIn]['prefix'] = modelIn+utils.returnIdSeparator()+identifier
inputKwargs[modelIn]['uniqueHandler'] = self.name+identifier
suffix = ''
if 'batchRun' in inputKwargs[modelIn]:
suffix = f"{utils.returnIdSeparator()}{inputKwargs[modelIn]['batchRun']}"
inputKwargs[modelIn]['prefix'] = f"{modelIn}{utils.returnIdSeparator()}{identifier}{suffix}"
inputKwargs[modelIn]['uniqueHandler'] = f"{self.name}{identifier}{suffix}"
if metadataToTransfer is not None:
inputKwargs[modelIn]['metadataToTransfer'] = metadataToTransfer

for key, value in dependentOutput.items():
inputKwargs[modelIn]["SampledVars" ][key] = dependentOutput[key]
## FIXME it is a mistake (Andrea). The SampledVarsPb for this variable should be transferred from outside
## Who has this information? -- DPM 4/11/17
inputKwargs[modelIn]["SampledVarsPb"][key] = 1.0
inputKwargs[modelIn]["SampledVarsPb"][key] = 1.
self._replaceVariablesNamesWithAliasSystem(inputKwargs[modelIn]["SampledVars" ],'input',False)
self._replaceVariablesNamesWithAliasSystem(inputKwargs[modelIn]["SampledVarsPb"],'input',False)
## FIXME: this will come after we rework the "runInfo" collection in the code
Expand All @@ -696,8 +709,10 @@ def _externalRun(self,inRun, jobHandler = None):#, jobHandler):
if iterationCount == 1:
residueContainer[modelIn]['iterValues'][1][out] = np.zeros(len(residueContainer[modelIn]['iterValues'][0][out]))
for out in gotOutputs[modelCnt].keys():
residueContainer[modelIn]['residue'][out] = abs(np.asarray(residueContainer[modelIn]['iterValues'][0][out]) - np.asarray(residueContainer[modelIn]['iterValues'][1][out]))
residueContainer[modelIn]['Norm'] = np.linalg.norm(np.asarray(list(residueContainer[modelIn]['iterValues'][1].values()))-np.asarray(list(residueContainer[modelIn]['iterValues'][0].values())))
residueContainer[modelIn]['residue'][out] = abs(np.asarray(residueContainer[modelIn]['iterValues'][0][out]) -
np.asarray(residueContainer[modelIn]['iterValues'][1][out]))
residueContainer[modelIn]['Norm'] = np.linalg.norm(np.asarray(list(residueContainer[modelIn]['iterValues'][1].values()))-
np.asarray(list(residueContainer[modelIn]['iterValues'][0].values())))

# if nonlinear system, check the total residue and convergence
if self.activatePicard:
Expand Down Expand Up @@ -735,9 +750,11 @@ def __advanceModel(self, identifier, modelToExecute, origInputList, inputKwargs,
@ Out, evaluation, dict, the evaluation dictionary with the "unprojected" data
"""
returnDict = {}

suffix = ''
if 'batchRun' in inputKwargs:
suffix = f"{utils.returnIdSeparator()}{inputKwargs['batchRun']}"
self.raiseADebug('Submitting model',modelToExecute['Instance'].name)
localIdentifier = modelToExecute['Instance'].name+utils.returnIdSeparator()+identifier
localIdentifier = f"{modelToExecute['Instance'].name}{utils.returnIdSeparator()}{identifier}{suffix}"
if self.parallelStrategy == 1:
# we evaluate the model directly
try:
Expand All @@ -756,7 +773,7 @@ def __advanceModel(self, identifier, modelToExecute, origInputList, inputKwargs,
time.sleep(1.e-3)
moveOn = True
# get job that just finished to gather the results
finishedRun = jobHandler.getFinished(jobIdentifier = localIdentifier, uniqueHandler=self.name+identifier)
finishedRun = jobHandler.getFinished(jobIdentifier = localIdentifier, uniqueHandler=f"{self.name}{identifier}{suffix}")
evaluation = finishedRun[0].getEvaluation()
if isinstance(evaluation, rerror):
if finishedRun[0].exceptionTrace is not None:
Expand All @@ -767,7 +784,9 @@ def __advanceModel(self, identifier, modelToExecute, origInputList, inputKwargs,
evaluation = None
# the model failed
for modelToRemove in list(set(self.orderList) - set([modelToExecute['Instance'].name])):
jobHandler.getFinished(jobIdentifier = modelToRemove + utils.returnIdSeparator() + identifier, uniqueHandler = self.name + identifier)
jobHandler.getFinished(jobIdentifier = f"{modelToRemove}{utils.returnIdSeparator()}{identifier}{suffix}",
uniqueHandler = f"{self.name}{identifier}{suffix}")

else:
# collect the target evaluation
modelToExecute['Instance'].collectOutput(finishedRun[0],inRunTargetEvaluations)
Expand Down
2 changes: 1 addition & 1 deletion ravenframework/Steps/MultiRun.py
Original file line number Diff line number Diff line change
Expand Up @@ -214,7 +214,7 @@ def _localTakeAstepRun(self, inDictionary):
for idx in currentFailures:
finishedJobList.pop(idx)

if type(finishedJobObjs).__name__ in 'list': # TODO: should be consistent, if no batching should batch size be 1 or 0 ?
if isinstance(finishedJobObjs, list): # TODO: should be consistent, if no batching should batch size be 1 or 0 ?
# if sampler claims it's batching, then only collect once, since it will collect the batch
# together, not one-at-a-time
# FIXME: IN HERE WE SEND IN THE INSTANCE OF THE FIRST JOB OF A BATCH
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
# Copyright 2017 Battelle Energy Alliance, LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

def run(self, Input):
self.sum = self.A + self.B + self.C + self.D
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
# Copyright Nucube Energy, Inc.

def evaluate(self):
return self.decay_A+0.0001

Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
<?xml version="1.0" ?>
<AnalyticalBateman>
<totalTime>300</totalTime>
<powerHistory>1 1 1</powerHistory>
<flux>1e14 1e14 1e14</flux>
<stepDays>0 100 200 400</stepDays>
<timeSteps>100 100 100</timeSteps>
<nuclides>
<A>
<equationType>N1</equationType>
<initialMass>1.0</initialMass>
<decayConstant>$RAVEN-decay_A|10$</decayConstant>
<sigma>$RAVEN-sigma-A|10$</sigma>
<ANumber>230</ANumber>
</A>
<B>
<equationType>N2</equationType>
<initialMass>1.0</initialMass>
<decayConstant>$RAVEN-decay_B:0.000000006$</decayConstant>
<sigma>$RAVEN-sigma-B:5$</sigma>
<ANumber>200</ANumber>
</B>
<C>
<equationType>N3</equationType>
<initialMass>1.0</initialMass>
<decayConstant>$RAVEN-decay-C:0.000000008$</decayConstant>
<sigma>$RAVEN-sigma-C:3$</sigma>
<ANumber>150</ANumber>
</C>
<D>
<equationType>N4</equationType>
<initialMass>1.0</initialMass>
<decayConstant>$RAVEN-decay-D:0.000000009$</decayConstant>
<sigma>$RAVEN-sigma-D:1$</sigma>
<ANumber>100</ANumber>
</D>
</nuclides>
</AnalyticalBateman>

Loading