diff --git a/cads_adaptors/adaptors/mars.py b/cads_adaptors/adaptors/mars.py index 6d516c63..da35fd63 100644 --- a/cads_adaptors/adaptors/mars.py +++ b/cads_adaptors/adaptors/mars.py @@ -1,4 +1,5 @@ import os +import pathlib from typing import Any, BinaryIO from cads_adaptors.adaptors import Context, Request, cds @@ -49,10 +50,11 @@ def get_mars_server_list(config) -> list[str]: def execute_mars( request: dict[str, Any] | list[dict[str, Any]], - context: Context, + context: Context = Context(), config: dict[str, Any] = dict(), mapping: dict[str, Any] = dict(), - target: str = "data.grib", + target_fname: str = "data.grib", + target_dir: str | pathlib.Path = "", ) -> str: from cads_mars_server import client as mars_client @@ -63,6 +65,8 @@ def execute_mars( if config.get("embargo") is not None: requests, _cacheable = implement_embargo(requests, config["embargo"]) + target = str(pathlib.Path(target_dir) / target_fname) + split_on_keys = ALWAYS_SPLIT_ON + ensure_list(config.get("split_on", [])) requests = split_requests_on_keys(requests, split_on_keys, context, mapping) @@ -118,7 +122,11 @@ class DirectMarsCdsAdaptor(cds.AbstractCdsAdaptor): resources = {"MARS_CLIENT": 1} def retrieve(self, request: Request) -> BinaryIO: - result = execute_mars(request, context=self.context) + result = execute_mars( + request, + context=self.context, + target_dir=self.cache_tmp_path, + ) return open(result, "rb") @@ -178,22 +186,24 @@ def retrieve_list_of_results(self, request: dict[str, Any]) -> list[str]: # Call normalise_request to set self.mapped_requests request = self.normalise_request(request) - result: Any = execute_mars( + result = execute_mars( self.mapped_requests, context=self.context, config=self.config, mapping=self.mapping, + target_dir=self.cache_tmp_path, ) with dask.config.set(scheduler="threads"): - result = self.post_process(result) + results_dict = self.post_process(result) # TODO?: Generalise format conversion to be a post-processor paths = self.convert_format( - result, + results_dict, self.data_format, context=self.context, config=self.config, + target_dir=str(self.cache_tmp_path), ) # A check to ensure that if there is more than one path, and download_format diff --git a/cads_adaptors/adaptors/multi.py b/cads_adaptors/adaptors/multi.py index 002dcac4..10b3b3f2 100644 --- a/cads_adaptors/adaptors/multi.py +++ b/cads_adaptors/adaptors/multi.py @@ -200,11 +200,16 @@ def retrieve_list_of_results(self, request: Request) -> list[str]: context=self.context, config=self.config, mapping=self.mapping, + target_dir=self.cache_tmp_path, ) with dask.config.set(scheduler="threads"): paths = self.convert_format( - result, self.data_format, self.context, self.config + result, + self.data_format, + self.context, + self.config, + target_dir=str(self.cache_tmp_path), ) if len(paths) > 1 and self.download_format == "as_source": diff --git a/cads_adaptors/exceptions.py b/cads_adaptors/exceptions.py index f39b23f8..7be06df7 100644 --- a/cads_adaptors/exceptions.py +++ b/cads_adaptors/exceptions.py @@ -57,6 +57,10 @@ class RoocsValueError(ValueError): """Raised when a ROOCS request fails due to a value error.""" +class CdsFormatConversionError(RuntimeError): + """Raised when a CDS post-processing request fails.""" + + class CdsConfigurationError(ValueError): """Raised when a CDS request fails due to a configuration error.""" diff --git a/cads_adaptors/tools/convertors.py b/cads_adaptors/tools/convertors.py index 1756f34c..bb95ba34 100644 --- a/cads_adaptors/tools/convertors.py +++ b/cads_adaptors/tools/convertors.py @@ -6,6 +6,7 @@ import xarray as xr from cads_adaptors.adaptors import Context +from cads_adaptors.exceptions import CdsFormatConversionError from cads_adaptors.tools import adaptor_tools from cads_adaptors.tools.general import ensure_list @@ -35,7 +36,7 @@ def add_user_log_and_raise_error( message: str, context: Context = Context(), - thisError=ValueError, + thisError=CdsFormatConversionError, ) -> NoReturn: context.add_user_visible_error(message) raise thisError(message) @@ -46,13 +47,17 @@ def convert_format( target_format: str, context: Context = Context(), config: dict[str, Any] = {}, + target_dir: str = ".", + **runtime_kwargs: dict[str, dict[str, Any]], ) -> list[str]: target_format = adaptor_tools.handle_data_format(target_format) post_processing_kwargs = config.get("post_processing_kwargs", {}) + for k, v in runtime_kwargs.items(): + post_processing_kwargs.setdefault(k, {}).update(v) + post_processing_kwargs.setdefault("target_dir", target_dir) context.add_stdout( f"Converting result ({result}) to {target_format} with kwargs: {post_processing_kwargs}" ) - convertor: None | Callable = CONVERTORS.get(target_format, None) if convertor is not None: @@ -124,7 +129,6 @@ def result_to_grib_files( add_user_log_and_raise_error( f"Unable to convert result of type {result_type} to grib files. result:\n{result}", context=context, - thisError=ValueError, ) @@ -171,7 +175,6 @@ def result_to_netcdf_files( add_user_log_and_raise_error( f"Unable to convert result of type {result_type} to netCDF files. result:\n{result}", context=context, - thisError=ValueError, ) @@ -179,6 +182,7 @@ def result_to_netcdf_legacy_files( result: Any, context: Context = Context(), to_netcdf_legacy_kwargs: dict[str, Any] = {}, + target_dir: str = ".", **kwargs, ) -> list[str]: """ @@ -238,7 +242,6 @@ def result_to_netcdf_legacy_files( add_user_log_and_raise_error( f"Unable to convert result of type {type(result)} to 'netcdf_legacy' files. result:\n{result}", context=context, - thisError=ValueError, ) if filter_rules: @@ -265,7 +268,7 @@ def result_to_netcdf_legacy_files( nc_files = [] for out_fname_base, grib_file in result.items(): - out_fname = f"{out_fname_base}.nc" + out_fname = os.path.join(target_dir, f"{out_fname_base}.nc") nc_files.append(out_fname) command = ensure_list(command) os.system(" ".join(command + ["-o", out_fname, grib_file])) @@ -275,7 +278,7 @@ def result_to_netcdf_legacy_files( "We are unable to convert this GRIB data to netCDF, " "please download as GRIB and convert to netCDF locally.\n" ) - add_user_log_and_raise_error(message, context=context, thisError=RuntimeError) + add_user_log_and_raise_error(message, context=context) return nc_files @@ -302,9 +305,7 @@ def unknown_filetype_to_grib_files( ) return [infile] else: - add_user_log_and_raise_error( - f"Unknown file type: {infile}", context=context, thisError=ValueError - ) + add_user_log_and_raise_error(f"Unknown file type: {infile}", context=context) def unknown_filetype_to_netcdf_files( @@ -320,25 +321,21 @@ def unknown_filetype_to_netcdf_files( context.add_stdout(f"Converting {infile} to netCDF files with kwargs: {kwargs}") return grib_to_netcdf_files(infile, context=context, **kwargs) else: - add_user_log_and_raise_error( - f"Unknown file type: {infile}", context=context, thisError=ValueError - ) + add_user_log_and_raise_error(f"Unknown file type: {infile}", context=context) def grib_to_netcdf_files( grib_file: str, open_datasets_kwargs: None | dict[str, Any] | list[dict[str, Any]] = None, post_open_datasets_kwargs: dict[str, Any] = {}, - to_netcdf_kwargs: dict[str, Any] = {}, context: Context = Context(), **kwargs, ): - to_netcdf_kwargs.update(kwargs.pop("to_netcdf_kwargs", {})) grib_file = os.path.realpath(grib_file) context.add_stdout( f"Converting {grib_file} to netCDF files with:\n" - f"to_netcdf_kwargs: {to_netcdf_kwargs}\n" + f"to_netcdf_kwargs: {kwargs}\n" f"open_datasets_kwargs: {open_datasets_kwargs}\n" f"post_open_datasets_kwargs: {post_open_datasets_kwargs}\n" ) @@ -357,11 +354,9 @@ def grib_to_netcdf_files( ) context.add_user_visible_error(message=message) context.add_stderr(message=message) - raise RuntimeError(message) + raise CdsFormatConversionError(message) - out_nc_files = xarray_dict_to_netcdf( - datasets, context=context, to_netcdf_kwargs=to_netcdf_kwargs - ) + out_nc_files = xarray_dict_to_netcdf(datasets, context=context, **kwargs) return out_nc_files @@ -372,12 +367,16 @@ def xarray_dict_to_netcdf( compression_options: str | dict[str, Any] = "default", to_netcdf_kwargs: dict[str, Any] = {}, out_fname_prefix: str = "", + target_dir: str = "", **kwargs, ) -> list[str]: """ Convert a dictionary of xarray datasets to netCDF files, where the key of the dictionary is used in the filename. """ + # Untangle any nested kwargs (I don't think this is necessary anymore) + to_netcdf_kwargs.update(kwargs.pop("to_netcdf_kwargs", {})) + # Check if compression_options or out_fname_prefix have been provided in to_netcdf_kwargs compression_options = to_netcdf_kwargs.pop( "compression_options", compression_options @@ -396,7 +395,7 @@ def xarray_dict_to_netcdf( "encoding": {var: compression_options for var in dataset}, } ) - out_fname = f"{out_fname_prefix}{out_fname_base}.nc" + out_fname = os.path.join(target_dir, f"{out_fname_prefix}{out_fname_base}.nc") context.add_stdout(f"Writing {out_fname} with kwargs:\n{to_netcdf_kwargs}") dataset.to_netcdf(out_fname, **to_netcdf_kwargs) out_nc_files.append(out_fname) @@ -435,7 +434,6 @@ def open_result_as_xarray_dictionary( add_user_log_and_raise_error( f"Unable to open result as an xarray dataset: \n{result}", context=context, - thisError=ValueError, ) @@ -457,7 +455,6 @@ def open_file_as_xarray_dictionary( add_user_log_and_raise_error( f"Unable to open file {infile} as an xarray dataset.", context=context, - thisError=ValueError, ) @@ -487,7 +484,7 @@ def safely_rename_variable(dataset: xr.Dataset, rename: dict[str, str]) -> xr.Da if (new_name not in rename_order) or ( rename_order.index(conflict) > rename_order.index(new_name) ): - raise ValueError( + raise CdsFormatConversionError( f"Refusing to to rename to existing variable name: {conflict}->{new_name}" ) diff --git a/tests/test_20_adaptor_multi.py b/tests/test_15_adaptor_multi.py similarity index 82% rename from tests/test_20_adaptor_multi.py rename to tests/test_15_adaptor_multi.py index fdc7857e..ef9a33f9 100644 --- a/tests/test_20_adaptor_multi.py +++ b/tests/test_15_adaptor_multi.py @@ -1,6 +1,14 @@ +import os + +import requests + from cads_adaptors import AbstractAdaptor from cads_adaptors.adaptors import multi +TEST_GRIB_FILE = ( + "https://get.ecmwf.int/repository/test-data/cfgrib/era5-levels-members.grib" +) + FORM = { "level": ["500", "850"], "time": ["12:00", "00:00"], @@ -177,3 +185,38 @@ def test_multi_adaptor_split_adaptors_dont_split_keys(): assert "dont_split" in sub_adaptors["mean"][1].keys() assert "dont_split" not in sub_adaptors["max"][1].keys() assert "area" in sub_adaptors["max"][1].keys() + + +def test_convert_format(tmp_path, monkeypatch): + monkeypatch.chdir(tmp_path) + multi_adaptor = multi.MultiMarsCdsAdaptor({}, {}) + + assert hasattr(multi_adaptor, "convert_format") + + url = TEST_GRIB_FILE + remote_file = requests.get(url) + _, ext = os.path.splitext(url) + + tmp_file = f"test{ext}" + with open(tmp_file, "wb") as f: + f.write(remote_file.content) + + converted_files = multi_adaptor.convert_format( + tmp_file, + "netcdf", + ) + assert isinstance(converted_files, list) + assert len(converted_files) == 1 + _, out_ext = os.path.splitext(converted_files[0]) + assert out_ext == ".nc" + + test_subdir = "./test_subdir" + os.makedirs(test_subdir, exist_ok=True) + converted_files = multi_adaptor.convert_format( + tmp_file, "netcdf", target_dir=test_subdir + ) + assert isinstance(converted_files, list) + assert len(converted_files) == 1 + _, out_ext = os.path.splitext(converted_files[0]) + assert out_ext == ".nc" + assert "/test_subdir/" in converted_files[0] diff --git a/tests/test_15_mars.py b/tests/test_15_mars.py index 4ab2abfc..6587a48a 100644 --- a/tests/test_15_mars.py +++ b/tests/test_15_mars.py @@ -1,7 +1,13 @@ import os +import requests + from cads_adaptors.adaptors import mars +TEST_GRIB_FILE = ( + "https://get.ecmwf.int/repository/test-data/cfgrib/era5-levels-members.grib" +) + def test_get_mars_servers(): mars_servers = mars.get_mars_server_list( @@ -24,3 +30,38 @@ def test_get_mars_servers_envvar(): mars_servers = mars.get_mars_server_list({}) assert len(mars_servers) == 1 assert mars_servers[0] == "http://a-test-server.url" + + +def test_convert_format(tmp_path, monkeypatch): + monkeypatch.chdir(tmp_path) + mars_adaptor = mars.MarsCdsAdaptor({}, {}) + + assert hasattr(mars_adaptor, "convert_format") + + url = TEST_GRIB_FILE + remote_file = requests.get(url) + _, ext = os.path.splitext(url) + + tmp_file = f"test{ext}" + with open(tmp_file, "wb") as f: + f.write(remote_file.content) + + converted_files = mars_adaptor.convert_format( + tmp_file, + "netcdf", + ) + assert isinstance(converted_files, list) + assert len(converted_files) == 1 + _, out_ext = os.path.splitext(converted_files[0]) + assert out_ext == ".nc" + + test_subdir = "./test_subdir" + os.makedirs(test_subdir, exist_ok=True) + converted_files = mars_adaptor.convert_format( + tmp_file, "netcdf", target_dir=test_subdir + ) + assert isinstance(converted_files, list) + assert len(converted_files) == 1 + _, out_ext = os.path.splitext(converted_files[0]) + assert out_ext == ".nc" + assert "/test_subdir/" in converted_files[0] diff --git a/tests/test_20_url_tools.py b/tests/test_20_url_tools.py index 74beeba2..dc1e5691 100644 --- a/tests/test_20_url_tools.py +++ b/tests/test_20_url_tools.py @@ -66,7 +66,7 @@ def test_download_with_server_suggested_filename(tmp_path, monkeypatch): False, ), ) -def test_ftp_download(tmp_path, ftpserver, anon): +def test_ftp_download(tmp_path, monkeypatch, ftpserver, anon): local_test_file = os.path.join(tmp_path, "testfile.txt") with open(local_test_file, "w") as f: f.write("This is a test file") @@ -74,7 +74,7 @@ def test_ftp_download(tmp_path, ftpserver, anon): ftp_url = ftpserver.put_files(local_test_file, style="url", anon=anon) work_dir = os.path.join(tmp_path, "work_dir") os.makedirs(work_dir) - os.chdir(work_dir) + monkeypatch.chdir(work_dir) local_test_download = url_tools.try_download(ftp_url, context=url_tools.Context())[ 0 ] diff --git a/tests/test_30_convertors.py b/tests/test_30_convertors.py index f664f427..d607c08d 100644 --- a/tests/test_30_convertors.py +++ b/tests/test_30_convertors.py @@ -74,59 +74,67 @@ def test_open_netcdf(): assert list(xarray_dict)[0] == "test" -def test_open_file_as_xarray_dictionary(): +def test_open_file_as_xarray_dictionary(tmp_path, monkeypatch): + monkeypatch.chdir(tmp_path) grib_file = requests.get(TEST_GRIB_FILE) - with tempfile.TemporaryDirectory() as tmpdirname: - os.chdir(tmpdirname) - tmp_grib_file = "test.grib" - with open(tmp_grib_file, "wb") as f: - f.write(grib_file.content) - xarray_dict = convertors.open_file_as_xarray_dictionary( - tmp_grib_file, open_datasets_kwargs={"tag": "tag"} - ) - assert isinstance(xarray_dict, dict) - assert len(xarray_dict) == 1 - assert list(xarray_dict)[0] == "test_tag" + tmp_grib_file = "test.grib" + with open(tmp_grib_file, "wb") as f: + f.write(grib_file.content) - xarray_dict = convertors.open_file_as_xarray_dictionary( - tmp_grib_file, open_datasets_kwargs=[{"tag": "tag1"}, {"tag": "tag2"}] - ) - assert isinstance(xarray_dict, dict) - assert len(xarray_dict) == 2 - assert list(xarray_dict)[0] == "test_tag1" - assert list(xarray_dict)[1] == "test_tag2" + xarray_dict = convertors.open_file_as_xarray_dictionary( + tmp_grib_file, open_datasets_kwargs={"tag": "tag"} + ) + assert isinstance(xarray_dict, dict) + assert len(xarray_dict) == 1 + assert list(xarray_dict)[0] == "test_tag" + + xarray_dict = convertors.open_file_as_xarray_dictionary( + tmp_grib_file, open_datasets_kwargs=[{"tag": "tag1"}, {"tag": "tag2"}] + ) + assert isinstance(xarray_dict, dict) + assert len(xarray_dict) == 2 + assert list(xarray_dict)[0] == "test_tag1" + assert list(xarray_dict)[1] == "test_tag2" -def test_grib_to_netcdf(): +def test_grib_to_netcdf(tmp_path, monkeypatch): + monkeypatch.chdir(tmp_path) grib_file = requests.get(TEST_GRIB_FILE) - with tempfile.TemporaryDirectory() as tmpdirname: - os.chdir(tmpdirname) - tmp_grib_file = "test.grib" - with open(tmp_grib_file, "wb") as f: - f.write(grib_file.content) - netcdf_files = convertors.grib_to_netcdf_files(tmp_grib_file) - assert isinstance(netcdf_files, list) - assert len(netcdf_files) == 1 + tmp_grib_file = "test.grib" + with open(tmp_grib_file, "wb") as f: + f.write(grib_file.content) - netcdf_files = convertors.grib_to_netcdf_files( - tmp_grib_file, compression_options="default" - ) - assert isinstance(netcdf_files, list) - assert len(netcdf_files) == 1 + netcdf_files = convertors.grib_to_netcdf_files(tmp_grib_file) + assert isinstance(netcdf_files, list) + assert len(netcdf_files) == 1 - netcdf_files = convertors.grib_to_netcdf_files( - tmp_grib_file, open_datasets_kwargs={"chunks": {"time": 1}} - ) - assert isinstance(netcdf_files, list) - assert len(netcdf_files) == 1 + os.makedirs("test_subdir", exist_ok=True) + netcdf_files = convertors.grib_to_netcdf_files( + tmp_grib_file, target_dir="./test_subdir" + ) + assert isinstance(netcdf_files, list) + assert "/test_subdir/" in netcdf_files[0] + assert len(netcdf_files) == 1 - netcdf_files = convertors.grib_to_netcdf_files( - tmp_grib_file, encoding={"time": {"dtype": "int64"}} - ) - assert isinstance(netcdf_files, list) - assert len(netcdf_files) == 1 + netcdf_files = convertors.grib_to_netcdf_files( + tmp_grib_file, compression_options="default" + ) + assert isinstance(netcdf_files, list) + assert len(netcdf_files) == 1 + + netcdf_files = convertors.grib_to_netcdf_files( + tmp_grib_file, open_datasets_kwargs={"chunks": {"time": 1}} + ) + assert isinstance(netcdf_files, list) + assert len(netcdf_files) == 1 + + netcdf_files = convertors.grib_to_netcdf_files( + tmp_grib_file, encoding={"time": {"dtype": "int64"}} + ) + assert isinstance(netcdf_files, list) + assert len(netcdf_files) == 1 EXTENSION_MAPPING = { @@ -137,62 +145,78 @@ def test_grib_to_netcdf(): @pytest.mark.parametrize("url", [TEST_GRIB_FILE, TEST_NC_FILE]) -def test_convert_format_to_netcdf(url, target_format="netcdf"): +def test_convert_format_to_netcdf(tmp_path, monkeypatch, url, target_format="netcdf"): + monkeypatch.chdir(tmp_path) remote_file = requests.get(url) _, ext = os.path.splitext(url) - with tempfile.TemporaryDirectory() as tmpdirname: - os.chdir(tmpdirname) - tmp_file = f"test.{ext}" - with open(tmp_file, "wb") as f: - f.write(remote_file.content) - converted_files = convertors.convert_format( - tmp_file, target_format=target_format - ) - assert isinstance(converted_files, list) - assert len(converted_files) == 1 - _, out_ext = os.path.splitext(converted_files[0]) - assert out_ext == EXTENSION_MAPPING.get(target_format, f".{target_format}") + tmp_file = f"test{ext}" + with open(tmp_file, "wb") as f: + f.write(remote_file.content) + + converted_files = convertors.convert_format(tmp_file, target_format=target_format) + assert isinstance(converted_files, list) + assert len(converted_files) == 1 + _, out_ext = os.path.splitext(converted_files[0]) + assert out_ext == EXTENSION_MAPPING.get(target_format, f".{target_format}") + + os.makedirs("test_subdir", exist_ok=True) + converted_files = convertors.convert_format( + tmp_file, target_format=target_format, target_dir="./test_subdir" + ) + assert isinstance(converted_files, list) + assert len(converted_files) == 1 + _, out_ext = os.path.splitext(converted_files[0]) + assert out_ext == EXTENSION_MAPPING.get(target_format, f".{target_format}") + if out_ext != ext: # i.e. if a conversion has taken place + assert "/test_subdir/" in converted_files[0] @pytest.mark.parametrize("url", [TEST_GRIB_FILE, TEST_NC_FILE]) -def test_convert_format_to_grib(url, target_format="grib"): +def test_convert_format_to_grib(tmp_path, monkeypatch, url, target_format="grib"): + monkeypatch.chdir(tmp_path) remote_file = requests.get(url) _, ext = os.path.splitext(url) - with tempfile.TemporaryDirectory() as tmpdirname: - os.chdir(tmpdirname) - tmp_file = f"test.{ext}" - with open(tmp_file, "wb") as f: - f.write(remote_file.content) - converted_files = convertors.convert_format( - tmp_file, target_format=target_format - ) - assert isinstance(converted_files, list) - assert len(converted_files) == 1 - # Can't convert from netcdf to grib yet, so ensure in extension is the same as input - _, out_ext = os.path.splitext(converted_files[0]) - assert out_ext == ext + tmp_file = f"test.{ext}" + with open(tmp_file, "wb") as f: + f.write(remote_file.content) + + converted_files = convertors.convert_format(tmp_file, target_format=target_format) + assert isinstance(converted_files, list) + assert len(converted_files) == 1 + # Can't convert from netcdf to grib yet, so ensure in extension is the same as input + _, out_ext = os.path.splitext(converted_files[0]) + assert out_ext == ext def test_convert_format_to_netcdf_legacy( - url=TEST_GRIB_FILE, target_format="netcdf_legacy" + tmp_path, monkeypatch, url=TEST_GRIB_FILE, target_format="netcdf_legacy" ): + monkeypatch.chdir(tmp_path) remote_file = requests.get(url) _, ext = os.path.splitext(url) - with tempfile.TemporaryDirectory() as tmpdirname: - os.chdir(tmpdirname) - tmp_file = f"test.{ext}" - with open(tmp_file, "wb") as f: - f.write(remote_file.content) - converted_files = convertors.convert_format( - tmp_file, target_format=target_format - ) - assert isinstance(converted_files, list) - assert len(converted_files) == 1 - _, out_ext = os.path.splitext(converted_files[0]) - assert out_ext == EXTENSION_MAPPING.get(target_format, f".{target_format}") + tmp_file = f"test{ext}" + with open(tmp_file, "wb") as f: + f.write(remote_file.content) + + converted_files = convertors.convert_format(tmp_file, target_format=target_format) + assert isinstance(converted_files, list) + assert len(converted_files) == 1 + _, out_ext = os.path.splitext(converted_files[0]) + assert out_ext == EXTENSION_MAPPING.get(target_format, f".{target_format}") + + os.makedirs("test_subdir", exist_ok=True) + converted_files = convertors.convert_format( + tmp_file, target_format=target_format, target_dir="./test_subdir" + ) + assert isinstance(converted_files, list) + assert len(converted_files) == 1 + _, out_ext = os.path.splitext(converted_files[0]) + assert out_ext == EXTENSION_MAPPING.get(target_format, f".{target_format}") + if out_ext != ext: # i.e. if a conversion has taken place + assert "/test_subdir/" in converted_files[0] def test_safely_rename_variable(): @@ -244,150 +268,150 @@ def test_safely_expand_dims(): assert "time" in ds_1.temperature.dims -def test_prepare_open_datasets_kwargs_grib_split_on(): +def test_prepare_open_datasets_kwargs_grib_split_on(tmp_path, monkeypatch): + monkeypatch.chdir(tmp_path) grib_file = requests.get(TEST_GRIB_FILE) - with tempfile.TemporaryDirectory() as tmpdirname: - os.chdir(tmpdirname) - tmp_grib_file = "test.grib" - with open(tmp_grib_file, "wb") as f: - f.write(grib_file.content) - open_ds_kwargs = { - "test_kwarg": 1, - "tag": "tag", - "split_on": ["paramId"], - } - new_open_ds_kwargs = convertors.prepare_open_datasets_kwargs_grib( - tmp_grib_file, open_ds_kwargs - ) - assert isinstance(new_open_ds_kwargs, list) - assert len(new_open_ds_kwargs) == 2 - assert "tag_paramId-130" in [d["tag"] for d in new_open_ds_kwargs] - assert "tag_paramId-129" in [d["tag"] for d in new_open_ds_kwargs] - assert not any("split_on" in d for d in new_open_ds_kwargs) - assert all("test_kwarg" in d for d in new_open_ds_kwargs) - - # Single value to split on - open_ds_kwargs = { - "test_kwarg": 1, - "tag": "tag", - "split_on": ["stream"], - } - new_open_ds_kwargs = convertors.prepare_open_datasets_kwargs_grib( - tmp_grib_file, open_ds_kwargs - ) - assert isinstance(new_open_ds_kwargs, list) - assert len(new_open_ds_kwargs) == 1 - assert "tag_stream-enda" in [d["tag"] for d in new_open_ds_kwargs] - assert not any("split_on" in d for d in new_open_ds_kwargs) - assert all("test_kwarg" in d for d in new_open_ds_kwargs) - - # Key does not exist - open_ds_kwargs = { - "test_kwarg": 1, - "tag": "tag", - "split_on": ["kebab"], - } - new_open_ds_kwargs = convertors.prepare_open_datasets_kwargs_grib( - tmp_grib_file, open_ds_kwargs - ) - assert isinstance(new_open_ds_kwargs, list) - assert len(new_open_ds_kwargs) == 1 - assert "tag_kebab-None" in [d["tag"] for d in new_open_ds_kwargs] - assert not any("split_on" in d for d in new_open_ds_kwargs) - assert all("test_kwarg" in d for d in new_open_ds_kwargs) + tmp_grib_file = "test.grib" + with open(tmp_grib_file, "wb") as f: + f.write(grib_file.content) + + open_ds_kwargs = { + "test_kwarg": 1, + "tag": "tag", + "split_on": ["paramId"], + } + new_open_ds_kwargs = convertors.prepare_open_datasets_kwargs_grib( + tmp_grib_file, open_ds_kwargs + ) + assert isinstance(new_open_ds_kwargs, list) + assert len(new_open_ds_kwargs) == 2 + assert "tag_paramId-130" in [d["tag"] for d in new_open_ds_kwargs] + assert "tag_paramId-129" in [d["tag"] for d in new_open_ds_kwargs] + assert not any("split_on" in d for d in new_open_ds_kwargs) + assert all("test_kwarg" in d for d in new_open_ds_kwargs) + + # Single value to split on + open_ds_kwargs = { + "test_kwarg": 1, + "tag": "tag", + "split_on": ["stream"], + } + new_open_ds_kwargs = convertors.prepare_open_datasets_kwargs_grib( + tmp_grib_file, open_ds_kwargs + ) + assert isinstance(new_open_ds_kwargs, list) + assert len(new_open_ds_kwargs) == 1 + assert "tag_stream-enda" in [d["tag"] for d in new_open_ds_kwargs] + assert not any("split_on" in d for d in new_open_ds_kwargs) + assert all("test_kwarg" in d for d in new_open_ds_kwargs) + + # Key does not exist + open_ds_kwargs = { + "test_kwarg": 1, + "tag": "tag", + "split_on": ["kebab"], + } + new_open_ds_kwargs = convertors.prepare_open_datasets_kwargs_grib( + tmp_grib_file, open_ds_kwargs + ) + assert isinstance(new_open_ds_kwargs, list) + assert len(new_open_ds_kwargs) == 1 + assert "tag_kebab-None" in [d["tag"] for d in new_open_ds_kwargs] + assert not any("split_on" in d for d in new_open_ds_kwargs) + assert all("test_kwarg" in d for d in new_open_ds_kwargs) -def test_prepare_open_datasets_kwargs_grib_split_on_alias(): +def test_prepare_open_datasets_kwargs_grib_split_on_alias(tmp_path, monkeypatch): + monkeypatch.chdir(tmp_path) # Test split_on_alias, if differences detected in k, then split on v grib_file_2 = requests.get(TEST_GRIB_FILE_2) - with tempfile.TemporaryDirectory() as tmpdirname: - os.chdir(tmpdirname) - tmp_grib_file = "test2.grib" - with open(tmp_grib_file, "wb") as f: - f.write(grib_file_2.content) - open_ds_kwargs = { - "test_kwarg": 1, - "tag": "tag", - "split_on_alias": {"expver": "stepType"}, - } - new_open_ds_kwargs = convertors.prepare_open_datasets_kwargs_grib( - tmp_grib_file, open_ds_kwargs - ) - assert isinstance(new_open_ds_kwargs, list) - assert len(new_open_ds_kwargs) == 2 - assert "tag_stepType-instant" in [d["tag"] for d in new_open_ds_kwargs] - assert "tag_stepType-accum" in [d["tag"] for d in new_open_ds_kwargs] - assert not any("split_on_alias" in d for d in new_open_ds_kwargs) - assert all("test_kwarg" in d for d in new_open_ds_kwargs) - - # Single k1 value - open_ds_kwargs = { - "test_kwarg": 1, - "tag": "tag", - # "split_on": ["origin"], - "split_on_alias": {"origin": "stepType"}, - } - new_open_ds_kwargs = convertors.prepare_open_datasets_kwargs_grib( - tmp_grib_file, open_ds_kwargs - ) - assert isinstance(new_open_ds_kwargs, list) - assert len(new_open_ds_kwargs) == 1 - assert "tag" in [d["tag"] for d in new_open_ds_kwargs] - assert not any("split_on_alias" in d for d in new_open_ds_kwargs) - assert all("test_kwarg" in d for d in new_open_ds_kwargs) - - # Combined split_on and split_on_alias - open_ds_kwargs = { - "test_kwarg": 1, - "tag": "tag", - "split_on": ["stream"], - "split_on_alias": {"expver": "paramId"}, - } - new_open_ds_kwargs = convertors.prepare_open_datasets_kwargs_grib( - tmp_grib_file, open_ds_kwargs - ) - assert isinstance(new_open_ds_kwargs, list) - assert len(new_open_ds_kwargs) == 6 - for tag in [ - "tag_stream-oper_paramId-167", - "tag_stream-oper_paramId-140232", - "tag_stream-oper_paramId-228", - "tag_stream-wave_paramId-167", - "tag_stream-wave_paramId-140232", - "tag_stream-wave_paramId-228", - ]: - assert tag in [d["tag"] for d in new_open_ds_kwargs] - assert not any("split_on_alias" in d for d in new_open_ds_kwargs) - assert all("test_kwarg" in d for d in new_open_ds_kwargs) - - # k1 does not exist - open_ds_kwargs = { - "test_kwarg": 1, - "tag": "tag", - "split_on_alias": {"kebab": "stepType"}, - } - new_open_ds_kwargs = convertors.prepare_open_datasets_kwargs_grib( - tmp_grib_file, open_ds_kwargs - ) - assert isinstance(new_open_ds_kwargs, list) - assert len(new_open_ds_kwargs) == 1 - assert "tag" in [d["tag"] for d in new_open_ds_kwargs] - assert not any("split_on_alias" in d for d in new_open_ds_kwargs) - assert all("test_kwarg" in d for d in new_open_ds_kwargs) - - # k2 does not exist - open_ds_kwargs = { - "test_kwarg": 1, - "tag": "tag", - "split_on_alias": {"expver": "kebab"}, - } - new_open_ds_kwargs = convertors.prepare_open_datasets_kwargs_grib( - tmp_grib_file, open_ds_kwargs - ) - assert isinstance(new_open_ds_kwargs, list) - assert len(new_open_ds_kwargs) == 1 - assert "tag_kebab-None" in [d["tag"] for d in new_open_ds_kwargs] - assert not any("split_on_alias" in d for d in new_open_ds_kwargs) - assert all("test_kwarg" in d for d in new_open_ds_kwargs) + tmp_grib_file = "test2.grib" + with open(tmp_grib_file, "wb") as f: + f.write(grib_file_2.content) + + open_ds_kwargs = { + "test_kwarg": 1, + "tag": "tag", + "split_on_alias": {"expver": "stepType"}, + } + new_open_ds_kwargs = convertors.prepare_open_datasets_kwargs_grib( + tmp_grib_file, open_ds_kwargs + ) + assert isinstance(new_open_ds_kwargs, list) + assert len(new_open_ds_kwargs) == 2 + assert "tag_stepType-instant" in [d["tag"] for d in new_open_ds_kwargs] + assert "tag_stepType-accum" in [d["tag"] for d in new_open_ds_kwargs] + assert not any("split_on_alias" in d for d in new_open_ds_kwargs) + assert all("test_kwarg" in d for d in new_open_ds_kwargs) + + # Single k1 value + open_ds_kwargs = { + "test_kwarg": 1, + "tag": "tag", + # "split_on": ["origin"], + "split_on_alias": {"origin": "stepType"}, + } + new_open_ds_kwargs = convertors.prepare_open_datasets_kwargs_grib( + tmp_grib_file, open_ds_kwargs + ) + assert isinstance(new_open_ds_kwargs, list) + assert len(new_open_ds_kwargs) == 1 + assert "tag" in [d["tag"] for d in new_open_ds_kwargs] + assert not any("split_on_alias" in d for d in new_open_ds_kwargs) + assert all("test_kwarg" in d for d in new_open_ds_kwargs) + + # Combined split_on and split_on_alias + open_ds_kwargs = { + "test_kwarg": 1, + "tag": "tag", + "split_on": ["stream"], + "split_on_alias": {"expver": "paramId"}, + } + new_open_ds_kwargs = convertors.prepare_open_datasets_kwargs_grib( + tmp_grib_file, open_ds_kwargs + ) + assert isinstance(new_open_ds_kwargs, list) + assert len(new_open_ds_kwargs) == 6 + for tag in [ + "tag_stream-oper_paramId-167", + "tag_stream-oper_paramId-140232", + "tag_stream-oper_paramId-228", + "tag_stream-wave_paramId-167", + "tag_stream-wave_paramId-140232", + "tag_stream-wave_paramId-228", + ]: + assert tag in [d["tag"] for d in new_open_ds_kwargs] + assert not any("split_on_alias" in d for d in new_open_ds_kwargs) + assert all("test_kwarg" in d for d in new_open_ds_kwargs) + + # k1 does not exist + open_ds_kwargs = { + "test_kwarg": 1, + "tag": "tag", + "split_on_alias": {"kebab": "stepType"}, + } + new_open_ds_kwargs = convertors.prepare_open_datasets_kwargs_grib( + tmp_grib_file, open_ds_kwargs + ) + assert isinstance(new_open_ds_kwargs, list) + assert len(new_open_ds_kwargs) == 1 + assert "tag" in [d["tag"] for d in new_open_ds_kwargs] + assert not any("split_on_alias" in d for d in new_open_ds_kwargs) + assert all("test_kwarg" in d for d in new_open_ds_kwargs) + + # k2 does not exist + open_ds_kwargs = { + "test_kwarg": 1, + "tag": "tag", + "split_on_alias": {"expver": "kebab"}, + } + new_open_ds_kwargs = convertors.prepare_open_datasets_kwargs_grib( + tmp_grib_file, open_ds_kwargs + ) + assert isinstance(new_open_ds_kwargs, list) + assert len(new_open_ds_kwargs) == 1 + assert "tag_kebab-None" in [d["tag"] for d in new_open_ds_kwargs] + assert not any("split_on_alias" in d for d in new_open_ds_kwargs) + assert all("test_kwarg" in d for d in new_open_ds_kwargs) diff --git a/tests/test_40_post_processors.py b/tests/test_40_post_processors.py index 33846a89..0f2e1b84 100644 --- a/tests/test_40_post_processors.py +++ b/tests/test_40_post_processors.py @@ -1,5 +1,4 @@ import os -import tempfile import pytest import requests @@ -26,42 +25,42 @@ def test_pp_config_mapping(in_mapping, out_mapping): assert post_processors.pp_config_mapping(in_mapping) == out_mapping -def test_daily_reduce(): +def test_daily_reduce(tmp_path, monkeypatch): + monkeypatch.chdir(tmp_path) remote_file = requests.get(TEST_FILE_1) _, ext = os.path.splitext(TEST_FILE_1) - with tempfile.TemporaryDirectory() as tmpdirname: - os.chdir(tmpdirname) - tmp_file = f"test{ext}" - with open(tmp_file, "wb") as f: - f.write(remote_file.content) - xarray_dict = convertors.open_grib_file_as_xarray_dictionary(tmp_file) + tmp_file = f"test{ext}" + with open(tmp_file, "wb") as f: + f.write(remote_file.content) - out_xarray_dict = post_processors.daily_reduce(xarray_dict, how="mean") - assert isinstance(out_xarray_dict, dict) - assert len(out_xarray_dict) == 1 - assert list(out_xarray_dict)[0] == "test_0_daily-mean" - assert isinstance(out_xarray_dict["test_0_daily-mean"], xr.Dataset) - assert isinstance(out_xarray_dict["test_0_daily-mean"].attrs["history"], str) + xarray_dict = convertors.open_grib_file_as_xarray_dictionary(tmp_file) + out_xarray_dict = post_processors.daily_reduce(xarray_dict, how="mean") + assert isinstance(out_xarray_dict, dict) + assert len(out_xarray_dict) == 1 + assert list(out_xarray_dict)[0] == "test_0_daily-mean" + assert isinstance(out_xarray_dict["test_0_daily-mean"], xr.Dataset) + assert isinstance(out_xarray_dict["test_0_daily-mean"].attrs["history"], str) -def test_monthly_reduce(): + +def test_monthly_reduce(tmp_path, monkeypatch): + monkeypatch.chdir(tmp_path) remote_file = requests.get(TEST_FILE_1) _, ext = os.path.splitext(TEST_FILE_1) - with tempfile.TemporaryDirectory() as tmpdirname: - os.chdir(tmpdirname) - tmp_file = f"test{ext}" - with open(tmp_file, "wb") as f: - f.write(remote_file.content) - - xarray_dict = convertors.open_grib_file_as_xarray_dictionary(tmp_file) - - out_xarray_dict = post_processors.monthly_reduce(xarray_dict, how="mean") - assert isinstance(out_xarray_dict, dict) - assert len(out_xarray_dict) == 1 - assert list(out_xarray_dict)[0] == "test_0_monthly-mean" - assert isinstance(out_xarray_dict["test_0_monthly-mean"], xr.Dataset) - assert isinstance(out_xarray_dict["test_0_monthly-mean"].attrs["history"], str) + + tmp_file = f"test{ext}" + with open(tmp_file, "wb") as f: + f.write(remote_file.content) + + xarray_dict = convertors.open_grib_file_as_xarray_dictionary(tmp_file) + + out_xarray_dict = post_processors.monthly_reduce(xarray_dict, how="mean") + assert isinstance(out_xarray_dict, dict) + assert len(out_xarray_dict) == 1 + assert list(out_xarray_dict)[0] == "test_0_monthly-mean" + assert isinstance(out_xarray_dict["test_0_monthly-mean"], xr.Dataset) + assert isinstance(out_xarray_dict["test_0_monthly-mean"].attrs["history"], str) def test_update_history():