Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor(bigquerystorage): to_dataframe on an arrow stream uses faster to_arrow + to_pandas, internally #9997

Merged
merged 1 commit into from
Jan 15, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 16 additions & 0 deletions bigquery_storage/google/cloud/bigquery_storage_v1beta1/reader.py
Original file line number Diff line number Diff line change
Expand Up @@ -311,6 +311,21 @@ def to_dataframe(self, dtypes=None):
if pandas is None:
raise ImportError(_PANDAS_REQUIRED)

if dtypes is None:
dtypes = {}

# If it's an Arrow stream, calling to_arrow, then converting to a
# pandas dataframe is about 2x faster. This is because pandas.concat is
# rarely no-copy, whereas pyarrow.Table.from_batches + to_pandas is
# usually no-copy.
schema_type = self._read_session.WhichOneof("schema")
if schema_type == "arrow_schema":
record_batch = self.to_arrow()
df = record_batch.to_pandas()
for column in dtypes:
df[column] = pandas.Series(df[column], dtype=dtypes[column])
return df

frames = []
for page in self.pages:
frames.append(page.to_dataframe(dtypes=dtypes))
Expand Down Expand Up @@ -403,6 +418,7 @@ def to_dataframe(self, dtypes=None):
"""
if pandas is None:
raise ImportError(_PANDAS_REQUIRED)

return self._stream_parser.to_dataframe(self._message, dtypes=dtypes)


Expand Down
88 changes: 80 additions & 8 deletions bigquery_storage/tests/unit/test_reader.py
Original file line number Diff line number Diff line change
Expand Up @@ -173,9 +173,9 @@ def _bq_to_arrow_batches(bq_blocks, arrow_schema):
return arrow_batches


def _avro_blocks_w_unavailable(avro_blocks):
for block in avro_blocks:
yield block
def _pages_w_unavailable(pages):
for page in pages:
yield page
raise google.api_core.exceptions.ServiceUnavailable("test: please reconnect")


Expand Down Expand Up @@ -371,9 +371,7 @@ def test_rows_w_reconnect(class_under_test, mock_client):
[{"int_col": 123}, {"int_col": 234}],
[{"int_col": 345}, {"int_col": 456}],
]
avro_blocks_1 = _avro_blocks_w_unavailable(
_bq_to_avro_blocks(bq_blocks_1, avro_schema)
)
avro_blocks_1 = _pages_w_unavailable(_bq_to_avro_blocks(bq_blocks_1, avro_schema))
bq_blocks_2 = [[{"int_col": 567}, {"int_col": 789}], [{"int_col": 890}]]
avro_blocks_2 = _bq_to_avro_blocks(bq_blocks_2, avro_schema)

Expand Down Expand Up @@ -433,7 +431,7 @@ def test_rows_w_reconnect_by_page(class_under_test, mock_client):
)

reader = class_under_test(
_avro_blocks_w_unavailable(avro_blocks_1),
_pages_w_unavailable(avro_blocks_1),
mock_client,
stream_position,
{"metadata": {"test-key": "test-value"}},
Expand Down Expand Up @@ -680,7 +678,7 @@ def test_to_dataframe_by_page(class_under_test, mock_client):
)

reader = class_under_test(
_avro_blocks_w_unavailable(avro_blocks_1),
_pages_w_unavailable(avro_blocks_1),
mock_client,
stream_position,
{"metadata": {"test-key": "test-value"}},
Expand Down Expand Up @@ -721,6 +719,80 @@ def test_to_dataframe_by_page(class_under_test, mock_client):
)


def test_to_dataframe_by_page_arrow(class_under_test, mock_client):
bq_columns = [
{"name": "int_col", "type": "int64"},
{"name": "bool_col", "type": "bool"},
]
arrow_schema = _bq_to_arrow_schema(bq_columns)
read_session = _generate_arrow_read_session(arrow_schema)

bq_block_1 = [
{"int_col": 123, "bool_col": True},
{"int_col": 234, "bool_col": False},
]
bq_block_2 = [
{"int_col": 345, "bool_col": True},
{"int_col": 456, "bool_col": False},
]
bq_block_3 = [
{"int_col": 567, "bool_col": True},
{"int_col": 789, "bool_col": False},
]
bq_block_4 = [{"int_col": 890, "bool_col": True}]
# Break blocks into two groups to test that iteration continues across
# reconnection.
bq_blocks_1 = [bq_block_1, bq_block_2]
bq_blocks_2 = [bq_block_3, bq_block_4]
batch_1 = _bq_to_arrow_batches(bq_blocks_1, arrow_schema)
batch_2 = _bq_to_arrow_batches(bq_blocks_2, arrow_schema)

mock_client.read_rows.return_value = batch_2

reader = class_under_test(
_pages_w_unavailable(batch_1),
mock_client,
bigquery_storage_v1beta1.types.StreamPosition(),
{},
)
got = reader.rows(read_session)
pages = iter(got.pages)

page_1 = next(pages)
pandas.testing.assert_frame_equal(
page_1.to_dataframe(
dtypes={"int_col": "int64", "bool_col": "bool"}
).reset_index(drop=True),
pandas.DataFrame(bq_block_1, columns=["int_col", "bool_col"]).reset_index(
drop=True
),
)

page_2 = next(pages)
pandas.testing.assert_frame_equal(
page_2.to_dataframe().reset_index(drop=True),
pandas.DataFrame(bq_block_2, columns=["int_col", "bool_col"]).reset_index(
drop=True
),
)

page_3 = next(pages)
pandas.testing.assert_frame_equal(
page_3.to_dataframe().reset_index(drop=True),
pandas.DataFrame(bq_block_3, columns=["int_col", "bool_col"]).reset_index(
drop=True
),
)

page_4 = next(pages)
pandas.testing.assert_frame_equal(
page_4.to_dataframe().reset_index(drop=True),
pandas.DataFrame(bq_block_4, columns=["int_col", "bool_col"]).reset_index(
drop=True
),
)


def test_copy_stream_position(mut):
read_position = bigquery_storage_v1beta1.types.StreamPosition(
stream={"name": "test"}, offset=41
Expand Down