From 619101f9c6999a1ef31193f132f9ec0ebcd3cb65 Mon Sep 17 00:00:00 2001 From: Juliusz Sompolski Date: Thu, 15 Jul 2021 16:07:42 +0200 Subject: [PATCH] Buffer overflow in cursor create_name_map (#1) Function create_name_map used a static buffer `szName[300]` for the column name in call to `SQLDescribeColW`. That function would truncate the column name to the size of the provided buffer, but return the actual length of the column name in the `&cchName` return parameter. That `cchName` output was used to compute `cbName` parameter for the call to `TextBufferToObject(enc, szName, cbName)`. When the actual column name was longer than the buffer, cbName was to big, and was causing a buffer overflow, resulting in crashes, invalid data, or errors. Fix this by making szName a dynamic buffer. This buffer overflow is very similar to the one fixed in https://github.com/mkleehammer/pyodbc/pull/881. I check through the rest of the code base, and I don't see more places with static buffers that could have a similar issue. Upstream PR: https://github.com/mkleehammer/pyodbc/pull/931 --- src/cursor.cpp | 20 +++++++++++++++++--- tests2/accesstests.py | 16 +++++++++++++++- tests2/informixtests.py | 16 +++++++++++++++- tests2/sqldwtests.py | 14 ++++++++++++++ tests2/sqlitetests.py | 16 +++++++++++++++- tests3/accesstests.py | 16 +++++++++++++++- tests3/informixtests.py | 16 +++++++++++++++- tests3/sqldwtests.py | 14 ++++++++++++++ tests3/sqlitetests.py | 16 +++++++++++++++- 9 files changed, 135 insertions(+), 9 deletions(-) diff --git a/src/cursor.cpp b/src/cursor.cpp index 98983bfd..7928e6d6 100644 --- a/src/cursor.cpp +++ b/src/cursor.cpp @@ -131,6 +131,8 @@ static bool create_name_map(Cursor* cur, SQLSMALLINT field_count, bool lower) bool success = false; PyObject *desc = 0, *colmap = 0, *colinfo = 0, *type = 0, *index = 0, *nullable_obj=0; + SQLSMALLINT nameLen = 300; + ODBCCHAR *szName = NULL; SQLRETURN ret; I(cur->hstmt != SQL_NULL_HANDLE && cur->colinfos != 0); @@ -148,20 +150,21 @@ static bool create_name_map(Cursor* cur, SQLSMALLINT field_count, bool lower) desc = PyTuple_New((Py_ssize_t)field_count); colmap = PyDict_New(); - if (!desc || !colmap) + szName = (ODBCCHAR*) pyodbc_malloc((nameLen + 1) * sizeof(ODBCCHAR)); + if (!desc || !colmap || !szName) goto done; for (int i = 0; i < field_count; i++) { - ODBCCHAR szName[300]; SQLSMALLINT cchName; SQLSMALLINT nDataType; SQLULEN nColSize; // precision SQLSMALLINT cDecimalDigits; // scale SQLSMALLINT nullable; + retry: Py_BEGIN_ALLOW_THREADS - ret = SQLDescribeColW(cur->hstmt, (SQLUSMALLINT)(i + 1), (SQLWCHAR*)szName, _countof(szName), &cchName, &nDataType, &nColSize, &cDecimalDigits, &nullable); + ret = SQLDescribeColW(cur->hstmt, (SQLUSMALLINT)(i + 1), (SQLWCHAR*)szName, nameLen, &cchName, &nDataType, &nColSize, &cDecimalDigits, &nullable); Py_END_ALLOW_THREADS if (cur->cnxn->hdbc == SQL_NULL_HANDLE) @@ -177,6 +180,16 @@ static bool create_name_map(Cursor* cur, SQLSMALLINT field_count, bool lower) goto done; } + // If needed, allocate a bigger column name message buffer and retry. + if (cchName > nameLen - 1) { + nameLen = cchName + 1; + if (!pyodbc_realloc((BYTE**) &szName, (nameLen + 1) * sizeof(ODBCCHAR))) { + PyErr_NoMemory(); + goto done; + } + goto retry; + } + const TextEnc& enc = cur->cnxn->metadata_enc; // HACK: I don't know the exact issue, but iODBC + Teradata results in either UCS4 data @@ -289,6 +302,7 @@ static bool create_name_map(Cursor* cur, SQLSMALLINT field_count, bool lower) Py_XDECREF(colmap); Py_XDECREF(index); Py_XDECREF(colinfo); + pyodbc_free(szName); return success; } diff --git a/tests2/accesstests.py b/tests2/accesstests.py index 2cd0fc55..7e423e4e 100755 --- a/tests2/accesstests.py +++ b/tests2/accesstests.py @@ -521,7 +521,21 @@ def test_lower_case(self): # Put it back so other tests don't fail. pyodbc.lowercase = False - + + def test_long_column_name(self): + "ensure super long column names are handled correctly." + c1 = 'abcdefghij' * 50 + c2 = 'klmnopqrst' * 60 + self.cursor = self.cnxn.cursor() + + self.cursor.execute("create table t1({} int, {} int)".format(c1, c2)) + self.cursor.execute("select * from t1") + + names = [ t[0] for t in self.cursor.description ] + names.sort() + + self.assertEqual(names, [ c1, c2 ]) + def test_row_description(self): """ Ensure Cursor.description is accessible as Row.cursor_description. diff --git a/tests2/informixtests.py b/tests2/informixtests.py index 36525e90..0f740167 100755 --- a/tests2/informixtests.py +++ b/tests2/informixtests.py @@ -791,7 +791,21 @@ def test_lower_case(self): # Put it back so other tests don't fail. pyodbc.lowercase = False - + + def test_long_column_name(self): + "ensure super long column names are handled correctly." + c1 = 'abcdefghij' * 50 + c2 = 'klmnopqrst' * 60 + self.cursor = self.cnxn.cursor() + + self.cursor.execute("create table t1({} int, {} int)".format(c1, c2)) + self.cursor.execute("select * from t1") + + names = [ t[0] for t in self.cursor.description ] + names.sort() + + self.assertEqual(names, [ c1, c2 ]) + def test_row_description(self): """ Ensure Cursor.description is accessible as Row.cursor_description. diff --git a/tests2/sqldwtests.py b/tests2/sqldwtests.py index 95b50300..5f169938 100644 --- a/tests2/sqldwtests.py +++ b/tests2/sqldwtests.py @@ -945,6 +945,20 @@ def test_lower_case(self): # Put it back so other tests don't fail. pyodbc.lowercase = False + def test_long_column_name(self): + "ensure super long column names are handled correctly." + c1 = 'abcdefghij' * 50 + c2 = 'klmnopqrst' * 60 + self.cursor = self.cnxn.cursor() + + self.cursor.execute("create table t1({} int, {} int)".format(c1, c2)) + self.cursor.execute("select * from t1") + + names = [ t[0] for t in self.cursor.description ] + names.sort() + + self.assertEqual(names, [ c1, c2 ]) + def test_row_description(self): """ Ensure Cursor.description is accessible as Row.cursor_description. diff --git a/tests2/sqlitetests.py b/tests2/sqlitetests.py index b402d19e..3512095a 100755 --- a/tests2/sqlitetests.py +++ b/tests2/sqlitetests.py @@ -401,7 +401,21 @@ def test_lower_case(self): # Put it back so other tests don't fail. pyodbc.lowercase = False - + + def test_long_column_name(self): + "ensure super long column names are handled correctly." + c1 = 'abcdefghij' * 50 + c2 = 'klmnopqrst' * 60 + self.cursor = self.cnxn.cursor() + + self.cursor.execute("create table t1({} int, {} int)".format(c1, c2)) + self.cursor.execute("select * from t1") + + names = [ t[0] for t in self.cursor.description ] + names.sort() + + self.assertEqual(names, [ c1, c2 ]) + def test_row_description(self): """ Ensure Cursor.description is accessible as Row.cursor_description. diff --git a/tests3/accesstests.py b/tests3/accesstests.py index d33c54ea..c7cf546e 100644 --- a/tests3/accesstests.py +++ b/tests3/accesstests.py @@ -482,7 +482,21 @@ def test_lower_case(self): # Put it back so other tests don't fail. pyodbc.lowercase = False - + + def test_long_column_name(self): + "ensure super long column names are handled correctly." + c1 = 'abcdefghij' * 50 + c2 = 'klmnopqrst' * 60 + self.cursor = self.cnxn.cursor() + + self.cursor.execute("create table t1({} int, {} int)".format(c1, c2)) + self.cursor.execute("select * from t1") + + names = [ t[0] for t in self.cursor.description ] + names.sort() + + self.assertEqual(names, [ c1, c2 ]) + def test_row_description(self): """ Ensure Cursor.description is accessible as Row.cursor_description. diff --git a/tests3/informixtests.py b/tests3/informixtests.py index 62aabfec..1ef66f65 100644 --- a/tests3/informixtests.py +++ b/tests3/informixtests.py @@ -791,7 +791,21 @@ def test_lower_case(self): # Put it back so other tests don't fail. pyodbc.lowercase = False - + + def test_long_column_name(self): + "ensure super long column names are handled correctly." + c1 = 'abcdefghij' * 50 + c2 = 'klmnopqrst' * 60 + self.cursor = self.cnxn.cursor() + + self.cursor.execute("create table t1({} int, {} int)".format(c1, c2)) + self.cursor.execute("select * from t1") + + names = [ t[0] for t in self.cursor.description ] + names.sort() + + self.assertEqual(names, [ c1, c2 ]) + def test_row_description(self): """ Ensure Cursor.description is accessible as Row.cursor_description. diff --git a/tests3/sqldwtests.py b/tests3/sqldwtests.py index 14405046..a0e78783 100644 --- a/tests3/sqldwtests.py +++ b/tests3/sqldwtests.py @@ -899,6 +899,20 @@ def test_lower_case(self): # Put it back so other tests don't fail. pyodbc.lowercase = False + def test_long_column_name(self): + "ensure super long column names are handled correctly." + c1 = 'abcdefghij' * 50 + c2 = 'klmnopqrst' * 60 + self.cursor = self.cnxn.cursor() + + self.cursor.execute("create table t1({} int, {} int)".format(c1, c2)) + self.cursor.execute("select * from t1") + + names = [ t[0] for t in self.cursor.description ] + names.sort() + + self.assertEqual(names, [ c1, c2 ]) + def test_row_description(self): """ Ensure Cursor.description is accessible as Row.cursor_description. diff --git a/tests3/sqlitetests.py b/tests3/sqlitetests.py index 1978269f..21a2157d 100644 --- a/tests3/sqlitetests.py +++ b/tests3/sqlitetests.py @@ -383,7 +383,21 @@ def test_lower_case(self): # Put it back so other tests don't fail. pyodbc.lowercase = False - + + def test_long_column_name(self): + "ensure super long column names are handled correctly." + c1 = 'abcdefghij' * 50 + c2 = 'klmnopqrst' * 60 + self.cursor = self.cnxn.cursor() + + self.cursor.execute("create table t1({} int, {} int)".format(c1, c2)) + self.cursor.execute("select * from t1") + + names = [ t[0] for t in self.cursor.description ] + names.sort() + + self.assertEqual(names, [ c1, c2 ]) + def test_row_description(self): """ Ensure Cursor.description is accessible as Row.cursor_description.