Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

#1584 #1585

Merged
merged 2 commits into from
Apr 17, 2024
Merged

#1584 #1585

Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion opteryx/__version__.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
__build__ = 426
__build__ = 427

# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
Expand Down
22 changes: 10 additions & 12 deletions opteryx/connectors/aws_s3_connector.py
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,9 @@ def get_list_of_blob_names(self, *, prefix: str) -> List[str]:
blob for blob in blobs if ("." + blob.split(".")[-1].lower()) in VALID_EXTENSIONS
)

def read_dataset(self, columns: list = None, **kwargs) -> pyarrow.Table:
def read_dataset(
self, columns: list = None, just_schema: bool = False, **kwargs
) -> pyarrow.Table:
blob_names = self.partition_scheme.get_blobs_in_partition(
start_date=self.start_date,
end_date=self.end_date,
Expand All @@ -95,7 +97,11 @@ def read_dataset(self, columns: list = None, **kwargs) -> pyarrow.Table:
try:
decoder = get_decoder(blob_name)
blob_bytes = self.read_blob(blob_name=blob_name, statistics=self.statistics)
yield decoder(blob_bytes, projection=columns)
decoded = decoder(blob_bytes, projection=columns, just_schema=just_schema)
if not just_schema:
num_rows, num_columns, decoded = decoded
self.statistics.rows_seen += num_rows
yield decoded
except UnsupportedFileTypeError:
pass

Expand All @@ -106,19 +112,11 @@ def get_dataset_schema(self) -> RelationSchema:
return self.schema

# Read first blob for schema inference and cache it
record = next(self.read_dataset(), None)
self.cached_first_blob = record
self.schema = next(self.read_dataset(just_schema=True), None)

if record is None:
if self.schema is None:
raise DatasetNotFoundError(dataset=self.dataset)

arrow_schema = record.schema

self.schema = RelationSchema(
name=self.dataset,
columns=[FlatColumn.from_arrow(field) for field in arrow_schema],
)

return self.schema

def read_blob(self, *, blob_name, **kwargs):
Expand Down
10 changes: 8 additions & 2 deletions opteryx/connectors/disk_connector.py
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,9 @@ def read_blob(self, *, blob_name, **kwargs) -> bytes:
"""
file_descriptor = os.open(blob_name, os.O_RDONLY | os.O_BINARY)
try:
return os.read(file_descriptor, os.path.getsize(blob_name))
size = os.path.getsize(blob_name)
self.statistics.bytes_read += size
return os.read(file_descriptor, size)
finally:
os.close(file_descriptor)

Expand Down Expand Up @@ -134,9 +136,13 @@ def read_dataset(
try:
decoder = get_decoder(blob_name)
blob_bytes = self.read_blob(blob_name=blob_name, statistics=self.statistics)
yield decoder(
decoded = decoder(
blob_bytes, projection=columns, selection=predicates, just_schema=just_schema
)
if not just_schema:
num_rows, num_columns, decoded = decoded
self.statistics.rows_seen += num_rows
yield decoded
except UnsupportedFileTypeError:
pass # Skip unsupported file types
except pyarrow.ArrowInvalid:
Expand Down
6 changes: 5 additions & 1 deletion opteryx/connectors/file_connector.py
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,11 @@ def read_dataset(
An iterator containing a single decoded pyarrow.Table.
"""
self._read_file()
return iter([self.decoder(self._byte_array, projection=columns, selection=predicates)])
num_rows, num_columns, decoded = self.decoder(
self._byte_array, projection=columns, selection=predicates
)
self.statistics.rows_seen += num_rows
yield decoded

def get_dataset_schema(self) -> RelationSchema:
"""
Expand Down
10 changes: 8 additions & 2 deletions opteryx/connectors/gcp_cloudstorage_connector.py
Original file line number Diff line number Diff line change
Expand Up @@ -120,7 +120,9 @@ def read_blob(self, *, blob_name, **kwargs):
if response.status_code != 200:
raise DatasetReadError(f"Unable to read '{blob_name}' - {response.status_code}")

return response.content
content = response.content
self.statistics.bytes_read += len(content)
return content

@single_item_cache
def get_list_of_blob_names(self, *, prefix: str) -> List[str]:
Expand Down Expand Up @@ -172,9 +174,13 @@ def read_dataset(
try:
decoder = get_decoder(blob_name)
blob_bytes = self.read_blob(blob_name=blob_name, statistics=self.statistics)
yield decoder(
decoded = decoder(
blob_bytes, projection=columns, selection=predicates, just_schema=just_schema
)
if not just_schema:
num_rows, num_columns, decoded = decoded
self.statistics.rows_seen += num_rows
yield decoded
except UnsupportedFileTypeError: # pragma: no cover
pass

Expand Down
2 changes: 2 additions & 0 deletions opteryx/connectors/sql_connector.py
Original file line number Diff line number Diff line change
Expand Up @@ -174,7 +174,9 @@ def read_dataset( # type:ignore
)

while True:
t = time.monotonic_ns()
batch_rows = result.fetchmany(self.chunk_size)
self.statistics.time_waiting_sql = time.monotonic_ns() - t
if not batch_rows:
break

Expand Down
4 changes: 2 additions & 2 deletions opteryx/operators/morsel_defragment_node.py
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@ def execute(self) -> Generator: # pragma: no cover
start = time.monotonic_ns()
# add what we've collected before to the table
if collected_rows: # pragma: no cover
self.statistics.chunk_merges += 1
self.statistics.morsel_merges += 1
morsel = pyarrow.concat_tables([collected_rows, morsel], promote_options="none")
collected_rows = None
self.statistics.time_defragmenting += time.monotonic_ns() - start
Expand All @@ -93,7 +93,7 @@ def execute(self) -> Generator: # pragma: no cover
int(MORSEL_SIZE_BYTES / average_record_size), MORSEL_SIZE_COUNT
)
row_counter += new_row_count
self.statistics.chunk_splits += 1
self.statistics.morsel_splits += 1
new_morsel = morsel.slice(offset=0, length=new_row_count)
at_least_one_morsel = True
collected_rows = morsel.slice(offset=new_row_count)
Expand Down
30 changes: 20 additions & 10 deletions opteryx/utils/file_decoders.py
Original file line number Diff line number Diff line change
Expand Up @@ -135,9 +135,9 @@ def parquet_decoder(buffer, projection: List = None, selection=None, just_schema
else:
stream = io.BytesIO(buffer)

# Open the parquet file only once
parquet_file = parquet.ParquetFile(stream)
if projection or just_schema:
# Open the parquet file only once
parquet_file = parquet.ParquetFile(stream)

# Return just the schema if that's all that's needed
if just_schema:
Expand All @@ -161,8 +161,12 @@ def parquet_decoder(buffer, projection: List = None, selection=None, just_schema
if selection:
table = filter_records(selection, table)
if projection == []:
return pyarrow.Table.from_pydict({"_": numpy.full(table.num_rows, True, dtype=numpy.bool_)})
return table
return (
parquet_file.metadata.num_rows,
parquet_file.metadata.num_columns,
pyarrow.Table.from_pydict({"_": numpy.full(table.num_rows, True, dtype=numpy.bool_)}),
)
return (parquet_file.metadata.num_rows, parquet_file.metadata.num_columns, table)


def orc_decoder(buffer, projection: List = None, selection=None, just_schema: bool = False):
Expand All @@ -183,11 +187,12 @@ def orc_decoder(buffer, projection: List = None, selection=None, just_schema: bo
return convert_arrow_schema_to_orso_schema(orc_schema)

table = orc_file.read()
full_shape = table.shape
if selection:
table = filter_records(selection, table)
if projection:
table = post_read_projector(table, projection)
return table
return *full_shape, table


def jsonl_decoder(buffer, projection: List = None, selection=None, just_schema: bool = False):
Expand All @@ -206,12 +211,13 @@ def jsonl_decoder(buffer, projection: List = None, selection=None, just_schema:
if just_schema:
return convert_arrow_schema_to_orso_schema(schema)

full_shape = table.shape
if selection:
table = filter_records(selection, table)
if projection:
table = post_read_projector(table, projection)

return table
return *full_shape, table


def csv_decoder(
Expand All @@ -231,12 +237,13 @@ def csv_decoder(
if just_schema:
return convert_arrow_schema_to_orso_schema(schema)

full_shape = table.shape
if selection:
table = filter_records(selection, table)
if projection:
table = post_read_projector(table, projection)

return table
return *full_shape, table


def tsv_decoder(buffer, projection: List = None, selection=None, just_schema: bool = False):
Expand All @@ -262,12 +269,13 @@ def arrow_decoder(buffer, projection: List = None, selection=None, just_schema:
if just_schema:
return convert_arrow_schema_to_orso_schema(schema)

full_shape = table.shape
if selection:
table = filter_records(selection, table)
if projection:
table = post_read_projector(table, projection)

return table
return *full_shape, table


def avro_decoder(buffer, projection: List = None, selection=None, just_schema: bool = False):
Expand All @@ -293,12 +301,13 @@ def avro_decoder(buffer, projection: List = None, selection=None, just_schema: b
if just_schema:
return convert_arrow_schema_to_orso_schema(schema)

full_shape = table.shape
if selection:
table = filter_records(selection, table)
if projection:
table = post_read_projector(table, projection)

return table
return *full_shape, table


def ipc_decoder(buffer, projection: List = None, selection=None, just_schema: bool = False):
Expand All @@ -323,12 +332,13 @@ def ipc_decoder(buffer, projection: List = None, selection=None, just_schema: bo
return convert_arrow_schema_to_orso_schema(schema)

table = pyarrow.Table.from_batches([batch for batch in chain([batch_one], reader)])
full_shape = table.shape
if selection:
table = filter_records(selection, table)
if projection:
table = post_read_projector(table, projection)

return table
return *full_shape, table


# for types we know about, set up how we handle them
Expand Down
Loading