From 2737af31b7467fde3a8152922311e0136924d58e Mon Sep 17 00:00:00 2001 From: Erwan Simon Date: Mon, 15 Apr 2024 10:01:27 +0200 Subject: [PATCH 1/2] fix: prevent athena.to_iceberg overwrite to delete table in order to preserve iceberg transactions history --- awswrangler/athena/_write_iceberg.py | 20 +++++++++++++++----- 1 file changed, 15 insertions(+), 5 deletions(-) diff --git a/awswrangler/athena/_write_iceberg.py b/awswrangler/athena/_write_iceberg.py index 5d3d45d8c..8c46ded1c 100644 --- a/awswrangler/athena/_write_iceberg.py +++ b/awswrangler/athena/_write_iceberg.py @@ -383,11 +383,6 @@ def to_iceberg( glue_table_settings if glue_table_settings else {}, ) - if mode == "overwrite": - catalog.delete_table_if_exists( - database=database, table=table, catalog_id=catalog_id, boto3_session=boto3_session - ) - try: # Create Iceberg table if it doesn't exist if not catalog.does_table_exist( @@ -469,6 +464,21 @@ def to_iceberg( s3_additional_kwargs=s3_additional_kwargs, catalog_id=catalog_id, ) + # if mode == "overwrite", delete whole data from table (but not table itself) + elif mode == "overwrite": + delete_sql_statement = f"DELETE FROM {table}" + delete_query_execution_id: str = _start_query_execution( + sql=delete_sql_statement, + workgroup=workgroup, + wg_config=wg_config, + database=database, + data_source=data_source, + encryption=encryption, + kms_key=kms_key, + boto3_session=boto3_session, + ) + wait_query(query_execution_id=delete_query_execution_id, + boto3_session=boto3_session) # Create temporary external table, write the results s3.to_parquet( From 409b6b777e06bcd6c79a8c129cc72b3101a6217f Mon Sep 17 00:00:00 2001 From: Erwan Simon Date: Mon, 15 Apr 2024 11:14:10 +0200 Subject: [PATCH 2/2] fix: ruff format --- awswrangler/athena/_write_iceberg.py | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/awswrangler/athena/_write_iceberg.py b/awswrangler/athena/_write_iceberg.py index 8c46ded1c..28eae686b 100644 --- a/awswrangler/athena/_write_iceberg.py +++ b/awswrangler/athena/_write_iceberg.py @@ -477,8 +477,7 @@ def to_iceberg( kms_key=kms_key, boto3_session=boto3_session, ) - wait_query(query_execution_id=delete_query_execution_id, - boto3_session=boto3_session) + wait_query(query_execution_id=delete_query_execution_id, boto3_session=boto3_session) # Create temporary external table, write the results s3.to_parquet(