From 2009c865e7066eeadfa736bc9354606076594dbd Mon Sep 17 00:00:00 2001 From: Tim Swast Date: Wed, 18 Dec 2019 11:46:25 -0800 Subject: [PATCH] fix(bigquerystorage): to_dataframe on an arrow stream uses 2x faster to_arrow + to_pandas, internally Towards https://issuetracker.google.com/140579733 --- .../cloud/bigquery_storage_v1beta1/reader.py | 16 ++++ bigquery_storage/tests/unit/test_reader.py | 88 +++++++++++++++++-- 2 files changed, 96 insertions(+), 8 deletions(-) diff --git a/bigquery_storage/google/cloud/bigquery_storage_v1beta1/reader.py b/bigquery_storage/google/cloud/bigquery_storage_v1beta1/reader.py index 138fae4110eb0..b2c4a704afe39 100644 --- a/bigquery_storage/google/cloud/bigquery_storage_v1beta1/reader.py +++ b/bigquery_storage/google/cloud/bigquery_storage_v1beta1/reader.py @@ -311,6 +311,21 @@ def to_dataframe(self, dtypes=None): if pandas is None: raise ImportError(_PANDAS_REQUIRED) + if dtypes is None: + dtypes = {} + + # If it's an Arrow stream, calling to_arrow, then converting to a + # pandas dataframe is about 2x faster. This is because pandas.concat is + # rarely no-copy, whereas pyarrow.Table.from_batches + to_pandas is + # usually no-copy. + schema_type = self._read_session.WhichOneof("schema") + if schema_type == "arrow_schema": + record_batch = self.to_arrow() + df = record_batch.to_pandas() + for column in dtypes: + df[column] = pandas.Series(df[column], dtype=dtypes[column]) + return df + frames = [] for page in self.pages: frames.append(page.to_dataframe(dtypes=dtypes)) @@ -403,6 +418,7 @@ def to_dataframe(self, dtypes=None): """ if pandas is None: raise ImportError(_PANDAS_REQUIRED) + return self._stream_parser.to_dataframe(self._message, dtypes=dtypes) diff --git a/bigquery_storage/tests/unit/test_reader.py b/bigquery_storage/tests/unit/test_reader.py index 748a45608f3a2..09d2a6b695032 100644 --- a/bigquery_storage/tests/unit/test_reader.py +++ b/bigquery_storage/tests/unit/test_reader.py @@ -173,9 +173,9 @@ def _bq_to_arrow_batches(bq_blocks, arrow_schema): return arrow_batches -def _avro_blocks_w_unavailable(avro_blocks): - for block in avro_blocks: - yield block +def _pages_w_unavailable(pages): + for page in pages: + yield page raise google.api_core.exceptions.ServiceUnavailable("test: please reconnect") @@ -371,9 +371,7 @@ def test_rows_w_reconnect(class_under_test, mock_client): [{"int_col": 123}, {"int_col": 234}], [{"int_col": 345}, {"int_col": 456}], ] - avro_blocks_1 = _avro_blocks_w_unavailable( - _bq_to_avro_blocks(bq_blocks_1, avro_schema) - ) + avro_blocks_1 = _pages_w_unavailable(_bq_to_avro_blocks(bq_blocks_1, avro_schema)) bq_blocks_2 = [[{"int_col": 567}, {"int_col": 789}], [{"int_col": 890}]] avro_blocks_2 = _bq_to_avro_blocks(bq_blocks_2, avro_schema) @@ -433,7 +431,7 @@ def test_rows_w_reconnect_by_page(class_under_test, mock_client): ) reader = class_under_test( - _avro_blocks_w_unavailable(avro_blocks_1), + _pages_w_unavailable(avro_blocks_1), mock_client, stream_position, {"metadata": {"test-key": "test-value"}}, @@ -680,7 +678,7 @@ def test_to_dataframe_by_page(class_under_test, mock_client): ) reader = class_under_test( - _avro_blocks_w_unavailable(avro_blocks_1), + _pages_w_unavailable(avro_blocks_1), mock_client, stream_position, {"metadata": {"test-key": "test-value"}}, @@ -721,6 +719,80 @@ def test_to_dataframe_by_page(class_under_test, mock_client): ) +def test_to_dataframe_by_page_arrow(class_under_test, mock_client): + bq_columns = [ + {"name": "int_col", "type": "int64"}, + {"name": "bool_col", "type": "bool"}, + ] + arrow_schema = _bq_to_arrow_schema(bq_columns) + read_session = _generate_arrow_read_session(arrow_schema) + + bq_block_1 = [ + {"int_col": 123, "bool_col": True}, + {"int_col": 234, "bool_col": False}, + ] + bq_block_2 = [ + {"int_col": 345, "bool_col": True}, + {"int_col": 456, "bool_col": False}, + ] + bq_block_3 = [ + {"int_col": 567, "bool_col": True}, + {"int_col": 789, "bool_col": False}, + ] + bq_block_4 = [{"int_col": 890, "bool_col": True}] + # Break blocks into two groups to test that iteration continues across + # reconnection. + bq_blocks_1 = [bq_block_1, bq_block_2] + bq_blocks_2 = [bq_block_3, bq_block_4] + batch_1 = _bq_to_arrow_batches(bq_blocks_1, arrow_schema) + batch_2 = _bq_to_arrow_batches(bq_blocks_2, arrow_schema) + + mock_client.read_rows.return_value = batch_2 + + reader = class_under_test( + _pages_w_unavailable(batch_1), + mock_client, + bigquery_storage_v1beta1.types.StreamPosition(), + {}, + ) + got = reader.rows(read_session) + pages = iter(got.pages) + + page_1 = next(pages) + pandas.testing.assert_frame_equal( + page_1.to_dataframe( + dtypes={"int_col": "int64", "bool_col": "bool"} + ).reset_index(drop=True), + pandas.DataFrame(bq_block_1, columns=["int_col", "bool_col"]).reset_index( + drop=True + ), + ) + + page_2 = next(pages) + pandas.testing.assert_frame_equal( + page_2.to_dataframe().reset_index(drop=True), + pandas.DataFrame(bq_block_2, columns=["int_col", "bool_col"]).reset_index( + drop=True + ), + ) + + page_3 = next(pages) + pandas.testing.assert_frame_equal( + page_3.to_dataframe().reset_index(drop=True), + pandas.DataFrame(bq_block_3, columns=["int_col", "bool_col"]).reset_index( + drop=True + ), + ) + + page_4 = next(pages) + pandas.testing.assert_frame_equal( + page_4.to_dataframe().reset_index(drop=True), + pandas.DataFrame(bq_block_4, columns=["int_col", "bool_col"]).reset_index( + drop=True + ), + ) + + def test_copy_stream_position(mut): read_position = bigquery_storage_v1beta1.types.StreamPosition( stream={"name": "test"}, offset=41