Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(ibis): implement workaround for empty json result #1013

Merged
merged 3 commits into from
Dec 25, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
44 changes: 44 additions & 0 deletions ibis-server/app/model/connector.py
Original file line number Diff line number Diff line change
@@ -1,12 +1,17 @@
import base64
import importlib
from functools import cache
from json import loads
from typing import Any

import ibis
import ibis.expr.datatypes as dt
import ibis.expr.schema as sch
import ibis.formats
import pandas as pd
import sqlglot.expressions as sge
from google.cloud import bigquery
from google.oauth2 import service_account
from ibis import BaseBackend
from ibis.backends.sql.compilers.postgres import compiler as postgres_compiler

Expand All @@ -23,6 +28,8 @@ def __init__(self, data_source: DataSource, connection_info: ConnectionInfo):
self._connector = MSSqlConnector(connection_info)
elif data_source == DataSource.canner:
self._connector = CannerConnector(connection_info)
elif data_source == DataSource.bigquery:
self._connector = BigQueryConnector(connection_info)
else:
self._connector = SimpleConnector(data_source, connection_info)

Expand Down Expand Up @@ -100,6 +107,43 @@ def _to_ibis_type(type_name: str) -> dt.DataType:
return postgres_compiler.type_mapper.from_string(type_name)


class BigQueryConnector(SimpleConnector):
def __init__(self, connection_info: ConnectionInfo):
super().__init__(DataSource.bigquery, connection_info)
self.connection_info = connection_info

def query(self, sql: str, limit: int) -> pd.DataFrame:
try:
return super().query(sql, limit)
except ValueError as e:
# Import here to avoid override the custom datatypes
import ibis.backends.bigquery

# Try to match the error message from the google cloud bigquery library matching Arrow type error.
# If the error message matches, requries to get the schema from the result and generate a empty pandas dataframe with the mapped schema
#
# It's a workaround for the issue that the ibis library does not support empty result for some special types (e.g. JSON or Interval)
# see details:
# - https://github.com/Canner/wren-engine/issues/909
# - https://github.com/ibis-project/ibis/issues/10612
if "Must pass schema" in str(e):
credits_json = loads(
base64.b64decode(
self.connection_info.credentials.get_secret_value()
).decode("utf-8")
)
credentials = service_account.Credentials.from_service_account_info(
credits_json
)
client = bigquery.Client(credentials=credentials)
ibis_schema_mapper = ibis.backends.bigquery.BigQuerySchema()
bq_fields = client.query(sql).result()
ibis_fields = ibis_schema_mapper.to_ibis(bq_fields.schema)
return pd.DataFrame(columns=ibis_fields.names)
else:
raise e


@cache
def _get_pg_type_names(connection: BaseBackend) -> dict[int, str]:
cur = connection.raw_sql("SELECT oid, typname FROM pg_type")
Expand Down
62 changes: 62 additions & 0 deletions ibis-server/tests/routers/v2/connector/test_bigquery.py
Original file line number Diff line number Diff line change
Expand Up @@ -198,6 +198,38 @@ async def test_query_values(client, manifest_str):
assert response.status_code == 204


async def test_query_empty_json(client, manifest_str):
"""Test the empty result with json column."""
response = await client.post(
url=f"{base_url}/query",
json={
"manifestStr": manifest_str,
"connectionInfo": connection_info,
"sql": "select json_object('a', 1, 'b', 2) limit 0",
},
)
assert response.status_code == 200
result = response.json()
assert len(result["data"]) == 0
assert result["dtypes"] == {"f0_": "object"}

"""Test only the json column is null."""
response = await client.post(
url=f"{base_url}/query",
json={
"manifestStr": manifest_str,
"connectionInfo": connection_info,
"sql": "select cast(null as JSON), 1",
},
)
assert response.status_code == 200
result = response.json()
assert len(result["data"]) == 1
assert result["data"][0][0] is None
assert result["data"][0][1] == 1
assert result["dtypes"] == {"f0_": "object", "f1_": "int64"}


async def test_interval(client, manifest_str):
response = await client.post(
url=f"{base_url}/query",
Expand Down Expand Up @@ -228,6 +260,36 @@ async def test_avg_interval(client, manifest_str):
assert result["dtypes"] == {"col": "object"}


async def test_custom_datatypes_no_overrides(client, manifest_str):
# Trigger import the official BigQueryType
response = await client.post(
url=f"{base_url}/query",
json={
"manifestStr": manifest_str,
"connectionInfo": connection_info,
"sql": "select json_object('a', 1, 'b', 2) limit 0",
},
)
assert response.status_code == 200
result = response.json()
assert len(result["data"]) == 0
assert result["dtypes"] == {"f0_": "object"}

# Should use back the custom BigQueryType
response = await client.post(
url=f"{base_url}/query",
json={
"connectionInfo": connection_info,
"manifestStr": manifest_str,
"sql": "SELECT INTERVAL '1' YEAR + INTERVAL '100' MONTH + INTERVAL '100' DAY + INTERVAL '1' HOUR AS col",
},
)
assert response.status_code == 200
result = response.json()
assert result["data"][0] == ["112 months 100 days 3600000000 microseconds"]
assert result["dtypes"] == {"col": "object"}


async def test_validate_with_unknown_rule(client, manifest_str):
response = await client.post(
url=f"{base_url}/validate/unknown_rule",
Expand Down
Loading