Skip to content

Commit

Permalink
STY: use pytest.raises context manager (io.sql) (pandas-dev#25597)
Browse files Browse the repository at this point in the history
  • Loading branch information
simonjayhawkins authored and jreback committed Mar 9, 2019
1 parent 9352e69 commit 976a2db
Showing 1 changed file with 60 additions and 60 deletions.
120 changes: 60 additions & 60 deletions pandas/tests/io/test_sql.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@
import pytest

import pandas.compat as compat
from pandas.compat import PY36, lrange, range, string_types
from pandas.compat import PY2, PY36, lrange, range, string_types

from pandas.core.dtypes.common import (
is_datetime64_dtype, is_datetime64tz_dtype)
Expand Down Expand Up @@ -400,8 +400,10 @@ def _to_sql_fail(self):
self.test_frame1, 'test_frame1', if_exists='fail')
assert self.pandasSQL.has_table('test_frame1')

pytest.raises(ValueError, self.pandasSQL.to_sql,
self.test_frame1, 'test_frame1', if_exists='fail')
msg = "Table 'test_frame1' already exists"
with pytest.raises(ValueError, match=msg):
self.pandasSQL.to_sql(
self.test_frame1, 'test_frame1', if_exists='fail')

self.drop_table('test_frame1')

Expand Down Expand Up @@ -563,8 +565,10 @@ def test_to_sql_fail(self):
self.conn, if_exists='fail')
assert sql.has_table('test_frame2', self.conn)

pytest.raises(ValueError, sql.to_sql, self.test_frame1,
'test_frame2', self.conn, if_exists='fail')
msg = "Table 'test_frame2' already exists"
with pytest.raises(ValueError, match=msg):
sql.to_sql(self.test_frame1, 'test_frame2',
self.conn, if_exists='fail')

def test_to_sql_replace(self):
sql.to_sql(self.test_frame1, 'test_frame3',
Expand Down Expand Up @@ -699,10 +703,11 @@ def test_timedelta(self):
result = sql.read_sql_query('SELECT * FROM test_timedelta', self.conn)
tm.assert_series_equal(result['foo'], df['foo'].astype('int64'))

def test_complex(self):
def test_complex_raises(self):
df = DataFrame({'a': [1 + 1j, 2j]})
# Complex data type should raise error
pytest.raises(ValueError, df.to_sql, 'test_complex', self.conn)
msg = "Complex datatypes not supported"
with pytest.raises(ValueError, match=msg):
df.to_sql('test_complex', self.conn)

@pytest.mark.parametrize("index_name,index_label,expected", [
# no index name, defaults to 'index'
Expand Down Expand Up @@ -758,10 +763,11 @@ def test_to_sql_index_label_multiindex(self):
frame = sql.read_sql_query('SELECT * FROM test_index_label', self.conn)
assert frame.columns[:2].tolist() == ['C', 'D']

# wrong length of index_label
pytest.raises(ValueError, sql.to_sql, temp_frame,
'test_index_label', self.conn, if_exists='replace',
index_label='C')
msg = ("Length of 'index_label' should match number of levels, which"
" is 2")
with pytest.raises(ValueError, match=msg):
sql.to_sql(temp_frame, 'test_index_label', self.conn,
if_exists='replace', index_label='C')

def test_multiindex_roundtrip(self):
df = DataFrame.from_records([(1, 2.1, 'line1'), (2, 1.5, 'line2')],
Expand Down Expand Up @@ -866,6 +872,8 @@ def test_escaped_table_name(self):


@pytest.mark.single
@pytest.mark.skipif(
not SQLALCHEMY_INSTALLED, reason='SQLAlchemy not installed')
class TestSQLApi(SQLAlchemyMixIn, _TestSQLApi):
"""
Test the public API as it would be used directly
Expand All @@ -878,10 +886,7 @@ class TestSQLApi(SQLAlchemyMixIn, _TestSQLApi):
mode = 'sqlalchemy'

def connect(self):
if SQLALCHEMY_INSTALLED:
return sqlalchemy.create_engine('sqlite:///:memory:')
else:
pytest.skip('SQLAlchemy not installed')
return sqlalchemy.create_engine('sqlite:///:memory:')

def test_read_table_columns(self):
# test columns argument in read_table
Expand Down Expand Up @@ -1091,20 +1096,21 @@ def test_sql_open_close(self):

tm.assert_frame_equal(self.test_frame3, result)

@pytest.mark.skipif(SQLALCHEMY_INSTALLED, reason='SQLAlchemy is installed')
def test_con_string_import_error(self):
if not SQLALCHEMY_INSTALLED:
conn = 'mysql://root@localhost/pandas_nosetest'
pytest.raises(ImportError, sql.read_sql, "SELECT * FROM iris",
conn)
else:
pytest.skip('SQLAlchemy is installed')
conn = 'mysql://root@localhost/pandas_nosetest'
msg = "Using URI string without sqlalchemy installed"
with pytest.raises(ImportError, match=msg):
sql.read_sql("SELECT * FROM iris", conn)

def test_read_sql_delegate(self):
iris_frame1 = sql.read_sql_query("SELECT * FROM iris", self.conn)
iris_frame2 = sql.read_sql("SELECT * FROM iris", self.conn)
tm.assert_frame_equal(iris_frame1, iris_frame2)

pytest.raises(sql.DatabaseError, sql.read_sql, 'iris', self.conn)
msg = "Execution failed on sql 'iris': near \"iris\": syntax error"
with pytest.raises(sql.DatabaseError, match=msg):
sql.read_sql('iris', self.conn)

def test_safe_names_warning(self):
# GH 6798
Expand Down Expand Up @@ -1260,9 +1266,10 @@ def test_read_table_columns(self):
tm.equalContents(
iris_frame.columns.values, ['SepalLength', 'SepalLength'])

def test_read_table_absent(self):
pytest.raises(
ValueError, sql.read_sql_table, "this_doesnt_exist", con=self.conn)
def test_read_table_absent_raises(self):
msg = "Table this_doesnt_exist not found"
with pytest.raises(ValueError, match=msg):
sql.read_sql_table("this_doesnt_exist", con=self.conn)

def test_default_type_conversion(self):
df = sql.read_sql_table("types_test_data", self.conn)
Expand Down Expand Up @@ -1601,8 +1608,9 @@ def test_dtype(self):
meta.reflect()
sqltype = meta.tables['dtype_test2'].columns['B'].type
assert isinstance(sqltype, sqlalchemy.TEXT)
pytest.raises(ValueError, df.to_sql,
'error', self.conn, dtype={'B': str})
msg = "The type of B is not a SQLAlchemy type"
with pytest.raises(ValueError, match=msg):
df.to_sql('error', self.conn, dtype={'B': str})

# GH9083
df.to_sql('dtype_test3', self.conn, dtype={'B': sqlalchemy.String(10)})
Expand Down Expand Up @@ -1887,8 +1895,9 @@ def test_schema_support(self):
res4 = sql.read_sql_table('test_schema_other', self.conn,
schema='other')
tm.assert_frame_equal(df, res4)
pytest.raises(ValueError, sql.read_sql_table, 'test_schema_other',
self.conn, schema='public')
msg = "Table test_schema_other not found"
with pytest.raises(ValueError, match=msg):
sql.read_sql_table('test_schema_other', self.conn, schema='public')

# different if_exists options

Expand Down Expand Up @@ -2104,6 +2113,7 @@ def _get_sqlite_column_type(self, table, column):
return ctype
raise ValueError('Table %s, column %s not found' % (table, column))

@pytest.mark.skipif(PY2, reason="pytest.raises match regex fails")
def test_dtype(self):
if self.flavor == 'mysql':
pytest.skip('Not applicable to MySQL legacy')
Expand All @@ -2120,8 +2130,9 @@ def test_dtype(self):

assert self._get_sqlite_column_type(
'dtype_test2', 'B') == 'STRING'
pytest.raises(ValueError, df.to_sql,
'error', self.conn, dtype={'B': bool})
msg = r"B \(<class 'bool'>\) not a string"
with pytest.raises(ValueError, match=msg):
df.to_sql('error', self.conn, dtype={'B': bool})

# single dtype
df.to_sql('single_dtype_test', self.conn, dtype='STRING')
Expand Down Expand Up @@ -2153,8 +2164,9 @@ def test_illegal_names(self):
# For sqlite, these should work fine
df = DataFrame([[1, 2], [3, 4]], columns=['a', 'b'])

# Raise error on blank
pytest.raises(ValueError, df.to_sql, "", self.conn)
msg = "Empty table or column name specified"
with pytest.raises(ValueError, match=msg):
df.to_sql("", self.conn)

for ndx, weird_name in enumerate(
['test_weird_name]', 'test_weird_name[',
Expand Down Expand Up @@ -2383,25 +2395,19 @@ def clean_up(test_table_to_drop):
"""
self.drop_table(test_table_to_drop)

# test if invalid value for if_exists raises appropriate error
pytest.raises(ValueError,
sql.to_sql,
frame=df_if_exists_1,
con=self.conn,
name=table_name,
if_exists='notvalidvalue')
msg = "'notvalidvalue' is not valid for if_exists"
with pytest.raises(ValueError, match=msg):
sql.to_sql(frame=df_if_exists_1, con=self.conn, name=table_name,
if_exists='notvalidvalue')
clean_up(table_name)

# test if_exists='fail'
sql.to_sql(frame=df_if_exists_1, con=self.conn,
name=table_name, if_exists='fail')
pytest.raises(ValueError,
sql.to_sql,
frame=df_if_exists_1,
con=self.conn,
name=table_name,
if_exists='fail')

msg = "Table 'table_if_exists' already exists"
with pytest.raises(ValueError, match=msg):
sql.to_sql(frame=df_if_exists_1, con=self.conn, name=table_name,
if_exists='fail')
# test if_exists='replace'
sql.to_sql(frame=df_if_exists_1, con=self.conn, name=table_name,
if_exists='replace', index=False)
Expand Down Expand Up @@ -2647,23 +2653,17 @@ def clean_up(test_table_to_drop):
self.drop_table(test_table_to_drop)

# test if invalid value for if_exists raises appropriate error
pytest.raises(ValueError,
sql.to_sql,
frame=df_if_exists_1,
con=self.conn,
name=table_name,
if_exists='notvalidvalue')
with pytest.raises(ValueError, match="<insert message here>"):
sql.to_sql(frame=df_if_exists_1, con=self.conn, name=table_name,
if_exists='notvalidvalue')
clean_up(table_name)

# test if_exists='fail'
sql.to_sql(frame=df_if_exists_1, con=self.conn, name=table_name,
if_exists='fail', index=False)
pytest.raises(ValueError,
sql.to_sql,
frame=df_if_exists_1,
con=self.conn,
name=table_name,
if_exists='fail')
with pytest.raises(ValueError, match="<insert message here>"):
sql.to_sql(frame=df_if_exists_1, con=self.conn, name=table_name,
if_exists='fail')

# test if_exists='replace'
sql.to_sql(frame=df_if_exists_1, con=self.conn, name=table_name,
Expand Down

0 comments on commit 976a2db

Please sign in to comment.