diff --git a/ci/deps/actions-310.yaml b/ci/deps/actions-310.yaml index 94652e8586d77..f03ee2d321529 100644 --- a/ci/deps/actions-310.yaml +++ b/ci/deps/actions-310.yaml @@ -56,5 +56,7 @@ dependencies: - zstandard>=0.19.0 - pip: + - adbc-driver-postgresql>=0.8.0 + - adbc-driver-sqlite>=0.8.0 - pyqt5>=5.15.8 - tzdata>=2022.7 diff --git a/ci/deps/actions-311-downstream_compat.yaml b/ci/deps/actions-311-downstream_compat.yaml index bf47bfe3a83ec..7c1173a9c4d83 100644 --- a/ci/deps/actions-311-downstream_compat.yaml +++ b/ci/deps/actions-311-downstream_compat.yaml @@ -70,6 +70,8 @@ dependencies: - pyyaml - py - pip: + - adbc-driver-postgresql>=0.8.0 + - adbc-driver-sqlite>=0.8.0 - dataframe-api-compat>=0.1.7 - pyqt5>=5.15.8 - tzdata>=2022.7 diff --git a/ci/deps/actions-311.yaml b/ci/deps/actions-311.yaml index bd9e2059ef477..da73169513c0d 100644 --- a/ci/deps/actions-311.yaml +++ b/ci/deps/actions-311.yaml @@ -56,5 +56,7 @@ dependencies: - zstandard>=0.19.0 - pip: + - adbc-driver-postgresql>=0.8.0 + - adbc-driver-sqlite>=0.8.0 - pyqt5>=5.15.8 - tzdata>=2022.7 diff --git a/ci/deps/actions-39-minimum_versions.yaml b/ci/deps/actions-39-minimum_versions.yaml index aa8597978ecf7..42729046c8d7f 100644 --- a/ci/deps/actions-39-minimum_versions.yaml +++ b/ci/deps/actions-39-minimum_versions.yaml @@ -58,6 +58,8 @@ dependencies: - zstandard=0.19.0 - pip: + - adbc-driver-postgresql==0.8.0 + - adbc-driver-sqlite==0.8.0 - dataframe-api-compat==0.1.7 - pyqt5==5.15.8 - tzdata==2022.7 diff --git a/ci/deps/actions-39.yaml b/ci/deps/actions-39.yaml index cf4087a3e4670..85bac1da098b8 100644 --- a/ci/deps/actions-39.yaml +++ b/ci/deps/actions-39.yaml @@ -56,5 +56,7 @@ dependencies: - zstandard>=0.19.0 - pip: + - adbc-driver-postgresql>=0.8.0 + - adbc-driver-sqlite>=0.8.0 - pyqt5>=5.15.8 - tzdata>=2022.7 diff --git a/ci/deps/circle-310-arm64.yaml b/ci/deps/circle-310-arm64.yaml index abe6145d077ed..8f45666d3b124 100644 --- a/ci/deps/circle-310-arm64.yaml +++ b/ci/deps/circle-310-arm64.yaml @@ -54,3 +54,6 @@ dependencies: - xlrd>=2.0.1 - xlsxwriter>=3.0.5 - zstandard>=0.19.0 + - pip: + - adbc-driver-postgresql>=0.8.0 + - adbc-driver-sqlite>=0.8.0 diff --git a/doc/source/getting_started/install.rst b/doc/source/getting_started/install.rst index 2af66da6b8baf..27131dd113f1f 100644 --- a/doc/source/getting_started/install.rst +++ b/doc/source/getting_started/install.rst @@ -335,7 +335,7 @@ lxml 4.9.2 xml XML parser for read SQL databases ^^^^^^^^^^^^^ -Installable with ``pip install "pandas[postgresql, mysql, sql-other]"``. +Traditional drivers are installable with ``pip install "pandas[postgresql, mysql, sql-other]"`` ========================= ================== =============== ============================================================= Dependency Minimum Version pip extra Notes @@ -345,6 +345,8 @@ SQLAlchemy 2.0.0 postgresql, SQL support for dat sql-other psycopg2 2.9.6 postgresql PostgreSQL engine for sqlalchemy pymysql 1.0.2 mysql MySQL engine for sqlalchemy +adbc-driver-postgresql 0.8.0 postgresql ADBC Driver for PostgreSQL +adbc-driver-sqlite 0.8.0 sql-other ADBC Driver for SQLite ========================= ================== =============== ============================================================= Other data sources diff --git a/doc/source/user_guide/io.rst b/doc/source/user_guide/io.rst index 73ec09cdd12f1..21e07e8d00ad6 100644 --- a/doc/source/user_guide/io.rst +++ b/doc/source/user_guide/io.rst @@ -5565,9 +5565,23 @@ SQL queries ----------- The :mod:`pandas.io.sql` module provides a collection of query wrappers to both -facilitate data retrieval and to reduce dependency on DB-specific API. Database abstraction -is provided by SQLAlchemy if installed. In addition you will need a driver library for -your database. Examples of such drivers are `psycopg2 `__ +facilitate data retrieval and to reduce dependency on DB-specific API. + +Where available, users may first want to opt for `Apache Arrow ADBC +`_ drivers. These drivers +should provide the best performance, null handling, and type detection. + + .. versionadded:: 2.2.0 + + Added native support for ADBC drivers + +For a full list of ADBC drivers and their development status, see the `ADBC Driver +Implementation Status `_ +documentation. + +Where an ADBC driver is not available or may be missing functionality, +users should opt for installing SQLAlchemy alongside their database driver library. +Examples of such drivers are `psycopg2 `__ for PostgreSQL or `pymysql `__ for MySQL. For `SQLite `__ this is included in Python's standard library by default. @@ -5600,6 +5614,18 @@ In the following example, we use the `SQlite engine. You can use a temporary SQLite database where data are stored in "memory". +To connect using an ADBC driver you will want to install the ``adbc_driver_sqlite`` using your +package manager. Once installed, you can use the DBAPI interface provided by the ADBC driver +to connect to your database. + +.. code-block:: python + + import adbc_driver_sqlite.dbapi as sqlite_dbapi + + # Create the connection + with sqlite_dbapi.connect("sqlite:///:memory:") as conn: + df = pd.read_sql_table("data", conn) + To connect with SQLAlchemy you use the :func:`create_engine` function to create an engine object from database URI. You only need to create the engine once per database you are connecting to. @@ -5675,9 +5701,74 @@ writes ``data`` to the database in batches of 1000 rows at a time: SQL data types ++++++++++++++ -:func:`~pandas.DataFrame.to_sql` will try to map your data to an appropriate -SQL data type based on the dtype of the data. When you have columns of dtype -``object``, pandas will try to infer the data type. +Ensuring consistent data type management across SQL databases is challenging. +Not every SQL database offers the same types, and even when they do the implementation +of a given type can vary in ways that have subtle effects on how types can be +preserved. + +For the best odds at preserving database types users are advised to use +ADBC drivers when available. The Arrow type system offers a wider array of +types that more closely match database types than the historical pandas/NumPy +type system. To illustrate, note this (non-exhaustive) listing of types +available in different databases and pandas backends: + ++-----------------+-----------------------+----------------+---------+ +|numpy/pandas |arrow |postgres |sqlite | ++=================+=======================+================+=========+ +|int16/Int16 |int16 |SMALLINT |INTEGER | ++-----------------+-----------------------+----------------+---------+ +|int32/Int32 |int32 |INTEGER |INTEGER | ++-----------------+-----------------------+----------------+---------+ +|int64/Int64 |int64 |BIGINT |INTEGER | ++-----------------+-----------------------+----------------+---------+ +|float32 |float32 |REAL |REAL | ++-----------------+-----------------------+----------------+---------+ +|float64 |float64 |DOUBLE PRECISION|REAL | ++-----------------+-----------------------+----------------+---------+ +|object |string |TEXT |TEXT | ++-----------------+-----------------------+----------------+---------+ +|bool |``bool_`` |BOOLEAN | | ++-----------------+-----------------------+----------------+---------+ +|datetime64[ns] |timestamp(us) |TIMESTAMP | | ++-----------------+-----------------------+----------------+---------+ +|datetime64[ns,tz]|timestamp(us,tz) |TIMESTAMPTZ | | ++-----------------+-----------------------+----------------+---------+ +| |date32 |DATE | | ++-----------------+-----------------------+----------------+---------+ +| |month_day_nano_interval|INTERVAL | | ++-----------------+-----------------------+----------------+---------+ +| |binary |BINARY |BLOB | ++-----------------+-----------------------+----------------+---------+ +| |decimal128 |DECIMAL [#f1]_ | | ++-----------------+-----------------------+----------------+---------+ +| |list |ARRAY [#f1]_ | | ++-----------------+-----------------------+----------------+---------+ +| |struct |COMPOSITE TYPE | | +| | | [#f1]_ | | ++-----------------+-----------------------+----------------+---------+ + +.. rubric:: Footnotes + +.. [#f1] Not implemented as of writing, but theoretically possible + +If you are interested in preserving database types as best as possible +throughout the lifecycle of your DataFrame, users are encouraged to +leverage the ``dtype_backend="pyarrow"`` argument of :func:`~pandas.read_sql` + +.. code-block:: ipython + + # for roundtripping + with pg_dbapi.connect(uri) as conn: + df2 = pd.read_sql("pandas_table", conn, dtype_backend="pyarrow") + +This will prevent your data from being converted to the traditional pandas/NumPy +type system, which often converts SQL types in ways that make them impossible to +round-trip. + +In case an ADBC driver is not available, :func:`~pandas.DataFrame.to_sql` +will try to map your data to an appropriate SQL data type based on the dtype of +the data. When you have columns of dtype ``object``, pandas will try to infer +the data type. You can always override the default type by specifying the desired SQL type of any of the columns by using the ``dtype`` argument. This argument needs a @@ -5696,7 +5787,9 @@ default ``Text`` type for string columns: Due to the limited support for timedelta's in the different database flavors, columns with type ``timedelta64`` will be written as integer - values as nanoseconds to the database and a warning will be raised. + values as nanoseconds to the database and a warning will be raised. The only + exception to this is when using the ADBC PostgreSQL driver in which case a + timedelta will be written to the database as an ``INTERVAL`` .. note:: @@ -5711,7 +5804,7 @@ default ``Text`` type for string columns: Datetime data types ''''''''''''''''''' -Using SQLAlchemy, :func:`~pandas.DataFrame.to_sql` is capable of writing +Using ADBC or SQLAlchemy, :func:`~pandas.DataFrame.to_sql` is capable of writing datetime data that is timezone naive or timezone aware. However, the resulting data stored in the database ultimately depends on the supported data type for datetime data of the database system being used. @@ -5802,7 +5895,7 @@ table name and optionally a subset of columns to read. .. note:: In order to use :func:`~pandas.read_sql_table`, you **must** have the - SQLAlchemy optional dependency installed. + ADBC driver or SQLAlchemy optional dependency installed. .. ipython:: python @@ -5810,7 +5903,8 @@ table name and optionally a subset of columns to read. .. note:: - Note that pandas infers column dtypes from query outputs, and not by looking + ADBC drivers will map database types directly back to arrow types. For other drivers + note that pandas infers column dtypes from query outputs, and not by looking up data types in the physical database schema. For example, assume ``userid`` is an integer column in a table. Then, intuitively, ``select userid ...`` will return integer-valued series, while ``select cast(userid as text) ...`` will diff --git a/doc/source/whatsnew/v2.2.0.rst b/doc/source/whatsnew/v2.2.0.rst index 170526c13f05d..b1ac5a0b15a5e 100644 --- a/doc/source/whatsnew/v2.2.0.rst +++ b/doc/source/whatsnew/v2.2.0.rst @@ -89,6 +89,97 @@ a Series. (:issue:`55323`) ) series.list[0] +.. _whatsnew_220.enhancements.adbc_support: + +ADBC Driver support in to_sql and read_sql +^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ + +:func:`read_sql` and :meth:`~DataFrame.to_sql` now work with `Apache Arrow ADBC +`_ drivers. Compared to +traditional drivers used via SQLAlchemy, ADBC drivers should provide +significant performance improvements, better type support and cleaner +nullability handling. + +.. code-block:: ipython + + import adbc_driver_postgresql.dbapi as pg_dbapi + + df = pd.DataFrame( + [ + [1, 2, 3], + [4, 5, 6], + ], + columns=['a', 'b', 'c'] + ) + uri = "postgresql://postgres:postgres@localhost/postgres" + with pg_dbapi.connect(uri) as conn: + df.to_sql("pandas_table", conn, index=False) + + # for roundtripping + with pg_dbapi.connect(uri) as conn: + df2 = pd.read_sql("pandas_table", conn) + +The Arrow type system offers a wider array of types that can more closely match +what databases like PostgreSQL can offer. To illustrate, note this (non-exhaustive) +listing of types available in different databases and pandas backends: + ++-----------------+-----------------------+----------------+---------+ +|numpy/pandas |arrow |postgres |sqlite | ++=================+=======================+================+=========+ +|int16/Int16 |int16 |SMALLINT |INTEGER | ++-----------------+-----------------------+----------------+---------+ +|int32/Int32 |int32 |INTEGER |INTEGER | ++-----------------+-----------------------+----------------+---------+ +|int64/Int64 |int64 |BIGINT |INTEGER | ++-----------------+-----------------------+----------------+---------+ +|float32 |float32 |REAL |REAL | ++-----------------+-----------------------+----------------+---------+ +|float64 |float64 |DOUBLE PRECISION|REAL | ++-----------------+-----------------------+----------------+---------+ +|object |string |TEXT |TEXT | ++-----------------+-----------------------+----------------+---------+ +|bool |``bool_`` |BOOLEAN | | ++-----------------+-----------------------+----------------+---------+ +|datetime64[ns] |timestamp(us) |TIMESTAMP | | ++-----------------+-----------------------+----------------+---------+ +|datetime64[ns,tz]|timestamp(us,tz) |TIMESTAMPTZ | | ++-----------------+-----------------------+----------------+---------+ +| |date32 |DATE | | ++-----------------+-----------------------+----------------+---------+ +| |month_day_nano_interval|INTERVAL | | ++-----------------+-----------------------+----------------+---------+ +| |binary |BINARY |BLOB | ++-----------------+-----------------------+----------------+---------+ +| |decimal128 |DECIMAL [#f1]_ | | ++-----------------+-----------------------+----------------+---------+ +| |list |ARRAY [#f1]_ | | ++-----------------+-----------------------+----------------+---------+ +| |struct |COMPOSITE TYPE | | +| | | [#f1]_ | | ++-----------------+-----------------------+----------------+---------+ + +.. rubric:: Footnotes + +.. [#f1] Not implemented as of writing, but theoretically possible + +If you are interested in preserving database types as best as possible +throughout the lifecycle of your DataFrame, users are encouraged to +leverage the ``dtype_backend="pyarrow"`` argument of :func:`~pandas.read_sql` + +.. code-block:: ipython + + # for roundtripping + with pg_dbapi.connect(uri) as conn: + df2 = pd.read_sql("pandas_table", conn, dtype_backend="pyarrow") + +This will prevent your data from being converted to the traditional pandas/NumPy +type system, which often converts SQL types in ways that make them impossible to +round-trip. + +For a full list of ADBC drivers and their development status, see the `ADBC Driver +Implementation Status `_ +documentation. + .. _whatsnew_220.enhancements.other: Other enhancements diff --git a/environment.yml b/environment.yml index f87338e54f563..7558b8cf59a9f 100644 --- a/environment.yml +++ b/environment.yml @@ -113,6 +113,8 @@ dependencies: - pygments # Code highlighting - pip: + - adbc-driver-postgresql>=0.8.0 + - adbc-driver-sqlite>=0.8.0 - dataframe-api-compat>=0.1.7 - sphinx-toggleprompt # conda-forge version has stricter pins on jinja2 - typing_extensions; python_version<"3.11" diff --git a/pandas/compat/_optional.py b/pandas/compat/_optional.py index c45c2ff6eb6df..84cf7af0fe7a6 100644 --- a/pandas/compat/_optional.py +++ b/pandas/compat/_optional.py @@ -15,6 +15,8 @@ # Update install.rst & setup.cfg when updating versions! VERSIONS = { + "adbc-driver-postgresql": "0.8.0", + "adbc-driver-sqlite": "0.8.0", "bs4": "4.11.2", "blosc": "1.21.3", "bottleneck": "1.3.6", diff --git a/pandas/io/sql.py b/pandas/io/sql.py index d854ea09a5335..d4b6602d9f0eb 100644 --- a/pandas/io/sql.py +++ b/pandas/io/sql.py @@ -32,6 +32,8 @@ import numpy as np +from pandas._config import using_pyarrow_string_dtype + from pandas._libs import lib from pandas.compat._optional import import_optional_dependency from pandas.errors import ( @@ -45,7 +47,10 @@ is_dict_like, is_list_like, ) -from pandas.core.dtypes.dtypes import DatetimeTZDtype +from pandas.core.dtypes.dtypes import ( + ArrowDtype, + DatetimeTZDtype, +) from pandas.core.dtypes.missing import isna from pandas import get_option @@ -56,6 +61,7 @@ from pandas.core.arrays import ArrowExtensionArray from pandas.core.base import PandasObject import pandas.core.common as com +from pandas.core.common import maybe_make_list from pandas.core.internals.construction import convert_object_array from pandas.core.tools.datetimes import to_datetime @@ -186,7 +192,7 @@ def _wrap_result( dtype: DtypeArg | None = None, dtype_backend: DtypeBackend | Literal["numpy"] = "numpy", ): - """Wrap result set of query in a DataFrame.""" + """Wrap result set of a SQLAlchemy query in a DataFrame.""" frame = _convert_arrays_to_dataframe(data, columns, coerce_float, dtype_backend) if dtype: @@ -200,6 +206,26 @@ def _wrap_result( return frame +def _wrap_result_adbc( + df: DataFrame, + *, + index_col=None, + parse_dates=None, + dtype: DtypeArg | None = None, + dtype_backend: DtypeBackend | Literal["numpy"] = "numpy", +) -> DataFrame: + """Wrap result set of a SQLAlchemy query in a DataFrame.""" + if dtype: + df = df.astype(dtype) + + df = _parse_date_columns(df, parse_dates) + + if index_col is not None: + df = df.set_index(index_col) + + return df + + def execute(sql, con, params=None): """ Execute the given SQL query using the provided connection object. @@ -559,11 +585,12 @@ def read_sql( ---------- sql : str or SQLAlchemy Selectable (select or text object) SQL query to be executed or a table name. - con : SQLAlchemy connectable, str, or sqlite3 connection + con : ADBC Connection, SQLAlchemy connectable, str, or sqlite3 connection + ADBC provides high performance I/O with native type support, where available. Using SQLAlchemy makes it possible to use any DB supported by that library. If a DBAPI2 object, only sqlite3 is supported. The user is responsible - for engine disposal and connection closure for the SQLAlchemy connectable; str - connections are closed automatically. See + for engine disposal and connection closure for the ADBC connection and + SQLAlchemy connectable; str connections are closed automatically. See `here `_. index_col : str or list of str, optional, default: None Column(s) to set as index(MultiIndex). @@ -648,6 +675,17 @@ def read_sql( int_column date_column 0 0 2012-11-10 1 1 2010-11-12 + + .. versionadded:: 2.2.0 + + pandas now supports reading via ADBC drivers + + >>> from adbc_driver_postgresql import dbapi + >>> with dbapi.connect('postgres:///db_name') as conn: # doctest:+SKIP + ... pd.read_sql('SELECT int_column FROM test_data', conn) + int_column + 0 0 + 1 1 """ check_dtype_backend(dtype_backend) @@ -719,8 +757,9 @@ def to_sql( frame : DataFrame, Series name : str Name of SQL table. - con : SQLAlchemy connectable(engine/connection) or database string URI + con : ADBC Connection, SQLAlchemy connectable, str, or sqlite3 connection or sqlite3 DBAPI2 connection + ADBC provides high performance I/O with native type support, where available. Using SQLAlchemy makes it possible to use any DB supported by that library. If a DBAPI2 object, only sqlite3 is supported. @@ -775,7 +814,8 @@ def to_sql( Notes ----- The returned rows affected is the sum of the ``rowcount`` attribute of ``sqlite3.Cursor`` - or SQLAlchemy connectable. The returned value may not reflect the exact number of written + or SQLAlchemy connectable. If using ADBC the returned rows are the result + of ``Cursor.adbc_ingest``. The returned value may not reflect the exact number of written rows as stipulated in the `sqlite3 `__ or `SQLAlchemy `__ @@ -814,7 +854,8 @@ def has_table(table_name: str, con, schema: str | None = None) -> bool: ---------- table_name: string Name of SQL table. - con: SQLAlchemy connectable(engine/connection) or sqlite3 DBAPI2 connection + con: ADBC Connection, SQLAlchemy connectable, str, or sqlite3 connection + ADBC provides high performance I/O with native type support, where available. Using SQLAlchemy makes it possible to use any DB supported by that library. If a DBAPI2 object, only sqlite3 is supported. @@ -856,6 +897,10 @@ def pandasSQL_builder( if sqlalchemy is not None and isinstance(con, (str, sqlalchemy.engine.Connectable)): return SQLDatabase(con, schema, need_transaction) + adbc = import_optional_dependency("adbc_driver_manager.dbapi", errors="ignore") + if adbc and isinstance(con, adbc.Connection): + return ADBCDatabase(con) + warnings.warn( "pandas only supports SQLAlchemy connectable (engine/connection) or " "database string URI or sqlite3 DBAPI2 connection. Other DBAPI2 " @@ -2034,6 +2079,356 @@ def _create_sql_schema( # ---- SQL without SQLAlchemy --- + + +class ADBCDatabase(PandasSQL): + """ + This class enables conversion between DataFrame and SQL databases + using ADBC to handle DataBase abstraction. + + Parameters + ---------- + con : adbc_driver_manager.dbapi.Connection + """ + + def __init__(self, con) -> None: + self.con = con + + @contextmanager + def run_transaction(self): + with self.con.cursor() as cur: + try: + yield cur + except Exception: + self.con.rollback() + raise + self.con.commit() + + def execute(self, sql: str | Select | TextClause, params=None): + if not isinstance(sql, str): + raise TypeError("Query must be a string unless using sqlalchemy.") + args = [] if params is None else [params] + cur = self.con.cursor() + try: + cur.execute(sql, *args) + return cur + except Exception as exc: + try: + self.con.rollback() + except Exception as inner_exc: # pragma: no cover + ex = DatabaseError( + f"Execution failed on sql: {sql}\n{exc}\nunable to rollback" + ) + raise ex from inner_exc + + ex = DatabaseError(f"Execution failed on sql '{sql}': {exc}") + raise ex from exc + + def read_table( + self, + table_name: str, + index_col: str | list[str] | None = None, + coerce_float: bool = True, + parse_dates=None, + columns=None, + schema: str | None = None, + chunksize: int | None = None, + dtype_backend: DtypeBackend | Literal["numpy"] = "numpy", + ) -> DataFrame | Iterator[DataFrame]: + """ + Read SQL database table into a DataFrame. + + Parameters + ---------- + table_name : str + Name of SQL table in database. + coerce_float : bool, default True + Raises NotImplementedError + parse_dates : list or dict, default: None + - List of column names to parse as dates. + - Dict of ``{column_name: format string}`` where format string is + strftime compatible in case of parsing string times, or is one of + (D, s, ns, ms, us) in case of parsing integer timestamps. + - Dict of ``{column_name: arg}``, where the arg corresponds + to the keyword arguments of :func:`pandas.to_datetime`. + Especially useful with databases without native Datetime support, + such as SQLite. + columns : list, default: None + List of column names to select from SQL table. + schema : string, default None + Name of SQL schema in database to query (if database flavor + supports this). If specified, this overwrites the default + schema of the SQL database object. + chunksize : int, default None + Raises NotImplementedError + dtype_backend : {'numpy_nullable', 'pyarrow'}, default 'numpy_nullable' + Back-end data type applied to the resultant :class:`DataFrame` + (still experimental). Behaviour is as follows: + + * ``"numpy_nullable"``: returns nullable-dtype-backed :class:`DataFrame` + (default). + * ``"pyarrow"``: returns pyarrow-backed nullable :class:`ArrowDtype` + DataFrame. + + .. versionadded:: 2.0 + + Returns + ------- + DataFrame + + See Also + -------- + pandas.read_sql_table + SQLDatabase.read_query + + """ + if coerce_float is not True: + raise NotImplementedError( + "'coerce_float' is not implemented for ADBC drivers" + ) + if chunksize: + raise NotImplementedError("'chunksize' is not implemented for ADBC drivers") + + if columns: + if index_col: + index_select = maybe_make_list(index_col) + else: + index_select = [] + to_select = index_select + columns + select_list = ", ".join(f'"{x}"' for x in to_select) + else: + select_list = "*" + if schema: + stmt = f"SELECT {select_list} FROM {schema}.{table_name}" + else: + stmt = f"SELECT {select_list} FROM {table_name}" + + mapping: type[ArrowDtype] | None | Callable + if dtype_backend == "pyarrow": + mapping = ArrowDtype + elif dtype_backend == "numpy_nullable": + from pandas.io._util import _arrow_dtype_mapping + + mapping = _arrow_dtype_mapping().get + elif using_pyarrow_string_dtype(): + from pandas.io._util import arrow_string_types_mapper + + arrow_string_types_mapper() + else: + mapping = None + + with self.con.cursor() as cur: + cur.execute(stmt) + df = cur.fetch_arrow_table().to_pandas(types_mapper=mapping) + + return _wrap_result_adbc( + df, + index_col=index_col, + parse_dates=parse_dates, + ) + + def read_query( + self, + sql: str, + index_col: str | list[str] | None = None, + coerce_float: bool = True, + parse_dates=None, + params=None, + chunksize: int | None = None, + dtype: DtypeArg | None = None, + dtype_backend: DtypeBackend | Literal["numpy"] = "numpy", + ) -> DataFrame | Iterator[DataFrame]: + """ + Read SQL query into a DataFrame. + + Parameters + ---------- + sql : str + SQL query to be executed. + index_col : string, optional, default: None + Column name to use as index for the returned DataFrame object. + coerce_float : bool, default True + Raises NotImplementedError + params : list, tuple or dict, optional, default: None + Raises NotImplementedError + parse_dates : list or dict, default: None + - List of column names to parse as dates. + - Dict of ``{column_name: format string}`` where format string is + strftime compatible in case of parsing string times, or is one of + (D, s, ns, ms, us) in case of parsing integer timestamps. + - Dict of ``{column_name: arg dict}``, where the arg dict + corresponds to the keyword arguments of + :func:`pandas.to_datetime` Especially useful with databases + without native Datetime support, such as SQLite. + chunksize : int, default None + Raises NotImplementedError + dtype : Type name or dict of columns + Data type for data or columns. E.g. np.float64 or + {'a': np.float64, 'b': np.int32, 'c': 'Int64'} + + .. versionadded:: 1.3.0 + + Returns + ------- + DataFrame + + See Also + -------- + read_sql_table : Read SQL database table into a DataFrame. + read_sql + + """ + if coerce_float is not True: + raise NotImplementedError( + "'coerce_float' is not implemented for ADBC drivers" + ) + if params: + raise NotImplementedError("'params' is not implemented for ADBC drivers") + if chunksize: + raise NotImplementedError("'chunksize' is not implemented for ADBC drivers") + + mapping: type[ArrowDtype] | None | Callable + if dtype_backend == "pyarrow": + mapping = ArrowDtype + elif dtype_backend == "numpy_nullable": + from pandas.io._util import _arrow_dtype_mapping + + mapping = _arrow_dtype_mapping().get + else: + mapping = None + + with self.con.cursor() as cur: + cur.execute(sql) + df = cur.fetch_arrow_table().to_pandas(types_mapper=mapping) + + return _wrap_result_adbc( + df, + index_col=index_col, + parse_dates=parse_dates, + dtype=dtype, + ) + + read_sql = read_query + + def to_sql( + self, + frame, + name: str, + if_exists: Literal["fail", "replace", "append"] = "fail", + index: bool = True, + index_label=None, + schema: str | None = None, + chunksize: int | None = None, + dtype: DtypeArg | None = None, + method: Literal["multi"] | Callable | None = None, + engine: str = "auto", + **engine_kwargs, + ) -> int | None: + """ + Write records stored in a DataFrame to a SQL database. + + Parameters + ---------- + frame : DataFrame + name : string + Name of SQL table. + if_exists : {'fail', 'replace', 'append'}, default 'fail' + - fail: If table exists, do nothing. + - replace: If table exists, drop it, recreate it, and insert data. + - append: If table exists, insert data. Create if does not exist. + index : boolean, default True + Write DataFrame index as a column. + index_label : string or sequence, default None + Raises NotImplementedError + schema : string, default None + Name of SQL schema in database to write to (if database flavor + supports this). If specified, this overwrites the default + schema of the SQLDatabase object. + chunksize : int, default None + Raises NotImplementedError + dtype : single type or dict of column name to SQL type, default None + Raises NotImplementedError + method : {None', 'multi', callable}, default None + Raises NotImplementedError + engine : {'auto', 'sqlalchemy'}, default 'auto' + Raises NotImplementedError if not set to 'auto' + """ + if index_label: + raise NotImplementedError( + "'index_label' is not implemented for ADBC drivers" + ) + if chunksize: + raise NotImplementedError("'chunksize' is not implemented for ADBC drivers") + if dtype: + raise NotImplementedError("'dtype' is not implemented for ADBC drivers") + if method: + raise NotImplementedError("'method' is not implemented for ADBC drivers") + if engine != "auto": + raise NotImplementedError( + "engine != 'auto' not implemented for ADBC drivers" + ) + + if schema: + table_name = f"{schema}.{name}" + else: + table_name = name + + # pandas if_exists="append" will still create the + # table if it does not exist; ADBC is more explicit with append/create + # as applicable modes, so the semantics get blurred across + # the libraries + mode = "create" + if self.has_table(name, schema): + if if_exists == "fail": + raise ValueError(f"Table '{table_name}' already exists.") + elif if_exists == "replace": + with self.con.cursor() as cur: + cur.execute(f"DROP TABLE {table_name}") + elif if_exists == "append": + mode = "append" + + import pyarrow as pa + + try: + tbl = pa.Table.from_pandas(frame, preserve_index=index) + except pa.ArrowNotImplementedError as exc: + raise ValueError("datatypes not supported") from exc + + with self.con.cursor() as cur: + total_inserted = cur.adbc_ingest(table_name, tbl, mode=mode) + + self.con.commit() + return total_inserted + + def has_table(self, name: str, schema: str | None = None) -> bool: + meta = self.con.adbc_get_objects( + db_schema_filter=schema, table_name_filter=name + ).read_all() + + for catalog_schema in meta["catalog_db_schemas"].to_pylist(): + if not catalog_schema: + continue + for schema_record in catalog_schema: + if not schema_record: + continue + + for table_record in schema_record["db_schema_tables"]: + if table_record["table_name"] == name: + return True + + return False + + def _create_sql_schema( + self, + frame: DataFrame, + table_name: str, + keys: list[str] | None = None, + dtype: DtypeArg | None = None, + schema: str | None = None, + ): + raise NotImplementedError("not implemented for adbc") + + # sqlite-specific sql strings and handler class # dictionary used for readability purposes _SQL_TYPES = { @@ -2507,9 +2902,10 @@ def get_schema( name of SQL table keys : string or sequence, default: None columns to use a primary key - con: an open SQL database connection object or a SQLAlchemy connectable + con: ADBC Connection, SQLAlchemy connectable, sqlite3 connection, default: None + ADBC provides high performance I/O with native type support, where available. Using SQLAlchemy makes it possible to use any DB supported by that - library, default: None + library If a DBAPI2 object, only sqlite3 is supported. dtype : dict of column name to SQL type, default None Optional specifying the datatype for columns. The SQL type should diff --git a/pandas/tests/io/test_sql.py b/pandas/tests/io/test_sql.py index 9ac20774b8c93..144210166d1a6 100644 --- a/pandas/tests/io/test_sql.py +++ b/pandas/tests/io/test_sql.py @@ -19,6 +19,11 @@ import pytest from pandas._libs import lib +from pandas.compat import ( + pa_version_under13p0, + pa_version_under14p1, +) +from pandas.compat._optional import import_optional_dependency import pandas.util._test_decorators as td import pandas as pd @@ -56,6 +61,11 @@ import sqlalchemy +pytestmark = pytest.mark.filterwarnings( + "ignore:Passing a BlockManager to DataFrame:DeprecationWarning" +) + + @pytest.fixture def sql_strings(): return { @@ -110,8 +120,7 @@ def iris_table_metadata(): return iris -def create_and_load_iris_sqlite3(conn: sqlite3.Connection, iris_file: Path): - cur = conn.cursor() +def create_and_load_iris_sqlite3(conn, iris_file: Path): stmt = """CREATE TABLE iris ( "SepalLength" REAL, "SepalWidth" REAL, @@ -119,17 +128,65 @@ def create_and_load_iris_sqlite3(conn: sqlite3.Connection, iris_file: Path): "PetalWidth" REAL, "Name" TEXT )""" + + cur = conn.cursor() cur.execute(stmt) with iris_file.open(newline=None, encoding="utf-8") as csvfile: reader = csv.reader(csvfile) next(reader) stmt = "INSERT INTO iris VALUES(?, ?, ?, ?, ?)" - cur.executemany(stmt, reader) + # ADBC requires explicit types - no implicit str -> float conversion + records = [] + records = [ + ( + float(row[0]), + float(row[1]), + float(row[2]), + float(row[3]), + row[4], + ) + for row in reader + ] + + cur.executemany(stmt, records) + cur.close() + + conn.commit() + + +def create_and_load_iris_postgresql(conn, iris_file: Path): + stmt = """CREATE TABLE iris ( + "SepalLength" DOUBLE PRECISION, + "SepalWidth" DOUBLE PRECISION, + "PetalLength" DOUBLE PRECISION, + "PetalWidth" DOUBLE PRECISION, + "Name" TEXT + )""" + with conn.cursor() as cur: + cur.execute(stmt) + with iris_file.open(newline=None, encoding="utf-8") as csvfile: + reader = csv.reader(csvfile) + next(reader) + stmt = "INSERT INTO iris VALUES($1, $2, $3, $4, $5)" + # ADBC requires explicit types - no implicit str -> float conversion + records = [ + ( + float(row[0]), + float(row[1]), + float(row[2]), + float(row[3]), + row[4], + ) + for row in reader + ] + + cur.executemany(stmt, records) + + conn.commit() def create_and_load_iris(conn, iris_file: Path): from sqlalchemy import insert - from sqlalchemy.engine import Engine iris = iris_table_metadata() @@ -138,17 +195,10 @@ def create_and_load_iris(conn, iris_file: Path): header = next(reader) params = [dict(zip(header, row)) for row in reader] stmt = insert(iris).values(params) - if isinstance(conn, Engine): - with conn.connect() as conn: - with conn.begin(): - iris.drop(conn, checkfirst=True) - iris.create(bind=conn) - conn.execute(stmt) - else: - with conn.begin(): - iris.drop(conn, checkfirst=True) - iris.create(bind=conn) - conn.execute(stmt) + with conn.begin() as con: + iris.drop(con, checkfirst=True) + iris.create(bind=con) + con.execute(stmt) def create_and_load_iris_view(conn): @@ -157,17 +207,17 @@ def create_and_load_iris_view(conn): cur = conn.cursor() cur.execute(stmt) else: - from sqlalchemy import text - from sqlalchemy.engine import Engine - - stmt = text(stmt) - if isinstance(conn, Engine): - with conn.connect() as conn: - with conn.begin(): - conn.execute(stmt) + adbc = import_optional_dependency("adbc_driver_manager.dbapi", errors="ignore") + if adbc and isinstance(conn, adbc.Connection): + with conn.cursor() as cur: + cur.execute(stmt) + conn.commit() else: - with conn.begin(): - conn.execute(stmt) + from sqlalchemy import text + + stmt = text(stmt) + with conn.begin() as con: + con.execute(stmt) def types_table_metadata(dialect: str): @@ -201,8 +251,7 @@ def types_table_metadata(dialect: str): return types -def create_and_load_types_sqlite3(conn: sqlite3.Connection, types_data: list[dict]): - cur = conn.cursor() +def create_and_load_types_sqlite3(conn, types_data: list[dict]): stmt = """CREATE TABLE types ( "TextCol" TEXT, "DateCol" TEXT, @@ -214,13 +263,47 @@ def create_and_load_types_sqlite3(conn: sqlite3.Connection, types_data: list[dic "IntColWithNull" INTEGER, "BoolColWithNull" INTEGER )""" - cur.execute(stmt) - stmt = """ - INSERT INTO types - VALUES(?, ?, ?, ?, ?, ?, ?, ?, ?) - """ - cur.executemany(stmt, types_data) + ins_stmt = """ + INSERT INTO types + VALUES(?, ?, ?, ?, ?, ?, ?, ?, ?) + """ + + if isinstance(conn, sqlite3.Connection): + cur = conn.cursor() + cur.execute(stmt) + cur.executemany(ins_stmt, types_data) + else: + with conn.cursor() as cur: + cur.execute(stmt) + cur.executemany(ins_stmt, types_data) + + conn.commit() + + +def create_and_load_types_postgresql(conn, types_data: list[dict]): + with conn.cursor() as cur: + stmt = """CREATE TABLE types ( + "TextCol" TEXT, + "DateCol" TIMESTAMP, + "IntDateCol" INTEGER, + "IntDateOnlyCol" INTEGER, + "FloatCol" DOUBLE PRECISION, + "IntCol" INTEGER, + "BoolCol" BOOLEAN, + "IntColWithNull" INTEGER, + "BoolColWithNull" BOOLEAN + )""" + cur.execute(stmt) + + stmt = """ + INSERT INTO types + VALUES($1, $2::timestamp, $3, $4, $5, $6, $7, $8, $9) + """ + + cur.executemany(stmt, types_data) + + conn.commit() def create_and_load_types(conn, types_data: list[dict], dialect: str): @@ -298,9 +381,14 @@ def check_iris_frame(frame: DataFrame): def count_rows(conn, table_name: str): stmt = f"SELECT count(*) AS count_1 FROM {table_name}" + adbc = import_optional_dependency("adbc_driver_manager.dbapi", errors="ignore") if isinstance(conn, sqlite3.Connection): cur = conn.cursor() return cur.execute(stmt).fetchone()[0] + elif adbc and isinstance(conn, adbc.Connection): + with conn.cursor() as cur: + cur.execute(stmt) + return cur.fetchone()[0] else: from sqlalchemy import create_engine from sqlalchemy.engine import Engine @@ -423,9 +511,24 @@ def get_all_views(conn): c = conn.execute("SELECT name FROM sqlite_master WHERE type='view'") return [view[0] for view in c.fetchall()] else: - from sqlalchemy import inspect + adbc = import_optional_dependency("adbc_driver_manager.dbapi", errors="ignore") + if adbc and isinstance(conn, adbc.Connection): + results = [] + info = conn.adbc_get_objects().read_all().to_pylist() + for catalog in info: + catalog["catalog_name"] + for schema in catalog["catalog_db_schemas"]: + schema["db_schema_name"] + for table in schema["db_schema_tables"]: + if table["table_type"] == "view": + view_name = table["table_name"] + results.append(view_name) + + return results + else: + from sqlalchemy import inspect - return inspect(conn).get_view_names() + return inspect(conn).get_view_names() def get_all_tables(conn): @@ -433,9 +536,23 @@ def get_all_tables(conn): c = conn.execute("SELECT name FROM sqlite_master WHERE type='table'") return [table[0] for table in c.fetchall()] else: - from sqlalchemy import inspect + adbc = import_optional_dependency("adbc_driver_manager.dbapi", errors="ignore") + + if adbc and isinstance(conn, adbc.Connection): + results = [] + info = conn.adbc_get_objects().read_all().to_pylist() + for catalog in info: + for schema in catalog["catalog_db_schemas"]: + for table in schema["db_schema_tables"]: + if table["table_type"] == "table": + table_name = table["table_name"] + results.append(table_name) + + return results + else: + from sqlalchemy import inspect - return inspect(conn).get_table_names() + return inspect(conn).get_table_names() def drop_table( @@ -445,10 +562,16 @@ def drop_table( if isinstance(conn, sqlite3.Connection): conn.execute(f"DROP TABLE IF EXISTS {sql._get_valid_sqlite_name(table_name)}") conn.commit() + else: - with conn.begin() as con: - with sql.SQLDatabase(con) as db: - db.drop_table(table_name) + adbc = import_optional_dependency("adbc_driver_manager.dbapi", errors="ignore") + if adbc and isinstance(conn, adbc.Connection): + with conn.cursor() as cur: + cur.execute(f'DROP TABLE IF EXISTS "{table_name}"') + else: + with conn.begin() as con: + with sql.SQLDatabase(con) as db: + db.drop_table(table_name) def drop_view( @@ -461,12 +584,17 @@ def drop_view( conn.execute(f"DROP VIEW IF EXISTS {sql._get_valid_sqlite_name(view_name)}") conn.commit() else: - quoted_view = conn.engine.dialect.identifier_preparer.quote_identifier( - view_name - ) - stmt = sqlalchemy.text(f"DROP VIEW IF EXISTS {quoted_view}") - with conn.begin() as con: - con.execute(stmt) # type: ignore[union-attr] + adbc = import_optional_dependency("adbc_driver_manager.dbapi", errors="ignore") + if adbc and isinstance(conn, adbc.Connection): + with conn.cursor() as cur: + cur.execute(f'DROP VIEW IF EXISTS "{view_name}"') + else: + quoted_view = conn.engine.dialect.identifier_preparer.quote_identifier( + view_name + ) + stmt = sqlalchemy.text(f"DROP VIEW IF EXISTS {quoted_view}") + with conn.begin() as con: + con.execute(stmt) # type: ignore[union-attr] @pytest.fixture @@ -552,6 +680,57 @@ def postgresql_psycopg2_conn(postgresql_psycopg2_engine): yield conn +@pytest.fixture +def postgresql_adbc_conn(): + pytest.importorskip("adbc_driver_postgresql") + from adbc_driver_postgresql import dbapi + + uri = "postgresql://postgres:postgres@localhost:5432/pandas" + with dbapi.connect(uri) as conn: + yield conn + for view in get_all_views(conn): + drop_view(view, conn) + for tbl in get_all_tables(conn): + drop_table(tbl, conn) + conn.commit() + + +@pytest.fixture +def postgresql_adbc_iris(postgresql_adbc_conn, iris_path): + import adbc_driver_manager as mgr + + conn = postgresql_adbc_conn + + try: + conn.adbc_get_table_schema("iris") + except mgr.ProgrammingError: + conn.rollback() + create_and_load_iris_postgresql(conn, iris_path) + try: + conn.adbc_get_table_schema("iris_view") + except mgr.ProgrammingError: # note arrow-adbc issue 1022 + conn.rollback() + create_and_load_iris_view(conn) + yield conn + + +@pytest.fixture +def postgresql_adbc_types(postgresql_adbc_conn, types_data): + import adbc_driver_manager as mgr + + conn = postgresql_adbc_conn + + try: + conn.adbc_get_table_schema("types") + except mgr.ProgrammingError: + conn.rollback() + new_data = [tuple(entry.values()) for entry in types_data] + + create_and_load_types_postgresql(conn, new_data) + + yield conn + + @pytest.fixture def postgresql_psycopg2_conn_iris(postgresql_psycopg2_engine_iris): with postgresql_psycopg2_engine_iris.connect() as conn: @@ -633,6 +812,62 @@ def sqlite_conn_types(sqlite_engine_types): yield conn +@pytest.fixture +def sqlite_adbc_conn(): + pytest.importorskip("adbc_driver_sqlite") + from adbc_driver_sqlite import dbapi + + with tm.ensure_clean() as name: + uri = f"file:{name}" + with dbapi.connect(uri) as conn: + yield conn + for view in get_all_views(conn): + drop_view(view, conn) + for tbl in get_all_tables(conn): + drop_table(tbl, conn) + conn.commit() + + +@pytest.fixture +def sqlite_adbc_iris(sqlite_adbc_conn, iris_path): + import adbc_driver_manager as mgr + + conn = sqlite_adbc_conn + try: + conn.adbc_get_table_schema("iris") + except mgr.ProgrammingError: + conn.rollback() + create_and_load_iris_sqlite3(conn, iris_path) + try: + conn.adbc_get_table_schema("iris_view") + except mgr.ProgrammingError: + conn.rollback() + create_and_load_iris_view(conn) + yield conn + + +@pytest.fixture +def sqlite_adbc_types(sqlite_adbc_conn, types_data): + import adbc_driver_manager as mgr + + conn = sqlite_adbc_conn + try: + conn.adbc_get_table_schema("types") + except mgr.ProgrammingError: + conn.rollback() + new_data = [] + for entry in types_data: + entry["BoolCol"] = int(entry["BoolCol"]) + if entry["BoolColWithNull"] is not None: + entry["BoolColWithNull"] = int(entry["BoolColWithNull"]) + new_data.append(tuple(entry.values())) + + create_and_load_types_sqlite3(conn, new_data) + conn.commit() + + yield conn + + @pytest.fixture def sqlite_buildin(): with contextlib.closing(sqlite3.connect(":memory:")) as closing_conn: @@ -712,11 +947,31 @@ def sqlite_buildin_types(sqlite_buildin, types_data): mysql_connectable_types + postgresql_connectable_types + sqlite_connectable_types ) -all_connectable = sqlalchemy_connectable + ["sqlite_buildin"] +adbc_connectable = [ + "sqlite_adbc_conn", + pytest.param("postgresql_adbc_conn", marks=pytest.mark.db), +] + +adbc_connectable_iris = [ + pytest.param("postgresql_adbc_iris", marks=pytest.mark.db), + pytest.param("sqlite_adbc_iris", marks=pytest.mark.db), +] + +adbc_connectable_types = [ + pytest.param("postgresql_adbc_types", marks=pytest.mark.db), + pytest.param("sqlite_adbc_types", marks=pytest.mark.db), +] + + +all_connectable = sqlalchemy_connectable + ["sqlite_buildin"] + adbc_connectable -all_connectable_iris = sqlalchemy_connectable_iris + ["sqlite_buildin_iris"] +all_connectable_iris = ( + sqlalchemy_connectable_iris + ["sqlite_buildin_iris"] + adbc_connectable_iris +) -all_connectable_types = sqlalchemy_connectable_types + ["sqlite_buildin_types"] +all_connectable_types = ( + sqlalchemy_connectable_types + ["sqlite_buildin_types"] + adbc_connectable_types +) @pytest.mark.parametrize("conn", all_connectable) @@ -728,6 +983,13 @@ def test_dataframe_to_sql(conn, test_frame1, request): @pytest.mark.parametrize("conn", all_connectable) def test_dataframe_to_sql_empty(conn, test_frame1, request): + if conn == "postgresql_adbc_conn": + request.node.add_marker( + pytest.mark.xfail( + reason="postgres ADBC driver cannot insert index with null type", + strict=True, + ) + ) # GH 51086 if conn is sqlite_engine conn = request.getfixturevalue(conn) empty_df = test_frame1.iloc[:0] @@ -749,8 +1011,22 @@ def test_dataframe_to_sql_arrow_dtypes(conn, request): "string": pd.array(["a"], dtype="string[pyarrow]"), } ) + + if "adbc" in conn: + if conn == "sqlite_adbc_conn": + df = df.drop(columns=["timedelta"]) + if pa_version_under14p1: + exp_warning = FutureWarning + msg = "is_sparse is deprecated" + else: + exp_warning = None + msg = "" + else: + exp_warning = UserWarning + msg = "the 'timedelta'" + conn = request.getfixturevalue(conn) - with tm.assert_produces_warning(UserWarning, match="the 'timedelta'"): + with tm.assert_produces_warning(exp_warning, match=msg, check_stacklevel=False): df.to_sql(name="test_arrow", con=conn, if_exists="replace", index=False) @@ -772,6 +1048,13 @@ def test_dataframe_to_sql_arrow_dtypes_missing(conn, request, nulls_fixture): @pytest.mark.parametrize("conn", all_connectable) @pytest.mark.parametrize("method", [None, "multi"]) def test_to_sql(conn, method, test_frame1, request): + if method == "multi" and "adbc" in conn: + request.node.add_marker( + pytest.mark.xfail( + reason="'method' not implemented for ADBC drivers", strict=True + ) + ) + conn = request.getfixturevalue(conn) with pandasSQL_builder(conn, need_transaction=True) as pandasSQL: pandasSQL.to_sql(test_frame1, "test_frame", method=method) @@ -816,6 +1099,13 @@ def test_read_iris_query(conn, request): @pytest.mark.parametrize("conn", all_connectable_iris) def test_read_iris_query_chunksize(conn, request): + if "adbc" in conn: + request.node.add_marker( + pytest.mark.xfail( + reason="'chunksize' not implemented for ADBC drivers", + strict=True, + ) + ) conn = request.getfixturevalue(conn) iris_frame = concat(read_sql_query("SELECT * FROM iris", conn, chunksize=7)) check_iris_frame(iris_frame) @@ -828,6 +1118,13 @@ def test_read_iris_query_chunksize(conn, request): @pytest.mark.parametrize("conn", sqlalchemy_connectable_iris) def test_read_iris_query_expression_with_parameter(conn, request): + if "adbc" in conn: + request.node.add_marker( + pytest.mark.xfail( + reason="'chunksize' not implemented for ADBC drivers", + strict=True, + ) + ) conn = request.getfixturevalue(conn) from sqlalchemy import ( MetaData, @@ -849,6 +1146,14 @@ def test_read_iris_query_expression_with_parameter(conn, request): @pytest.mark.parametrize("conn", all_connectable_iris) def test_read_iris_query_string_with_parameter(conn, request, sql_strings): + if "adbc" in conn: + request.node.add_marker( + pytest.mark.xfail( + reason="'chunksize' not implemented for ADBC drivers", + strict=True, + ) + ) + for db, query in sql_strings["read_parameters"].items(): if db in conn: break @@ -871,6 +1176,10 @@ def test_read_iris_table(conn, request): @pytest.mark.parametrize("conn", sqlalchemy_connectable_iris) def test_read_iris_table_chunksize(conn, request): + if "adbc" in conn: + request.node.add_marker( + pytest.mark.xfail(reason="chunksize argument NotImplemented with ADBC") + ) conn = request.getfixturevalue(conn) iris_frame = concat(read_sql_table("iris", conn, chunksize=7)) check_iris_frame(iris_frame) @@ -1209,6 +1518,13 @@ def flavor(conn_name): @pytest.mark.parametrize("conn", all_connectable_iris) def test_read_sql_iris_parameter(conn, request, sql_strings): + if "adbc" in conn: + request.node.add_marker( + pytest.mark.xfail( + reason="'params' not implemented for ADBC drivers", + strict=True, + ) + ) conn_name = conn conn = request.getfixturevalue(conn) query = sql_strings["read_parameters"][flavor(conn_name)] @@ -1221,6 +1537,14 @@ def test_read_sql_iris_parameter(conn, request, sql_strings): @pytest.mark.parametrize("conn", all_connectable_iris) def test_read_sql_iris_named_parameter(conn, request, sql_strings): + if "adbc" in conn: + request.node.add_marker( + pytest.mark.xfail( + reason="'params' not implemented for ADBC drivers", + strict=True, + ) + ) + conn_name = conn conn = request.getfixturevalue(conn) query = sql_strings["read_named_parameters"][flavor(conn_name)] @@ -1233,7 +1557,7 @@ def test_read_sql_iris_named_parameter(conn, request, sql_strings): @pytest.mark.parametrize("conn", all_connectable_iris) def test_read_sql_iris_no_parameter_with_percent(conn, request, sql_strings): - if "mysql" in conn or "postgresql" in conn: + if "mysql" in conn or ("postgresql" in conn and "adbc" not in conn): request.applymarker(pytest.mark.xfail(reason="broken test")) conn_name = conn @@ -1259,6 +1583,10 @@ def test_api_read_sql_view(conn, request): @pytest.mark.parametrize("conn", all_connectable_iris) def test_api_read_sql_with_chunksize_no_result(conn, request): + if "adbc" in conn: + request.node.add_marker( + pytest.mark.xfail(reason="chunksize argument NotImplemented with ADBC") + ) conn = request.getfixturevalue(conn) query = 'SELECT * FROM iris_view WHERE "SepalLength" < 0.0' with_batch = sql.read_sql_query(query, conn, chunksize=5) @@ -1357,6 +1685,7 @@ def test_api_to_sql_series(conn, request): @pytest.mark.parametrize("conn", all_connectable) def test_api_roundtrip(conn, request, test_frame1): + conn_name = conn conn = request.getfixturevalue(conn) if sql.has_table("test_frame_roundtrip", conn): with sql.SQLDatabase(conn, need_transaction=True) as pandasSQL: @@ -1366,6 +1695,8 @@ def test_api_roundtrip(conn, request, test_frame1): result = sql.read_sql_query("SELECT * FROM test_frame_roundtrip", con=conn) # HACK! + if "adbc" in conn_name: + result = result.rename(columns={"__index_level_0__": "level_0"}) result.index = test_frame1.index result.set_index("level_0", inplace=True) result.index.astype(int) @@ -1375,6 +1706,10 @@ def test_api_roundtrip(conn, request, test_frame1): @pytest.mark.parametrize("conn", all_connectable) def test_api_roundtrip_chunksize(conn, request, test_frame1): + if "adbc" in conn: + request.node.add_marker( + pytest.mark.xfail(reason="chunksize argument NotImplemented with ADBC") + ) conn = request.getfixturevalue(conn) if sql.has_table("test_frame_roundtrip", conn): with sql.SQLDatabase(conn, need_transaction=True) as pandasSQL: @@ -1398,6 +1733,7 @@ def test_api_execute_sql(conn, request): with sql.pandasSQL_builder(conn) as pandas_sql: iris_results = pandas_sql.execute("SELECT * FROM iris") row = iris_results.fetchone() + iris_results.close() tm.equalContents(row, [5.1, 3.5, 1.4, 0.2, "Iris-setosa"]) @@ -1496,6 +1832,19 @@ def test_api_custom_dateparsing_error( result["BoolCol"] = result["BoolCol"].astype(int) result["BoolColWithNull"] = result["BoolColWithNull"].astype(float) + if conn_name == "postgresql_adbc_types": + expected = expected.astype( + { + "IntDateCol": "int32", + "IntDateOnlyCol": "int32", + "IntCol": "int32", + } + ) + + if not pa_version_under13p0: + # TODO: is this astype safe? + expected["DateCol"] = expected["DateCol"].astype("datetime64[us]") + tm.assert_frame_equal(result, expected) @@ -1517,24 +1866,60 @@ def test_api_date_and_index(conn, request): @pytest.mark.parametrize("conn", all_connectable) def test_api_timedelta(conn, request): # see #6921 + conn_name = conn conn = request.getfixturevalue(conn) if sql.has_table("test_timedelta", conn): with sql.SQLDatabase(conn, need_transaction=True) as pandasSQL: pandasSQL.drop_table("test_timedelta") df = to_timedelta(Series(["00:00:01", "00:00:03"], name="foo")).to_frame() - with tm.assert_produces_warning(UserWarning): + + if conn_name == "sqlite_adbc_conn": + request.node.add_marker( + pytest.mark.xfail( + reason="sqlite ADBC driver doesn't implement timedelta", + ) + ) + + if "adbc" in conn_name: + if pa_version_under14p1: + exp_warning = FutureWarning + else: + exp_warning = None + else: + exp_warning = UserWarning + + with tm.assert_produces_warning(exp_warning, check_stacklevel=False): result_count = df.to_sql(name="test_timedelta", con=conn) assert result_count == 2 result = sql.read_sql_query("SELECT * FROM test_timedelta", conn) - tm.assert_series_equal(result["foo"], df["foo"].view("int64")) + + if conn_name == "postgresql_adbc_conn": + # TODO: Postgres stores an INTERVAL, which ADBC reads as a Month-Day-Nano + # Interval; the default pandas type mapper maps this to a DateOffset + # but maybe we should try and restore the timedelta here? + expected = Series( + [ + pd.DateOffset(months=0, days=0, microseconds=1000000, nanoseconds=0), + pd.DateOffset(months=0, days=0, microseconds=3000000, nanoseconds=0), + ], + name="foo", + ) + else: + expected = df["foo"].view("int64") + tm.assert_series_equal(result["foo"], expected) @pytest.mark.parametrize("conn", all_connectable) def test_api_complex_raises(conn, request): + conn_name = conn conn = request.getfixturevalue(conn) df = DataFrame({"a": [1 + 1j, 2j]}) - msg = "Complex datatypes not supported" + + if "adbc" in conn_name: + msg = "datatypes not supported" + else: + msg = "Complex datatypes not supported" with pytest.raises(ValueError, match=msg): assert df.to_sql("test_complex", con=conn) is None @@ -1558,6 +1943,10 @@ def test_api_complex_raises(conn, request): ], ) def test_api_to_sql_index_label(conn, request, index_name, index_label, expected): + if "adbc" in conn: + request.node.add_marker( + pytest.mark.xfail(reason="index_label argument NotImplemented with ADBC") + ) conn = request.getfixturevalue(conn) if sql.has_table("test_index_label", conn): with sql.SQLDatabase(conn, need_transaction=True) as pandasSQL: @@ -1580,6 +1969,10 @@ def test_api_to_sql_index_label_multiindex(conn, request): reason="MySQL can fail using TEXT without length as key", strict=False ) ) + elif "adbc" in conn_name: + request.node.add_marker( + pytest.mark.xfail(reason="index_label argument NotImplemented with ADBC") + ) conn = request.getfixturevalue(conn) if sql.has_table("test_index_label", conn): @@ -1702,6 +2095,13 @@ def test_api_integer_col_names(conn, request): @pytest.mark.parametrize("conn", all_connectable) def test_api_get_schema(conn, request, test_frame1): + if "adbc" in conn: + request.node.add_marker( + pytest.mark.xfail( + reason="'get_schema' not implemented for ADBC drivers", + strict=True, + ) + ) conn = request.getfixturevalue(conn) create_sql = sql.get_schema(test_frame1, "test", con=conn) assert "CREATE" in create_sql @@ -1710,6 +2110,13 @@ def test_api_get_schema(conn, request, test_frame1): @pytest.mark.parametrize("conn", all_connectable) def test_api_get_schema_with_schema(conn, request, test_frame1): # GH28486 + if "adbc" in conn: + request.node.add_marker( + pytest.mark.xfail( + reason="'get_schema' not implemented for ADBC drivers", + strict=True, + ) + ) conn = request.getfixturevalue(conn) create_sql = sql.get_schema(test_frame1, "test", con=conn, schema="pypi") assert "CREATE TABLE pypi." in create_sql @@ -1717,6 +2124,13 @@ def test_api_get_schema_with_schema(conn, request, test_frame1): @pytest.mark.parametrize("conn", all_connectable) def test_api_get_schema_dtypes(conn, request): + if "adbc" in conn: + request.node.add_marker( + pytest.mark.xfail( + reason="'get_schema' not implemented for ADBC drivers", + strict=True, + ) + ) conn_name = conn conn = request.getfixturevalue(conn) float_frame = DataFrame({"a": [1.1, 1.2], "b": [2.1, 2.2]}) @@ -1734,6 +2148,13 @@ def test_api_get_schema_dtypes(conn, request): @pytest.mark.parametrize("conn", all_connectable) def test_api_get_schema_keys(conn, request, test_frame1): + if "adbc" in conn: + request.node.add_marker( + pytest.mark.xfail( + reason="'get_schema' not implemented for ADBC drivers", + strict=True, + ) + ) conn_name = conn conn = request.getfixturevalue(conn) frame = DataFrame({"Col1": [1.1, 1.2], "Col2": [2.1, 2.2]}) @@ -1756,6 +2177,10 @@ def test_api_get_schema_keys(conn, request, test_frame1): @pytest.mark.parametrize("conn", all_connectable) def test_api_chunksize_read(conn, request): + if "adbc" in conn: + request.node.add_marker( + pytest.mark.xfail(reason="chunksize argument NotImplemented with ADBC") + ) conn_name = conn conn = request.getfixturevalue(conn) if sql.has_table("test_chunksize", conn): @@ -1801,6 +2226,13 @@ def test_api_chunksize_read(conn, request): @pytest.mark.parametrize("conn", all_connectable) def test_api_categorical(conn, request): + if conn == "postgresql_adbc_conn": + request.node.add_marker( + pytest.mark.xfail( + reason="categorical dtype not implemented for ADBC postgres driver", + strict=True, + ) + ) # GH8624 # test that categorical gets written correctly as dense column conn = request.getfixturevalue(conn) @@ -1859,6 +2291,10 @@ def test_api_escaped_table_name(conn, request): @pytest.mark.parametrize("conn", all_connectable) def test_api_read_sql_duplicate_columns(conn, request): # GH#53117 + if "adbc" in conn: + request.node.add_marker( + pytest.mark.xfail(reason="pyarrow->pandas throws ValueError", strict=True) + ) conn = request.getfixturevalue(conn) if sql.has_table("test_table", conn): with sql.SQLDatabase(conn, need_transaction=True) as pandasSQL: @@ -1867,7 +2303,7 @@ def test_api_read_sql_duplicate_columns(conn, request): df = DataFrame({"a": [1, 2, 3], "b": [0.1, 0.2, 0.3], "c": 1}) df.to_sql(name="test_table", con=conn, index=False) - result = pd.read_sql("SELECT a, b, a +1 as a, c FROM test_table;", conn) + result = pd.read_sql("SELECT a, b, a +1 as a, c FROM test_table", conn) expected = DataFrame( [[1, 0.1, 2, 1], [2, 0.2, 3, 1], [3, 0.3, 4, 1]], columns=["a", "b", "a", "c"], @@ -1961,7 +2397,7 @@ def test_not_reflect_all_tables(sqlite_conn): @pytest.mark.parametrize("conn", all_connectable) def test_warning_case_insensitive_table_name(conn, request, test_frame1): conn_name = conn - if conn_name == "sqlite_buildin": + if conn_name == "sqlite_buildin" or "adbc" in conn_name: request.applymarker(pytest.mark.xfail(reason="Does not raise warning")) conn = request.getfixturevalue(conn) @@ -2249,12 +2685,15 @@ def test_roundtrip(conn, request, test_frame1): if conn == "sqlite_str": pytest.skip("sqlite_str has no inspection system") + conn_name = conn conn = request.getfixturevalue(conn) pandasSQL = pandasSQL_builder(conn) with pandasSQL.run_transaction(): assert pandasSQL.to_sql(test_frame1, "test_frame_roundtrip") == 4 result = pandasSQL.read_query("SELECT * FROM test_frame_roundtrip") + if "adbc" in conn_name: + result = result.rename(columns={"__index_level_0__": "level_0"}) result.set_index("level_0", inplace=True) # result.index.astype(int) @@ -2270,6 +2709,7 @@ def test_execute_sql(conn, request): with pandasSQL.run_transaction(): iris_results = pandasSQL.execute("SELECT * FROM iris") row = iris_results.fetchone() + iris_results.close() tm.equalContents(row, [5.1, 3.5, 1.4, 0.2, "Iris-setosa"]) @@ -2629,6 +3069,12 @@ def test_nan_string(conn, request): @pytest.mark.parametrize("conn", all_connectable) def test_to_sql_save_index(conn, request): + if "adbc" in conn: + request.node.add_marker( + pytest.mark.xfail( + reason="ADBC implementation does not create index", strict=True + ) + ) conn_name = conn conn = request.getfixturevalue(conn) df = DataFrame.from_records( @@ -2667,7 +3113,7 @@ def test_transactions(conn, request): conn = request.getfixturevalue(conn) stmt = "CREATE TABLE test_trans (A INT, B TEXT)" - if conn_name != "sqlite_buildin": + if conn_name != "sqlite_buildin" and "adbc" not in conn_name: from sqlalchemy import text stmt = text(stmt) @@ -2679,11 +3125,12 @@ def test_transactions(conn, request): @pytest.mark.parametrize("conn", all_connectable) def test_transaction_rollback(conn, request): + conn_name = conn conn = request.getfixturevalue(conn) with pandasSQL_builder(conn) as pandasSQL: with pandasSQL.run_transaction() as trans: stmt = "CREATE TABLE test_trans (A INT, B TEXT)" - if isinstance(pandasSQL, SQLiteDatabase): + if "adbc" in conn_name or isinstance(pandasSQL, SQLiteDatabase): trans.execute(stmt) else: from sqlalchemy import text @@ -2987,9 +3434,11 @@ class Temporary(Base): @pytest.mark.parametrize("conn", all_connectable) def test_invalid_engine(conn, request, test_frame1): - if conn == "sqlite_buildin": + if conn == "sqlite_buildin" or "adbc" in conn: request.applymarker( - pytest.mark.xfail(reason="SQLiteDatabase does not raise for bad engine") + pytest.mark.xfail( + reason="SQLiteDatabase/ADBCDatabase does not raise for bad engine" + ) ) conn = request.getfixturevalue(conn) @@ -3089,6 +3538,12 @@ def test_read_sql_dtype_backend( expected = dtype_backend_expected(string_storage, dtype_backend, conn_name) tm.assert_frame_equal(result, expected) + if "adbc" in conn_name: + # adbc does not support chunksize argument + request.applymarker( + pytest.mark.xfail(reason="adbc does not support chunksize argument") + ) + with pd.option_context("mode.string_storage", string_storage): iterator = getattr(pd, func)( f"Select * from {table}", @@ -3112,7 +3567,7 @@ def test_read_sql_dtype_backend_table( dtype_backend_data, dtype_backend_expected, ): - if "sqlite" in conn: + if "sqlite" in conn and "adbc" not in conn: request.applymarker( pytest.mark.xfail( reason=( @@ -3133,6 +3588,10 @@ def test_read_sql_dtype_backend_table( expected = dtype_backend_expected(string_storage, dtype_backend, conn_name) tm.assert_frame_equal(result, expected) + if "adbc" in conn_name: + # adbc does not support chunksize argument + return + with pd.option_context("mode.string_storage", string_storage): iterator = getattr(pd, func)( table, @@ -3229,6 +3688,10 @@ def func(storage, dtype_backend, conn_name) -> DataFrame: @pytest.mark.parametrize("conn", all_connectable) def test_chunksize_empty_dtypes(conn, request): # GH#50245 + if "adbc" in conn: + request.node.add_marker( + pytest.mark.xfail(reason="chunksize argument NotImplemented with ADBC") + ) conn = request.getfixturevalue(conn) dtypes = {"a": "int64", "b": "object"} df = DataFrame(columns=["a", "b"]).astype(dtypes) diff --git a/pyproject.toml b/pyproject.toml index 38a22b2e7f90d..c9eb8d9ac4e40 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -76,9 +76,9 @@ hdf5 = [# blosc only available on conda (https://github.com/Blosc/python-blosc/i #'blosc>=1.20.1', 'tables>=3.8.0'] spss = ['pyreadstat>=1.2.0'] -postgresql = ['SQLAlchemy>=2.0.0', 'psycopg2>=2.9.6'] +postgresql = ['SQLAlchemy>=2.0.0', 'psycopg2>=2.9.6', 'adbc-driver-postgresql>=0.8.0'] mysql = ['SQLAlchemy>=2.0.0', 'pymysql>=1.0.2'] -sql-other = ['SQLAlchemy>=2.0.0'] +sql-other = ['SQLAlchemy>=2.0.0', 'adbc-driver-postgresql>=0.8.0', 'adbc-driver-sqlite>=0.8.0'] html = ['beautifulsoup4>=4.11.2', 'html5lib>=1.1', 'lxml>=4.9.2'] xml = ['lxml>=4.9.2'] plot = ['matplotlib>=3.6.3'] @@ -86,7 +86,9 @@ output-formatting = ['jinja2>=3.1.2', 'tabulate>=0.9.0'] clipboard = ['PyQt5>=5.15.8', 'qtpy>=2.3.0'] compression = ['zstandard>=0.19.0'] consortium-standard = ['dataframe-api-compat>=0.1.7'] -all = ['beautifulsoup4>=4.11.2', +all = ['adbc-driver-postgresql>=0.8.0', + 'adbc-driver-sqlite>=0.8.0', + 'beautifulsoup4>=4.11.2', # blosc only available on conda (https://github.com/Blosc/python-blosc/issues/297) #'blosc>=1.21.3', 'bottleneck>=1.3.6', diff --git a/requirements-dev.txt b/requirements-dev.txt index 23a8112bdb344..d074949b22df1 100644 --- a/requirements-dev.txt +++ b/requirements-dev.txt @@ -81,6 +81,8 @@ feedparser pyyaml requests pygments +adbc-driver-postgresql>=0.8.0 +adbc-driver-sqlite>=0.8.0 dataframe-api-compat>=0.1.7 sphinx-toggleprompt typing_extensions; python_version<"3.11"