Skip to content

Commit

Permalink
removing deprecation postgres (#39706)
Browse files Browse the repository at this point in the history
  • Loading branch information
Bowrna authored May 22, 2024
1 parent cee4464 commit cb57a67
Show file tree
Hide file tree
Showing 3 changed files with 49 additions and 49 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,8 @@
.. _howto/operators:postgres:

How-to Guide for PostgresOperator
=================================
How-to Guide for Postgres using SQLExecuteQueryOperator
=======================================================

Introduction
------------
Expand All @@ -28,16 +28,17 @@ workflow. Airflow is essentially a graph (Directed Acyclic Graph) made up of tas

A task defined or implemented by a operator is a unit of work in your data pipeline.

The purpose of :class:`~airflow.providers.postgres.operators.postgres.PostgresOperator` is to define tasks involving
interactions with a PostgreSQL database.
The purpose of this guide is to define tasks involving interactions with a PostgreSQL database with
the :class:`~airflow.providers.common.sql.operators.SQLExecuteQueryOperator`.

Under the hood, the :class:`~airflow.providers.postgres.operators.postgres.PostgresOperator` delegates its heavy lifting to the :class:`~airflow.providers.postgres.hooks.postgres.PostgresHook`.
.. warning::
Previously, PostgresOperator was used to perform this kind of operation. But at the moment PostgresOperator is deprecated and will be removed in future versions of the provider. Please consider to switch to SQLExecuteQueryOperator as soon as possible.

Common Database Operations with PostgresOperator
------------------------------------------------
Common Database Operations with SQLExecuteQueryOperator
-------------------------------------------------------

To use the PostgresOperator to carry out SQL request, two parameters are required: ``sql`` and ``postgres_conn_id``.
These two parameters are eventually fed to the PostgresHook object that interacts directly with the Postgres database.
To use the SQLExecuteQueryOperator to carry out PostgreSQL request, two parameters are required: ``sql`` and ``conn_id``.
These two parameters are eventually fed to the DbApiHook object that interacts directly with the Postgres database.

Creating a Postgres database table
----------------------------------
Expand All @@ -46,11 +47,11 @@ The code snippets below are based on Airflow-2.0

.. exampleinclude:: /../../tests/system/providers/postgres/example_postgres.py
:language: python
:start-after: [START postgres_operator_howto_guide]
:end-before: [END postgres_operator_howto_guide_create_pet_table]
:start-after: [START postgres_sql_execute_query_operator_howto_guide]
:end-before: [END postgres_sql_execute_query_operator_howto_guide_create_pet_table]


Dumping SQL statements into your PostgresOperator isn't quite appealing and will create maintainability pains somewhere
Dumping SQL statements into your operator isn't quite appealing and will create maintainability pains somewhere
down to the road. To prevent this, Airflow offers an elegant solution. This is how it works: you simply create
a directory inside the DAG folder called ``sql`` and then put all the SQL files containing your SQL queries inside it.

Expand All @@ -71,9 +72,9 @@ Now let's refactor ``create_pet_table`` in our DAG:

.. code-block:: python
create_pet_table = PostgresOperator(
create_pet_table = SQLExecuteQueryOperator(
task_id="create_pet_table",
postgres_conn_id="postgres_default",
conn_id="postgres_default",
sql="sql/pet_schema.sql",
)
Expand All @@ -91,13 +92,13 @@ Let's say we already have the SQL insert statement below in our ``dags/sql/pet_s
INSERT INTO pet VALUES ( 'Lester', 'Hamster', '2020-06-23', 'Lily');
INSERT INTO pet VALUES ( 'Quincy', 'Parrot', '2013-08-11', 'Anne');

We can then create a PostgresOperator task that populate the ``pet`` table.
We can then create a SQLExecuteQueryOperator task that populate the ``pet`` table.

.. code-block:: python
populate_pet_table = PostgresOperator(
populate_pet_table = SQLExecuteQueryOperator(
task_id="populate_pet_table",
postgres_conn_id="postgres_default",
conn_id="postgres_default",
sql="sql/pet_schema.sql",
)
Expand All @@ -109,29 +110,29 @@ Fetching records from your Postgres database table can be as simple as:

.. code-block:: python
get_all_pets = PostgresOperator(
get_all_pets = SQLExecuteQueryOperator(
task_id="get_all_pets",
postgres_conn_id="postgres_default",
conn_id="postgres_default",
sql="SELECT * FROM pet;",
)
Passing Parameters into PostgresOperator
----------------------------------------
Passing Parameters into SQLExecuteQueryOperator for Postgres
------------------------------------------------------------

PostgresOperator provides ``parameters`` attribute which makes it possible to dynamically inject values into your
SQL requests during runtime. The BaseOperator class has the ``params`` attribute which is available to the PostgresOperator
SQLExecuteQueryOperator provides ``parameters`` attribute which makes it possible to dynamically inject values into your
SQL requests during runtime. The BaseOperator class has the ``params`` attribute which is available to the SQLExecuteQueryOperator
by virtue of inheritance. Both ``parameters`` and ``params`` make it possible to dynamically pass in parameters in many
interesting ways.

To find the owner of the pet called 'Lester':

.. code-block:: python
get_birth_date = PostgresOperator(
get_birth_date = SQLExecuteQueryOperator(
task_id="get_birth_date",
postgres_conn_id="postgres_default",
conn_id="postgres_default",
sql="SELECT * FROM pet WHERE birth_date BETWEEN SYMMETRIC %(begin_date)s AND %(end_date)s",
parameters={"begin_date": "2020-01-01", "end_date": "2020-12-31"},
)
Expand All @@ -149,24 +150,24 @@ class.

.. code-block:: python
get_birth_date = PostgresOperator(
get_birth_date = SQLExecuteQueryOperator(
task_id="get_birth_date",
postgres_conn_id="postgres_default",
conn_id="postgres_default",
sql="sql/birth_date.sql",
params={"begin_date": "2020-01-01", "end_date": "2020-12-31"},
)
Passing Server Configuration Parameters into PostgresOperator
-------------------------------------------------------------

PostgresOperator provides ``hook_params`` attribute that allows you to pass add parameters to PostgresHook.
SQLExecuteQueryOperator provides ``hook_params`` attribute that allows you to pass add parameters to DbApiHook.
You can pass ``options`` argument this way so that you specify `command-line options <https://www.postgresql.org/docs/current/libpq-connect.html#LIBPQ-CONNECT-OPTIONS>`_
sent to the server at connection start.

.. exampleinclude:: /../../tests/system/providers/postgres/example_postgres.py
:language: python
:start-after: [START postgres_operator_howto_guide_get_birth_date]
:end-before: [END postgres_operator_howto_guide_get_birth_date]
:start-after: [START postgres_sql_execute_query_operator_howto_guide_get_birth_date]
:end-before: [END postgres_sql_execute_query_operator_howto_guide_get_birth_date]


The complete Postgres Operator DAG
Expand All @@ -176,14 +177,14 @@ When we put everything together, our DAG should look like this:

.. exampleinclude:: /../../tests/system/providers/postgres/example_postgres.py
:language: python
:start-after: [START postgres_operator_howto_guide]
:end-before: [END postgres_operator_howto_guide]
:start-after: [START postgres_sql_execute_query_operator_howto_guide]
:end-before: [END postgres_sql_execute_query_operator_howto_guide]


Conclusion
----------

In this how-to guide we explored the Apache Airflow PostgreOperator. Let's quickly highlight the key takeaways.
In this how-to guide we explored the Apache Airflow SQLExecuteQueryOperator to connect to PostgreSQL Database. Let's quickly highlight the key takeaways.
It is best practice to create subdirectory called ``sql`` in your ``dags`` directory where you can store your sql files.
This will make your code more elegant and more maintainable.
And finally, we looked at the different ways you can dynamically pass parameters into our PostgresOperator
Expand Down
1 change: 0 additions & 1 deletion tests/always/test_example_dags.py
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,6 @@
"tests/system/providers/apache/drill/example_drill_dag.py",
"tests/system/providers/microsoft/mssql/example_mssql.py",
"tests/system/providers/mysql/example_mysql.py",
"tests/system/providers/postgres/example_postgres.py",
"tests/system/providers/snowflake/example_snowflake.py",
"tests/system/providers/trino/example_trino.py",
)
Expand Down
30 changes: 15 additions & 15 deletions tests/system/providers/postgres/example_postgres.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,9 +20,9 @@
import os

from airflow import DAG
from airflow.providers.postgres.operators.postgres import PostgresOperator
from airflow.providers.common.sql.operators.sql import SQLExecuteQueryOperator

# [START postgres_operator_howto_guide]
# [START postgres_sql_execute_query_operator_howto_guide]


# create_pet_table, populate_pet_table, get_all_pets, and get_birth_date are examples of tasks created by
Expand All @@ -37,8 +37,8 @@
schedule="@once",
catchup=False,
) as dag:
# [START postgres_operator_howto_guide_create_pet_table]
create_pet_table = PostgresOperator(
# [START postgres_sql_execute_query_operator_howto_guide_create_pet_table]
create_pet_table = SQLExecuteQueryOperator(
task_id="create_pet_table",
sql="""
CREATE TABLE IF NOT EXISTS pet (
Expand All @@ -49,9 +49,9 @@
OWNER VARCHAR NOT NULL);
""",
)
# [END postgres_operator_howto_guide_create_pet_table]
# [START postgres_operator_howto_guide_populate_pet_table]
populate_pet_table = PostgresOperator(
# [END postgres_sql_execute_query_operator_howto_guide_create_pet_table]
# [START postgres_sql_execute_query_operator_howto_guide_populate_pet_table]
populate_pet_table = SQLExecuteQueryOperator(
task_id="populate_pet_table",
sql="""
INSERT INTO pet (name, pet_type, birth_date, OWNER)
Expand All @@ -64,21 +64,21 @@
VALUES ( 'Quincy', 'Parrot', '2013-08-11', 'Anne');
""",
)
# [END postgres_operator_howto_guide_populate_pet_table]
# [START postgres_operator_howto_guide_get_all_pets]
get_all_pets = PostgresOperator(task_id="get_all_pets", sql="SELECT * FROM pet;")
# [END postgres_operator_howto_guide_get_all_pets]
# [START postgres_operator_howto_guide_get_birth_date]
get_birth_date = PostgresOperator(
# [END postgres_sql_execute_query_operator_howto_guide_populate_pet_table]
# [START postgres_sql_execute_query_operator_howto_guide_get_all_pets]
get_all_pets = SQLExecuteQueryOperator(task_id="get_all_pets", sql="SELECT * FROM pet;")
# [END postgres_sql_execute_query_operator_howto_guide_get_all_pets]
# [START postgres_sql_execute_query_operator_howto_guide_get_birth_date]
get_birth_date = SQLExecuteQueryOperator(
task_id="get_birth_date",
sql="SELECT * FROM pet WHERE birth_date BETWEEN SYMMETRIC %(begin_date)s AND %(end_date)s",
parameters={"begin_date": "2020-01-01", "end_date": "2020-12-31"},
hook_params={"options": "-c statement_timeout=3000ms"},
)
# [END postgres_operator_howto_guide_get_birth_date]
# [END postgres_sql_execute_query_operator_howto_guide_get_birth_date]

create_pet_table >> populate_pet_table >> get_all_pets >> get_birth_date
# [END postgres_operator_howto_guide]
# [END postgres_sql_execute_query_operator_howto_guide]

from tests.system.utils.watcher import watcher

Expand Down

0 comments on commit cb57a67

Please sign in to comment.