Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

New option to control Cache expiration time #262

Merged
merged 16 commits into from
May 2, 2023
11 changes: 8 additions & 3 deletions argopy/options.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,8 @@
FTP = "ftp"
ERDDAP = 'erddap'
DATASET = "dataset"
DATA_CACHE = "cachedir"
CACHE_FOLDER = "cachedir"
CACHE_EXPIRATION = "cache_expiration"
USER_LEVEL = "mode"
API_TIMEOUT = "api_timeout"
TRUST_ENV = "trust_env"
Expand All @@ -36,7 +37,8 @@
FTP: "https://data-argo.ifremer.fr",
ERDDAP: "https://erddap.ifremer.fr/erddap",
DATASET: "phy",
DATA_CACHE: os.path.expanduser(os.path.sep.join(["~", ".cache", "argopy"])),
CACHE_FOLDER: os.path.expanduser(os.path.sep.join(["~", ".cache", "argopy"])),
CACHE_EXPIRATION: 86400,
USER_LEVEL: "standard",
API_TIMEOUT: 60,
TRUST_ENV: False,
Expand Down Expand Up @@ -77,7 +79,8 @@ def validate_http(this_path):
FTP: validate_ftp,
ERDDAP: validate_http,
DATASET: _DATASET_LIST.__contains__,
DATA_CACHE: os.path.exists,
CACHE_FOLDER: os.path.exists,
CACHE_EXPIRATION: lambda x: isinstance(x, int) and x > 0,
USER_LEVEL: _USER_LEVEL_LIST.__contains__,
API_TIMEOUT: lambda x: isinstance(x, int) and x > 0,
TRUST_ENV: lambda x: isinstance(x, bool),
Expand All @@ -102,6 +105,8 @@ class set_options:
Default: None
- ``cachedir``: Absolute path to a local cache directory.
Default: ``~/.cache/argopy``
- ``cache_expiration``: Expiration delay of cache files in seconds.
Default: 86400
- ``mode``: User mode.
Default: ``standard``.
Possible values: ``standard`` or ``expert``.
Expand Down
45 changes: 40 additions & 5 deletions argopy/stores/argo_index_pa.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,8 @@
import gzip
from packaging import version

from ..errors import DataNotFound
from ..utilities import check_index_cols, is_indexbox, check_wmo, check_cyc, doc_inherit
from ..errors import DataNotFound, InvalidDatasetStructure
from ..utilities import check_index_cols, is_indexbox, check_wmo, check_cyc, doc_inherit, to_list
from .argo_index_proto import ArgoIndexStoreProto
try:
import pyarrow.csv as csv # noqa: F401
Expand Down Expand Up @@ -89,11 +89,10 @@ def read_csv(input_file, nrows=None):
return this_table

def csv2index(obj, origin):
index_file = origin.split(self.fs['src'].fs.sep)[-1]
index = read_csv(obj, nrows=nrows)
check_index_cols(
index.column_names,
convention=index_file.split(".")[0],
convention=self.convention,
)
log.debug("Argo index file loaded with pyarrow read_csv. src='%s'" % origin)
return index
Expand Down Expand Up @@ -244,6 +243,20 @@ def fct(row):
wmo = [int(w) for w in wmo]
return wmo

def read_params(self, index=False):
if self.convention not in ["argo_bio-profile_index", "argo_synthetic-profile_index"]:
raise InvalidDatasetStructure("Cannot list parameters in this index (not a BGC profile index)")
if hasattr(self, "search") and not index:
df = pa.compute.split_pattern(self.search["parameters"], pattern=" ").to_pandas()
else:
df = pa.compute.split_pattern(self.index["parameters"], pattern=" ").to_pandas()
plist = set(df[0])
def fct(row):
[plist.add(v) for v in row]
return len(row)
df.map(fct)
return sorted(list(plist))

def records_per_wmo(self, index=False):
""" Return the number of records per unique WMOs in search results

Expand Down Expand Up @@ -402,6 +415,25 @@ def search_lat_lon_tim(self, BOX, nrows=None):
self.run(nrows=nrows)
return self

def search_params(self, PARAMs, nrows=None):
if self.convention not in ["argo_bio-profile_index", "argo_synthetic-profile_index"]:
raise InvalidDatasetStructure("Cannot search for parameters in this index (not a BGC profile index)")
log.debug("Argo index searching for parameters in PARAM=%s ..." % PARAMs)
PARAMs = to_list(PARAMs) # Make sure we deal with a list
self.load()
self.search_type = {"PARAM": PARAMs}
filt = []
for param in PARAMs:
pattern = "%s" % param
filt.append(
pa.compute.match_substring_regex(
self.index["parameters"], pattern=pattern
)
)
self.search_filter = self._reduce_a_filter_list(filt, op='and')
self.run(nrows=nrows)
return self

def to_indexfile(self, file):
"""Save search results on file, following the Argo standard index formats

Expand All @@ -425,7 +457,10 @@ def convert_a_date(row):

s = self.search
s = s.set_column(1, "date", new_date)
s = s.set_column(7, "date_update", new_date_update)
if self.convention == "ar_index_global_prof":
s = s.set_column(7, "date_update", new_date_update)
elif self.convention in ["argo_bio-profile_index", "argo_synthetic-profile_index"]:
s = s.set_column(9, "date_update", new_date_update)

write_options = csv.WriteOptions(delimiter=",", include_header=False, quoting_style="none")
csv.write_csv(s, file, write_options=write_options)
Expand Down
48 changes: 42 additions & 6 deletions argopy/stores/argo_index_pd.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,8 @@
import logging
import gzip

from ..errors import DataNotFound
from ..utilities import check_index_cols, is_indexbox, check_wmo, check_cyc, doc_inherit
from ..errors import DataNotFound, InvalidDatasetStructure
from ..utilities import check_index_cols, is_indexbox, check_wmo, check_cyc, doc_inherit, to_list
from .argo_index_proto import ArgoIndexStoreProto


Expand Down Expand Up @@ -46,11 +46,10 @@ def read_csv(input_file, nrows=None):
return this_table

def csv2index(obj, origin):
index_file = origin.split(self.fs['src'].fs.sep)[-1]
index = read_csv(obj, nrows=nrows)
check_index_cols(
index.columns.to_list(),
convention=index_file.split(".")[0],
convention=self.convention,
)
log.debug("Argo index file loaded with pandas read_csv. src='%s'" % origin)
return index
Expand Down Expand Up @@ -191,6 +190,21 @@ def read_wmo(self, index=False):
wmo = np.unique(results)
return wmo

def read_params(self, index=False):
if self.convention not in ["argo_bio-profile_index", "argo_synthetic-profile_index"]:
raise InvalidDatasetStructure("Cannot list parameters in this index (not a BGC profile index)")
if hasattr(self, "search") and not index:
df = self.search['parameters']
else:
df = self.index['parameters']
plist = set(df[0].split(" "))
def fct(row):
row = row.split(" ")
[plist.add(v) for v in row]
return len(row)
df.map(fct)
return sorted(list(plist))

def records_per_wmo(self, index=False):
""" Return the number of records per unique WMOs in search results

Expand Down Expand Up @@ -325,6 +339,22 @@ def search_lat_lon_tim(self, BOX, nrows=None):
self.run(nrows=nrows)
return self

def search_params(self, PARAMs, nrows=None):
if self.convention not in ["argo_bio-profile_index", "argo_synthetic-profile_index"]:
raise InvalidDatasetStructure("Cannot search for parameters in this index (not a BGC profile index)")
log.debug("Argo index searching for parameters in PARAM=%s ..." % PARAMs)
PARAMs = to_list(PARAMs) # Make sure we deal with a list
self.load()
self.search_type = {"PARAM": PARAMs}
filt = []
for param in PARAMs:
filt.append(
self.index["parameters"].str.contains("%s" % param, regex=True, case=False)
)
self.search_filter = np.logical_and.reduce(filt)
self.run(nrows=nrows)
return self

def to_indexfile(self, outputfile):
"""Save search results on file, following the Argo standard index formats

Expand All @@ -338,10 +368,16 @@ def to_indexfile(self, outputfile):
str
"""

if self.convention == "ar_index_global_prof":
columns = ['file', 'date', 'latitude', 'longitude', 'ocean', 'profiler_type', 'institution',
'date_update']
elif self.convention in ["argo_bio-profile_index", "argo_synthetic-profile_index"]:
columns = ['file', 'date', 'latitude', 'longitude', 'ocean', 'profiler_type', 'institution',
'parameters', 'parameter_data_mode', 'date_update']

self.search.to_csv(outputfile, sep=',', index=False, index_label=False,
header=False,
columns=['file', 'date', 'latitude', 'longitude', 'ocean', 'profiler_type', 'institution',
'date_update'])
columns=columns)
outputfile = self._insert_header(outputfile)

return outputfile
Loading