Skip to content

Commit

Permalink
Handle unsupported column type with proper Exception and log
Browse files Browse the repository at this point in the history
  • Loading branch information
auxten committed Jul 29, 2024
1 parent b70cd12 commit 3e42876
Show file tree
Hide file tree
Showing 2 changed files with 106 additions and 7 deletions.
57 changes: 50 additions & 7 deletions src/Processors/Sources/PythonSource.cpp
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
#include <exception>
#include <Processors/Sources/PythonSource.h>
#include "base/scope_guard.h"

Expand Down Expand Up @@ -106,7 +107,16 @@ void PythonSource::convert_string_array_to_block(
offsets.reserve(row_count);
for (size_t i = offset; i < offset + row_count; ++i)
{
FillColumnString(buf[i], string_column);
auto * obj = buf[i];
if (!PyUnicode_Check(obj))
{
LOG_ERROR(
logger,
"Unsupported Python object type {}, Unicode string expected here. Try convert column type to str with `astype(str)`",
Py_TYPE(obj)->tp_name);
throw Exception(ErrorCodes::BAD_TYPE_OF_FIELD, "Unsupported Python object type {}", Py_TYPE(obj)->tp_name);
}
FillColumnString(obj, string_column);
// Try to help reserve memory for the string column data every 100 rows to avoid frequent reallocations
// Check the avg size of the string column data and reserve memory accordingly
if ((i - offset) % 10 == 9)
Expand Down Expand Up @@ -278,11 +288,34 @@ Chunk PythonSource::genChunk(size_t & num_rows, PyObjectVecPtr data)
type->getName(),
description.sample_block.getByPosition(i).name);
}
catch (const Exception & e)
catch (Exception & e)
{
destory(data);
LOG_ERROR(logger, "Error processing column \"{}\": {}", description.sample_block.getByPosition(i).name, e.what());
throw Exception(
ErrorCodes::PY_EXCEPTION_OCCURED,
"Error processing column \"{}\": {}",
description.sample_block.getByPosition(i).name,
e.what());
}
catch (std::exception & e)
{
destory(data);
LOG_ERROR(logger, "Error processing column \"{}\": {}", description.sample_block.getByPosition(i).name, e.what());
throw Exception(
ErrorCodes::PY_EXCEPTION_OCCURED,
"Error processing column \"{}\": {}",
description.sample_block.getByPosition(i).name,
e.what());
}
catch (...)
{
destory(data);
LOG_ERROR(logger, "Error processing column {}: {}", i, e.what());
throw;
LOG_ERROR(logger, "Error processing column \"{}\": unknown exception", description.sample_block.getByPosition(i).name);
throw Exception(
ErrorCodes::PY_EXCEPTION_OCCURED,
"Error processing column \"{}\": unknown exception",
description.sample_block.getByPosition(i).name);
}
}

Expand Down Expand Up @@ -415,10 +448,20 @@ Chunk PythonSource::scanDataToChunk()
// LOG_DEBUG(logger, "Column {} data: {}", col.name, ss.str());
}
}
catch (const Exception & e)
catch (Exception & e)
{
LOG_ERROR(logger, "Error processing column \"{}\": {}", col.name, e.what());
throw Exception(ErrorCodes::PY_EXCEPTION_OCCURED, "Error processing column \"{}\": {}", col.name, e.what());
}
catch (std::exception & e)
{
LOG_ERROR(logger, "Error processing column \"{}\": {}", col.name, e.what());
throw Exception(ErrorCodes::PY_EXCEPTION_OCCURED, "Error processing column \"{}\": {}", col.name, e.what());
}
catch (...)
{
LOG_ERROR(logger, "Error processing column {}: {}", i, e.what());
throw;
LOG_ERROR(logger, "Error processing column \"{}\": unknown exception", col.name);
throw Exception(ErrorCodes::PY_EXCEPTION_OCCURED, "Error processing column \"{}\": unknown exception", col.name);
}
}
cursor += count;
Expand Down
56 changes: 56 additions & 0 deletions tests/test_issue251.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
#!python3

import os
import unittest
import zipfile
import urllib.request

import pandas as pd
import chdb


class TestIssue251(unittest.TestCase):
def setUp(self):
# if /tmp/issue251/artifacts/create_final_community_reports.parquet not exists,
# download https://github.com/user-attachments/files/16361689/parquet-test-data.zip
# and unzip it to /tmp/issue251/
if not os.path.exists(
"/tmp/issue251/artifacts/create_final_community_reports.parquet"
):
print("Downloading parquet-test-data.zip")

url = "https://github.com/user-attachments/files/16361689/parquet-test-data.zip"
os.makedirs("/tmp/issue251/", exist_ok=True)
urllib.request.urlretrieve(url, "/tmp/issue251/parquet-test-data.zip")
with zipfile.ZipFile("/tmp/issue251/parquet-test-data.zip", "r") as zip_ref:
zip_ref.extractall("/tmp/issue251/")

def test_issue251(self):
df = pd.read_parquet(
"/tmp/issue251/artifacts/create_final_community_reports.parquet",
columns=[
"id",
"community",
"level",
"title",
"summary",
"findings",
"rank",
"rank_explanation",
],
)

# make pandas show all columns
pd.set_option("display.max_columns", None)
print(df.head(2))
print(df.dtypes)
try:
chdb.query("FROM Python(df) SELECT * LIMIT 10")
except Exception as e:
self.assertTrue(
"Unsupported Python object type numpy.ndarray" in str(e)
)


if __name__ == "__main__":
unittest.main()

0 comments on commit 3e42876

Please sign in to comment.