From fbf6b152036455fc6531df62bc762c6ce6334b6e Mon Sep 17 00:00:00 2001 From: Rishi Kulkarni Date: Thu, 20 Jul 2023 21:32:38 -0400 Subject: [PATCH] FIX: deferrable operators now use AioCredentials Fixes airflow.providers.amazong.aws.hooks.base_aws.BaseSessionFactory feeds synchronous credentials to aiobotocore when using `assume_role` #32732 --- .../providers/amazon/aws/hooks/base_aws.py | 32 +++++++++++++++---- 1 file changed, 26 insertions(+), 6 deletions(-) diff --git a/airflow/providers/amazon/aws/hooks/base_aws.py b/airflow/providers/amazon/aws/hooks/base_aws.py index aaec87dd0911..98a0e9ffc8cb 100644 --- a/airflow/providers/amazon/aws/hooks/base_aws.py +++ b/airflow/providers/amazon/aws/hooks/base_aws.py @@ -201,18 +201,38 @@ def _create_session_with_assume_role( if self.conn.assume_role_method == "assume_role_with_web_identity": # Deferred credentials have no initial credentials credential_fetcher = self._get_web_identity_credential_fetcher() - credentials = botocore.credentials.DeferredRefreshableCredentials( + + if deferrable: + from aiobotocore.credentials import AioDeferredRefreshableCredentials + + credentials = AioDeferredRefreshableCredentials( method="assume-role-with-web-identity", refresh_using=credential_fetcher.fetch_credentials, time_fetcher=lambda: datetime.datetime.now(tz=tzlocal()), ) + else: + credentials = botocore.credentials.DeferredRefreshableCredentials( + method="assume-role-with-web-identity", + refresh_using=credential_fetcher.fetch_credentials, + time_fetcher=lambda: datetime.datetime.now(tz=tzlocal()), + ) else: # Refreshable credentials do have initial credentials - credentials = botocore.credentials.RefreshableCredentials.create_from_metadata( - metadata=self._refresh_credentials(), - refresh_using=self._refresh_credentials, - method="sts-assume-role", - ) + + if deferrable: + from aiobotocore.credentials import AioRefreshableCredentials + + credentials = AioRefreshableCredentials.create_from_metadata( + metadata=self._refresh_credentials(), + refresh_using=self._refresh_credentials, + method="sts-assume-role", + ) + else: + credentials = botocore.credentials.RefreshableCredentials.create_from_metadata( + metadata=self._refresh_credentials(), + refresh_using=self._refresh_credentials, + method="sts-assume-role", + ) if deferrable: from aiobotocore.session import get_session as async_get_session