Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

ENH: add schema support to sql functions #7952

Merged
merged 2 commits into from
Aug 31, 2014
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion ci/requirements-2.6.txt
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ pytz==2013b
http://www.crummy.com/software/BeautifulSoup/bs4/download/4.2/beautifulsoup4-4.2.0.tar.gz
html5lib==1.0b2
numexpr==1.4.2
sqlalchemy==0.7.1
sqlalchemy==0.7.4
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

leave this for now...can't access server (once @cpcloud puts in the conda stuff then easy to change this)

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

But the problem is that the tests will then always fail on 2.6 (the schema keyword is only added in sqlalchemy 0.7.4)

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ahh ok
only issue is that I can't upload wheels ATM - it will work but will take the build a bit longer - but that's a fast build anyhow

pymysql==0.6.0
psycopg2==2.5
scipy==0.11.0
Expand Down
14 changes: 14 additions & 0 deletions doc/source/io.rst
Original file line number Diff line number Diff line change
Expand Up @@ -3320,6 +3320,20 @@ to pass to :func:`pandas.to_datetime`:

You can check if a table exists using :func:`~pandas.io.sql.has_table`

Schema support
~~~~~~~~~~~~~~

.. versionadded:: 0.15.0

Reading from and writing to different schema's is supported through the ``schema``
keyword in the :func:`~pandas.read_sql_table` and :func:`~pandas.DataFrame.to_sql`
functions. Note however that this depends on the database flavor (sqlite does not
have schema's). For example:

.. code-block:: python

df.to_sql('table', engine, schema='other_schema')
pd.read_sql_table('table', engine, schema='other_schema')

Querying
~~~~~~~~
Expand Down
7 changes: 7 additions & 0 deletions doc/source/v0.15.0.txt
Original file line number Diff line number Diff line change
Expand Up @@ -429,6 +429,13 @@ Enhancements

- Added support for a ``chunksize`` parameter to ``to_sql`` function. This allows DataFrame to be written in chunks and avoid packet-size overflow errors (:issue:`8062`)
- Added support for writing ``datetime.date`` and ``datetime.time`` object columns with ``to_sql`` (:issue:`6932`).
- Added support for specifying a ``schema`` to read from/write to with ``read_sql_table`` and ``to_sql`` (:issue:`7441`, :issue:`7952`).
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

maybe bigger add code-block example? (like above), link-to-docs

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

For example:

.. code-block:: python

df.to_sql('table', engine, schema='other_schema')
pd.read_sql_table('table', engine, schema='other_schema')

- Added support for bool, uint8, uint16 and uint32 datatypes in ``to_stata`` (:issue:`7097`, :issue:`7365`)

Expand Down
11 changes: 7 additions & 4 deletions pandas/core/generic.py
Original file line number Diff line number Diff line change
Expand Up @@ -915,8 +915,8 @@ def to_msgpack(self, path_or_buf=None, **kwargs):
from pandas.io import packers
return packers.to_msgpack(path_or_buf, self, **kwargs)

def to_sql(self, name, con, flavor='sqlite', if_exists='fail', index=True,
index_label=None, chunksize=None):
def to_sql(self, name, con, flavor='sqlite', schema=None, if_exists='fail',
index=True, index_label=None, chunksize=None):
"""
Write records stored in a DataFrame to a SQL database.

Expand All @@ -932,6 +932,9 @@ def to_sql(self, name, con, flavor='sqlite', if_exists='fail', index=True,
The flavor of SQL to use. Ignored when using SQLAlchemy engine.
'mysql' is deprecated and will be removed in future versions, but it
will be further supported through SQLAlchemy engines.
schema : string, default None
Specify the schema (if database flavor supports this). If None, use
default schema.
if_exists : {'fail', 'replace', 'append'}, default 'fail'
- fail: If table exists, do nothing.
- replace: If table exists, drop it, recreate it, and insert data.
Expand All @@ -949,8 +952,8 @@ def to_sql(self, name, con, flavor='sqlite', if_exists='fail', index=True,
"""
from pandas.io import sql
sql.to_sql(
self, name, con, flavor=flavor, if_exists=if_exists, index=index,
index_label=index_label, chunksize=chunksize)
self, name, con, flavor=flavor, schema=schema, if_exists=if_exists,
index=index, index_label=index_label, chunksize=chunksize)

def to_pickle(self, path):
"""
Expand Down
100 changes: 59 additions & 41 deletions pandas/io/sql.py
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ def _is_sqlalchemy_engine(con):
try:
import sqlalchemy
_SQLALCHEMY_INSTALLED = True

from distutils.version import LooseVersion
ver = LooseVersion(sqlalchemy.__version__)
# For sqlalchemy versions < 0.8.2, the BIGINT type is recognized
Expand All @@ -47,7 +47,7 @@ def _is_sqlalchemy_engine(con):
if ver < '0.8.2':
from sqlalchemy import BigInteger
from sqlalchemy.ext.compiler import compiles

@compiles(BigInteger, 'sqlite')
def compile_big_int_sqlite(type_, compiler, **kw):
return 'INTEGER'
Expand Down Expand Up @@ -145,7 +145,7 @@ def _safe_fetch(cur):
if not isinstance(result, list):
result = list(result)
return result
except Exception as e: # pragma: no cover
except Exception as e: # pragma: no cover
excName = e.__class__.__name__
if excName == 'OperationalError':
return []
Expand Down Expand Up @@ -187,7 +187,7 @@ def tquery(sql, con=None, cur=None, retry=True):
con.commit()
except Exception as e:
excName = e.__class__.__name__
if excName == 'OperationalError': # pragma: no cover
if excName == 'OperationalError': # pragma: no cover
print('Failed to commit, may need to restart interpreter')
else:
raise
Expand All @@ -199,7 +199,7 @@ def tquery(sql, con=None, cur=None, retry=True):
if result and len(result[0]) == 1:
# python 3 compat
result = list(lzip(*result)[0])
elif result is None: # pragma: no cover
elif result is None: # pragma: no cover
result = []

return result
Expand Down Expand Up @@ -253,8 +253,8 @@ def uquery(sql, con=None, cur=None, retry=True, params=None):
#------------------------------------------------------------------------------
#--- Read and write to DataFrames

def read_sql_table(table_name, con, index_col=None, coerce_float=True,
parse_dates=None, columns=None):
def read_sql_table(table_name, con, schema=None, index_col=None,
coerce_float=True, parse_dates=None, columns=None):
"""Read SQL database table into a DataFrame.

Given a table name and an SQLAlchemy engine, returns a DataFrame.
Expand All @@ -266,6 +266,9 @@ def read_sql_table(table_name, con, index_col=None, coerce_float=True,
Name of SQL table in database
con : SQLAlchemy engine
Sqlite DBAPI connection mode not supported
schema : string, default None
Name of SQL schema in database to query (if database flavor supports this).
If None, use default schema (default).
index_col : string, optional
Column to set as index
coerce_float : boolean, default True
Expand Down Expand Up @@ -298,7 +301,7 @@ def read_sql_table(table_name, con, index_col=None, coerce_float=True,
"SQLAlchemy engines.")
import sqlalchemy
from sqlalchemy.schema import MetaData
meta = MetaData(con)
meta = MetaData(con, schema=schema)
try:
meta.reflect(only=[table_name])
except sqlalchemy.exc.InvalidRequestError:
Expand Down Expand Up @@ -437,8 +440,8 @@ def read_sql(sql, con, index_col=None, coerce_float=True, params=None,
coerce_float=coerce_float, parse_dates=parse_dates)


def to_sql(frame, name, con, flavor='sqlite', if_exists='fail', index=True,
index_label=None, chunksize=None):
def to_sql(frame, name, con, flavor='sqlite', schema=None, if_exists='fail',
index=True, index_label=None, chunksize=None):
"""
Write records stored in a DataFrame to a SQL database.

Expand All @@ -455,6 +458,9 @@ def to_sql(frame, name, con, flavor='sqlite', if_exists='fail', index=True,
The flavor of SQL to use. Ignored when using SQLAlchemy engine.
'mysql' is deprecated and will be removed in future versions, but it
will be further supported through SQLAlchemy engines.
schema : string, default None
Name of SQL schema in database to write to (if database flavor supports
this). If None, use default schema (default).
if_exists : {'fail', 'replace', 'append'}, default 'fail'
- fail: If table exists, do nothing.
- replace: If table exists, drop it, recreate it, and insert data.
Expand All @@ -473,18 +479,19 @@ def to_sql(frame, name, con, flavor='sqlite', if_exists='fail', index=True,
if if_exists not in ('fail', 'replace', 'append'):
raise ValueError("'{0}' is not valid for if_exists".format(if_exists))

pandas_sql = pandasSQL_builder(con, flavor=flavor)
pandas_sql = pandasSQL_builder(con, schema=schema, flavor=flavor)

if isinstance(frame, Series):
frame = frame.to_frame()
elif not isinstance(frame, DataFrame):
raise NotImplementedError

pandas_sql.to_sql(frame, name, if_exists=if_exists, index=index,
index_label=index_label, chunksize=chunksize)
index_label=index_label, schema=schema,
chunksize=chunksize)


def has_table(table_name, con, flavor='sqlite'):
def has_table(table_name, con, flavor='sqlite', schema=None):
"""
Check if DataBase has named table.

Expand All @@ -500,12 +507,15 @@ def has_table(table_name, con, flavor='sqlite'):
The flavor of SQL to use. Ignored when using SQLAlchemy engine.
'mysql' is deprecated and will be removed in future versions, but it
will be further supported through SQLAlchemy engines.
schema : string, default None
Name of SQL schema in database to write to (if database flavor supports
this). If None, use default schema (default).

Returns
-------
boolean
"""
pandas_sql = pandasSQL_builder(con, flavor=flavor)
pandas_sql = pandasSQL_builder(con, flavor=flavor, schema=schema)
return pandas_sql.has_table(table_name)

table_exists = has_table
Expand All @@ -515,15 +525,15 @@ def has_table(table_name, con, flavor='sqlite'):
"and will be removed in future versions. "
"MySQL will be further supported with SQLAlchemy engines.")

def pandasSQL_builder(con, flavor=None, meta=None, is_cursor=False):
def pandasSQL_builder(con, flavor=None, schema=None, meta=None, is_cursor=False):
"""
Convenience function to return the correct PandasSQL subclass based on the
provided parameters
"""
# When support for DBAPI connections is removed,
# is_cursor should not be necessary.
if _is_sqlalchemy_engine(con):
return PandasSQLAlchemy(con, meta=meta)
return PandasSQLAlchemy(con, schema=schema, meta=meta)
else:
if flavor == 'mysql':
warnings.warn(_MYSQL_WARNING, FutureWarning)
Expand All @@ -540,24 +550,26 @@ class PandasSQLTable(PandasObject):
"""
# TODO: support for multiIndex
def __init__(self, name, pandas_sql_engine, frame=None, index=True,
if_exists='fail', prefix='pandas', index_label=None):
if_exists='fail', prefix='pandas', index_label=None,
schema=None):
self.name = name
self.pd_sql = pandas_sql_engine
self.prefix = prefix
self.frame = frame
self.index = self._index_name(index, index_label)
self.schema = schema

if frame is not None:
# We want to write a frame
if self.pd_sql.has_table(self.name):
if self.pd_sql.has_table(self.name, self.schema):
if if_exists == 'fail':
raise ValueError("Table '%s' already exists." % name)
elif if_exists == 'replace':
self.pd_sql.drop_table(self.name)
self.pd_sql.drop_table(self.name, self.schema)
self.table = self._create_table_statement()
self.create()
elif if_exists == 'append':
self.table = self.pd_sql.get_table(self.name)
self.table = self.pd_sql.get_table(self.name, self.schema)
if self.table is None:
self.table = self._create_table_statement()
else:
Expand All @@ -568,13 +580,13 @@ def __init__(self, name, pandas_sql_engine, frame=None, index=True,
self.create()
else:
# no data provided, read-only mode
self.table = self.pd_sql.get_table(self.name)
self.table = self.pd_sql.get_table(self.name, self.schema)

if self.table is None:
raise ValueError("Could not init table '%s'" % name)

def exists(self):
return self.pd_sql.has_table(self.name)
return self.pd_sql.has_table(self.name, self.schema)

def sql_schema(self):
from sqlalchemy.schema import CreateTable
Expand Down Expand Up @@ -709,7 +721,7 @@ def _create_table_statement(self):
columns = [Column(name, typ)
for name, typ in column_names_and_types]

return Table(self.name, self.pd_sql.meta, *columns)
return Table(self.name, self.pd_sql.meta, *columns, schema=self.schema)

def _harmonize_columns(self, parse_dates=None):
""" Make a data_frame's column type align with an sql_table
Expand Down Expand Up @@ -830,11 +842,11 @@ class PandasSQLAlchemy(PandasSQL):
using SQLAlchemy to handle DataBase abstraction
"""

def __init__(self, engine, meta=None):
def __init__(self, engine, schema=None, meta=None):
self.engine = engine
if not meta:
from sqlalchemy.schema import MetaData
meta = MetaData(self.engine)
meta = MetaData(self.engine, schema=schema)

self.meta = meta

Expand All @@ -843,9 +855,10 @@ def execute(self, *args, **kwargs):
return self.engine.execute(*args, **kwargs)

def read_table(self, table_name, index_col=None, coerce_float=True,
parse_dates=None, columns=None):
parse_dates=None, columns=None, schema=None):

table = PandasSQLTable(table_name, self, index=index_col)
table = PandasSQLTable(
table_name, self, index=index_col, schema=schema)
return table.read(coerce_float=coerce_float,
parse_dates=parse_dates, columns=columns)

Expand All @@ -868,26 +881,31 @@ def read_sql(self, sql, index_col=None, coerce_float=True,
return data_frame

def to_sql(self, frame, name, if_exists='fail', index=True,
index_label=None, chunksize=None):
index_label=None, schema=None, chunksize=None):
table = PandasSQLTable(
name, self, frame=frame, index=index, if_exists=if_exists,
index_label=index_label)
index_label=index_label, schema=schema)
table.insert(chunksize)

@property
def tables(self):
return self.meta.tables

def has_table(self, name):
return self.engine.has_table(name)
def has_table(self, name, schema=None):
return self.engine.has_table(name, schema or self.meta.schema)

def get_table(self, table_name):
return self.meta.tables.get(table_name)
def get_table(self, table_name, schema=None):
schema = schema or self.meta.schema
if schema:
return self.meta.tables.get('.'.join([schema, table_name]))
else:
return self.meta.tables.get(table_name)

def drop_table(self, table_name):
if self.engine.has_table(table_name):
self.meta.reflect(only=[table_name])
self.get_table(table_name).drop()
def drop_table(self, table_name, schema=None):
schema = schema or self.meta.schema
if self.engine.has_table(table_name, schema):
self.meta.reflect(only=[table_name], schema=schema)
self.get_table(table_name, schema).drop()
self.meta.clear()

def _create_sql_schema(self, frame, table_name):
Expand Down Expand Up @@ -1113,7 +1131,7 @@ def _fetchall_as_list(self, cur):
return result

def to_sql(self, frame, name, if_exists='fail', index=True,
index_label=None, chunksize=None):
index_label=None, schema=None, chunksize=None):
"""
Write records stored in a DataFrame to a SQL database.

Expand All @@ -1133,7 +1151,7 @@ def to_sql(self, frame, name, if_exists='fail', index=True,
index_label=index_label)
table.insert(chunksize)

def has_table(self, name):
def has_table(self, name, schema=None):
flavor_map = {
'sqlite': ("SELECT name FROM sqlite_master "
"WHERE type='table' AND name='%s';") % name,
Expand All @@ -1142,10 +1160,10 @@ def has_table(self, name):

return len(self.execute(query).fetchall()) > 0

def get_table(self, table_name):
def get_table(self, table_name, schema=None):
return None # not supported in Legacy mode

def drop_table(self, name):
def drop_table(self, name, schema=None):
drop_sql = "DROP TABLE %s" % name
self.execute(drop_sql)

Expand Down
Loading