Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

for yyyy_mm_dd metrics, query by fmt_time instead of local_dt #970

Merged
merged 5 commits into from
Jun 25, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 0 additions & 3 deletions emission/core/wrapper/localdate.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,9 +9,6 @@
import arrow
import emission.core.wrapper.wrapperbase as ecwb

# specify the order of time units, from largest to smallest
DATETIME_UNITS = ['year', 'month', 'day', 'hour', 'minute', 'second']

class LocalDate(ecwb.WrapperBase):
"""
Supporting wrapper class that stores the expansions of the components
Expand Down
11 changes: 3 additions & 8 deletions emission/net/api/metrics.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
import emission.analysis.result.metrics.simple_metrics as earms
import emission.storage.decorations.analysis_timeseries_queries as esda
import emission.storage.decorations.local_date_queries as esdl
import emission.storage.timeseries.tcquery as esttc
import emission.storage.timeseries.fmt_time_query as estf

import emcommon.metrics.metrics_summaries as emcms

Expand All @@ -25,14 +25,9 @@ def summarize_by_local_date(user_id, start_ld, end_ld, freq_name, metric_list, i
return _call_group_fn(earmt.group_by_local_date, user_id, start_ld, end_ld,
local_freq, metric_list, include_aggregate)

def summarize_by_yyyy_mm_dd(user_id, start_ymd, end_ymd, freq, metric_list, include_agg, app_config):
time_query = esttc.TimeComponentQuery(
"data.start_local_dt",
esdl.yyyy_mm_dd_to_local_date(start_ymd),
esdl.yyyy_mm_dd_to_local_date(end_ymd)
)
def summarize_by_yyyy_mm_dd(user_id, start_ymd, end_ymd, freq, metric_list, include_agg, app_config):
time_query = estf.FmtTimeQuery("data.start_fmt_time", start_ymd, end_ymd)
trips = esda.get_entries(esda.COMPOSITE_TRIP_KEY, None, time_query)
print('found ' + str([e for e in trips]))
return emcms.generate_summaries(metric_list, trips, app_config)

def _call_group_fn(group_fn, user_id, start_time, end_time, freq, metric_list, include_aggregate):
Expand Down
91 changes: 43 additions & 48 deletions emission/storage/decorations/local_date_queries.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,52 +11,47 @@

import emission.core.wrapper.localdate as ecwl

def get_range_query(field_prefix, start_ld, end_ld):
units = [u for u in ecwl.DATETIME_UNITS if u in start_ld and u in end_ld]
logging.debug(f'get_range_query: units = {units}')
try:
gt_query = get_comparison_query(field_prefix, start_ld, end_ld, units, 'gt')
lt_query = get_comparison_query(field_prefix, end_ld, start_ld, units, 'lt')
logging.debug(f'get_range_query: gt_query = {gt_query}, lt_query = {lt_query}')
return { "$and": [gt_query, lt_query] } if gt_query and lt_query else {}
except AssertionError as e:
logging.error(f'Invalid range from {str(start_ld)} to {str(end_ld)}: {str(e)}')
return None

def get_comparison_query(field_prefix, base_ld, limit_ld, units, gt_or_lt):
field_name = lambda i: f'{field_prefix}.{units[i]}'
and_conditions, or_conditions = [], []
tiebreaker_index = -1
for i, unit in enumerate(units):
# the range is inclusive, so if on the last unit we should use $lte / $gte instead of $lt / $gt
op = f'${gt_or_lt}e' if i == len(units)-1 else f'${gt_or_lt}'
if tiebreaker_index >= 0:
tiebreaker_conditions = [{ field_name(j): base_ld[units[j]] } for j in range(tiebreaker_index, i)]
tiebreaker_conditions.append({ field_name(i): { op: base_ld[unit] }})
or_conditions.append({ "$and": tiebreaker_conditions })
elif base_ld[unit] == limit_ld[unit]:
and_conditions.append({field_name(i): base_ld[unit]})
def get_filter_query(field_name, start_local_dt, end_local_dt):
if list(start_local_dt.keys()) != list(end_local_dt.keys()):
raise RuntimeError("start_local_dt.keys() = %s does not match end_local_dt.keys() = %s" %
(list(start_local_dt.keys()), list(end_local_dt.keys())))
query_result = {}
for key in start_local_dt:
curr_field = "%s.%s" % (field_name, key)
gte_lte_query = {}
try:
start_int = int(start_local_dt[key])
except:
logging.info("start_local_dt[%s] = %s, not an integer, skipping" %
(key, start_local_dt[key]))
continue

try:
end_int = int(end_local_dt[key])
except:
logging.info("end_local_dt[%s] = %s, not an integer, skipping" %
(key, end_local_dt[key]))
continue

is_rollover = start_int > end_int

if is_rollover:
gte_lte_query = get_rollover_query(start_int, end_int)
else:
gte_lte_query = get_standard_query(start_int, end_int)

if len(gte_lte_query) > 0:
query_result.update({curr_field: gte_lte_query})
else:
assert (base_ld[unit] < limit_ld[unit]) if gt_or_lt == 'gt' else (base_ld[unit] > limit_ld[unit])
or_conditions.append({field_name(i): { op: base_ld[unit] }})
tiebreaker_index = i
if and_conditions and or_conditions:
return { "$and": and_conditions + [{ "$or": or_conditions }] }
elif and_conditions:
return { "$and": and_conditions }
elif or_conditions:
return { "$or": or_conditions }
else:
return {}

def yyyy_mm_dd_to_local_date(ymd: str) -> ecwl.LocalDate:
return ecwl.LocalDate({
'year': int(ymd[0:4]),
'month': int(ymd[5:7]),
'day': int(ymd[8:10])
})

def get_yyyy_mm_dd_range_query(field_name, start_ymd: str, end_ymd: str) -> dict:
start_local_date = yyyy_mm_dd_to_local_date(start_ymd)
end_local_date = yyyy_mm_dd_to_local_date(end_ymd)
return get_range_query(field_name, start_local_date, end_local_date)
logging.info("key %s exists, skipping because upper AND lower bounds are missing" % key)

logging.debug("In get_filter_query, returning query %s" % query_result)
return query_result

def get_standard_query(start_int, end_int):
assert(start_int <= end_int)
return {'$gte': start_int, '$lte': end_int}

def get_rollover_query(start_int, end_int):
assert(start_int > end_int)
return {'$not': {'$gt': end_int, '$lt': start_int}}
27 changes: 27 additions & 0 deletions emission/storage/timeseries/fmt_time_query.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
from builtins import object

class FmtTimeQuery(object):
"""
Object that encapsulates a query for an inclusive range between two ISO-format strings.
Useful for querying based on the local date/time at which data was collected,
like with timeType of "data.fmt_time" or "data.start_fmt_time".
e.g. FmtTimeQuery("data.fmt_time", "2024-01", "2024-03") # first quarter of 2024
e.g. FmtTimeQuery("data.fmt_time", "2024-05-01", "2024-05-31") # all of May 2024
e.g. FmtTimeQuery("data.fmt_time", "2024-06-03T08:00", "2024-06-03T16:59") # work hours on Jun 3 2024
"""
def __init__(self, timeType: str, startIso: str, endIso: str) -> None:
self.timeType = timeType
self.startIso = startIso
# append 'Z' to make the end range inclusive
# (because Z is greater than any other character that can appear in an ISO string)
self.endIso = endIso + 'Z'

def get_query(self) -> dict:
time_key = self.timeType
ret_query = {time_key: {"$lte": self.endIso}}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

do we expect that the end will always be present?
An alternate approach would be to assume that the start will always be present, and set the end to today

if (self.startIso is not None):
ret_query[time_key].update({"$gte": self.startIso})
return ret_query

def __repr__(self) -> str:
return f"FmtTimeQuery {self.timeType} with range [{self.startIso}, {self.endIso})"
9 changes: 6 additions & 3 deletions emission/storage/timeseries/tcquery.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,13 +11,16 @@

class TimeComponentQuery(object):
"""
Object that encapsulates a query for a particular time at the local time in
the timezone where the data was generated.
Object that encapsulates a query for filtering based on localdate objects.
This works as a set of filters for each localdate field, e.g. year, month, day, etc.
Useful for filtering on one or more localdate fields
e.g. TimeComponentQuery("data.start_local_dt", {"weekday": 0}, {"weekday": 4})
For range queries, use FmtTimeQuery instead.
"""
def __init__(self, timeType, startLD, endLD):
self.timeType = timeType
self.startLD = startLD
self.endLD = endLD

def get_query(self):
return esdl.get_range_query(self.timeType, self.startLD, self.endLD)
return esdl.get_filter_query(self.timeType, self.startLD, self.endLD)
26 changes: 18 additions & 8 deletions emission/tests/storageTests/TestLocalDateQueries.py
Original file line number Diff line number Diff line change
Expand Up @@ -64,36 +64,46 @@ def testLocalDateReadWrite(self):
self.assertEqual(ret_entry.data.local_dt.weekday, 2)
self.assertEqual(ret_entry.data.fmt_time, "2016-04-13T15:32:09-07:00")

def testLocalRangeStandardQuery(self):
def testLocalDateFilterStandardQuery(self):
"""
Search for all entries between 8:18 and 8:20 local time, both inclusive
"""
start_local_dt = ecwl.LocalDate({'year': 2015, 'month': 8, 'hour': 8, 'minute': 18})
end_local_dt = ecwl.LocalDate({'year': 2015, 'month': 8, 'hour': 8, 'minute': 20})
final_query = {"user_id": self.testUUID}
final_query.update(esdl.get_range_query("data.local_dt", start_local_dt, end_local_dt))
final_query.update(esdl.get_filter_query("data.local_dt", start_local_dt, end_local_dt))
entriesCnt = edb.get_timeseries_db().count_documents(final_query)
self.assertEqual(15, entriesCnt)

def testLocalRangeRolloverQuery(self):
def testLocalDateFilterRolloverQuery(self):
"""
Search for all entries between 8:18 and 9:08 local time, both inclusive
"""
start_local_dt = ecwl.LocalDate({'year': 2015, 'month': 8, 'hour': 8, 'minute': 18})
end_local_dt = ecwl.LocalDate({'year': 2015, 'month': 8, 'hour': 9, 'minute': 8})
final_query = {"user_id": self.testUUID}
final_query.update(esdl.get_range_query("data.local_dt", start_local_dt, end_local_dt))
entriesCnt = edb.get_timeseries_db().count_documents(final_query)
self.assertEqual(232, entriesCnt)
final_query.update(esdl.get_filter_query("data.local_dt", start_local_dt, end_local_dt))
entries = edb.get_timeseries_db().find(final_query).sort('data.ts', pymongo.ASCENDING)
self.assertEqual(448, edb.get_timeseries_db().count_documents(final_query))

entries_list = list(entries)

# Note that since this is a set of filters, as opposed to a range, this
# returns all entries between 18 and 8 in both hours.
# so 8:18 is valid, but so is 9:57
self.assertEqual(ecwe.Entry(entries_list[0]).data.local_dt.hour, 8)
self.assertEqual(ecwe.Entry(entries_list[0]).data.local_dt.minute, 18)
self.assertEqual(ecwe.Entry(entries_list[-1]).data.local_dt.hour, 9)
self.assertEqual(ecwe.Entry(entries_list[-1]).data.local_dt.minute, 57)

def testLocalMatchingQuery(self):
def testLocalDateFilterMatchingQuery(self):
"""
Search for all entries that occur at minute = 8 from any hour
"""
start_local_dt = ecwl.LocalDate({'minute': 8})
end_local_dt = ecwl.LocalDate({'minute': 8})
final_query = {"user_id": self.testUUID}
final_query.update(esdl.get_range_query("data.local_dt", start_local_dt, end_local_dt))
final_query.update(esdl.get_filter_query("data.local_dt", start_local_dt, end_local_dt))
entries_docs = edb.get_timeseries_db().find(final_query).sort("metadata.write_ts")
self.assertEqual(20, edb.get_timeseries_db().count_documents(final_query))
entries = [ecwe.Entry(doc) for doc in entries_docs]
Expand Down
Loading