diff --git a/esmvalcore/_data_finder.py b/esmvalcore/_data_finder.py index 7a27dee4f6..17c13f6488 100644 --- a/esmvalcore/_data_finder.py +++ b/esmvalcore/_data_finder.py @@ -240,7 +240,7 @@ def get_input_filelist(variable, rootpath, drs): return (files, dirnames, filenames) -def get_output_file(variable, preproc_dir): +def get_output_file(variable, preproc_dir, fx_var_alias=None): """Return the full path to the output (preprocessed) file.""" cfg = get_project_config(variable['project']) @@ -249,11 +249,12 @@ def get_output_file(variable, preproc_dir): variable = dict(variable) variable['exp'] = '-'.join(variable['exp']) + tag_source = variable if fx_var_alias is None else fx_var_alias outfile = os.path.join( preproc_dir, variable['diagnostic'], variable['variable_group'], - _replace_tags(cfg['output_file'], variable)[0], + _replace_tags(cfg['output_file'], tag_source)[0], ) if variable['frequency'] != 'fx': outfile += '_{start_year}-{end_year}'.format(**variable) diff --git a/esmvalcore/_recipe.py b/esmvalcore/_recipe.py index f21c8aa206..cc030ad5d3 100644 --- a/esmvalcore/_recipe.py +++ b/esmvalcore/_recipe.py @@ -12,7 +12,8 @@ from . import __version__ from . import _recipe_checks as check -from ._config import TAGS, get_activity, get_institutes, replace_tags +from ._config import (TAGS, get_activity, get_institutes, get_project_config, + replace_tags) from ._data_finder import (get_input_filelist, get_output_file, get_statistic_output_file) from ._provenance import TrackedFile, get_recipe_provenance @@ -22,7 +23,7 @@ from .cmor.table import CMOR_TABLES from .preprocessor import (DEFAULT_ORDER, FINAL_STEPS, INITIAL_STEPS, MULTI_MODEL_FUNCTIONS, PreprocessingTask, - PreprocessorFile) + PreprocessorFile, TIME_PREPROCESSORS) from .preprocessor._derive import get_required from .preprocessor._download import synda_search from .preprocessor._io import DATASET_KEYS, concatenate_callback @@ -128,13 +129,13 @@ def _special_name_to_dataset(variable, special_name): if special_name in ('reference_dataset', 'alternative_dataset'): if special_name not in variable: raise RecipeError( - "Preprocessor {} uses {}, but {} is not defined for " - "variable {} of diagnostic {}".format( - variable['preprocessor'], - special_name, - special_name, - variable['short_name'], - variable['diagnostic'], + "Preprocessor {preproc} uses {name}, but {name} is not " + "defined for variable {short_name} of diagnostic " + "{diagnostic}".format( + preproc=variable['preprocessor'], + name=special_name, + short_name=variable['short_name'], + diagnostic=variable['diagnostic'], )) special_name = variable[special_name] @@ -174,8 +175,8 @@ def _update_target_levels(variable, variables, settings, config_user): short_name=variable_data['short_name'], mip=variable_data['mip'], frequency=variable_data['frequency'], - fix_dir=os.path.splitext( - variable_data['filename'])[0] + '_fixed', + fix_dir=os.path.splitext(variable_data['filename'])[0] + + '_fixed', ) @@ -231,8 +232,8 @@ def _dataset_to_file(variable, config_user): for required_var in required_vars: _augment(required_var, variable) _add_cmor_info(required_var, override=True) - (files, dirnames, filenames) = _get_input_files(required_var, - config_user) + (files, dirnames, + filenames) = _get_input_files(required_var, config_user) if files: variable = required_var break @@ -261,8 +262,7 @@ def _limit_datasets(variables, profile, max_datasets=0): if variable not in limited: limited.append(variable) - logger.info("Only considering %s", - ', '.join(v['alias'] for v in limited)) + logger.info("Only considering %s", ', '.join(v['alias'] for v in limited)) return limited @@ -352,59 +352,68 @@ def _get_default_settings(variable, config_user, derive=False): def _add_fxvar_keys(fx_var_dict, variable): """Add keys specific to fx variable to use get_input_filelist.""" fx_variable = dict(variable) + fx_variable.update(fx_var_dict) # set variable names fx_variable['variable_group'] = fx_var_dict['short_name'] - fx_variable['short_name'] = fx_var_dict['short_name'] - # specificities of project + # add special ensemble for CMIP5 only if fx_variable['project'] == 'CMIP5': - fx_variable['mip'] = 'fx' fx_variable['ensemble'] = 'r0i0p0' - elif fx_variable['project'] == 'CMIP6': - fx_variable['grid'] = variable['grid'] - if 'mip' in fx_var_dict: - fx_variable['mip'] = fx_var_dict['mip'] - elif fx_variable['project'] in ['OBS', 'OBS6', 'obs4mips']: - fx_variable['mip'] = 'fx' + # add missing cmor info _add_cmor_info(fx_variable, override=True) + return fx_variable -def _get_correct_fx_file(variable, fx_varname, config_user): +def _get_correct_fx_file(variable, fx_variable, config_user): """Get fx files (searching all possible mips).""" - # TODO: allow user to specify certain mip if desired + # make it a dict + if isinstance(fx_variable, str): + fx_varname = fx_variable + fx_variable = {'short_name': fx_varname} + else: + fx_varname = fx_variable['short_name'] + + # assemble info from master variable var = dict(variable) var_project = variable['project'] + get_project_config(var_project) cmor_table = CMOR_TABLES[var_project] + valid_fx_vars = [] - # Get all fx-related mips ('fx' always first, original mip last) - fx_mips = ['fx'] - fx_mips.extend( - [key for key in cmor_table.tables if 'fx' in key and key != 'fx']) - fx_mips.append(variable['mip']) + # force only the mip declared by user + if 'mip' in fx_variable: + fx_mips = [fx_variable['mip']] + else: + # Get all fx-related mips (original var mip, + # 'fx' and extend from cmor tables) + fx_mips = [variable['mip'], 'fx'] + fx_mips.extend( + [key for key in cmor_table.tables if 'fx' in key and key != 'fx']) # Search all mips for available variables + # priority goes to user specified mip if available searched_mips = [] + fx_files = [] for fx_mip in fx_mips: - fx_variable = cmor_table.get_variable(fx_mip, fx_varname) - if fx_variable is not None: + fx_cmor_variable = cmor_table.get_variable(fx_mip, fx_varname) + if fx_cmor_variable is not None: + fx_var_dict = dict(fx_variable) searched_mips.append(fx_mip) - fx_var = _add_fxvar_keys( - {'short_name': fx_varname, 'mip': fx_mip}, var) - logger.debug("For CMIP6 fx variable '%s', found table '%s'", - fx_varname, fx_mip) - fx_files = _get_input_files(fx_var, config_user)[0] + fx_var_dict['mip'] = fx_mip + fx_var_dict = _add_fxvar_keys(fx_var_dict, var) + valid_fx_vars.append(fx_var_dict) + logger.debug("For fx variable '%s', found table '%s'", fx_varname, + fx_mip) + fx_files = _get_input_files(fx_var_dict, config_user)[0] # If files found, return them if fx_files: - logger.debug("Found CMIP6 fx variables '%s':\n%s", - fx_varname, pformat(fx_files)) + logger.debug("Found fx variables '%s':\n%s", fx_varname, + pformat(fx_files)) break - else: - # No files found - fx_files = [] # If fx variable was not found in any table, raise exception if not searched_mips: @@ -412,22 +421,17 @@ def _get_correct_fx_file(variable, fx_varname, config_user): f"Requested fx variable '{fx_varname}' not available in " f"any 'fx'-related CMOR table ({fx_mips}) for '{var_project}'") + # flag a warning + if not fx_files: + logger.warning("Missing data for fx variable '%s'", fx_varname) + # allow for empty lists corrected for by NE masks if fx_files: fx_files = fx_files[0] + if valid_fx_vars: + valid_fx_vars = valid_fx_vars[0] - return fx_files - - -def _get_landsea_fraction_fx_dict(variable, config_user): - """Get dict of available ``sftlf`` and ``sftof`` variables.""" - fx_dict = {} - fx_vars = ['sftlf'] - if variable['project'] != 'obs4mips': - fx_vars.append('sftof') - for fx_var in fx_vars: - fx_dict[fx_var] = _get_correct_fx_file(variable, fx_var, config_user) - return fx_dict + return fx_files, valid_fx_vars def _exclude_dataset(settings, variable, step): @@ -449,41 +453,67 @@ def _update_weighting_settings(settings, variable): _exclude_dataset(settings, variable, 'weighting_landsea_fraction') +def _update_fx_files(step, settings, variable, config_user, fx_vars): + """Update settings with mask fx file list or dict.""" + logger.debug('Getting fx settings for {} now...'.format(step)) + non_preproc_fx_steps = [ + 'mask_landsea', 'mask_landseaice', 'weighting_landsea_fraction' + ] + preproc_fx_steps = [ + 'area_statistics', 'volume_statistics', 'zonal_statistics' + ] + + fx_vars = [ + _get_correct_fx_file(variable, fxvar, config_user) + for fxvar in fx_vars + ] + + if step in non_preproc_fx_steps: + fx_dict = {fx_var[1]['short_name']: fx_var[0] for fx_var in fx_vars} + elif step in preproc_fx_steps: + fx_dict = { + fx_var[1]['short_name']: + get_output_file(variable, + config_user['preproc_dir'], + fx_var_alias=fx_var[1]) + for fx_var in fx_vars if fx_var[1] + } + settings['fx_files'] = fx_dict + logger.info('Using fx_files: %s', pformat(settings['fx_files'])) + + def _update_fx_settings(settings, variable, config_user): - """Find and set the FX mask settings.""" - msg = f"Using fx files for %s of dataset {variable['dataset']}:\n%s" - if 'mask_landsea' in settings: - logger.debug('Getting fx mask settings now...') - fx_dict = _get_landsea_fraction_fx_dict(variable, config_user) - fx_list = [fx_file for fx_file in fx_dict.values() if fx_file] - settings['mask_landsea']['fx_files'] = fx_list - logger.info(msg, 'land/sea masking', pformat(fx_dict)) - - if 'mask_landseaice' in settings: - logger.debug('Getting fx mask settings now...') - settings['mask_landseaice']['fx_files'] = [] - fx_files_dict = { - 'sftgif': _get_correct_fx_file(variable, 'sftgif', config_user)} - if fx_files_dict['sftgif']: - settings['mask_landseaice']['fx_files'].append( - fx_files_dict['sftgif']) - logger.info(msg, 'land/sea ice masking', pformat(fx_files_dict)) - - if 'weighting_landsea_fraction' in settings: - logger.debug("Getting fx files for landsea fraction weighting now...") - fx_dict = _get_landsea_fraction_fx_dict(variable, config_user) - settings['weighting_landsea_fraction']['fx_files'] = fx_dict - logger.info(msg, 'land/sea fraction weighting', pformat(fx_dict)) - - for step in ('area_statistics', 'volume_statistics'): - if settings.get(step, {}).get('fx_files'): - var = dict(variable) - var['fx_files'] = settings.get(step, {}).get('fx_files') - fx_files_dict = { - fxvar: _get_correct_fx_file(variable, fxvar, config_user) - for fxvar in var['fx_files']} - settings[step]['fx_files'] = fx_files_dict - logger.info(msg, step, pformat(fx_files_dict)) + """Update fx settings depending on the needed method.""" + msg = f"Using fx files for %s of dataset %s:\n%s" + + # get fx variables either from user defined attribute or fixed + def _get_fx_vars_from_attribute(settings, step_name): + user_fx_vars = settings.get(step_name, {}).get('fx_files') + if not user_fx_vars: + user_fx_vars = ['sftlf'] + if variable['project'] != 'obs4mips': + user_fx_vars.append('sftof') + if step_name == 'mask_landseaice': + user_fx_vars = ['sftgif'] + return user_fx_vars + + fx_steps = [ + 'mask_landsea', 'mask_landseaice', 'weighting_landsea_fraction', + 'area_statistics', 'volume_statistics', 'zonal_statistics' + ] + update_methods = { + step: (_update_fx_files, { + 'fx_vars': _get_fx_vars_from_attribute(settings, step) + }) + for step in fx_steps + } + for step_name, step_settings in settings.items(): + update_method, kwargs = update_methods.get(step_name, (None, {})) + if update_method: + update_method(step_name, step_settings, variable, config_user, + **kwargs) + logger.info(msg, variable['short_name'], variable['dataset'], + step_name) def _read_attributes(filename): @@ -501,10 +531,10 @@ def _read_attributes(filename): def _get_input_files(variable, config_user): """Get the input files for a single dataset (locally and via download).""" - (input_files, dirnames, filenames) = get_input_filelist( - variable=variable, - rootpath=config_user['rootpath'], - drs=config_user['drs']) + (input_files, dirnames, + filenames) = get_input_filelist(variable=variable, + rootpath=config_user['rootpath'], + drs=config_user['drs']) # Set up downloading using synda if requested. # Do not download if files are already available locally. @@ -518,8 +548,8 @@ def _get_input_files(variable, config_user): def _get_ancestors(variable, config_user): """Get the input files for a single dataset and setup provenance.""" - (input_files, dirnames, filenames) = _get_input_files(variable, - config_user) + (input_files, dirnames, + filenames) = _get_input_files(variable, config_user) logger.info("Using input files for variable %s of dataset %s:\n%s", variable['short_name'], variable['dataset'], @@ -664,14 +694,31 @@ def get_matching(attributes): return grouped_products -def _get_preprocessor_products(variables, profile, order, ancestor_products, - config_user): - """Get preprocessor product definitions for a set of datasets.""" +def _get_preprocessor_products(variables, + profile, + order, + ancestor_products, + config_user, + var_fxvar_coupling=False): + """ + Get preprocessor product definitions for a set of datasets. + + + It updates recipe settings as needed by various preprocessors + and sets the correct ancestry. + """ products = set() - for variable in variables: - variable['filename'] = get_output_file(variable, - config_user['preproc_dir']) + if not var_fxvar_coupling: + for variable in variables: + variable['filename'] = get_output_file(variable, + config_user['preproc_dir']) + else: + parent_variable = variables.pop() + for variable in variables: + variable['filename'] = get_output_file(parent_variable, + config_user['preproc_dir'], + fx_var_alias=variable) if ancestor_products: grouped_ancestors = _match_products(ancestor_products, variables) @@ -694,9 +741,9 @@ def _get_preprocessor_products(variables, profile, order, ancestor_products, ) _update_extract_shape(settings, config_user) _update_weighting_settings(settings, variable) - _update_fx_settings( - settings=settings, variable=variable, - config_user=config_user) + _update_fx_settings(settings=settings, + variable=variable, + config_user=config_user) _update_target_grid( variable=variable, variables=variables, @@ -705,7 +752,11 @@ def _get_preprocessor_products(variables, profile, order, ancestor_products, ) _update_regrid_time(variable, settings) ancestors = grouped_ancestors.get(variable['filename']) - if not ancestors: + fx_files_in_settings = [ + setting.get('fx_files') for setting in settings.values() + if setting.get('fx_files') is not None + ] + if not ancestors or fx_files_in_settings: ancestors = _get_ancestors(variable, config_user) if config_user.get('skip-nonexistent') and not ancestors: logger.info("Skipping: no data found for %s", variable) @@ -729,7 +780,8 @@ def _get_single_preprocessor_task(variables, profile, config_user, name, - ancestor_tasks=None): + ancestor_tasks=None, + var_fxvar_coupling=False): """Create preprocessor tasks for a set of datasets w/ special case fx.""" if ancestor_tasks is None: ancestor_tasks = [] @@ -746,7 +798,7 @@ def _get_single_preprocessor_task(variables, order=order, ancestor_products=ancestor_products, config_user=config_user, - ) + var_fxvar_coupling=var_fxvar_coupling) if not products: raise RecipeError( @@ -820,8 +872,7 @@ def append(group_prefix, var): for variable in variables: group_prefix = variable['variable_group'] + '_derive_input_' if not variable.get('force_derivation') and _get_input_files( - variable, - config_user)[0]: + variable, config_user)[0]: # No need to derive, just process normally up to derive step var = deepcopy(variable) append(group_prefix, var) @@ -845,6 +896,56 @@ def append(group_prefix, var): return derive_input +def _get_filtered_fxprofile(fxprofile): + """Remove all time preprocessors from the fx profile (returns a copy).""" + fxprofile = deepcopy(fxprofile) + for key in fxprofile: + if key in TIME_PREPROCESSORS: + fxprofile[key] = False + + return fxprofile + + +def _add_fxvar_preprocessing_ancestors(step, profile, variables, config_user, + task_name): + """Bolt on the fx vars preproc ancestors, if needed.""" + fx_preproc_tasks = [] + # conserve profile + fx_profile = deepcopy(profile) + fx_vars = profile.get(step, {}).get('fx_files') + + # Create tasks to prepare the input data for the fx var + order = _extract_preprocessor_order(fx_profile) + for var in variables: + fx_variables = [ + _get_correct_fx_file(var, fx_var, config_user)[1] + for fx_var in fx_vars + ] + for fx_variable in fx_variables: + # list may be (intentionally) empty - catch it here + if not fx_variable: + raise RecipeError(f"One or more of {step} fx data " + f"for {var['short_name']} are missing. " + f"Task can not be performed since there is " + f"no fx data found.") + before, _ = _split_settings(fx_profile, step, order) + # remove time preprocessors for any fx/Ofx/Efx/etc + # that dont have time coords + if fx_variable['frequency'] == 'fx': + before = _get_filtered_fxprofile(before) + fx_name = task_name.split( + TASKSEP)[0] + TASKSEP + 'fx_area-volume_stats_' + \ + fx_variable['variable_group'] + task = _get_single_preprocessor_task([fx_variable, var], + before, + config_user, + name=fx_name, + var_fxvar_coupling=True) + fx_preproc_tasks.append(task) + + return fx_preproc_tasks + + def _get_preprocessor_task(variables, profiles, config_user, task_name): """Create preprocessor task(s) for a set of datasets.""" # First set up the preprocessor profile @@ -863,6 +964,7 @@ def _get_preprocessor_task(variables, profiles, config_user, task_name): _add_cmor_info(variable) # Create preprocessor task(s) derive_tasks = [] + # set up tasks if variable.get('derive'): # Create tasks to prepare the input data for the derive step derive_profile, profile = _split_derive_profile(profile) @@ -881,6 +983,13 @@ def _get_preprocessor_task(variables, profiles, config_user, task_name): ) derive_tasks.append(task) + # special case: fx variable pre-processing + for step in ('area_statistics', 'volume_statistics', 'zonal_statistics'): + if profile.get(step, {}).get('fx_files'): + derive_tasks.extend( + _add_fxvar_preprocessing_ancestors(step, profile, variables, + config_user, task_name)) + # Create (final) preprocessor task task = _get_single_preprocessor_task( variables, @@ -893,14 +1002,33 @@ def _get_preprocessor_task(variables, profiles, config_user, task_name): return task +def _check_duplication(task): + """Check and remove duplicate ancestry tasks.""" + ancestors_filename_dict = OrderedDict() + filenames = [] + if isinstance(task, PreprocessingTask): + for ancestor_task in task.ancestors: + for anc_product in ancestor_task.products: + ancestors_filename_dict[anc_product.filename] = ancestor_task + filenames.append(anc_product.filename) + set_ancestors = { + ancestors_filename_dict[filename] + for filename in filenames + } + task.ancestors = [ + ancestor for ancestor in task.ancestors + if ancestor in set_ancestors + ] + + return task + + class Recipe: """Recipe object.""" - info_keys = ( - 'project', 'activity', 'dataset', 'exp', 'ensemble', 'version' - ) + info_keys = ('project', 'activity', 'dataset', 'exp', 'ensemble', + 'version') """List of keys to be used to compose the alias, ordered by priority.""" - def __init__(self, raw_recipe, config_user, @@ -982,9 +1110,9 @@ def _initialize_datasets(raw_datasets): @staticmethod def _expand_ensemble(variables): """ - Expand ensemble members to multiple datasets + Expand ensemble members to multiple datasets. - Expansion only support ensembles defined as strings, not lists + Expansion only supports ensembles defined as strings, not lists. """ expanded = [] regex = re.compile(r'\(\d+:\d+\)') @@ -997,7 +1125,7 @@ def _expand_ensemble(variables): if not match: expanded.append(variable) continue - start, end = match.group(0)[1: -1].split(':') + start, end = match.group(0)[1:-1].split(':') for i in range(int(start), int(end) + 1): expand = deepcopy(variable) expand['ensemble'] = regex.sub(str(i), ensemble, 1) @@ -1074,8 +1202,7 @@ def _initialize_preprocessor_output(self, diagnostic_name, raw_variables, return preprocessor_output def _set_alias(self, preprocessor_output): - """ - Add unique alias for datasets. + """Add unique alias for datasets. Generates a unique alias for each dataset that will be shared by all variables. Tries to make it as small as possible to make it useful for @@ -1092,7 +1219,7 @@ def _set_alias(self, preprocessor_output): Function will not modify alias if it is manually added to the recipe but it will use the dataset info to compute the others - Examples: + Examples -------- - {project: CMIP5, model: EC-Earth, ensemble: r1i1p1} - {project: CMIP6, model: EC-Earth, ensemble: r1i1p1f1} @@ -1116,7 +1243,7 @@ def _set_alias(self, preprocessor_output): - {project: CMIP5, model: EC-Earth, experiment: historical} will generate alias 'EC-Earth' - Parameters: + Parameters ---------- preprocessor_output : dict preprocessor output dictionary @@ -1263,6 +1390,11 @@ def initialize_tasks(self): config_user=self._cfg, task_name=task_name, ) + # remove possible duplicate ancestor tasks that + # preprocess the same + # fx var file needed by different var["activity"] but with + # the same output fx var file + _check_duplication(task) for task0 in task.flatten(): task0.priority = priority tasks.add(task) diff --git a/esmvalcore/preprocessor/_mask.py b/esmvalcore/preprocessor/_mask.py index a1dcf72555..49a25ed5bb 100644 --- a/esmvalcore/preprocessor/_mask.py +++ b/esmvalcore/preprocessor/_mask.py @@ -12,10 +12,10 @@ import cartopy.io.shapereader as shpreader import iris -from iris.analysis import Aggregator -from iris.util import rolling_window import numpy as np import shapely.vectorized as shp_vect +from iris.analysis import Aggregator +from iris.util import rolling_window logger = logging.getLogger(__name__) @@ -100,8 +100,8 @@ def mask_landsea(cube, fx_files, mask_out, always_use_ne_mask=False): cube: iris.cube.Cube data cube to be masked. - fx_files: list - list holding the full paths to fx files. + fx_files: dict + dict: keys: fx variables, values: full paths to fx files. mask_out: str either "land" to mask out land mass or "sea" to mask out seas. @@ -131,7 +131,8 @@ def mask_landsea(cube, fx_files, mask_out, always_use_ne_mask=False): 'sea': os.path.join(cwd, 'ne_masks/ne_50m_ocean.shp') } - if fx_files and not always_use_ne_mask: + fx_files = fx_files.values() + if any(fx_files) and not always_use_ne_mask: fx_cubes = {} for fx_file in fx_files: fxfile_members = os.path.basename(fx_file).split('_') @@ -192,8 +193,8 @@ def mask_landseaice(cube, fx_files, mask_out): cube: iris.cube.Cube data cube to be masked. - fx_files: list - list holding the full paths to fx files. + fx_files: dict + dict: keys: fx variables, values: full paths to fx files. mask_out: str either "landsea" to mask out landsea or "ice" to mask out ice. @@ -211,8 +212,9 @@ def mask_landseaice(cube, fx_files, mask_out): Error raised if fx_files list is empty. """ - # sftgif is the only one so far - if fx_files: + # sftgif is the only one so far but users can set others + fx_files = fx_files.values() + if any(fx_files): for fx_file in fx_files: fx_cube = iris.load_cube(fx_file) diff --git a/tests/integration/preprocessor/_mask/test_mask.py b/tests/integration/preprocessor/_mask/test_mask.py index c89c091551..afd96ac16f 100644 --- a/tests/integration/preprocessor/_mask/test_mask.py +++ b/tests/integration/preprocessor/_mask/test_mask.py @@ -63,8 +63,10 @@ def test_mask_landsea(self): dim_coords_and_dims=self.coords_spec) # mask with fx files - result_land = mask_landsea(new_cube_land, ['sftlf_test.nc'], 'land') - result_sea = mask_landsea(new_cube_sea, ['sftlf_test.nc'], 'sea') + result_land = mask_landsea(new_cube_land, + {'sftlf': 'sftlf_test.nc'}, 'land') + result_sea = mask_landsea(new_cube_sea, + {'sftlf': 'sftlf_test.nc'}, 'sea') expected = np.ma.empty((3, 3)) expected.data[:] = 200. expected.mask = np.ones((3, 3), bool) @@ -83,10 +85,10 @@ def test_mask_landsea(self): dim_coords_and_dims=self.coords_spec) new_cube_sea = iris.cube.Cube(self.new_cube_data, dim_coords_and_dims=self.coords_spec) - result_land = mask_landsea(new_cube_land, ['sftlf_test.nc'], + result_land = mask_landsea(new_cube_land, {'sftlf': 'sftlf_test.nc'}, 'land', always_use_ne_mask=True) - result_sea = mask_landsea(new_cube_sea, ['sftlf_test.nc'], + result_sea = mask_landsea(new_cube_sea, {'sftlf': 'sftlf_test.nc'}, 'sea', always_use_ne_mask=True) @@ -106,8 +108,8 @@ def test_mask_landsea(self): dim_coords_and_dims=self.coords_spec) new_cube_sea = iris.cube.Cube(self.new_cube_data, dim_coords_and_dims=self.coords_spec) - result_land = mask_landsea(new_cube_land, None, 'land') - result_sea = mask_landsea(new_cube_sea, None, 'sea') + result_land = mask_landsea(new_cube_land, {}, 'land') + result_sea = mask_landsea(new_cube_sea, {}, 'sea') # bear in mind all points are in the ocean np.ma.set_fill_value(result_land.data, 1e+20) @@ -122,7 +124,8 @@ def test_mask_landseaice(self): iris.save(self.fx_mask, 'sftgif_test.nc') new_cube_ice = iris.cube.Cube(self.new_cube_data, dim_coords_and_dims=self.coords_spec) - result_ice = mask_landseaice(new_cube_ice, ['sftgif_test.nc'], 'ice') + result_ice = mask_landseaice(new_cube_ice, + {'sftgif': 'sftgif_test.nc'}, 'ice') expected = np.ma.empty((3, 3)) expected.data[:] = 200. expected.mask = np.ones((3, 3), bool) diff --git a/tests/integration/test_recipe.py b/tests/integration/test_recipe.py index fbc40fc787..52a90251a8 100644 --- a/tests/integration/test_recipe.py +++ b/tests/integration/test_recipe.py @@ -1721,13 +1721,91 @@ def test_landmask(tmp_path, patched_datafinder, config_user): assert len(settings) == 2 assert settings['mask_out'] == 'sea' fx_files = settings['fx_files'] - assert isinstance(fx_files, list) + assert isinstance(fx_files, dict) + fx_files = fx_files.values() if product.attributes['project'] == 'obs4mips': assert len(fx_files) == 1 else: assert len(fx_files) == 2 +def test_landseamask_change_fxvar(tmp_path, patched_datafinder, config_user): + content = dedent(""" + preprocessors: + landmask: + mask_landsea: + mask_out: sea + fx_files: [{'short_name': 'sftlf', 'exp': 'piControl'}] + + diagnostics: + diagnostic_name: + variables: + gpp: + preprocessor: landmask + project: CMIP5 + mip: Lmon + exp: historical + start_year: 2000 + end_year: 2005 + ensemble: r1i1p1 + additional_datasets: + - {dataset: CanESM2} + scripts: null + """) + recipe = get_recipe(tmp_path, content, config_user) + + # Check custom fx variables + task = recipe.tasks.pop() + product = task.products.pop() + settings = product.settings['mask_landsea'] + assert len(settings) == 2 + assert settings['mask_out'] == 'sea' + fx_files = settings['fx_files'] + assert isinstance(fx_files, dict) + assert len(fx_files) == 1 + assert '_fx_' in fx_files['sftlf'] + assert '_piControl_' in fx_files['sftlf'] + + +def test_landseaicemask_change_fxvar(tmp_path, patched_datafinder, + config_user): + content = dedent(""" + preprocessors: + landmask: + mask_landseaice: + mask_out: sea + fx_files: [{'short_name': 'sftgif', 'exp': 'piControl'}] + + diagnostics: + diagnostic_name: + variables: + gpp: + preprocessor: landmask + project: CMIP5 + mip: Lmon + exp: historical + start_year: 2000 + end_year: 2005 + ensemble: r1i1p1 + additional_datasets: + - {dataset: CanESM2} + scripts: null + """) + recipe = get_recipe(tmp_path, content, config_user) + + # Check custom fx variables + task = recipe.tasks.pop() + product = task.products.pop() + settings = product.settings['mask_landseaice'] + assert len(settings) == 2 + assert settings['mask_out'] == 'sea' + fx_files = settings['fx_files'] + assert isinstance(fx_files, dict) + assert len(fx_files) == 1 + assert '_fx_' in fx_files['sftgif'] + assert '_piControl_' in fx_files['sftgif'] + + def test_landmask_no_fx(tmp_path, patched_failing_datafinder, config_user): content = dedent(""" preprocessors: @@ -1771,8 +1849,9 @@ def test_landmask_no_fx(tmp_path, patched_failing_datafinder, config_user): assert settings['mask_out'] == 'sea' assert settings['always_use_ne_mask'] is False fx_files = settings['fx_files'] - assert isinstance(fx_files, list) - assert fx_files == [] + assert isinstance(fx_files, dict) + fx_files = fx_files.values() + assert not any(fx_files) def test_fx_vars_mip_change_cmip6(tmp_path, patched_datafinder, config_user): @@ -1838,7 +1917,8 @@ def test_fx_vars_mip_change_cmip6(tmp_path, patched_datafinder, config_user): assert len(settings) == 2 assert settings['mask_out'] == 'sea' fx_files = settings['fx_files'] - assert isinstance(fx_files, list) + assert isinstance(fx_files, dict) + fx_files = fx_files.values() assert len(fx_files) == 2 for fx_file in fx_files: if 'sftlf' in fx_file: @@ -1891,10 +1971,366 @@ def test_fx_vars_volcello_in_ofx_cmip6(tmp_path, patched_datafinder, fx_files = settings['fx_files'] assert isinstance(fx_files, dict) assert len(fx_files) == 1 - assert '_Ofx_' in fx_files['volcello'] + assert '_Omon_' in fx_files['volcello'] + assert '_Ofx_' not in fx_files['volcello'] + + +def test_fx_dicts_volcello_in_ofx_cmip6(tmp_path, patched_datafinder, + config_user): + content = dedent(""" + preprocessors: + preproc: + volume_statistics: + operator: mean + fx_files: [{'short_name': 'volcello', 'mip': 'Oyr', + 'exp': 'piControl'}] + + diagnostics: + diagnostic_name: + variables: + tos: + preprocessor: preproc + project: CMIP6 + mip: Omon + exp: historical + start_year: 2000 + end_year: 2005 + ensemble: r1i1p1f1 + grid: gn + additional_datasets: + - {dataset: CanESM5} + scripts: null + """) + recipe = get_recipe(tmp_path, content, config_user) + + # Check generated tasks + assert len(recipe.tasks) == 1 + task = recipe.tasks.pop() + assert task.name == 'diagnostic_name' + TASKSEP + 'tos' + assert len(task.products) == 1 + product = task.products.pop() + + # Check volume_statistics + assert 'volume_statistics' in product.settings + settings = product.settings['volume_statistics'] + assert len(settings) == 2 + assert settings['operator'] == 'mean' + fx_files = settings['fx_files'] + assert isinstance(fx_files, dict) + assert len(fx_files) == 1 + assert '_Oyr_' in fx_files['volcello'] + assert '_piControl_' in fx_files['volcello'] assert '_Omon_' not in fx_files['volcello'] +def test_fx_vars_list_no_preproc_cmip6(tmp_path, patched_datafinder, + config_user): + content = dedent(""" + preprocessors: + preproc: + regrid: + target_grid: 1x1 + scheme: linear + extract_volume: + z_min: 0 + z_max: 100 + annual_statistics: + operator: mean + convert_units: + units: K + area_statistics: + operator: mean + + diagnostics: + diagnostic_name: + variables: + tos: + preprocessor: preproc + project: CMIP6 + mip: Omon + exp: historical + start_year: 2000 + end_year: 2005 + ensemble: r1i1p1f1 + grid: gn + additional_datasets: + - {dataset: CanESM5} + scripts: null + """) + recipe = get_recipe(tmp_path, content, config_user) + + # Check generated tasks + assert len(recipe.tasks) == 1 + task = recipe.tasks.pop() + assert task.name == 'diagnostic_name' + TASKSEP + 'tos' + assert len(task.ancestors) == 0 + assert len(task.products) == 1 + product = task.products.pop() + assert product.attributes['short_name'] == 'tos' + assert product.files + assert 'area_statistics' in product.settings + settings = product.settings['area_statistics'] + assert len(settings) == 2 + assert settings['operator'] == 'mean' + assert 'fx_files' in settings + + +def test_fx_vars_list_preproc_cmip6(tmp_path, patched_datafinder, + config_user): + content = dedent(""" + preprocessors: + preproc: + regrid: + target_grid: 1x1 + scheme: linear + extract_volume: + z_min: 0 + z_max: 100 + annual_statistics: + operator: mean + convert_units: + units: K + volume_statistics: + operator: mean + fx_files: ['areacello', 'volcello'] + + diagnostics: + diagnostic_name: + variables: + tos: + preprocessor: preproc + project: CMIP6 + mip: Omon + exp: historical + start_year: 2000 + end_year: 2005 + ensemble: r1i1p1f1 + grid: gn + additional_datasets: + - {dataset: CanESM5} + scripts: null + """) + recipe = get_recipe(tmp_path, content, config_user) + + # Check generated tasks + assert len(recipe.tasks) == 1 + task = recipe.tasks.pop() + assert task.name == 'diagnostic_name' + TASKSEP + 'tos' + assert len(task.ancestors) == 2 + assert len(task.ancestors[0].ancestors) == 0 + assert 'diagnostic_name' + TASKSEP + 'fx_area-volume_stats_areacello' in [ + t.name for t in task.ancestors + ] + assert 'diagnostic_name' + TASKSEP + 'fx_area-volume_stats_volcello' in [ + t.name for t in task.ancestors + ] + for product in task.products: + assert 'volume_statistics' in product.settings + assert product.attributes['short_name'] == 'tos' + for ancestor_product in task.ancestors[0].products: + assert ancestor_product.attributes['short_name'] == 'areacello' + assert 'volume_statistics' not in ancestor_product.settings + assert 'convert_units' not in ancestor_product.settings + assert 'regrid' in ancestor_product.settings + assert 'extract_volume' in ancestor_product.settings + for ancestor_product in task.ancestors[1].products: + assert ancestor_product.attributes['short_name'] == 'volcello' + assert 'volume_statistics' not in ancestor_product.settings + assert 'convert_units' not in ancestor_product.settings + assert 'regrid' in ancestor_product.settings + assert 'extract_volume' in ancestor_product.settings + assert len(task.products) == 1 + product = task.products.pop() + assert product.attributes['short_name'] == 'tos' + assert product.files + assert 'volume_statistics' in product.settings + settings = product.settings['volume_statistics'] + assert len(settings) == 2 + assert settings['operator'] == 'mean' + fx_files = settings['fx_files'] + assert isinstance(fx_files, dict) + assert 'preproc' in fx_files['areacello'] + assert 'CMIP6_CanESM5_Ofx_historical_r1i1p1f1_areacello_2000-2005.nc' in \ + fx_files['areacello'] + + +def test_fx_vars_list_preproc_cmip6_fail(tmp_path, patched_failing_datafinder, + config_user): + content = dedent(""" + preprocessors: + preproc: + regrid: + target_grid: 1x1 + scheme: linear + extract_volume: + z_min: 0 + z_max: 100 + annual_statistics: + operator: mean + convert_units: + units: K + volume_statistics: + operator: mean + fx_files: ['areacello', 'volcello'] + + diagnostics: + diagnostic_name: + variables: + tos: + preprocessor: preproc + project: CMIP6 + mip: Omon + exp: historical + start_year: 2000 + end_year: 2005 + ensemble: r1i1p1f1 + grid: gn + additional_datasets: + - {dataset: CanESM5} + scripts: null + """) + with pytest.raises(RecipeError) as rec_err: + get_recipe(tmp_path, content, config_user) + msg = ('One or more of volume_statistics fx data for tos are missing. ' + 'Task can not be performed since there is no fx data found.') + assert rec_err == msg + + +def test_fx_vars_dicts_preproc_cmip6(tmp_path, patched_datafinder, + config_user): + content = dedent(""" + preprocessors: + preproc: + custom_order: true + extract_volume: + z_min: 0 + z_max: 100 + annual_statistics: + operator: mean + volume_statistics: + operator: mean + fx_files: [{'short_name': 'areacello', 'mip': 'Ofx', + 'exp': 'piControl'}, + {'short_name': 'volcello', 'mip': 'Omon', + 'exp': 'historical'}] + + diagnostics: + diagnostic_name: + variables: + tos: + preprocessor: preproc + project: CMIP6 + mip: Omon + exp: historical + start_year: 2000 + end_year: 2005 + ensemble: r1i1p1f1 + grid: gn + additional_datasets: + - {dataset: CanESM5} + scripts: null + """) + recipe = get_recipe(tmp_path, content, config_user) + + # Check generated tasks + assert len(recipe.tasks) == 1 + task = recipe.tasks.pop() + assert task.name == 'diagnostic_name' + TASKSEP + 'tos' + assert len(task.ancestors) == 2 + assert len(task.ancestors[0].ancestors) == 0 + assert 'diagnostic_name' + TASKSEP + 'fx_area-volume_stats_areacello' in [ + t.name for t in task.ancestors + ] + assert 'diagnostic_name' + TASKSEP + 'fx_area-volume_stats_volcello' in [ + t.name for t in task.ancestors + ] + for product in task.products: + assert 'volume_statistics' in product.settings + assert product.attributes['short_name'] == 'tos' + for ancestor_product in task.ancestors[0].products: + assert ancestor_product.attributes['short_name'] == 'areacello' + assert ancestor_product.attributes['mip'] == 'Ofx' + assert ancestor_product.attributes['exp'] == 'piControl' + assert 'annual_statistics' not in ancestor_product.settings + assert 'volume_statistics' not in ancestor_product.settings + for ancestor_product in task.ancestors[1].products: + assert ancestor_product.attributes['short_name'] == 'volcello' + assert ancestor_product.attributes['mip'] == 'Omon' + assert ancestor_product.attributes['exp'] == 'historical' + assert 'annual_statistics' in ancestor_product.settings + assert 'volume_statistics' not in ancestor_product.settings + assert len(task.products) == 1 + product = task.products.pop() + assert product.attributes['short_name'] == 'tos' + assert product.files + assert 'volume_statistics' in product.settings + + +def test_fx_vars_dicts_activity_preproc_cmip6(tmp_path, patched_datafinder, + config_user): + content = dedent(""" + preprocessors: + preproc: + custom_order: true + extract_volume: + z_min: 0 + z_max: 100 + annual_statistics: + operator: mean + volume_statistics: + operator: mean + fx_files: [{short_name: areacello, + mip: Ofx, exp: piControl, + activity: CMIP}] + + diagnostics: + diagnostic_name: + variables: + tos: + preprocessor: preproc + project: CMIP6 + mip: Omon + start_year: 2000 + end_year: 2005 + ensemble: r1i1p1f2 + grid: gn + additional_datasets: + - {dataset: UKESM1-0-LL, exp: ssp585} + - {dataset: UKESM1-0-LL, exp: ssp119} + scripts: null + """) + recipe = get_recipe(tmp_path, content, config_user) + + # Check generated tasks + assert len(recipe.tasks) == 1 + task = recipe.tasks.pop() + assert task.name == 'diagnostic_name' + TASKSEP + 'tos' + assert len(task.ancestors) == 1 + assert len(task.ancestors[0].ancestors) == 0 + assert 'diagnostic_name' + TASKSEP + 'fx_area-volume_stats_areacello' in [ + t.name for t in task.ancestors + ] + product_files = [ + anc_product.filename for anc_product + in task.ancestors[0].products + ] + anc_products = task.ancestors[0].products + assert len(anc_products) == 1 + assert len(product_files) == 1 + assert os.path.basename(product_files[0]) == \ + 'CMIP6_UKESM1-0-LL_Ofx_piControl_r1i1p1f2_areacello_2000-2005.nc' + for product in task.products: + assert 'volume_statistics' in product.settings + assert product.attributes['short_name'] == 'tos' + ancestor_product = list(anc_products)[0] + assert ancestor_product.attributes['short_name'] == 'areacello' + assert ancestor_product.attributes['mip'] == 'Ofx' + assert ancestor_product.attributes['exp'] == 'piControl' + assert ancestor_product.attributes['activity'] == 'CMIP' + assert 'annual_statistics' not in ancestor_product.settings + assert 'volume_statistics' not in ancestor_product.settings + + def test_fx_vars_volcello_in_omon_cmip6(tmp_path, patched_failing_datafinder, config_user): content = dedent(""" @@ -2097,33 +2533,103 @@ def test_invalid_fx_var_cmip6(tmp_path, patched_datafinder, config_user): assert msg in str(rec_err_exp.value) -def test_fx_var_invalid_project(tmp_path, patched_datafinder, config_user): +def test_fx_vars_duplicate_files(tmp_path, patched_datafinder, + config_user): content = dedent(""" preprocessors: preproc: - area_statistics: - operator: mean - fx_files: ['areacella'] + custom_order: True + annual_statistics: + operator: mean + extract_region: + start_longitude: -80. + end_longitude: 30. + start_latitude: -80. + end_latitude: 80. + area_statistics: + operator: mean + fx_files: [{short_name: areacello, mip: Ofx, + exp: piControl, activity: CMIP}] + regrid_time: + frequency: yr diagnostics: - diagnostic_name: + diag_437: variables: - tas: + tos: preprocessor: preproc - project: EMAC - mip: Amon - exp: historical + project: CMIP6 + mip: Omon start_year: 2000 end_year: 2005 - ensemble: r1i1p1f1 + ensemble: r1i1p1f2 grid: gn additional_datasets: - - {dataset: CanESM5} + - {dataset: UKESM1-0-LL, exp: ssp585} + - {dataset: UKESM1-0-LL, exp: ssp119} + scripts: null + diag_439: + variables: + sst: + short_name: tos + preprocessor: preproc + project: CMIP6 + mip: Omon + start_year: 2000 + end_year: 2005 + ensemble: r1i1p1f2 + grid: gn + additional_datasets: + - {dataset: UKESM1-0-LL, exp: ssp585} + sss: + short_name: tos + preprocessor: preproc + project: CMIP6 + mip: Omon + start_year: 2000 + end_year: 2005 + ensemble: r1i1p1f2 + grid: gn + additional_datasets: + - {dataset: UKESM1-0-LL, exp: ssp585} scripts: null """) - msg = ( - "Unable to load CMOR table (project) 'EMAC' for variable 'areacella' " - "with mip 'Amon'") - with pytest.raises(RecipeError) as rec_err_exp: - get_recipe(tmp_path, content, config_user) - assert str(rec_err_exp.value) == msg + recipe = get_recipe(tmp_path, content, config_user) + + # Check generated tasks + assert len(recipe.tasks) == 3 + all_task_names = [ + 'diag_439/sst', 'diag_437/tos', 'diag_439/sss' + ] + task_name = 'diag_437' + TASKSEP + 'tos' + task = [ + elem for elem in recipe.tasks if elem.name == task_name + ][0] + assert task.name in all_task_names + assert len(task.ancestors) == 1 + assert len(task.ancestors[0].ancestors) == 0 + anc_name = 'diag_437' + TASKSEP + 'fx_area-volume_stats_areacello' + assert anc_name in [ + t.name for t in task.ancestors + ] + for product in task.products: + assert 'regrid_time' in product.settings + assert product.attributes['short_name'] == 'tos' + for anc_product in task.ancestors[0].products: + assert anc_product.attributes['short_name'] == 'areacello' + + # identify tasks + task_name_a = 'diag_439' + TASKSEP + 'sss' + task_a = [ + elem for elem in recipe.tasks if elem.name == task_name_a + ] + assert task_a + task_a = task_a[0] + task_name_b = 'diag_439' + TASKSEP + 'sst' + task_b = [ + elem for elem in recipe.tasks if elem.name == task_name_b + ] + assert task_b + task_b = task_b[0] + assert len(task_a.ancestors) == 1 + assert len(task_b.ancestors) == 1