-
Notifications
You must be signed in to change notification settings - Fork 135
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
convert external post processor to use new DataObjects #479
Changes from 3 commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -379,6 +379,9 @@ def load(self,fName,style='netCDF',**kwargs): | |
@ In, kwargs, dict, optional, additional arguments to pass to reading function | ||
@ Out, None | ||
""" | ||
if len(self) > 0: | ||
self.raiseAnError(IOError, "Attempting to load data from a '{}', but target DataObject is not empty!".format(style) | ||
+ " This operation is not permitted; try outputting to a clean DataObject.") | ||
style = style.lower() | ||
# if fileToLoad in kwargs, then filename is actualle fName/fileToLoad | ||
if 'fileToLoad' in kwargs.keys(): | ||
|
@@ -503,7 +506,6 @@ def remove(self,realization=None,variable=None): | |
varlist.remove(variable) | ||
# remove from pivotParams, and remove any indexes without keys | ||
for pivot in self.indexes: | ||
print('DEBUGG checking pivot',pivot,self._pivotParams[pivot]) | ||
if variable in self._pivotParams[pivot]: | ||
self._pivotParams[pivot].remove(variable) | ||
if len(self._pivotParams[pivot]) == 0: | ||
|
@@ -938,8 +940,6 @@ def _fromCSV(self,fName,**kwargs): | |
@ In, kwargs, dict, optional arguments | ||
@ Out, None | ||
""" | ||
assert(self._data is None) | ||
assert(self._collector is None) | ||
# first, try to read from csv | ||
try: | ||
panda = pd.read_csv(fName+'.csv') | ||
|
@@ -1032,8 +1032,6 @@ def _fromDict(self,source,dims=None,**kwargs): | |
@ In, kwargs, dict, optional, additional arguments | ||
@ Out, None | ||
""" | ||
assert(self._data is None) | ||
assert(self._collector is None) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. same as above? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. the check is moved to the load method |
||
# not safe to default to dict, so if "dims" not specified set it here | ||
if dims is None: | ||
dims = {} | ||
|
@@ -1043,11 +1041,7 @@ def _fromDict(self,source,dims=None,**kwargs): | |
# etc | ||
## check that all inputs, outputs required are provided | ||
providedVars = set(source.keys()) | ||
requiredVars = set(self.getVars('input')+self.getVars('output')) | ||
## determine what vars are metadata (the "extra" stuff that isn't output or input | ||
# TODO don't take "extra", check registered meta explicitly | ||
extra = list(e for e in providedVars - requiredVars if e not in self.indexes) | ||
self._metavars = extra | ||
requiredVars = set(self.getVars()) | ||
## figure out who's missing from the IO space | ||
missing = requiredVars - providedVars | ||
if len(missing) > 0: | ||
|
@@ -1057,17 +1051,23 @@ def _fromDict(self,source,dims=None,**kwargs): | |
cols = len(self.getVars()) | ||
data = np.zeros([rows,cols],dtype=object) | ||
for i,var in enumerate(itertools.chain(self._inputs,self._outputs,self._metavars)): | ||
values = source[var] | ||
# TODO consistency checking with dimensions requested by the user? Or override them? | ||
# -> currently overriding them | ||
varDims = dims.get(var,[]) | ||
# TODO: This wastefully copies the source data, but it preserves the object dtype | ||
# Without this, a numpy.array of numpy.array will recast the xr.dataarray as simple numpy array | ||
if len(varDims) != 0: | ||
values = np.zeros(len(source[var]), dtype=object) | ||
else: | ||
values = source[var] | ||
# format higher-than-one-dimensional variables into a list of xr.DataArray | ||
for dim in varDims: | ||
## first, make sure we have all the dimensions for this variable | ||
if dim not in source.keys(): | ||
self.raiseAnError(KeyError,'Variable "{}" depends on dimension "{}" but it was not provided to _fromDict in the "source"!'.format(var,dim)) | ||
## construct ND arrays | ||
for v,val in enumerate(values): | ||
|
||
for v,val in enumerate(source[var]): | ||
## coordinates come from each dimension, specific to the "vth" realization | ||
coords = dict((dim,source[dim][v]) for dim in varDims) | ||
## swap-in-place the construction; this will likely error if there's inconsistencies | ||
|
@@ -1094,8 +1094,6 @@ def _fromNetCDF(self,fName, **kwargs): | |
""" | ||
# TODO set up to use dask for on-disk operations -> or is that a different data object? | ||
# TODO are these fair assertions? | ||
assert(self._data is None) | ||
assert(self._collector is None) | ||
self._data = xr.open_dataset(fName) | ||
# convert metadata back to XML files | ||
for key,val in self._data.attrs.items(): | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -22,6 +22,7 @@ | |
|
||
#External Modules--------------------------------------------------------------- | ||
import numpy as np | ||
import copy | ||
#External Modules End----------------------------------------------------------- | ||
|
||
#Internal Modules--------------------------------------------------------------- | ||
|
@@ -90,52 +91,44 @@ def inputToInternal(self, currentInput): | |
if type(currentInput) == dict and 'targets' in currentInput.keys(): | ||
return | ||
|
||
if type(currentInput) != list: | ||
currentInput = [currentInput] | ||
|
||
inputDict = {'targets':{}, 'metadata':{}} | ||
metadata = [] | ||
for item in currentInput: | ||
inType = None | ||
if hasattr(item, 'type'): | ||
inType = item.type | ||
elif type(item) in [list]: | ||
inType = "list" | ||
|
||
if isinstance(item,Files.File): | ||
if currentInput.subtype == 'csv': | ||
self.raiseAWarning(self, 'Input type ' + inType + ' not yet implemented. I am going to skip it.') | ||
elif inType == 'HDF5': | ||
# TODO | ||
self.raiseAWarning(self, 'Input type ' + inType + ' not yet implemented. I am going to skip it.') | ||
elif inType == 'PointSet': | ||
for param in item.getParaKeys('input'): | ||
inputDict['targets'][param] = item.getParam('input', param) | ||
for param in item.getParaKeys('output'): | ||
inputDict['targets'][param] = item.getParam('output', param) | ||
metadata.append(item.getAllMetadata()) | ||
elif inType =='HistorySet': | ||
outs, ins = item.getOutParametersValues(nodeId = 'ending'), item.getInpParametersValues(nodeId = 'ending') | ||
for param in item.getParaKeys('output'): | ||
inputDict['targets'][param] = [value[param] for value in outs.values()] | ||
for param in item.getParaKeys('input'): | ||
inputDict['targets'][param] = [value[param] for value in ins.values()] | ||
metadata.append(item.getAllMetadata()) | ||
elif inType != 'list': | ||
self.raiseAWarning(self, 'Input type ' + type(item).__name__ + ' not recognized. I am going to skip it.') | ||
|
||
# Not sure if we need it, but keep a copy of every inputs metadata | ||
inputDict['metadata'] = metadata | ||
if type(currentInput) == list: | ||
if len(currentInput) != 1: | ||
self.raiseAnError(IOError, "The postprocessor ", self.name, "only allows one input DataObjects," | ||
+ " but multiple inputs are provided!") | ||
else: | ||
currentInput = currentInput[-1] | ||
if hasattr(currentInput, 'type'): | ||
inType = currentInput.type | ||
else: | ||
self.raiseAnError(IOError, "Input type ", type(currentInput).__name__, ' is not recognized!') | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. In raven, all the objects have the There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Is it true? we used to check this. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. fixed |
||
|
||
if len(inputDict['targets'].keys()) == 0: | ||
self.raiseAnError(IOError, 'No input variables have been found in the input objects!') | ||
if inType in ['PointSet', 'HistorySet']: | ||
dataSet = currentInput.asDataset() | ||
else: | ||
self.raiseAnError(IOError, "Input type ", inType, ' is not yet implemented!') | ||
|
||
if len(currentInput) == 0: | ||
self.raiseAnError(IOError, 'The Input object ', currentInput.name, ' is empty!') | ||
inputDict = {} | ||
if inType == 'PointSet': | ||
for param in currentInput.getVars(): | ||
inputDict[param] = copy.copy(dataSet[param].values) | ||
elif inType == 'HistorySet': | ||
sliceList = currentInput.sliceByIndex('RAVEN_sample_ID') | ||
indexes = currentInput.indexes | ||
for param in currentInput.getVars('output'): | ||
inputDict[param] = [sliceData[param].dropna(indexes[-1]).values for sliceData in sliceList] | ||
for param in currentInput.getVars('input'): | ||
inputDict[param] = [sliceData[param].values for sliceData in sliceList] | ||
for param in indexes: | ||
inputDict[param] = [sliceData[param].dropna(indexes[-1]).values for sliceData in sliceList] | ||
|
||
for interface in self.externalInterfaces: | ||
for _ in self.methodsToRun: | ||
# The function should reference self and use the same variable names | ||
# as the xml file | ||
for param in interface.parameterNames(): | ||
if param not in inputDict['targets']: | ||
if param not in inputDict.keys(): | ||
self.raiseAnError(IOError, self, 'variable \"' + param | ||
+ '\" unknown. Please verify your ' | ||
+ 'external script (' | ||
|
@@ -198,113 +191,11 @@ def collectOutput(self, finishedJob, output): | |
|
||
if isinstance(output,Files.File): | ||
self.raiseAWarning('Output type File not yet implemented. I am going to skip it.') | ||
elif output.type == 'DataObjects': | ||
self.raiseAWarning('Output type ' + type(output).__name__ | ||
+ ' not yet implemented. I am going to skip it.') | ||
elif output.type == 'HDF5': | ||
self.raiseAWarning('Output type ' + type(output).__name__ | ||
+ ' not yet implemented. I am going to skip it.') | ||
elif output.type in ['PointSet','HistorySet'] : | ||
requestedInput = output.getParaKeys('input') | ||
## If you want to be able to dynamically add columns to your data, then | ||
## you should use this commented line, otherwise only the information | ||
## asked for by the user in the output data object will be available | ||
|
||
# requestedOutput = list(set(output.getParaKeys('output') + self.methodsToRun)) | ||
requestedOutput = output.getParaKeys('output') | ||
|
||
## The user can simply ask for a computation that may exist in multiple | ||
## interfaces, in that case, we will need to qualify their names for the | ||
## output. The names should already be qualified from the outputDict. | ||
## However, the user may have already qualified the name, so make sure and | ||
## test whether the unqualified name exists in the requestedOutput before | ||
## replacing it. | ||
for key, replacements in outputDict['qualifiedNames'].iteritems(): | ||
if key in requestedOutput: | ||
requestedOutput.remove(key) | ||
requestedOutput.extend(replacements) | ||
|
||
## Grab all data from the outputDict and anything else requested not | ||
## present in the outputDict will be copied from the input data. | ||
## TODO: User may want to specify which dataset the parameter comes from. | ||
## For now, we assume that if we find more than one an error will | ||
## occur. | ||
## FIXME: There is an issue that the data size should be determined before | ||
## entering this loop, otherwise if say a scalar is first added, | ||
## then dataLength will be 1 and everything longer will be placed | ||
## in the Metadata. | ||
## How do we know what size the output data should be? | ||
dataLength = None | ||
for key in requestedInput + requestedOutput: | ||
storeInOutput = True | ||
value = [] | ||
if key in outputDict: | ||
value = outputDict[key] | ||
else: | ||
foundCount = 0 | ||
if key in requestedInput: | ||
for inputData in inputList: | ||
if key in inputData.getParametersValues('input',nodeId = 'ending').keys() if inputData.type == 'PointSet' else inputData.getParametersValues('input',nodeId = 'ending').values()[-1].keys(): | ||
if inputData.type == 'PointSet': | ||
value = inputData.getParametersValues('input',nodeId = 'ending')[key] | ||
else: | ||
value = [value[key] for value in inputData.getParametersValues('input',nodeId = 'ending').values()] | ||
foundCount += 1 | ||
else: | ||
for inputData in inputList: | ||
if key in inputData.getParametersValues('output',nodeId = 'ending').keys() if inputData.type == 'PointSet' else inputData.getParametersValues('output',nodeId = 'ending').values()[-1].keys(): | ||
if inputData.type == 'PointSet': | ||
value = inputData.getParametersValues('output',nodeId = 'ending')[key] | ||
else: | ||
value = [value[key] for value in inputData.getParametersValues('output',nodeId = 'ending').values()] | ||
foundCount += 1 | ||
|
||
if foundCount == 0: | ||
self.raiseAnError(IOError, key + ' not found in the input ' | ||
+ 'object or the computed output ' | ||
+ 'object.') | ||
elif foundCount > 1: | ||
self.raiseAnError(IOError, key + ' is ambiguous since it occurs' | ||
+ ' in multiple input objects.') | ||
|
||
## We need the size to ensure the data size is consistent, but there | ||
## is no guarantee the data is not scalar, so this check is necessary | ||
myLength = 1 | ||
if not hasattr(value, "__iter__"): | ||
value = [value] | ||
myLength = len(value) | ||
|
||
if dataLength is None: | ||
dataLength = myLength | ||
elif dataLength != myLength: | ||
self.raiseAWarning('Requested output for ' + key + ' has a' | ||
+ ' non-conformant data size (' | ||
+ str(dataLength) + ' vs ' + str(myLength) | ||
+ '), it is being placed in the metadata.') | ||
storeInOutput = False | ||
|
||
## Finally, no matter what, place the requested data somewhere | ||
## accessible | ||
if storeInOutput: | ||
if key in requestedInput: | ||
for histNum, val in enumerate(value): | ||
param = key if output.type == 'PointSet' else [histNum+1,key] | ||
output.updateInputValue(param, val) | ||
else: | ||
for histNum, val in enumerate(value): | ||
if output.type == 'HistorySet': | ||
if histNum+1 in dataLenghtHistory.keys(): | ||
if dataLenghtHistory[histNum+1] != len(val): | ||
self.raiseAnError(IOError, key + ' the size of the arrays for history '+str(histNum+1)+' are different!') | ||
else: | ||
dataLenghtHistory[histNum+1] = len(val) | ||
param = key if output.type == 'PointSet' else [histNum+1,key] | ||
output.updateOutputValue(param, val) | ||
else: | ||
if not hasattr(value, "__iter__"): | ||
value = [value] | ||
for val in value: | ||
output.updateMetadata(key, val) | ||
elif output.type in ['PointSet', 'HistorySet']: | ||
output.load(outputDict, style='dict', dims=output.getDimensions()) | ||
else: | ||
self.raiseAnError(IOError, 'Unknown output type: ' + str(output.type)) | ||
|
||
|
@@ -315,8 +206,8 @@ def run(self, inputIn): | |
@ In, inputIn, dict, dictionary of data to process | ||
@ Out, outputDict, dict, Dictionary containing the post-processed results | ||
""" | ||
input = self.inputToInternal(inputIn) | ||
outputDict = {'qualifiedNames' : {}} | ||
inputDict = self.inputToInternal(inputIn) | ||
outputDict = {} | ||
## This will map the name to its appropriate interface and method | ||
## in the case of a function being defined in two separate files, we | ||
## qualify the output by appending the name of the interface from which it | ||
|
@@ -332,42 +223,55 @@ def run(self, inputIn): | |
matchingInterfaces.append(interface) | ||
if len(matchingInterfaces) == 0: | ||
self.raiseAWarning(method + ' not found. I will skip it.') | ||
elif len(matchingInterfaces) == 1: | ||
methodMap[method] = (matchingInterfaces[0], method) | ||
#elif len(matchingInterfaces) == 1: | ||
# methodMap[method] = (matchingInterfaces[0], method) | ||
else: | ||
outputDict['qualifiedNames'][method] = [] | ||
for interface in matchingInterfaces: | ||
methodName = interface.name + '.' + method | ||
methodName = interface.name + '_' + method | ||
methodMap[methodName] = (interface, method) | ||
outputDict['qualifiedNames'][method].append(methodName) | ||
|
||
## Evaluate the method and add it to the outputDict, also if the method | ||
## adjusts the input data, then you should update it as well. | ||
warningMessages = [] | ||
for methodName, (interface, method) in methodMap.iteritems(): | ||
outputDict[methodName] = interface.evaluate(method, input['targets']) | ||
# The deep copy is needed since the interface postprocesor will change the values of inputDict | ||
tempInputDict = copy.deepcopy(inputDict) | ||
outputDict[methodName] = np.atleast_1d(copy.copy(interface.evaluate(method, tempInputDict))) | ||
if outputDict[methodName] is None: | ||
self.raiseAnError(Exception,"the method "+methodName+" has not produced any result. It needs to return a result!") | ||
for target in input['targets']: | ||
for target in tempInputDict.keys(): | ||
if hasattr(interface, target): | ||
#if target not in outputDict.keys(): | ||
if target not in methodMap.keys(): | ||
attributeInSelf = getattr(interface, target) | ||
if len(np.atleast_1d(attributeInSelf)) != len(np.atleast_1d(input['targets'][target])) or (np.atleast_1d(attributeInSelf) - np.atleast_1d(input['targets'][target])).all(): | ||
if (np.atleast_1d(attributeInSelf)).shape != (np.atleast_1d(inputDict[target])).shape or (np.atleast_1d(attributeInSelf) - np.atleast_1d(inputDict[target])).all(): | ||
if target in outputDict.keys(): | ||
self.raiseAWarning("In Post-Processor "+ self.name +" the modified variable "+target+ | ||
" has the same name of a one already modified throuhg another Function method." + | ||
" has the same name of a one already modified through another Function method." + | ||
" This method overwrites the input DataObject variable value") | ||
outputDict[target] = attributeInSelf | ||
outputDict[target] = np.atleast_1d(attributeInSelf) | ||
else: | ||
warningMessages.append("In Post-Processor "+ self.name +" the method "+method+ | ||
" has the same name of a variable contained in the input DataObject." + | ||
" This method overwrites the input DataObject variable value") | ||
for msg in list(set(warningMessages)): | ||
self.raiseAWarning(msg) | ||
|
||
for target in input['targets'].keys(): | ||
if target not in outputDict.keys() and target in input['targets'].keys(): | ||
outputDict[target] = input['targets'][target] | ||
# TODO: We assume the structure of input to the external pp isthe same as the struture of output to this external pp | ||
# An interface pp should be used if the user wants to merge two data objects, or change the structures of input data | ||
# objects. | ||
numRlz = len(outputDict.values()[0]) | ||
for val in outputDict.values(): | ||
if len(val) != numRlz: | ||
self.raiseAnError(IOError, "The return results from the external functions have different number of lengths!" | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. fixed |
||
+ " This postpocessor ", self.name, " requests all the returned values should have the same lengths.") | ||
for target in inputDict.keys(): | ||
if target not in outputDict.keys(): | ||
if len(inputDict[target]) != numRlz: | ||
self.raiseAWarning("Parameter ", target, " is available in the provided input DataObjects," | ||
+ " but it has different length from the returned values from the external functions." | ||
+ " Thus this parameter will not be accessible by the output DataObjects!") | ||
else: | ||
outputDict[target] = np.atleast_1d(inputDict[target]) | ||
|
||
return outputDict |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
did you need to remove this?