Skip to content

Commit

Permalink
fix: use correct arguments when reading single tag and argument name …
Browse files Browse the repository at this point in the history
…cleanup
  • Loading branch information
mortendaehli committed Jul 6, 2023
1 parent bf7e1e4 commit 34b956e
Show file tree
Hide file tree
Showing 20 changed files with 710 additions and 727 deletions.
2 changes: 1 addition & 1 deletion .pre-commit-config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ repos:
]

- repo: https://github.com/pre-commit/mirrors-mypy
rev: v1.3.0
rev: v1.4.1
hooks:
- id: mypy
args: [--strict, --ignore-missing-imports]
Expand Down
4 changes: 2 additions & 2 deletions docs/manual.md
Original file line number Diff line number Diff line change
Expand Up @@ -28,8 +28,8 @@ Tagreader is intended to be easy to use, and present the same interface to the u
- [Caching results](#caching-results)
- [Time zones](#time-zones)
- [Fetching metadata](#fetching-metadata)
- [get_units()](#get_units)
- [get_description()](#get_description)
- [get_units()](#getunits)
- [get_description()](#getdescription)
- [Performing raw queries](#performing-raw-queries)

# Requirements
Expand Down
158 changes: 78 additions & 80 deletions tagreader/cache.py
Original file line number Diff line number Diff line change
Expand Up @@ -104,44 +104,44 @@ def delete_metadata(self, key: str) -> None:


class BucketCache(BaseCache):
@staticmethod
def _key_path(
self,
tagname: str,
readtype: ReaderType,
read_type: ReaderType,
ts: Union[int, timedelta],
stepped: bool,
status: bool,
start_time: Optional[datetime] = None,
end_time: Optional[datetime] = None,
get_status: bool,
start: Optional[datetime] = None,
end: Optional[datetime] = None,
) -> str:
"""Return a string on the form
$tagname$readtype[$sample_time][$stepped][$status]$_start_time_end_time
$tagname$read_type[$sample_time][$stepped][$get_status]$_start_end
tagname: safe tagname
sample_time: integer value. Empty for RAW.
stepped: "stepped" if value was read as stepped. Empty if not.
status: "status" if value contains status. Empty if not.
start_time: The start_time of the query that created the bucket.
end_time: The end_time of the query that created the bucket.
get_status: "status" if value contains status. Empty if not.
start: The start of the query that created the bucket.
end: The end of the query that created the bucket.
"""
tagname = safe_tagname(tagname)

ts = (
int(ts.total_seconds())
if readtype != ReaderType.RAW and isinstance(ts, timedelta)
if read_type != ReaderType.RAW and isinstance(ts, timedelta)
else ts
)
timespan = ""
if start_time is not None:
start_time_epoch = timestamp_to_epoch(start_time)
end_time_epoch = timestamp_to_epoch(end_time) if end_time else end_time
timespan = f"$_{start_time_epoch}_{end_time_epoch}"
if start is not None:
start_epoch = timestamp_to_epoch(start)
end_epoch = timestamp_to_epoch(end) if end else end
timespan = f"$_{start_epoch}_{end_epoch}"

keyval = (
f"${tagname}"
f"${readtype.name}"
f"{(ts is not None and readtype != ReaderType.RAW) * f'$s{ts}'}"
f"${read_type.name}"
f"{(ts is not None and read_type != ReaderType.RAW) * f'$s{ts}'}"
f"{stepped * '$stepped'}"
f"{status * '$status'}"
f"{get_status * '$get_status'}"
f"{timespan}"
)
return keyval
Expand All @@ -150,42 +150,42 @@ def store(
self,
df: pd.DataFrame,
tagname: str,
readtype: ReaderType,
read_type: ReaderType,
ts: timedelta,
stepped: bool,
status: bool,
start_time: datetime,
end_time: datetime,
get_status: bool,
start: datetime,
end: datetime,
) -> None:
if df.empty:
return

intersecting = self.get_intersecting_datasets(
tagname=tagname,
readtype=readtype,
read_type=read_type,
ts=ts,
stepped=stepped,
status=status,
start_time=start_time,
end_time=end_time,
get_status=get_status,
start=start,
end=end,
)
if len(intersecting) > 0:
for dataset in intersecting:
this_start, this_end = self._get_intervals_from_dataset_name(dataset)
start_time = min(start_time, this_start if this_start else start_time)
end_time = max(end_time, this_end if this_end else end_time)
start = min(start, this_start if this_start else start)
end = max(end, this_end if this_end else end)
df2 = self.get(dataset)
if df2 is not None:
df = pd.concat([df, df2], axis=0)
self.delete(dataset)
key = self._key_path(
tagname=tagname,
readtype=readtype,
read_type=read_type,
ts=ts,
stepped=stepped,
status=status,
start_time=start_time,
end_time=end_time,
get_status=get_status,
start=start,
end=end,
)
self.put(key=key, value=clean_dataframe(df))

Expand All @@ -195,63 +195,61 @@ def _get_intervals_from_dataset_name(
) -> Tuple[datetime, datetime]:
name_with_times = name.split("$")[-1]
if not name_with_times.count("_") == 2:
return (None, None) # type: ignore[return-value]
_, start_time_epoch, end_time_epoch = name_with_times.split("_")
start_time = pd.to_datetime(int(start_time_epoch), unit="s").tz_localize("UTC")
end_time = pd.to_datetime(int(end_time_epoch), unit="s").tz_localize("UTC")
return start_time, end_time
return None, None # type: ignore[return-value]
_, start_epoch, end_epoch = name_with_times.split("_")
start = pd.to_datetime(int(start_epoch), unit="s").tz_localize("UTC")
end = pd.to_datetime(int(end_epoch), unit="s").tz_localize("UTC")
return start, end

def get_intersecting_datasets(
self,
tagname: str,
readtype: ReaderType,
read_type: ReaderType,
ts: Union[int, timedelta],
stepped: bool,
status: bool,
start_time: datetime,
end_time: datetime,
get_status: bool,
start: datetime,
end: datetime,
) -> List[str]:
if not len(self) > 0:
return []
intersecting_datasets = []
for dataset in self.iterkeys():
target_key = self._key_path(
tagname=tagname,
readtype=readtype,
start_time=None,
end_time=None,
read_type=read_type,
start=None,
end=None,
ts=ts,
stepped=stepped,
status=status,
get_status=get_status,
)
if target_key in dataset:
start_time_ds, end_time_ds = self._get_intervals_from_dataset_name(
dataset
)
if end_time_ds >= start_time and end_time >= start_time_ds:
start_ds, end_ds = self._get_intervals_from_dataset_name(dataset)
if end_ds >= start and end >= start_ds:
intersecting_datasets.append(dataset)
return intersecting_datasets

def get_missing_intervals(
self,
tagname: str,
readtype: ReaderType,
read_type: ReaderType,
ts: Union[int, timedelta],
stepped: bool,
status: bool,
start_time: datetime,
end_time: datetime,
get_status: bool,
start: datetime,
end: datetime,
) -> List[Tuple[datetime, datetime]]:
datasets = self.get_intersecting_datasets(
tagname=tagname,
readtype=readtype,
read_type=read_type,
ts=ts,
stepped=stepped,
status=status,
start_time=start_time,
end_time=end_time,
get_status=get_status,
start=start,
end=end,
)
missing_intervals = [(start_time, end_time)]
missing_intervals = [(start, end)]
for dataset in datasets:
b = self._get_intervals_from_dataset_name(dataset)
for _ in range(0, len(missing_intervals)):
Expand All @@ -277,12 +275,12 @@ def get_missing_intervals(
def fetch(
self,
tagname: str,
readtype: ReaderType,
read_type: ReaderType,
ts: Union[int, timedelta],
stepped: bool,
status: bool,
start_time: datetime,
end_time: datetime,
get_status: bool,
start: datetime,
end: datetime,
) -> pd.DataFrame:
df = pd.DataFrame()
if not len(self) > 0:
Expand All @@ -293,18 +291,18 @@ def fetch(

datasets = self.get_intersecting_datasets(
tagname=tagname,
readtype=readtype,
read_type=read_type,
ts=ts,
stepped=stepped,
status=status,
start_time=start_time,
end_time=end_time,
get_status=get_status,
start=start,
end=end,
)

for dataset in datasets:
df2 = self.get(dataset)
if df2 is not None:
df = pd.concat([df, df2.loc[start_time:end_time]], axis=0) # type: ignore[call-overload, misc]
df = pd.concat([df, df2.loc[start:end]], axis=0) # type: ignore[call-overload, misc]

return clean_dataframe(df)

Expand All @@ -313,7 +311,7 @@ class SmartCache(BaseCache):
@staticmethod
def key_path(
df: Union[str, pd.DataFrame],
readtype: ReaderType,
read_type: ReaderType,
ts: Optional[Union[int, timedelta]] = None,
) -> str:
"""Return a string on the form
Expand All @@ -323,7 +321,7 @@ def key_path(
name = str(list(df)[0]) if isinstance(df, pd.DataFrame) else df
name = safe_tagname(name)
ts = int(ts.total_seconds()) if isinstance(ts, timedelta) else ts
if readtype != ReaderType.RAW:
if read_type != ReaderType.RAW:
if ts is None:
# Determine sample time by reading interval between first two
# samples of dataframe.
Expand All @@ -333,17 +331,17 @@ def key_path(
raise TypeError
else:
interval = int(ts)
return f"{readtype.name}$s{interval}${name}"
return f"{read_type.name}$s{interval}${name}"
else:
return f"{readtype.name}${name}"
return f"{read_type.name}${name}"

def store(
self,
df: pd.DataFrame,
readtype: ReaderType,
read_type: ReaderType,
ts: Optional[Union[int, timedelta]] = None,
) -> None:
key = self.key_path(df=df, readtype=readtype, ts=ts)
key = self.key_path(df=df, read_type=read_type, ts=ts)
if df.empty:
return # Weirdness ensues when using empty df in select statement below
if key in self:
Expand All @@ -361,17 +359,17 @@ def store(
def fetch(
self,
tagname: str,
readtype: ReaderType,
read_type: ReaderType,
ts: Optional[Union[int, timedelta]] = None,
start_time: Optional[datetime] = None,
stop_time: Optional[datetime] = None,
start: Optional[datetime] = None,
end: Optional[datetime] = None,
) -> pd.DataFrame:
key = self.key_path(df=tagname, readtype=readtype, ts=ts)
key = self.key_path(df=tagname, read_type=read_type, ts=ts)
df = cast(Optional[pd.DataFrame], self.get(key=key))
if df is None:
return pd.DataFrame()
if start_time is not None:
df = df.loc[df.index >= start_time]
if stop_time is not None:
df = df.loc[df.index <= stop_time]
if start is not None:
df = df.loc[df.index >= start]
if end is not None:
df = df.loc[df.index <= end]
return df
Loading

0 comments on commit 34b956e

Please sign in to comment.