Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Standardize collectOutput method for PostProcessor #1471

Merged
merged 13 commits into from
Mar 15, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
19 changes: 5 additions & 14 deletions framework/Models/PostProcessors/BasicStatistics.py
Original file line number Diff line number Diff line change
Expand Up @@ -140,11 +140,11 @@ def __init__(self, runInfoDict):
self.sampleTag = None # Tag used to track samples
self.pbPresent = False # True if the ProbabilityWeight is available
self.realizationWeight = None # The joint probabilities
self.outputDataset = False # True if the user wants to dump the outputs to dataset
self.steMetaIndex = 'targets' # when Dataset is requested as output, the default index of ste metadata is ['targets', self.pivotParameter]
self.multipleFeatures = True # True if multiple features are employed in linear regression as feature inputs
self.sampleSize = None # number of sample size
self.calculations = {}
self.validDataType = ['PointSet', 'HistorySet', 'DataSet'] # The list of accepted types of DataObject

def inputToInternal(self, currentInp):
"""
Expand Down Expand Up @@ -1296,22 +1296,13 @@ def run(self, inputIn):
outputSet = self.__runLocal(inputData)
return outputSet

def collectOutput(self, finishedJob, output):
def collectOutput(self, finishedJob, output, options=None):
"""
Function to place all of the computed data into the output object
@ In, finishedJob, JobHandler External or Internal instance, A JobHandler object that is in charge of running this post-processor
@ In, output, dataObjects, The object where we want to place our computed results
@ In, options, dict, optional, not used in PostProcessor.
dictionary of options that can be passed in when the collect of the output is performed by another model (e.g. EnsembleModel)
@ Out, None
"""
evaluation = finishedJob.getEvaluation()
outputRealization = evaluation[1]
if output.type in ['PointSet','HistorySet']:
if self.outputDataset:
self.raiseAnError(IOError, "DataSet output is required, but the provided type of DataObject is",output.type)
self.raiseADebug('Dumping output in data object named ' + output.name)
output.addRealization(outputRealization)
elif output.type in ['DataSet']:
self.raiseADebug('Dumping output in DataSet named ' + output.name)
output.load(outputRealization,style='dataset')
else:
self.raiseAnError(IOError, 'Output type ' + str(output.type) + ' unknown.')
PostProcessor.collectOutput(self, finishedJob, output, options=options)
73 changes: 20 additions & 53 deletions framework/Models/PostProcessors/DataClassifier.py
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,8 @@ def __init__(self, runInfoDict):
self.mapping = {} # dictionary for mapping input space between different DataObjects {'variableName':'externalFunctionName'}
self.funcDict = {} # Contains the function to be used {'variableName':externalFunctionInstance}
self.label = None # ID of the variable which containf the label values
self.outputMultipleRealizations = True # True indicate multiple realizations are returned
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is this variable being used?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, this is used to indicate what kind of method will be used to collect the output into the DataObjects. Currently, we have addRealization for single realization, and load method for multiple realizations.


# assembler objects to be requested
self.addAssemblerObject('Function', InputData.Quantity.one_to_infinity)

Expand Down Expand Up @@ -143,6 +145,8 @@ def inputToInternal(self, currentInput):
self.raiseAnError(IOError, "Only PointSet is allowed as classifier, but HistorySet", inputObject.name, "is provided!")
else:
dataType = 'target'
newInput[dataType]['data'] = inputObject.asDataset(outType='dict')['data']
newInput[dataType]['dims'] = inputObject.getDimensions()
if not haveTarget:
haveTarget = True
else:
Expand Down Expand Up @@ -189,14 +193,10 @@ def run(self, inputIn):
targetDict = inputDict['target']
classifierDict = inputDict['classifier']
outputDict = {}
outputDict.update(inputDict['target']['data'])
outputType = targetDict['type']
outputDict['dataType'] = outputType
outputDict['dataFrom'] = targetDict['name']
if outputType == 'HistorySet':
outputDict['historySizes'] = copy.copy(targetDict['historySizes'])

numRlz = utils.first(targetDict['input'].values()).size
outputDict[self.label] = np.empty(numRlz)
outputDict[self.label] = []
for i in range(numRlz):
tempTargDict = {}
for param, vals in targetDict['input'].items():
Expand All @@ -214,55 +214,22 @@ def run(self, inputIn):
labelIndex = labelIndex & set(inds)
if len(labelIndex) != 1:
self.raiseAnError(IOError, "The parameters", ",".join(tempTargDict.keys()), "with values", ",".join([str(el) for el in tempTargDict.values()]), "could not be put in any class!")
outputDict[self.label][i] = classifierDict['output'][self.label][list(labelIndex)[0]]

label = classifierDict['output'][self.label][list(labelIndex)[0]]
if outputType == 'PointSet':
outputDict[self.label].append(label)
else:
outputDict[self.label].append(np.asarray([label]*targetDict['historySizes'][i]))
outputDict[self.label] = np.asarray(outputDict[self.label])
outputDict = {'data': outputDict, 'dims':inputDict['target']['dims']}
return outputDict

def collectOutput(self, finishedJob, output):
def collectOutput(self, finishedJob, output, options=None):
"""
Method to place all of the computed data into output object
@ In, finishedJob, object, JobHandler object that is in charge of running this postprocessor
@ In, output, object, the object where we want to place our computed results
Function to place all of the computed data into the output object
@ In, finishedJob, JobHandler External or Internal instance, A JobHandler object that is in charge of running this post-processor
@ In, output, dataObjects, The object where we want to place our computed results
@ In, options, dict, optional, not used in PostProcessor.
dictionary of options that can be passed in when the collect of the output is performed by another model (e.g. EnsembleModel)
@ Out, None
"""
evaluation = finishedJob.getEvaluation()
inputObjects, outputDict = evaluation

if isinstance(output, Files.File):
self.raiseAnError(IOError, "Dump results to files is not yet implemented!")

for inp in inputObjects:
if inp.name == outputDict['dataFrom']:
inputObject = inp
break
if inputObject != output:
## Copy any data you need from the input DataObject before adding new data
rlzs = inputObject.asDataset(outType='dict')['data']
if output.type == 'PointSet':
output.load(rlzs, style='dict')
elif output.type == 'HistorySet':
if inputObject.type != 'HistorySet':
self.raiseAnError(IOError, "Copying the data from input PointSet", inputObject.name, "to output HistorySet", output.name, "is currently not allowed!")
output.load(rlzs, style='dict', dims=inputObject.getDimensions())

if output.type == 'PointSet':
output.addVariable(self.label, copy.copy(outputDict[self.label]), classify='output')
elif output.type == 'HistorySet':
numRlzs = output.size
labelValues = np.zeros(numRlzs, dtype=object)
pivotParams = tuple(output.indexes)
slices = output.sliceByIndex('RAVEN_sample_ID')
coordList = []
for i in range(numRlzs):
coordDict = {}
for elem in pivotParams:
coordDict[elem] = slices[i].dropna(elem)[elem]
coordList.append(coordDict)

for i in range(numRlzs):
histSize = outputDict['historySizes'][i]
values = np.empty(histSize)
values.fill(outputDict[self.label][i])
xrArray = xr.DataArray(values, dims=pivotParams, coords=coordList[i])
labelValues[i] = xrArray
output.addVariable(self.label, labelValues, classify='output')
PostProcessor.collectOutput(self, finishedJob, output, options=options)
38 changes: 15 additions & 23 deletions framework/Models/PostProcessors/ETImporter.py
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,12 @@ def __init__(self, runInfoDict):
# original tree
self.fileFormat = None # chosen format of the ET file
self.allowedFormats = ['OpenPSA'] # ET formats that are supported
self.validDataType = ['PointSet'] # The list of accepted types of DataObject
## Currently, we have used both DataObject.addRealization and DataObject.load to
## collect the PostProcessor returned outputs. DataObject.addRealization is used to
## collect single realization, while DataObject.load is used to collect multiple realizations
## However, the DataObject.load can not be directly used to collect single realization
self.outputMultipleRealizations = True

@classmethod
def getInputSpecification(cls):
Expand Down Expand Up @@ -95,35 +101,21 @@ def _handleInput(self, paramInput):
def run(self, inputs):
"""
This method executes the PostProcessor action.
@ In, inputs, list, list of file objects
@ Out, None
@ In, inputs, list, list of file objects
@ Out, outputDict, dict, dictionary of outputs
"""
eventTreeModel = ETStructure(self.expand, inputs)
return eventTreeModel.returnDict()
outputDict, variables = eventTreeModel.returnDict()
outputDict = {'data': outputDict, 'dims':{}}
return outputDict

def collectOutput(self, finishedJob, output):
def collectOutput(self, finishedJob, output, options=None):
"""
Function to place all of the computed data into the output object, (DataObjects)
@ In, finishedJob, object, JobHandler object that is in charge of running this PostProcessor
@ In, output, object, the object where we want to place our computed results
@ In, options, dict, optional, not used in PostProcessor.
dictionary of options that can be passed in when the collect of the output is performed by another model (e.g. EnsembleModel)
@ Out, None
"""
evaluation = finishedJob.getEvaluation()
outputDict ={}
outputDict['data'], variables = evaluation[1]
if not set(output.getVars('input')) == set(variables):
self.raiseAnError(RuntimeError, ' ETImporter: set of branching variables in the '
'ET ( ' + str(variables) + ' ) is not identical to the'
' set of input variables specified in the PointSet (' + str(output.getParaKeys('inputs')) +')')
# Output to file
if set(outputDict['data'].keys()) != set(output.getVars(subset='input')+output.getVars(subset='output')):
self.raiseAnError(RuntimeError, 'ETImporter failed: set of variables specified in the output '
'dataObject (' + str(set(outputDict['data'].keys())) + ') is different from the set of '
'variables specified in the ET (' + str(set(output.getVars(subset='input')+output.getVars(subset='output'))))
if output.type in ['PointSet']:
outputDict['dims'] = {}
for key in outputDict.keys():
outputDict['dims'][key] = []
output.load(outputDict['data'], style='dict', dims=outputDict['dims'])
else:
self.raiseAnError(RuntimeError, 'ETImporter failed: Output type ' + str(output.type) + ' is not supported.')
PostProcessor.collectOutput(self, finishedJob, output, options=options)
30 changes: 15 additions & 15 deletions framework/Models/PostProcessors/FTImporter.py
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,12 @@ def __init__(self, runInfoDict):
self.printTag = 'POSTPROCESSOR FT IMPORTER'
self.FTFormat = None # chosen format of the FT file
self.topEventID = None
self.validDataType = ['PointSet'] # The list of accepted types of DataObject
## Currently, we have used both DataObject.addRealization and DataObject.load to
## collect the PostProcessor returned outputs. DataObject.addRealization is used to
## collect single realization, while DataObject.load is used to collect multiple realizations
## However, the DataObject.load can not be directly used to collect single realization
self.outputMultipleRealizations = True

def initialize(self, runInfo, inputs, initDict) :
"""
Expand Down Expand Up @@ -89,26 +95,20 @@ def run(self, inputs):
"""
This method executes the postprocessor action.
@ In, inputs, list, list of file objects
@ Out, out, dict, dict containing the processed FT
@ Out, outputDict, dict, dict containing the processed FT
"""
faultTreeModel = FTStructure(inputs, self.topEventID)
return faultTreeModel.returnDict()
outputDict = faultTreeModel.returnDict()
outputDict = {'data': outputDict, 'dims':{}}
return outputDict

def collectOutput(self, finishedJob, output):
def collectOutput(self, finishedJob, output, options=None):
"""
Function to place all of the computed data into the output object, (DataObjects)
@ In, finishedJob, object, JobHandler object that is in charge of running this postprocessor
@ In, finishedJob, object, JobHandler object that is in charge of running this PostProcessor
@ In, output, object, the object where we want to place our computed results
@ In, options, dict, optional, not used in PostProcessor.
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

should we specify it since PP doesn't use it?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It inherits from the Model base class.

dictionary of options that can be passed in when the collect of the output is performed by another model (e.g. EnsembleModel)
@ Out, None
"""
evaluation = finishedJob.getEvaluation()
outputDict ={}
outputDict['data'] = evaluation[1]

outputDict['dims'] = {}
for key in outputDict['data'].keys():
outputDict['dims'][key] = []
if output.type in ['PointSet']:
output.load(outputDict['data'], style='dict', dims=outputDict['dims'])
else:
self.raiseAnError(RuntimeError, 'FTImporter failed: Output type ' + str(output.type) + ' is not supported.')
PostProcessor.collectOutput(self, finishedJob, output, options=options)
19 changes: 12 additions & 7 deletions framework/Models/PostProcessors/InterfacedPostProcessor.py
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,11 @@ def __init__(self, runInfoDict):
"""
PostProcessor.__init__(self, runInfoDict)
self.methodToRun = None
## Currently, we have used both DataObject.addRealization and DataObject.load to
## collect the PostProcessor returned outputs. DataObject.addRealization is used to
## collect single realization, while DataObject.load is used to collect multiple realizations
## However, the DataObject.load can not be directly used to collect single realization
self.outputMultipleRealizations = True

def initialize(self, runInfo, inputs, initDict):
"""
Expand Down Expand Up @@ -199,13 +204,13 @@ def returnFormat(self,location):
form = self.postProcessor.outputFormat
return form

def collectOutput(self, finishedJob, output):
def collectOutput(self, finishedJob, output, options=None):
"""
Function to place all of the computed data into the output object
@ In, finishedJob, JobHandler External or Internal instance, A JobHandler object that is in charge of running this post-processor
@ In, output, dataObjects, The object where we want to place our computed results
Function to place all of the computed data into the output object, (DataObjects)
@ In, finishedJob, object, JobHandler object that is in charge of running this PostProcessor
@ In, output, object, the object where we want to place our computed results
@ In, options, dict, optional, not used in PostProcessor.
dictionary of options that can be passed in when the collect of the output is performed by another model (e.g. EnsembleModel)
@ Out, None
"""
evaluations = finishedJob.getEvaluation()
evaluation = evaluations[1]
output.load(evaluation['data'], style='dict', dims=evaluation['dims'])
PostProcessor.collectOutput(self, finishedJob, output, options=options)
24 changes: 11 additions & 13 deletions framework/Models/PostProcessors/MCSimporter.py
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,12 @@ def __init__(self, runInfoDict):
self.expand = None # option that controls the structure of the ET. If True, the tree is expanded so that
# all possible sequences are generated. Sequence label is maintained according to the
# original tree
self.validDataType = ['PointSet'] # The list of accepted types of DataObject
## Currently, we have used both DataObject.addRealization and DataObject.load to
## collect the PostProcessor returned outputs. DataObject.addRealization is used to
## collect single realization, while DataObject.load is used to collect multiple realizations
## However, the DataObject.load can not be directly used to collect single realization
self.outputMultipleRealizations = True

@classmethod
def getInputSpecification(cls):
Expand Down Expand Up @@ -138,27 +144,19 @@ def run(self, inputs):
for be in mcs:
mcsPointSet[be][counter] = 1.0
counter = counter+1

mcsPointSet = {'data': mcsPointSet, 'dims': {}}
return mcsPointSet

def collectOutput(self, finishedJob, output):
def collectOutput(self, finishedJob, output, options=None):
"""
Function to place all of the computed data into the output object, (DataObjects)
@ In, finishedJob, object, JobHandler object that is in charge of running this PostProcessor
@ In, output, object, the object where we want to place our computed results
@ In, options, dict, optional, not used in PostProcessor.
dictionary of options that can be passed in when the collect of the output is performed by another model (e.g. EnsembleModel)
@ Out, None
"""
evaluation = finishedJob.getEvaluation()
outputDict ={}
outputDict['data'] = evaluation[1]

if output.type in ['PointSet']:
outputDict['dims'] = {}
for key in outputDict.keys():
outputDict['dims'][key] = []
output.load(outputDict['data'], style='dict', dims=outputDict['dims'])
else:
self.raiseAnError(RuntimeError, 'MCSImporter failed: Output type ' + str(output.type) + ' is not supported.')
PostProcessor.collectOutput(self, finishedJob, output, options=options)

def mcsReader(mcsListFile):
"""
Expand Down
23 changes: 8 additions & 15 deletions framework/Models/PostProcessors/ParetoFrontierPostProcessor.py
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,8 @@ def __init__(self, runInfoDict):
self.costLimit = None # variable associated with the upper limit of the cost dimension
self.invCost = False # variable which indicates if the cost dimension is inverted (e.g., it represents savings rather than costs)
self.invValue = False # variable which indicates if the value dimension is inverted (e.g., it represents a lost value rather than value)
self.validDataType = ['PointSet'] # The list of accepted types of DataObject
self.outputMultipleRealizations = True # True indicate multiple realizations are returned

@classmethod
def getInputSpecification(cls):
Expand Down Expand Up @@ -161,25 +163,16 @@ def run(self, inputIn):
paretoFrontierDict = {}
for index,varID in enumerate(sortedData.data_vars):
paretoFrontierDict[varID] = paretoFrontierData[:,index]

paretoFrontierDict = {'data':paretoFrontierDict, 'dims':{}}
return paretoFrontierDict

def collectOutput(self, finishedJob, output):
def collectOutput(self, finishedJob, output, options=None):
"""
Function to place all of the computed data into the output object
@ In, finishedJob, JobHandler External or Internal instance, A JobHandler object that is in charge of running this post-processor
@ In, output, DataObject.DataObject, The object where we want to place our computed results
@ In, output, dataObjects, The object where we want to place our computed results
@ In, options, dict, optional, not used in PostProcessor.
dictionary of options that can be passed in when the collect of the output is performed by another model (e.g. EnsembleModel)
@ Out, None
"""
evaluation = finishedJob.getEvaluation()

outputDict ={}
outputDict['data'] = evaluation[1]

if output.type in ['PointSet']:
outputDict['dims'] = {}
for key in outputDict.keys():
outputDict['dims'][key] = []
output.load(outputDict['data'], style='dict', dims=outputDict['dims'])
else:
self.raiseAnError(RuntimeError, 'ParetoFrontier failed: Output type ' + str(output.type) + ' is not supported.')
PostProcessor.collectOutput(self, finishedJob, output, options=options)
Loading