Skip to content

Commit

Permalink
Merge pull request #1647 from mabel-dev/#1643
Browse files Browse the repository at this point in the history
Opteryx Version 0.15.0-beta.488
  • Loading branch information
joocer authored May 7, 2024
2 parents 61707b5 + 45685f2 commit ef2350e
Show file tree
Hide file tree
Showing 10 changed files with 66 additions and 49 deletions.
2 changes: 1 addition & 1 deletion opteryx/__version__.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
__build__ = 487
__build__ = 489

# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
Expand Down
106 changes: 58 additions & 48 deletions opteryx/managers/schemes/mabel_partitions.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@

from opteryx.exceptions import DataError
from opteryx.managers.schemes import BasePartitionScheme
from opteryx.utils.dates import date_range
from opteryx.utils.file_decoders import DATA_EXTENSIONS

OS_SEP = os.sep
Expand All @@ -36,12 +37,8 @@ def __init__(self, dataset: str, segments: set = None):

def extract_prefix(path, prefix):
start_index = path.find(prefix)
if start_index == -1:
return None
end_index = path.find(OS_SEP, start_index)
if end_index == -1:
return None
return path[start_index:end_index]
end_index = path.find(OS_SEP, start_index) if start_index != -1 else -1
return path[start_index:end_index] if end_index != -1 else None


def is_complete_and_not_invalid(blobs, as_at):
Expand Down Expand Up @@ -87,9 +84,28 @@ def get_blobs_in_partition(
by_label = f"{OS_SEP}by_"
as_at_label = f"{OS_SEP}as_at"

def _inner(*, timestamp):
date_path = f"{prefix}{OS_SEP}year_{timestamp.year:04d}{OS_SEP}month_{timestamp.month:02d}{OS_SEP}day_{timestamp.day:02d}"
hour_label = f"{OS_SEP}by_hour{OS_SEP}hour={timestamp.hour:02d}/"
def process_as_ats(blobs, as_at_label, control_blobs):
as_ats = sorted(
{extract_prefix(blob, "as_at_") for blob in blobs if as_at_label in blob}
)
if len(as_ats) == 0:
return blobs
valid_blobs = []
while as_ats:
as_at = as_ats.pop()
if as_at is None:
continue
if is_complete_and_not_invalid(control_blobs, as_at):
valid_blobs = [blob for blob in blobs if as_at in blob]
break
else:
blobs = [blob for blob in blobs if as_at not in blob]
return valid_blobs

def _inner(*, date, start, end):
date_path = f"{prefix}{OS_SEP}year_{date.year:04d}{OS_SEP}month_{date.month:02d}{OS_SEP}day_{date.day:02d}"
by_label = f"{OS_SEP}by_"
as_at_label = f"{OS_SEP}as_at"

# Call your method to get the list of blob names
blob_names = blob_list_getter(prefix=date_path)
Expand All @@ -98,59 +114,53 @@ def _inner(*, timestamp):

control_blobs: List[str] = []
data_blobs: List[str] = []
as_ats = set()
hour_blobs: List[str] = []

for blob in blob_names:
extension = os.path.splitext(blob)[1]
if extension not in DATA_EXTENSIONS:
ext = os.path.splitext(blob)[1]
if ext not in DATA_EXTENSIONS:
control_blobs.append(blob)
else:
data_blobs.append(blob)
# Collect hour specific blobs, but only data blobs
if hour_label in blob:
hour_blobs.append(blob)

if hour_blobs:
data_blobs = hour_blobs

for blob in blob_names:
# Collect segments
if by_label in blob:
segment = extract_prefix(blob, "by_")
if segment != "by_hour":
raise UnsupportedSegementationError(dataset=prefix, segments=[segment])

if as_at_label in blob:
as_ats.add(extract_prefix(blob, "as_at_"))

as_at = None
as_at_list = sorted(as_ats)

# Keep popping from as_ats until a valid frame is found
while as_at_list:
as_at = as_at_list.pop()
if as_at is None:
continue
if is_complete_and_not_invalid(control_blobs, as_at):
data_blobs = [blob for blob in data_blobs if as_at in blob]
break
data_blobs = [blob for blob in data_blobs if as_at not in blob]
as_at = None

return data_blobs
if by_label in blob:
segment = extract_prefix(blob, "by_")
if segment != "by_hour":
raise UnsupportedSegementationError(dataset=prefix, segments=[segment])

if any(f"{OS_SEP}by_hour{OS_SEP}" in blob_name for blob_name in data_blobs):

if start > date:
start = date
if end < date:
end = date

selected_blobs = []

for hour in date_range(start, end, "1h"):
hour_label = f"{OS_SEP}by_hour{OS_SEP}hour={hour.hour:02d}/"
# Filter for the specific hour, if hour folders exist
if any(hour_label in blob_name for blob_name in data_blobs):
hours_blobs = [
blob_name for blob_name in data_blobs if hour_label in blob_name
]
selected_blobs.extend(
process_as_ats(hours_blobs, as_at_label, control_blobs)
)
else:
selected_blobs = process_as_ats(data_blobs, "as_at_", control_blobs)

return selected_blobs

start_date = start_date or midnight
end_date = end_date or midnight.replace(hour=23, minute=59)

found = set()

# Use a ThreadPoolExecutor to parallelize fetching blobs for each hour
with concurrent.futures.ThreadPoolExecutor() as executor:
with concurrent.futures.ThreadPoolExecutor(max_workers=4) as executor:
# Prepare a list of future tasks
futures = [
executor.submit(_inner, **{"timestamp": ts})
for ts in self.hourly_timestamps(start_date, end_date)
executor.submit(_inner, **{"date": date, "start": start_date, "end": end_date})
for date in date_range(start_date, end_date, "1d")
]
# Wait for all futures to complete and collect results
for future in concurrent.futures.as_completed(futures):
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
{"at_at":0, "hour":0}
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
{"at_at":3, "hour":0}
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
{"at_at":1, "hour":1}
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
{"at_at":2, "hour":1}
3 changes: 3 additions & 0 deletions tests/sql_battery/test_shapes_and_errors_battery.py
Original file line number Diff line number Diff line change
Expand Up @@ -1205,6 +1205,9 @@

("SELECT * FROM $missions WHERE MATCH (Location) AGAINST ('Florida USA')", 911, 8, None),

("SELECT * FROM testdata.partitioned.hourly FOR '2024-01-01 01:00'", 1, 2, None),
("SELECT * FROM testdata.partitioned.hourly FOR '2024-01-01'", 2, 2, None),

# 10-way join
("SELECT p1.name AS planet1_name, p2.name AS planet2_name, p3.name AS planet3_name, p4.name AS planet4_name, p5.name AS planet5_name, p6.name AS planet6_name, p7.name AS planet7_name, p8.name AS planet8_name, p9.name AS planet9_name, p10.name AS planet10_name, p1.diameter AS planet1_diameter, p2.gravity AS planet2_gravity, p3.orbitalPeriod AS planet3_orbitalPeriod, p4.numberOfMoons AS planet4_numberOfMoons, p5.meanTemperature AS planet5_meanTemperature FROM $planets p1 JOIN $planets p2 ON p1.id = p2.id JOIN $planets p3 ON p1.id = p3.id JOIN $planets p4 ON p1.id = p4.id JOIN $planets p5 ON p1.id = p5.id JOIN $planets p6 ON p1.id = p6.id JOIN $planets p7 ON p1.id = p7.id JOIN $planets p8 ON p1.id = p8.id JOIN $planets p9 ON p1.id = p9.id JOIN $planets p10 ON p1.id = p10.id WHERE p1.diameter > 10000 ORDER BY p1.name, p2.name, p3.name, p4.name, p5.name;", 6, 15, None),

Expand Down

0 comments on commit ef2350e

Please sign in to comment.