Skip to content

Commit

Permalink
Buffer overflow in cursor create_name_map (mkleehammer#1)
Browse files Browse the repository at this point in the history
Function create_name_map used a static buffer `szName[300]` for the column name in call to `SQLDescribeColW`. That function would truncate the column name to the size of the provided buffer, but return the actual length of the column name in the `&cchName` return parameter.
That `cchName` output was used to compute `cbName` parameter for the call to `TextBufferToObject(enc, szName, cbName)`. When the actual column name was longer than the buffer, cbName was to big, and was causing a buffer overflow, resulting in crashes, invalid data, or errors.

Fix this by making szName a dynamic buffer. This buffer overflow is very similar to the one fixed in mkleehammer#881. I check through the rest of the code base, and I don't see more places with static buffers that could have a similar issue.

Upstream PR: mkleehammer#931
  • Loading branch information
juliuszsompolski authored Jul 15, 2021
1 parent 7afa066 commit 619101f
Show file tree
Hide file tree
Showing 9 changed files with 135 additions and 9 deletions.
20 changes: 17 additions & 3 deletions src/cursor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -131,6 +131,8 @@ static bool create_name_map(Cursor* cur, SQLSMALLINT field_count, bool lower)

bool success = false;
PyObject *desc = 0, *colmap = 0, *colinfo = 0, *type = 0, *index = 0, *nullable_obj=0;
SQLSMALLINT nameLen = 300;
ODBCCHAR *szName = NULL;
SQLRETURN ret;

I(cur->hstmt != SQL_NULL_HANDLE && cur->colinfos != 0);
Expand All @@ -148,20 +150,21 @@ static bool create_name_map(Cursor* cur, SQLSMALLINT field_count, bool lower)

desc = PyTuple_New((Py_ssize_t)field_count);
colmap = PyDict_New();
if (!desc || !colmap)
szName = (ODBCCHAR*) pyodbc_malloc((nameLen + 1) * sizeof(ODBCCHAR));
if (!desc || !colmap || !szName)
goto done;

for (int i = 0; i < field_count; i++)
{
ODBCCHAR szName[300];
SQLSMALLINT cchName;
SQLSMALLINT nDataType;
SQLULEN nColSize; // precision
SQLSMALLINT cDecimalDigits; // scale
SQLSMALLINT nullable;

retry:
Py_BEGIN_ALLOW_THREADS
ret = SQLDescribeColW(cur->hstmt, (SQLUSMALLINT)(i + 1), (SQLWCHAR*)szName, _countof(szName), &cchName, &nDataType, &nColSize, &cDecimalDigits, &nullable);
ret = SQLDescribeColW(cur->hstmt, (SQLUSMALLINT)(i + 1), (SQLWCHAR*)szName, nameLen, &cchName, &nDataType, &nColSize, &cDecimalDigits, &nullable);
Py_END_ALLOW_THREADS

if (cur->cnxn->hdbc == SQL_NULL_HANDLE)
Expand All @@ -177,6 +180,16 @@ static bool create_name_map(Cursor* cur, SQLSMALLINT field_count, bool lower)
goto done;
}

// If needed, allocate a bigger column name message buffer and retry.
if (cchName > nameLen - 1) {
nameLen = cchName + 1;
if (!pyodbc_realloc((BYTE**) &szName, (nameLen + 1) * sizeof(ODBCCHAR))) {
PyErr_NoMemory();
goto done;
}
goto retry;
}

const TextEnc& enc = cur->cnxn->metadata_enc;

// HACK: I don't know the exact issue, but iODBC + Teradata results in either UCS4 data
Expand Down Expand Up @@ -289,6 +302,7 @@ static bool create_name_map(Cursor* cur, SQLSMALLINT field_count, bool lower)
Py_XDECREF(colmap);
Py_XDECREF(index);
Py_XDECREF(colinfo);
pyodbc_free(szName);

return success;
}
Expand Down
16 changes: 15 additions & 1 deletion tests2/accesstests.py
Original file line number Diff line number Diff line change
Expand Up @@ -521,7 +521,21 @@ def test_lower_case(self):

# Put it back so other tests don't fail.
pyodbc.lowercase = False


def test_long_column_name(self):
"ensure super long column names are handled correctly."
c1 = 'abcdefghij' * 50
c2 = 'klmnopqrst' * 60
self.cursor = self.cnxn.cursor()

self.cursor.execute("create table t1({} int, {} int)".format(c1, c2))
self.cursor.execute("select * from t1")

names = [ t[0] for t in self.cursor.description ]
names.sort()

self.assertEqual(names, [ c1, c2 ])

def test_row_description(self):
"""
Ensure Cursor.description is accessible as Row.cursor_description.
Expand Down
16 changes: 15 additions & 1 deletion tests2/informixtests.py
Original file line number Diff line number Diff line change
Expand Up @@ -791,7 +791,21 @@ def test_lower_case(self):

# Put it back so other tests don't fail.
pyodbc.lowercase = False


def test_long_column_name(self):
"ensure super long column names are handled correctly."
c1 = 'abcdefghij' * 50
c2 = 'klmnopqrst' * 60
self.cursor = self.cnxn.cursor()

self.cursor.execute("create table t1({} int, {} int)".format(c1, c2))
self.cursor.execute("select * from t1")

names = [ t[0] for t in self.cursor.description ]
names.sort()

self.assertEqual(names, [ c1, c2 ])

def test_row_description(self):
"""
Ensure Cursor.description is accessible as Row.cursor_description.
Expand Down
14 changes: 14 additions & 0 deletions tests2/sqldwtests.py
Original file line number Diff line number Diff line change
Expand Up @@ -945,6 +945,20 @@ def test_lower_case(self):
# Put it back so other tests don't fail.
pyodbc.lowercase = False

def test_long_column_name(self):
"ensure super long column names are handled correctly."
c1 = 'abcdefghij' * 50
c2 = 'klmnopqrst' * 60
self.cursor = self.cnxn.cursor()

self.cursor.execute("create table t1({} int, {} int)".format(c1, c2))
self.cursor.execute("select * from t1")

names = [ t[0] for t in self.cursor.description ]
names.sort()

self.assertEqual(names, [ c1, c2 ])

def test_row_description(self):
"""
Ensure Cursor.description is accessible as Row.cursor_description.
Expand Down
16 changes: 15 additions & 1 deletion tests2/sqlitetests.py
Original file line number Diff line number Diff line change
Expand Up @@ -401,7 +401,21 @@ def test_lower_case(self):

# Put it back so other tests don't fail.
pyodbc.lowercase = False


def test_long_column_name(self):
"ensure super long column names are handled correctly."
c1 = 'abcdefghij' * 50
c2 = 'klmnopqrst' * 60
self.cursor = self.cnxn.cursor()

self.cursor.execute("create table t1({} int, {} int)".format(c1, c2))
self.cursor.execute("select * from t1")

names = [ t[0] for t in self.cursor.description ]
names.sort()

self.assertEqual(names, [ c1, c2 ])

def test_row_description(self):
"""
Ensure Cursor.description is accessible as Row.cursor_description.
Expand Down
16 changes: 15 additions & 1 deletion tests3/accesstests.py
Original file line number Diff line number Diff line change
Expand Up @@ -482,7 +482,21 @@ def test_lower_case(self):

# Put it back so other tests don't fail.
pyodbc.lowercase = False


def test_long_column_name(self):
"ensure super long column names are handled correctly."
c1 = 'abcdefghij' * 50
c2 = 'klmnopqrst' * 60
self.cursor = self.cnxn.cursor()

self.cursor.execute("create table t1({} int, {} int)".format(c1, c2))
self.cursor.execute("select * from t1")

names = [ t[0] for t in self.cursor.description ]
names.sort()

self.assertEqual(names, [ c1, c2 ])

def test_row_description(self):
"""
Ensure Cursor.description is accessible as Row.cursor_description.
Expand Down
16 changes: 15 additions & 1 deletion tests3/informixtests.py
Original file line number Diff line number Diff line change
Expand Up @@ -791,7 +791,21 @@ def test_lower_case(self):

# Put it back so other tests don't fail.
pyodbc.lowercase = False


def test_long_column_name(self):
"ensure super long column names are handled correctly."
c1 = 'abcdefghij' * 50
c2 = 'klmnopqrst' * 60
self.cursor = self.cnxn.cursor()

self.cursor.execute("create table t1({} int, {} int)".format(c1, c2))
self.cursor.execute("select * from t1")

names = [ t[0] for t in self.cursor.description ]
names.sort()

self.assertEqual(names, [ c1, c2 ])

def test_row_description(self):
"""
Ensure Cursor.description is accessible as Row.cursor_description.
Expand Down
14 changes: 14 additions & 0 deletions tests3/sqldwtests.py
Original file line number Diff line number Diff line change
Expand Up @@ -899,6 +899,20 @@ def test_lower_case(self):
# Put it back so other tests don't fail.
pyodbc.lowercase = False

def test_long_column_name(self):
"ensure super long column names are handled correctly."
c1 = 'abcdefghij' * 50
c2 = 'klmnopqrst' * 60
self.cursor = self.cnxn.cursor()

self.cursor.execute("create table t1({} int, {} int)".format(c1, c2))
self.cursor.execute("select * from t1")

names = [ t[0] for t in self.cursor.description ]
names.sort()

self.assertEqual(names, [ c1, c2 ])

def test_row_description(self):
"""
Ensure Cursor.description is accessible as Row.cursor_description.
Expand Down
16 changes: 15 additions & 1 deletion tests3/sqlitetests.py
Original file line number Diff line number Diff line change
Expand Up @@ -383,7 +383,21 @@ def test_lower_case(self):

# Put it back so other tests don't fail.
pyodbc.lowercase = False


def test_long_column_name(self):
"ensure super long column names are handled correctly."
c1 = 'abcdefghij' * 50
c2 = 'klmnopqrst' * 60
self.cursor = self.cnxn.cursor()

self.cursor.execute("create table t1({} int, {} int)".format(c1, c2))
self.cursor.execute("select * from t1")

names = [ t[0] for t in self.cursor.description ]
names.sort()

self.assertEqual(names, [ c1, c2 ])

def test_row_description(self):
"""
Ensure Cursor.description is accessible as Row.cursor_description.
Expand Down

0 comments on commit 619101f

Please sign in to comment.