From c2c4a9209198a56f31273f62efa65d45b30a60ae Mon Sep 17 00:00:00 2001 From: Callum Rollo Date: Thu, 6 Oct 2022 14:33:30 +0200 Subject: [PATCH 01/15] use polars for seaexplorer data file load --- environment.yml | 1 + pyglider/seaexplorer.py | 148 ++++++++++++++++++++-------------------- tests/environment.yml | 1 + 3 files changed, 75 insertions(+), 75 deletions(-) diff --git a/environment.yml b/environment.yml index f1d086e..7e13558 100644 --- a/environment.yml +++ b/environment.yml @@ -13,5 +13,6 @@ dependencies: - scipy - bitstring - pooch + - polars - pip: - dbdreader diff --git a/pyglider/seaexplorer.py b/pyglider/seaexplorer.py index 59afcc0..ccfdca1 100644 --- a/pyglider/seaexplorer.py +++ b/pyglider/seaexplorer.py @@ -9,8 +9,8 @@ import xarray as xr import yaml import pyglider.utils as utils -import pandas as pd - +import datetime +import polars as pl _log = logging.getLogger(__name__) @@ -41,7 +41,7 @@ def _sort(ds): def raw_to_rawnc(indir, outdir, deploymentyaml, incremental=True, min_samples_in_file=5, dropna_subset=None, dropna_thresh=1): """ - Convert seaexplorer text files to raw netcdf files. + Convert seaexplorer text files to raw parquet files. Parameters ---------- @@ -125,37 +125,40 @@ def raw_to_rawnc(indir, outdir, deploymentyaml, incremental=True, _log.info(f'{f} to {fnout}') if not incremental or _needsupdating(ftype, f, fnout): _log.info(f'Doing: {f} to {fnout}') - out = pd.read_csv(f, header=0, delimiter=';', - parse_dates=True, index_col=0, - dayfirst=True) + out = pl.read_csv(f, sep=';') + if "Timestamp" in out.columns: + out = out.with_column( + pl.col("Timestamp").str.strptime(pl.Datetime, fmt="%d/%m/%Y %H:%M:%S")) + out = out.rename({"Timestamp": "time"}) + else: + out = out.with_column( + pl.col("PLD_REALTIMECLOCK").str.strptime(pl.Datetime, fmt="%d/%m/%Y %H:%M:%S.%f")) + out = out.rename({"PLD_REALTIMECLOCK": "time"}) # If AD2CP data present, convert timestamps to datetime if 'AD2CP_TIME' in out.columns: # Set datestamps with date 00000 to None - out.loc[out.AD2CP_TIME.str[:6] == '000000', - 'AD2CP_TIME'] = None - out['AD2CP_TIME'] = pd.to_datetime(out.AD2CP_TIME) + out = out.with_column( + pl.col('AD2CP_TIME').str.strptime(pl.Datetime, fmt="%m%d%y %H:%M:%S", strict=False)) # subsetting for heavily oversampled raw data: - if rawsub=='raw' and dropna_subset is not None: - out = out.dropna(subset=dropna_subset, thresh=dropna_thresh) - - with out.to_xarray() as outx: - key = list(outx.coords.keys())[0] - outx = outx.rename({key: 'time'}) - outx['fnum'] = ('time', - int(filenum)*np.ones(len(outx['time']))) - if ftype == 'gli': - outx.to_netcdf(fnout[:-3] + '.nc', 'w') + if rawsub == 'raw' and dropna_subset is not None: + out = out.with_column(out.select(pl.col(dropna_subset).is_null().cast(pl.Int64)) + .sum(axis=1).alias("null_count")).filter( + pl.col("null_count") <= dropna_thresh) \ + .drop("null_count") + + if ftype == 'gli': + out = out.with_columns([(pl.col("NavState") * 0 + int(filenum)).alias("fnum")]) + out.write_parquet(fnout[:-3] + '.parquet') + goodfiles.append(f) + else: + if out.select("time").shape[0] > min_samples_in_file: + out.write_parquet(fnout[:-3] + '.parquet') goodfiles.append(f) else: - if outx.indexes["time"].size > min_samples_in_file: - outx.to_netcdf(f'{fnout[:-3]}.nc', 'w', - unlimited_dims=['time']) - goodfiles.append(f) - else: - _log.warning('Number of sensor data points' - 'too small. Skipping nc write') - badfiles.append(f) + _log.warning('Number of sensor data points' + 'too small. Skipping parquet write') + badfiles.append(f) if len(badfiles) > 0: _log.warning('Some files could not be parsed:') for fn in badfiles: @@ -163,11 +166,11 @@ def raw_to_rawnc(indir, outdir, deploymentyaml, incremental=True, if not goodfiles: _log.warning(f'No valid unprocessed seaexplorer files found in'f'{indir}') return False - _log.info('All raw files converted to nc') + _log.info('All raw files converted to parquet') return True -def drop_rogue_1970(ds): +def drop_rogue_1970(df): """ If dates greater than 1971, 1, 1 are observed, drop any dates before 1971-01-01, from the datset and return it. This function removes 1970 @@ -180,11 +183,15 @@ def drop_rogue_1970(ds): Returns: ds: xarray.DataSet """ - dt_1971 = np.datetime64("1971-01-01") + dt_1971 = datetime.datetime(1971, 1, 1) # If all dates before or after 1971-01-01, return the dataset - if (ds.time > dt_1971).all() or (ds.time < dt_1971).all(): - return ds - return ds.where(ds.time > dt_1971, drop=True) + pre_1971 = df.filter(pl.col("time") < dt_1971) + if len(pre_1971) == len(df): + return pre_1971 + post_1971 = df.filter(pl.col("time") > dt_1971) + if len(post_1971) == len(df): + return post_1971 + return df.filter(pl.col("time") > dt_1971) def merge_rawnc(indir, outdir, deploymentyaml, incremental=False, kind='raw'): @@ -216,39 +223,28 @@ def merge_rawnc(indir, outdir, deploymentyaml, incremental=False, kind='raw'): deployment = yaml.safe_load(fin) metadata = deployment['metadata'] id = metadata['glider_name'] - outgli = outdir + '/' + id + '-rawgli.nc' - outpld = outdir + '/' + id + '-' + kind + 'pld.nc' + outgli = outdir + '/' + id + '-rawgli.parquet' + outpld = outdir + '/' + id + '-' + kind + 'pld.parquet' - _log.info('Opening *.gli.sub.*.nc multi-file dataset from %s', indir) - files = sorted(glob.glob(indir+'/*.gli.sub.*.nc')) + _log.info('Opening *.gli.sub.*.parquet multi-file dataset from %s', indir) + files = sorted(glob.glob(indir + '/*.gli.sub.*.parquet')) if not files: - _log.warning(f'No *gli*.nc files found in {indir}') + _log.warning(f'No *gli*.parquet files found in {indir}') return False - with xr.open_dataset(files[0]) as gli: - for fil in files[1:]: - try: - with xr.open_dataset(fil) as gli2: - gli = xr.concat([gli, gli2], dim='time') - except: - pass - gli = drop_rogue_1970(gli) - gli.to_netcdf(outgli) + gli = pl.read_parquet(indir + '/*.gli.sub.*.parquet') + gli = drop_rogue_1970(gli) + gli.write_parquet(outgli) _log.info(f'Done writing {outgli}') - _log.info('Opening *.pld.sub.*.nc multi-file dataset') - files = sorted(glob.glob(indir+'/*.pld1.'+kind+'.*.nc')) + _log.info('Opening *.pld.sub.*.parquet multi-file dataset') + files = sorted(glob.glob(indir + '/*.pld1.' + kind + '.*.parquet')) if not files: - _log.warning(f'No *{kind}*.nc files found in {indir}') + _log.warning(f'No *{kind}*.parquet files found in {indir}') return False - with xr.open_dataset(files[0]) as pld: - for fil in files[1:]: - try: - with xr.open_dataset(fil) as pld2: - pld = xr.concat([pld, pld2], dim='time') - except: - pass - pld = drop_rogue_1970(pld) - pld.to_netcdf(outpld) + pld = pl.read_parquet(indir + '/*.pld1.' + kind + '.*.parquet') + pld = drop_rogue_1970(pld) + pld.write_parquet(outpld) + _log.info(f'Done writing {outpld}') _log.info('Done merge_rawnc') return True @@ -286,9 +282,9 @@ def raw_to_timeseries(indir, outdir, deploymentyaml, kind='raw', device_data = deployment['glider_devices'] id = metadata['glider_name'] _log.info(f'Opening combined nav file {indir}/{id}-rawgli.nc') - gli = xr.open_dataset(f'{indir}/{id}-rawgli.nc') - _log.info(f'Opening combined payload file {indir}/{id}-{kind}pld.nc') - sensor = xr.open_dataset(f'{indir}/{id}-{kind}pld.nc') + gli = pl.read_parquet(f'{indir}/{id}-rawgli.parquet') + _log.info(f'Opening combined payload file {indir}/{id}-{kind}pld.parquet') + sensor = pl.read_parquet(f'{indir}/{id}-{kind}pld.parquet') # build a new data set based on info in `deploymentyaml.` # We will use ctd as the interpolant @@ -304,7 +300,8 @@ def raw_to_timeseries(indir, outdir, deploymentyaml, kind='raw', # It oversamples the nav data, but mildly undersamples the optics and # oxygen.... if 'timebase' in ncvar: - indctd = np.where(~np.isnan(sensor[ncvar['timebase']['source']]))[0] + vals = sensor.select([ncvar['timebase']['source']]).to_numpy()[:, 0] + indctd = np.where(~np.isnan(vals))[0] elif 'GPCTD_TEMPERATURE' in list(sensor.variables): _log.warning('No timebase specified. Using GPCTD_TEMPERATURE as time' 'base') @@ -317,35 +314,36 @@ def raw_to_timeseries(indir, outdir, deploymentyaml, kind='raw', _log.warning('No gpctd or legato data found. Using NAV_DEPTH as time' 'base') indctd = np.where(~np.isnan(sensor.NAV_DEPTH))[0] - ds['time'] = (('time'), sensor['time'].values[indctd], attr) + ds['time'] = (('time'), sensor.select('time').to_numpy()[indctd, 0], attr) thenames = list(ncvar.keys()) for i in ['time', 'timebase', 'keep_variables']: if i in thenames: thenames.remove(i) for name in thenames: _log.info('interpolating ' + name) - if not('method' in ncvar[name].keys()): + if not ('method' in ncvar[name].keys()): # variables that are in the data set or can be interpolated from it if 'conversion' in ncvar[name].keys(): convert = getattr(utils, ncvar[name]['conversion']) else: convert = utils._passthrough sensorname = ncvar[name]['source'] - if sensorname in list(sensor.variables): + if sensorname in list(sensor.columns): _log.debug('sensorname %s', sensorname) - val = convert(sensor[sensorname]) + val = convert(sensor.select(sensorname).to_numpy()[:, 0]) if 'coarsen' in ncvar[name]: # smooth oxygen data as originally perscribed coarsen_time = ncvar[name]['coarsen'] - sensor_sub = sensor.coarsen(time=coarsen_time, - boundary='trim').mean() - val2 = sensor_sub[sensorname] - val = _interp_gli_to_pld(sensor_sub, sensor, val2, indctd) + coarse_ints = np.arange(0, len(sensor)/coarsen_time, 1/coarsen_time).astype(int) + sensor_sub = sensor.with_columns(pl.lit(coarse_ints).alias("coarse_ints")) + sensor_sub_coarse = sensor_sub.groupby('coarse_ints').median().sort(by="time") + val2 = sensor_sub_coarse.select(sensorname).to_numpy()[:, 0] + val = _interp_gli_to_pld(sensor_sub_coarse, sensor, val2, indctd) val = val[indctd] ncvar['method'] = 'linear fill' else: - val = gli[sensorname] + val = gli.select(sensorname).to_numpy()[:, 0] val = convert(val) # Values from the glider netcdf must be interpolated to match # the sensor netcdf @@ -355,7 +353,7 @@ def raw_to_timeseries(indir, outdir, deploymentyaml, kind='raw', ncvar[name].pop('coordinates', None) attrs = ncvar[name] attrs = utils.fill_required_attrs(attrs) - ds[name] = (('time'), val.data, attrs) + ds[name] = (('time'), val, attrs) # fix lon and lat to be linearly interpolated between fixes good = np.where(np.abs(np.diff(ds.longitude)) + @@ -397,7 +395,7 @@ def raw_to_timeseries(indir, outdir, deploymentyaml, kind='raw', ds = ds.isel(time=good) len_after_drop = len(ds.time) proportion_kept = len_after_drop / len_before_drop - loss_str = f"{100 * (1-proportion_kept)} % samples removed by timestamp deduplication." + loss_str = f"{100 * (1 - proportion_kept)} % samples removed by timestamp deduplication." if proportion_kept < 0.5: raise ValueError(f"{loss_str} Check input data for duplicate timestamps") elif proportion_kept < 0.999: @@ -441,7 +439,7 @@ def raw_to_timeseries(indir, outdir, deploymentyaml, kind='raw', ds.ad2cp_time.attrs.pop('units') ds.to_netcdf(outname, 'w', encoding={'time': {'units': - 'seconds since 1970-01-01T00:00:00Z'}}) + 'seconds since 1970-01-01T00:00:00Z'}}) return outname diff --git a/tests/environment.yml b/tests/environment.yml index f227236..70c658a 100644 --- a/tests/environment.yml +++ b/tests/environment.yml @@ -15,6 +15,7 @@ dependencies: - pytest - pooch - matplotlib + - polars - pip: - dbdreader From 94a684073ea515fa691a5edb1f6cfa716d6bc1e1 Mon Sep 17 00:00:00 2001 From: Callum Rollo Date: Thu, 6 Oct 2022 15:06:42 +0200 Subject: [PATCH 02/15] correctly name parquet files --- pyglider/seaexplorer.py | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/pyglider/seaexplorer.py b/pyglider/seaexplorer.py index ccfdca1..41056a2 100644 --- a/pyglider/seaexplorer.py +++ b/pyglider/seaexplorer.py @@ -25,7 +25,7 @@ def _outputname(f, outdir): for ff in fns: fnout += ff.lower() + '.' filenum = int(fns[4]) - return outdir + fnout + 'nc', filenum + return outdir + fnout + 'parquet', filenum def _needsupdating(ftype, fin, fout): @@ -149,11 +149,11 @@ def raw_to_rawnc(indir, outdir, deploymentyaml, incremental=True, if ftype == 'gli': out = out.with_columns([(pl.col("NavState") * 0 + int(filenum)).alias("fnum")]) - out.write_parquet(fnout[:-3] + '.parquet') + out.write_parquet(fnout) goodfiles.append(f) else: if out.select("time").shape[0] > min_samples_in_file: - out.write_parquet(fnout[:-3] + '.parquet') + out.write_parquet(fnout) goodfiles.append(f) else: _log.warning('Number of sensor data points' From 20e30806d2e4af013204d761b3869b94bd9b9de6 Mon Sep 17 00:00:00 2001 From: Callum Rollo Date: Thu, 6 Oct 2022 15:43:04 +0200 Subject: [PATCH 03/15] test seaexplorer with parquet --- tests/test_seaexplorer.py | 15 +++++++-------- 1 file changed, 7 insertions(+), 8 deletions(-) diff --git a/tests/test_seaexplorer.py b/tests/test_seaexplorer.py index d24f4ce..1fe0cc3 100644 --- a/tests/test_seaexplorer.py +++ b/tests/test_seaexplorer.py @@ -1,7 +1,6 @@ -import xarray as xr +import polars as pl import pytest from pathlib import Path -import sys import os os.system('rm tests/data/realtime_rawnc/*') library_dir = Path(__file__).parent.parent.absolute() @@ -13,7 +12,7 @@ def test__outputname(): fnout, filenum = seaexplorer._outputname('tests/data/realtime_raw/sea035.12.pld1.sub.36', 'tests/data/realtime_rawnc/') - assert fnout == 'tests/data/realtime_rawnc/sea035.0012.pld1.sub.0036.nc' + assert fnout == 'tests/data/realtime_rawnc/sea035.0012.pld1.sub.0036.parquet' assert filenum == 36 @@ -44,7 +43,7 @@ def test_raw_to_rawnc(): def test__needsupdating(): ftype = 'pld1' fin = 'tests/data/realtime_raw/sea035.12.pld1.sub.36' - fout = 'tests/data/realtime_rawnc/sea035.0012.pld1.sub.0036.nc' + fout = 'tests/data/realtime_rawnc/sea035.0012.pld1.sub.0036.parquet' result_badpath = seaexplorer._needsupdating(ftype, fin, 'baz') result_goodpath = seaexplorer._needsupdating(ftype, fin, fout) assert result_badpath is True @@ -68,11 +67,11 @@ def test_merge_rawnc(): def test__interp_gli_to_pld(): # function should interpolate values from the glider dataset to sampling frequency of payload dataset - glider = xr.open_dataset('tests/data/realtime_rawnc/sea035.0012.gli.sub.0036.nc') - ds = xr.open_dataset('tests/data/realtime_rawnc/sea035.0012.pld1.sub.0036.nc') - val = glider.Pitch + glider = pl.read_parquet('tests/data/realtime_rawnc/sea035.0012.gli.sub.0036.parquet') + ds = pl.read_parquet('tests/data/realtime_rawnc/sea035.0012.pld1.sub.0036.parquet') + val = glider.select("Pitch").to_numpy()[:, 0] pitch_interp = seaexplorer._interp_gli_to_pld(glider, ds, val, None) - assert len(pitch_interp) == len(ds.time) + assert len(pitch_interp) == ds.shape[0] def test_raw_to_timeseries(): From 2c6d458380db63e06d92b687f55f19b6bc0e65e0 Mon Sep 17 00:00:00 2001 From: Callum Rollo Date: Thu, 6 Oct 2022 15:43:27 +0200 Subject: [PATCH 04/15] use filter to avoid future deprication --- pyglider/seaexplorer.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pyglider/seaexplorer.py b/pyglider/seaexplorer.py index 41056a2..70cd776 100644 --- a/pyglider/seaexplorer.py +++ b/pyglider/seaexplorer.py @@ -253,7 +253,7 @@ def merge_rawnc(indir, outdir, deploymentyaml, incremental=False, kind='raw'): def _interp_gli_to_pld(gli, ds, val, indctd): gli_ind = ~np.isnan(val) valout = np.interp(ds['time'], - gli['time'][gli_ind], + gli.filter(gli_ind)["time"], val[gli_ind]) return valout From c579a3c19270443c416266dee751062a42b5fb26 Mon Sep 17 00:00:00 2001 From: Callum Rollo Date: Thu, 6 Oct 2022 16:41:38 +0200 Subject: [PATCH 05/15] correct millisecond time parsing from pld --- pyglider/seaexplorer.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pyglider/seaexplorer.py b/pyglider/seaexplorer.py index 70cd776..7f26007 100644 --- a/pyglider/seaexplorer.py +++ b/pyglider/seaexplorer.py @@ -132,7 +132,7 @@ def raw_to_rawnc(indir, outdir, deploymentyaml, incremental=True, out = out.rename({"Timestamp": "time"}) else: out = out.with_column( - pl.col("PLD_REALTIMECLOCK").str.strptime(pl.Datetime, fmt="%d/%m/%Y %H:%M:%S.%f")) + pl.col("PLD_REALTIMECLOCK").str.strptime(pl.Datetime, fmt="%d/%m/%Y %H:%M:%S.%3f")) out = out.rename({"PLD_REALTIMECLOCK": "time"}) # If AD2CP data present, convert timestamps to datetime if 'AD2CP_TIME' in out.columns: From 942c5ea7988fb56156cb74ae99c0a83b8dfdbd84 Mon Sep 17 00:00:00 2001 From: Callum Rollo Date: Thu, 6 Oct 2022 16:52:27 +0200 Subject: [PATCH 06/15] correctly compare datetimes if us or ns when interpolating --- pyglider/seaexplorer.py | 12 +++++++++--- 1 file changed, 9 insertions(+), 3 deletions(-) diff --git a/pyglider/seaexplorer.py b/pyglider/seaexplorer.py index 7f26007..748c2d4 100644 --- a/pyglider/seaexplorer.py +++ b/pyglider/seaexplorer.py @@ -252,9 +252,15 @@ def merge_rawnc(indir, outdir, deploymentyaml, incremental=False, kind='raw'): def _interp_gli_to_pld(gli, ds, val, indctd): gli_ind = ~np.isnan(val) - valout = np.interp(ds['time'], - gli.filter(gli_ind)["time"], - val[gli_ind]) + # switch for if we are comparing two polars dataframes or a polars dataframe and a xarray dataset + if type(ds) is pl.internals.dataframe.frame.DataFrame: + valout = np.interp(ds["time"], + gli.filter(gli_ind)["time"], + val[gli_ind]) + else: + valout = np.interp(ds['time'].astype(int), + np.array(gli.filter(gli_ind)["time"].to_numpy().astype('datetime64[ns]')).astype(int), + val[gli_ind]) return valout From 863068062cdbe1a14fefa7469a2aa80f2b7e7b95 Mon Sep 17 00:00:00 2001 From: Callum Rollo Date: Fri, 7 Oct 2022 10:02:23 +0200 Subject: [PATCH 07/15] coarsen by mean as originally implemented in pandas --- pyglider/seaexplorer.py | 13 ++++++++++--- 1 file changed, 10 insertions(+), 3 deletions(-) diff --git a/pyglider/seaexplorer.py b/pyglider/seaexplorer.py index 748c2d4..fafa869 100644 --- a/pyglider/seaexplorer.py +++ b/pyglider/seaexplorer.py @@ -342,9 +342,16 @@ def raw_to_timeseries(indir, outdir, deploymentyaml, kind='raw', coarsen_time = ncvar[name]['coarsen'] coarse_ints = np.arange(0, len(sensor)/coarsen_time, 1/coarsen_time).astype(int) sensor_sub = sensor.with_columns(pl.lit(coarse_ints).alias("coarse_ints")) - sensor_sub_coarse = sensor_sub.groupby('coarse_ints').median().sort(by="time") - val2 = sensor_sub_coarse.select(sensorname).to_numpy()[:, 0] - val = _interp_gli_to_pld(sensor_sub_coarse, sensor, val2, indctd) + sensor_sub_grouped = sensor_sub.with_column( + pl.col('time').to_physical() + ).groupby( + by=pl.col('coarse_ints'), + maintain_order=True + ).mean().with_column( + pl.col('time').cast(pl.Datetime('ms')) + )[:-1, :] + val2 = sensor_sub_grouped.select(sensorname).to_numpy()[:, 0] + val = _interp_gli_to_pld(sensor_sub_grouped, sensor, val2, indctd) val = val[indctd] ncvar['method'] = 'linear fill' From eaf2450ede8ec8beb8805933525e524d20082309 Mon Sep 17 00:00:00 2001 From: Callum Rollo Date: Fri, 7 Oct 2022 10:03:06 +0200 Subject: [PATCH 08/15] slacken test rtol from 1E7 to 1E5 for small averaging discrepancy --- tests/test_pyglider.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tests/test_pyglider.py b/tests/test_pyglider.py index 6203281..269c981 100644 --- a/tests/test_pyglider.py +++ b/tests/test_pyglider.py @@ -39,7 +39,7 @@ def test_example_seaexplorer(var): # Test that each variable and its coordinates match assert output[var].attrs == test_data[var].attrs if var not in ['time']: - np.testing.assert_allclose(output[var].values, test_data[var].values) + np.testing.assert_allclose(output[var].values, test_data[var].values, rtol=1e-5) else: dt0 = output[var].values - np.datetime64('2000-01-01') dt1 = test_data[var].values - np.datetime64('2000-01-01') From 5485043b56a7724052e53bf1057d88d416370412 Mon Sep 17 00:00:00 2001 From: Callum Rollo Date: Mon, 10 Oct 2022 09:30:03 +0200 Subject: [PATCH 09/15] add notes on parquet files to documentation --- docs/getting-started-seaexplorer.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/docs/getting-started-seaexplorer.md b/docs/getting-started-seaexplorer.md index 33f239c..87ca7ae 100644 --- a/docs/getting-started-seaexplorer.md +++ b/docs/getting-started-seaexplorer.md @@ -30,7 +30,7 @@ There are four top-levels to the `deployment.yaml` The example script is relatively straight forward if there is no intermediate processing. See {ref}`ExProc`, below. -Data comes from an input directory, and is translated to raw glider-dependent netCDF files and put in a new directory. These files are useful of their own right, but are not CF compliant. These files are then merged into a single monolithic netCDF file, and this is translated to a CF-compliant timeseries file. Finally individual profiles are saved and a 2-D 1-m grid in time-depth is saved. +Data comes from an input directory, and is translated to raw glider-dependent parquet files files and put in a new directory. These files are useful of their own right. Apache Parquet is a columnar oriented format for storing tabular data. Parquet files take up less space than netCDF or csv and are much faster to read and write. These files can be opened with [polars.read_parquet](https://pola-rs.github.io/polars-book/user-guide/howcani/io/parquet.html) or [pandas.read_parquet](https://pandas.pydata.org/pandas-docs/stable/reference/api/pandas.read_parquet.html). These files are then merged into a single monolithic parquet file, and this is translated to a CF-compliant timeseries netCDF file. Finally individual profiles are saved and a 2-D 1-m grid in time-depth is saved. It is likely that between these steps the user will want to add any screening steps, or adjustments to the calibrations. PyGlider does not provide those steps. From ef8cc90e9d6f7247e3ec6501079f7753e5222bef Mon Sep 17 00:00:00 2001 From: Callum Rollo Date: Tue, 11 Oct 2022 13:43:53 +0200 Subject: [PATCH 10/15] catch for corrupted files and cast non time cols to float64 --- pyglider/seaexplorer.py | 9 ++++++++- 1 file changed, 8 insertions(+), 1 deletion(-) diff --git a/pyglider/seaexplorer.py b/pyglider/seaexplorer.py index fafa869..d979e0a 100644 --- a/pyglider/seaexplorer.py +++ b/pyglider/seaexplorer.py @@ -125,7 +125,11 @@ def raw_to_rawnc(indir, outdir, deploymentyaml, incremental=True, _log.info(f'{f} to {fnout}') if not incremental or _needsupdating(ftype, f, fnout): _log.info(f'Doing: {f} to {fnout}') - out = pl.read_csv(f, sep=';') + try: + out = pl.read_csv(f, sep=';') + except: + _log.warning(f'Could not read {f}') + continue if "Timestamp" in out.columns: out = out.with_column( pl.col("Timestamp").str.strptime(pl.Datetime, fmt="%d/%m/%Y %H:%M:%S")) @@ -134,6 +138,9 @@ def raw_to_rawnc(indir, outdir, deploymentyaml, incremental=True, out = out.with_column( pl.col("PLD_REALTIMECLOCK").str.strptime(pl.Datetime, fmt="%d/%m/%Y %H:%M:%S.%3f")) out = out.rename({"PLD_REALTIMECLOCK": "time"}) + for col_name in out.columns: + if "time" not in col_name.lower(): + out = out.with_column(pl.col(col_name).cast(pl.Float64)) # If AD2CP data present, convert timestamps to datetime if 'AD2CP_TIME' in out.columns: # Set datestamps with date 00000 to None From ce89d1b9a1307d06b18bf3b4f3d0bdab8328f591 Mon Sep 17 00:00:00 2001 From: Callum Rollo Date: Tue, 1 Nov 2022 14:46:39 +0100 Subject: [PATCH 11/15] if fileread fails, append to badfiles --- pyglider/seaexplorer.py | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/pyglider/seaexplorer.py b/pyglider/seaexplorer.py index d979e0a..f703b95 100644 --- a/pyglider/seaexplorer.py +++ b/pyglider/seaexplorer.py @@ -125,11 +125,15 @@ def raw_to_rawnc(indir, outdir, deploymentyaml, incremental=True, _log.info(f'{f} to {fnout}') if not incremental or _needsupdating(ftype, f, fnout): _log.info(f'Doing: {f} to {fnout}') + # Try to read the file with polars. If the file is corrupted (rare), file read will fail and file + # is appended to badfiles try: out = pl.read_csv(f, sep=';') except: _log.warning(f'Could not read {f}') + badfiles.append(f) continue + # Parse the datetime from nav files (called Timestamp) and pld1 files (called PLD_REALTIMECLOCK) if "Timestamp" in out.columns: out = out.with_column( pl.col("Timestamp").str.strptime(pl.Datetime, fmt="%d/%m/%Y %H:%M:%S")) From a8eb8894c681a6dfb2c67f24bbe06ed6ee9238c5 Mon Sep 17 00:00:00 2001 From: Callum Rollo Date: Tue, 1 Nov 2022 14:55:59 +0100 Subject: [PATCH 12/15] added comment on dropna and renamed drop_rogue_1970 --- pyglider/seaexplorer.py | 13 +++++++------ 1 file changed, 7 insertions(+), 6 deletions(-) diff --git a/pyglider/seaexplorer.py b/pyglider/seaexplorer.py index f703b95..3b2b0b4 100644 --- a/pyglider/seaexplorer.py +++ b/pyglider/seaexplorer.py @@ -153,6 +153,7 @@ def raw_to_rawnc(indir, outdir, deploymentyaml, incremental=True, # subsetting for heavily oversampled raw data: if rawsub == 'raw' and dropna_subset is not None: + # This check is the polars equivalent of pandas dropna. See docstring note on dropna out = out.with_column(out.select(pl.col(dropna_subset).is_null().cast(pl.Int64)) .sum(axis=1).alias("null_count")).filter( pl.col("null_count") <= dropna_thresh) \ @@ -181,7 +182,7 @@ def raw_to_rawnc(indir, outdir, deploymentyaml, incremental=True, return True -def drop_rogue_1970(df): +def drop_pre_1971_samples(df): """ If dates greater than 1971, 1, 1 are observed, drop any dates before 1971-01-01, from the datset and return it. This function removes 1970 @@ -189,10 +190,10 @@ def drop_rogue_1970(df): are < 1971-01-01, no action is taken Parameters: - ds: xarray.DataSet - dataset to check for pre-1971 dates + df: polars.DataFrame + dataframe to check for pre-1971 dates Returns: - ds: xarray.DataSet + df: polars.DataFrame """ dt_1971 = datetime.datetime(1971, 1, 1) # If all dates before or after 1971-01-01, return the dataset @@ -243,7 +244,7 @@ def merge_rawnc(indir, outdir, deploymentyaml, incremental=False, kind='raw'): _log.warning(f'No *gli*.parquet files found in {indir}') return False gli = pl.read_parquet(indir + '/*.gli.sub.*.parquet') - gli = drop_rogue_1970(gli) + gli = drop_pre_1971_samples(gli) gli.write_parquet(outgli) _log.info(f'Done writing {outgli}') @@ -253,7 +254,7 @@ def merge_rawnc(indir, outdir, deploymentyaml, incremental=False, kind='raw'): _log.warning(f'No *{kind}*.parquet files found in {indir}') return False pld = pl.read_parquet(indir + '/*.pld1.' + kind + '.*.parquet') - pld = drop_rogue_1970(pld) + pld = drop_pre_1971_samples(pld) pld.write_parquet(outpld) _log.info(f'Done writing {outpld}') From 94371a2dc6c077c64030c38060dbdda4a25a5031 Mon Sep 17 00:00:00 2001 From: Callum Rollo Date: Wed, 2 Nov 2022 10:43:39 +0100 Subject: [PATCH 13/15] relax test rtol to stop tests failing --- tests/test_pyglider.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/tests/test_pyglider.py b/tests/test_pyglider.py index 269c981..cd56fb4 100644 --- a/tests/test_pyglider.py +++ b/tests/test_pyglider.py @@ -98,7 +98,7 @@ def test_example_slocum(var): assert output_slocum[var].attrs == test_data_slocum[var].attrs if var not in ['time']: np.testing.assert_allclose(output_slocum[var].values, - test_data_slocum[var].values) + test_data_slocum[var].values, rtol=1e-6) else: dt0 = output_slocum[var].values - np.datetime64('2000-01-01') dt1 = test_data_slocum[var].values - np.datetime64('2000-01-01') @@ -163,7 +163,7 @@ def test_example_slocum_littleendian(var): assert output_slocum_le[var].attrs == test_data_slocum_le[var].attrs if var not in ['time']: np.testing.assert_allclose(output_slocum_le[var].values, - test_data_slocum_le[var].values) + test_data_slocum_le[var].values, rtol=1e-6) else: dt0 = output_slocum_le[var].values - np.datetime64('2000-01-01') dt1 = test_data_slocum_le[var].values - np.datetime64('2000-01-01') From 4bd4571bb8db120556180e359b1d9e5d9acaac43 Mon Sep 17 00:00:00 2001 From: Callum Rollo Date: Thu, 3 Nov 2022 10:10:19 +0100 Subject: [PATCH 14/15] refactor to merge_parquet --- docs/index.md | 2 +- pyglider/seaexplorer.py | 8 +++++--- .../process_deploymentRealTime.py | 2 +- .../example-seaexplorer/process_deploymentRealTime.py | 2 +- tests/test_pyglider.py | 2 +- tests/test_seaexplorer.py | 4 ++-- 6 files changed, 11 insertions(+), 9 deletions(-) diff --git a/docs/index.md b/docs/index.md index bad8fb1..e18d95f 100644 --- a/docs/index.md +++ b/docs/index.md @@ -10,7 +10,7 @@ by optional steps to create profiles or depth-time grids. ```python seaexplorer.raw_to_rawnc(rawdir, rawncdir, deploymentyaml) -seaexplorer.merge_rawnc(rawncdir, rawncdir, deploymentyaml, kind='sub') +seaexplorer.merge_parquet(rawncdir, rawncdir, deploymentyaml, kind='sub') outname = seaexplorer.raw_to_timeseries(rawncdir, tsdir, deploymentyaml, kind='sub') # optional... ncprocess.extract_timeseries_profiles(outname, profiledir, deploymentyaml) diff --git a/pyglider/seaexplorer.py b/pyglider/seaexplorer.py index 3b2b0b4..86ae3e3 100644 --- a/pyglider/seaexplorer.py +++ b/pyglider/seaexplorer.py @@ -206,7 +206,7 @@ def drop_pre_1971_samples(df): return df.filter(pl.col("time") > dt_1971) -def merge_rawnc(indir, outdir, deploymentyaml, incremental=False, kind='raw'): +def merge_parquet(indir, outdir, deploymentyaml, incremental=False, kind='raw'): """ Merge all the raw netcdf files in indir. These are meant to be the raw flight and science files from the slocum. @@ -352,7 +352,7 @@ def raw_to_timeseries(indir, outdir, deploymentyaml, kind='raw', if 'coarsen' in ncvar[name]: # smooth oxygen data as originally perscribed coarsen_time = ncvar[name]['coarsen'] - coarse_ints = np.arange(0, len(sensor)/coarsen_time, 1/coarsen_time).astype(int) + coarse_ints = np.arange(0, len(sensor) / coarsen_time, 1 / coarsen_time).astype(int) sensor_sub = sensor.with_columns(pl.lit(coarse_ints).alias("coarse_ints")) sensor_sub_grouped = sensor_sub.with_column( pl.col('time').to_physical() @@ -519,4 +519,6 @@ def _parse_sensor_config(filename): return active_device_dicts -__all__ = ['raw_to_rawnc', 'merge_rawnc', 'raw_to_timeseries'] +merge_rawnc = merge_parquet + +__all__ = ['raw_to_rawnc', 'merge_parquet', 'raw_to_timeseries'] diff --git a/tests/example-data/example-seaexplorer-legato-flntu-arod-ad2cp/process_deploymentRealTime.py b/tests/example-data/example-seaexplorer-legato-flntu-arod-ad2cp/process_deploymentRealTime.py index 81e1ba0..c76e44c 100644 --- a/tests/example-data/example-seaexplorer-legato-flntu-arod-ad2cp/process_deploymentRealTime.py +++ b/tests/example-data/example-seaexplorer-legato-flntu-arod-ad2cp/process_deploymentRealTime.py @@ -22,7 +22,7 @@ # turn seaexplorer zipped csvs into nc files. seaexplorer.raw_to_rawnc(rawdir, rawncdir, deploymentyaml) # merge individual netcdf files into single netcdf files *.gli*.nc and *.pld1*.nc - seaexplorer.merge_rawnc(rawncdir, rawncdir, deploymentyaml, kind='sub') + seaexplorer.merge_parquet(rawncdir, rawncdir, deploymentyaml, kind='sub') # Make level-0 timeseries netcdf file from the raw files... outname = seaexplorer.raw_to_timeseries(rawncdir, l0tsdir, deploymentyaml, kind='sub') ncprocess.extract_timeseries_profiles(outname, profiledir, deploymentyaml) diff --git a/tests/example-data/example-seaexplorer/process_deploymentRealTime.py b/tests/example-data/example-seaexplorer/process_deploymentRealTime.py index 3207dcc..8eef439 100644 --- a/tests/example-data/example-seaexplorer/process_deploymentRealTime.py +++ b/tests/example-data/example-seaexplorer/process_deploymentRealTime.py @@ -26,7 +26,7 @@ # turn *.EBD and *.DBD into *.ebd.nc and *.dbd.nc netcdf files. seaexplorer.raw_to_rawnc(rawdir, rawncdir, deploymentyaml) # merge individual neetcdf files into single netcdf files *.ebd.nc and *.dbd.nc - seaexplorer.merge_rawnc(rawncdir, rawncdir, deploymentyaml, kind='sub') + seaexplorer.merge_parquet(rawncdir, rawncdir, deploymentyaml, kind='sub') # Make level-1 timeseries netcdf file from th raw files... outname = seaexplorer.raw_to_timeseries(rawncdir, l0tsdir, deploymentyaml, kind='sub') diff --git a/tests/test_pyglider.py b/tests/test_pyglider.py index cd56fb4..80ac25f 100644 --- a/tests/test_pyglider.py +++ b/tests/test_pyglider.py @@ -16,7 +16,7 @@ deploymentyaml = str(example_dir / 'example-seaexplorer/deploymentRealtime.yml') l0tsdir = str(example_dir / 'example-seaexplorer/L0-timeseries-test/') + '/' seaexplorer.raw_to_rawnc(rawdir, rawncdir, deploymentyaml) -seaexplorer.merge_rawnc(rawncdir, rawncdir, deploymentyaml, kind='sub') +seaexplorer.merge_parquet(rawncdir, rawncdir, deploymentyaml, kind='sub') outname = seaexplorer.raw_to_L0timeseries(rawncdir, l0tsdir, deploymentyaml, kind='sub') output = xr.open_dataset(outname) diff --git a/tests/test_seaexplorer.py b/tests/test_seaexplorer.py index 1fe0cc3..d0ed977 100644 --- a/tests/test_seaexplorer.py +++ b/tests/test_seaexplorer.py @@ -51,12 +51,12 @@ def test__needsupdating(): def test_merge_rawnc(): - result_default = seaexplorer.merge_rawnc( + result_default = seaexplorer.merge_parquet( 'tests/data/realtime_rawnc/', 'tests/data/realtime_rawnc/', example_dir / 'example-seaexplorer/deploymentRealtime.yml') - result_sub = seaexplorer.merge_rawnc( + result_sub = seaexplorer.merge_parquet( 'tests/data/realtime_rawnc/', 'tests/data/realtime_rawnc/', example_dir / 'example-seaexplorer/deploymentRealtime.yml', From 9a512896f3a08d0c7fbf6174c95335840531bc98 Mon Sep 17 00:00:00 2001 From: Callum Rollo Date: Thu, 3 Nov 2022 10:13:57 +0100 Subject: [PATCH 15/15] explain optional coarsening of variables --- pyglider/seaexplorer.py | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/pyglider/seaexplorer.py b/pyglider/seaexplorer.py index 86ae3e3..2c98c41 100644 --- a/pyglider/seaexplorer.py +++ b/pyglider/seaexplorer.py @@ -350,10 +350,12 @@ def raw_to_timeseries(indir, outdir, deploymentyaml, kind='raw', _log.debug('sensorname %s', sensorname) val = convert(sensor.select(sensorname).to_numpy()[:, 0]) if 'coarsen' in ncvar[name]: - # smooth oxygen data as originally perscribed + # coarsen oxygen data as originally perscribed coarsen_time = ncvar[name]['coarsen'] + # create a boolean mask of coarsened timesteps. Use this mask to create an array of samples to keep coarse_ints = np.arange(0, len(sensor) / coarsen_time, 1 / coarsen_time).astype(int) sensor_sub = sensor.with_columns(pl.lit(coarse_ints).alias("coarse_ints")) + # Subsample the variable data keeping only the samples from the coarsened timeseries sensor_sub_grouped = sensor_sub.with_column( pl.col('time').to_physical() ).groupby(