Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

STY: use pytest.raises context manager (io.sql) #25597

Merged
merged 3 commits into from
Mar 9, 2019
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
120 changes: 60 additions & 60 deletions pandas/tests/io/test_sql.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@
import pytest

import pandas.compat as compat
from pandas.compat import PY36, lrange, range, string_types
from pandas.compat import PY2, PY36, lrange, range, string_types

from pandas.core.dtypes.common import (
is_datetime64_dtype, is_datetime64tz_dtype)
Expand Down Expand Up @@ -400,8 +400,10 @@ def _to_sql_fail(self):
self.test_frame1, 'test_frame1', if_exists='fail')
assert self.pandasSQL.has_table('test_frame1')

pytest.raises(ValueError, self.pandasSQL.to_sql,
self.test_frame1, 'test_frame1', if_exists='fail')
msg = "Table 'test_frame1' already exists"
with pytest.raises(ValueError, match=msg):
self.pandasSQL.to_sql(
self.test_frame1, 'test_frame1', if_exists='fail')

self.drop_table('test_frame1')

Expand Down Expand Up @@ -563,8 +565,10 @@ def test_to_sql_fail(self):
self.conn, if_exists='fail')
assert sql.has_table('test_frame2', self.conn)

pytest.raises(ValueError, sql.to_sql, self.test_frame1,
'test_frame2', self.conn, if_exists='fail')
msg = "Table 'test_frame2' already exists"
with pytest.raises(ValueError, match=msg):
sql.to_sql(self.test_frame1, 'test_frame2',
self.conn, if_exists='fail')

def test_to_sql_replace(self):
sql.to_sql(self.test_frame1, 'test_frame3',
Expand Down Expand Up @@ -699,10 +703,11 @@ def test_timedelta(self):
result = sql.read_sql_query('SELECT * FROM test_timedelta', self.conn)
tm.assert_series_equal(result['foo'], df['foo'].astype('int64'))

def test_complex(self):
def test_complex_raises(self):
df = DataFrame({'a': [1 + 1j, 2j]})
# Complex data type should raise error
pytest.raises(ValueError, df.to_sql, 'test_complex', self.conn)
msg = "Complex datatypes not supported"
with pytest.raises(ValueError, match=msg):
df.to_sql('test_complex', self.conn)

def test_to_sql_index_label(self):
temp_frame = DataFrame({'col1': range(4)})
Expand Down Expand Up @@ -774,10 +779,11 @@ def test_to_sql_index_label_multiindex(self):
frame = sql.read_sql_query('SELECT * FROM test_index_label', self.conn)
assert frame.columns[:2].tolist() == ['C', 'D']

# wrong length of index_label
pytest.raises(ValueError, sql.to_sql, temp_frame,
'test_index_label', self.conn, if_exists='replace',
index_label='C')
msg = ("Length of 'index_label' should match number of levels, which"
" is 2")
with pytest.raises(ValueError, match=msg):
sql.to_sql(temp_frame, 'test_index_label', self.conn,
if_exists='replace', index_label='C')

def test_multiindex_roundtrip(self):
df = DataFrame.from_records([(1, 2.1, 'line1'), (2, 1.5, 'line2')],
Expand Down Expand Up @@ -882,6 +888,8 @@ def test_escaped_table_name(self):


@pytest.mark.single
@pytest.mark.skipif(
not SQLALCHEMY_INSTALLED, reason='SQLAlchemy not installed')
class TestSQLApi(SQLAlchemyMixIn, _TestSQLApi):
"""
Test the public API as it would be used directly
Expand All @@ -894,10 +902,7 @@ class TestSQLApi(SQLAlchemyMixIn, _TestSQLApi):
mode = 'sqlalchemy'

def connect(self):
if SQLALCHEMY_INSTALLED:
return sqlalchemy.create_engine('sqlite:///:memory:')
else:
pytest.skip('SQLAlchemy not installed')
return sqlalchemy.create_engine('sqlite:///:memory:')

def test_read_table_columns(self):
# test columns argument in read_table
Expand Down Expand Up @@ -1107,20 +1112,21 @@ def test_sql_open_close(self):

tm.assert_frame_equal(self.test_frame3, result)

@pytest.mark.skipif(SQLALCHEMY_INSTALLED, reason='SQLAlchemy is installed')
def test_con_string_import_error(self):
if not SQLALCHEMY_INSTALLED:
conn = 'mysql://root@localhost/pandas_nosetest'
pytest.raises(ImportError, sql.read_sql, "SELECT * FROM iris",
conn)
else:
pytest.skip('SQLAlchemy is installed')
conn = 'mysql://root@localhost/pandas_nosetest'
msg = "Using URI string without sqlalchemy installed"
with pytest.raises(ImportError, match=msg):
sql.read_sql("SELECT * FROM iris", conn)

def test_read_sql_delegate(self):
iris_frame1 = sql.read_sql_query("SELECT * FROM iris", self.conn)
iris_frame2 = sql.read_sql("SELECT * FROM iris", self.conn)
tm.assert_frame_equal(iris_frame1, iris_frame2)

pytest.raises(sql.DatabaseError, sql.read_sql, 'iris', self.conn)
msg = "Execution failed on sql 'iris': near \"iris\": syntax error"
with pytest.raises(sql.DatabaseError, match=msg):
sql.read_sql('iris', self.conn)

def test_safe_names_warning(self):
# GH 6798
Expand Down Expand Up @@ -1276,9 +1282,10 @@ def test_read_table_columns(self):
tm.equalContents(
iris_frame.columns.values, ['SepalLength', 'SepalLength'])

def test_read_table_absent(self):
pytest.raises(
ValueError, sql.read_sql_table, "this_doesnt_exist", con=self.conn)
def test_read_table_absent_raises(self):
msg = "Table this_doesnt_exist not found"
with pytest.raises(ValueError, match=msg):
sql.read_sql_table("this_doesnt_exist", con=self.conn)

def test_default_type_conversion(self):
df = sql.read_sql_table("types_test_data", self.conn)
Expand Down Expand Up @@ -1617,8 +1624,9 @@ def test_dtype(self):
meta.reflect()
sqltype = meta.tables['dtype_test2'].columns['B'].type
assert isinstance(sqltype, sqlalchemy.TEXT)
pytest.raises(ValueError, df.to_sql,
'error', self.conn, dtype={'B': str})
msg = "The type of B is not a SQLAlchemy type"
with pytest.raises(ValueError, match=msg):
df.to_sql('error', self.conn, dtype={'B': str})

# GH9083
df.to_sql('dtype_test3', self.conn, dtype={'B': sqlalchemy.String(10)})
Expand Down Expand Up @@ -1903,8 +1911,9 @@ def test_schema_support(self):
res4 = sql.read_sql_table('test_schema_other', self.conn,
schema='other')
tm.assert_frame_equal(df, res4)
pytest.raises(ValueError, sql.read_sql_table, 'test_schema_other',
self.conn, schema='public')
msg = "Table test_schema_other not found"
with pytest.raises(ValueError, match=msg):
sql.read_sql_table('test_schema_other', self.conn, schema='public')

# different if_exists options

Expand Down Expand Up @@ -2120,6 +2129,7 @@ def _get_sqlite_column_type(self, table, column):
return ctype
raise ValueError('Table %s, column %s not found' % (table, column))

@pytest.mark.skipif(PY2, reason="pytest.raises match regex fails")
def test_dtype(self):
if self.flavor == 'mysql':
pytest.skip('Not applicable to MySQL legacy')
Expand All @@ -2136,8 +2146,9 @@ def test_dtype(self):

assert self._get_sqlite_column_type(
'dtype_test2', 'B') == 'STRING'
pytest.raises(ValueError, df.to_sql,
'error', self.conn, dtype={'B': bool})
msg = r"B \(<class 'bool'>\) not a string"
with pytest.raises(ValueError, match=msg):
df.to_sql('error', self.conn, dtype={'B': bool})

# single dtype
df.to_sql('single_dtype_test', self.conn, dtype='STRING')
Expand Down Expand Up @@ -2169,8 +2180,9 @@ def test_illegal_names(self):
# For sqlite, these should work fine
df = DataFrame([[1, 2], [3, 4]], columns=['a', 'b'])

# Raise error on blank
pytest.raises(ValueError, df.to_sql, "", self.conn)
msg = "Empty table or column name specified"
with pytest.raises(ValueError, match=msg):
df.to_sql("", self.conn)

for ndx, weird_name in enumerate(
['test_weird_name]', 'test_weird_name[',
Expand Down Expand Up @@ -2399,25 +2411,19 @@ def clean_up(test_table_to_drop):
"""
self.drop_table(test_table_to_drop)

# test if invalid value for if_exists raises appropriate error
pytest.raises(ValueError,
sql.to_sql,
frame=df_if_exists_1,
con=self.conn,
name=table_name,
if_exists='notvalidvalue')
msg = "'notvalidvalue' is not valid for if_exists"
with pytest.raises(ValueError, match=msg):
sql.to_sql(frame=df_if_exists_1, con=self.conn, name=table_name,
if_exists='notvalidvalue')
clean_up(table_name)

# test if_exists='fail'
sql.to_sql(frame=df_if_exists_1, con=self.conn,
name=table_name, if_exists='fail')
pytest.raises(ValueError,
sql.to_sql,
frame=df_if_exists_1,
con=self.conn,
name=table_name,
if_exists='fail')

msg = "Table 'table_if_exists' already exists"
with pytest.raises(ValueError, match=msg):
sql.to_sql(frame=df_if_exists_1, con=self.conn, name=table_name,
if_exists='fail')
# test if_exists='replace'
sql.to_sql(frame=df_if_exists_1, con=self.conn, name=table_name,
if_exists='replace', index=False)
Expand Down Expand Up @@ -2663,23 +2669,17 @@ def clean_up(test_table_to_drop):
self.drop_table(test_table_to_drop)

# test if invalid value for if_exists raises appropriate error
pytest.raises(ValueError,
sql.to_sql,
frame=df_if_exists_1,
con=self.conn,
name=table_name,
if_exists='notvalidvalue')
with pytest.raises(ValueError, match="<insert message here>"):
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

dummy message for unconditionally skipped test. xref #20536

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yep, that makes sense.

sql.to_sql(frame=df_if_exists_1, con=self.conn, name=table_name,
if_exists='notvalidvalue')
clean_up(table_name)

# test if_exists='fail'
sql.to_sql(frame=df_if_exists_1, con=self.conn, name=table_name,
if_exists='fail', index=False)
pytest.raises(ValueError,
sql.to_sql,
frame=df_if_exists_1,
con=self.conn,
name=table_name,
if_exists='fail')
with pytest.raises(ValueError, match="<insert message here>"):
sql.to_sql(frame=df_if_exists_1, con=self.conn, name=table_name,
if_exists='fail')

# test if_exists='replace'
sql.to_sql(frame=df_if_exists_1, con=self.conn, name=table_name,
Expand Down