Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix handling of single quotes in RedshiftToS3Operator #35986

Merged
merged 5 commits into from
Dec 4, 2023
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion airflow/providers/amazon/aws/transfers/redshift_to_s3.py
Original file line number Diff line number Diff line change
Expand Up @@ -141,8 +141,10 @@ def __init__(
def _build_unload_query(
self, credentials_block: str, select_query: str, s3_key: str, unload_options: str
) -> str:
# Un-escape already escaped queries
select_query = select_query.replace("''", "'")
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Would this break empty string literals…?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ouh! That's a good catch!

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I addressed it in 7d06f1e259a695fc68e16e50e9562f514dbb757d. I also added a unit test for this use case

return f"""
UNLOAD ('{select_query}')
UNLOAD ($${select_query}$$)
TO 's3://{self.s3_bucket}/{s3_key}'
credentials
'{credentials_block}'
Expand Down
59 changes: 59 additions & 0 deletions tests/providers/amazon/aws/transfers/test_redshift_to_s3.py
Original file line number Diff line number Diff line change
Expand Up @@ -217,6 +217,65 @@ def test_custom_select_query_unloading(
assert secret_key in unload_query
assert_equal_ignore_multiple_spaces(mock_run.call_args.args[0], unload_query)

@pytest.mark.parametrize(
"table_as_file_name, expected_s3_key, select_query",
[
[True, "key/table_", "SELECT 'Single Quotes Break this Operator'"],
[False, "key", "SELECT 'Single Quotes Break this Operator'"],
[True, "key/table_", "SELECT ''Single Quotes Break this Operator''"],
[False, "key", "SELECT ''Single Quotes Break this Operator''"],
],
)
@mock.patch("airflow.providers.amazon.aws.hooks.s3.S3Hook.get_connection")
@mock.patch("airflow.models.connection.Connection")
@mock.patch("boto3.session.Session")
@mock.patch("airflow.providers.amazon.aws.hooks.redshift_sql.RedshiftSQLHook.run")
def test_custom_select_query_unloading_with_single_quotes(
self,
mock_run,
mock_session,
mock_connection,
mock_hook,
table_as_file_name,
expected_s3_key,
select_query,
):
access_key = "aws_access_key_id"
secret_key = "aws_secret_access_key"
mock_session.return_value = Session(access_key, secret_key)
mock_session.return_value.access_key = access_key
mock_session.return_value.secret_key = secret_key
mock_session.return_value.token = None
mock_connection.return_value = Connection()
mock_hook.return_value = Connection()
s3_bucket = "bucket"
s3_key = "key"
unload_options = ["HEADER"]

op = RedshiftToS3Operator(
select_query=select_query,
s3_bucket=s3_bucket,
s3_key=s3_key,
unload_options=unload_options,
include_header=True,
redshift_conn_id="redshift_conn_id",
aws_conn_id="aws_conn_id",
task_id="task_id",
dag=None,
)

op.execute(None)

unload_options = "\n\t\t\t".join(unload_options)
credentials_block = build_credentials_block(mock_session.return_value)

unload_query = op._build_unload_query(credentials_block, select_query, s3_key, unload_options)

assert mock_run.call_count == 1
assert access_key in unload_query
assert secret_key in unload_query
assert_equal_ignore_multiple_spaces(mock_run.call_args.args[0], unload_query)

@pytest.mark.parametrize("table_as_file_name, expected_s3_key", [[True, "key/table_"], [False, "key"]])
@mock.patch("airflow.providers.amazon.aws.hooks.s3.S3Hook.get_connection")
@mock.patch("airflow.models.connection.Connection")
Expand Down