From 525898cf2355ca55e1c70fe37b0c6b962d097328 Mon Sep 17 00:00:00 2001 From: Dalei Li Date: Wed, 27 Jul 2022 14:37:21 +0200 Subject: [PATCH 1/9] Fix BigQueryInsertJobOperator cancel_on_kill --- .../providers/google/cloud/hooks/bigquery.py | 2 +- .../google/cloud/operators/bigquery.py | 68 +++++++++---------- 2 files changed, 35 insertions(+), 35 deletions(-) diff --git a/airflow/providers/google/cloud/hooks/bigquery.py b/airflow/providers/google/cloud/hooks/bigquery.py index 0049143aea461..712a18bfd9a7c 100644 --- a/airflow/providers/google/cloud/hooks/bigquery.py +++ b/airflow/providers/google/cloud/hooks/bigquery.py @@ -1415,7 +1415,7 @@ def cancel_job( location: Optional[str] = None, ) -> None: """ - Cancels a job an wait for cancellation to complete + Cancels a job and wait for cancellation to complete :param job_id: id of the job. :param project_id: Google Cloud Project where the job is running diff --git a/airflow/providers/google/cloud/operators/bigquery.py b/airflow/providers/google/cloud/operators/bigquery.py index 550c3174068ec..71e0052f81993 100644 --- a/airflow/providers/google/cloud/operators/bigquery.py +++ b/airflow/providers/google/cloud/operators/bigquery.py @@ -2139,7 +2139,7 @@ def _submit_job( hook: BigQueryHook, job_id: str, ) -> BigQueryJob: - # Submit a new job and wait for it to complete and get the result. + # Submit a new job without waiting for it to complete. return hook.insert_job( configuration=self.configuration, project_id=self.project_id, @@ -2147,6 +2147,7 @@ def _submit_job( job_id=job_id, timeout=self.result_timeout, retry=self.result_retry, + nowait=True, ) @staticmethod @@ -2155,6 +2156,32 @@ def _handle_job_error(job: BigQueryJob) -> None: raise AirflowException(f"BigQuery job {job.job_id} failed: {job.error_result}") def execute(self, context: Any): + job_types = { + LoadJob._JOB_TYPE: ["sourceTable", "destinationTable"], + CopyJob._JOB_TYPE: ["sourceTable", "destinationTable"], + ExtractJob._JOB_TYPE: ["sourceTable"], + QueryJob._JOB_TYPE: ["destinationTable"], + } + + if self.project_id: + for job_type, tables_prop in job_types.items(): + job_configuration = job.to_api_repr()["configuration"] + if job_type in job_configuration: + for table_prop in tables_prop: + if table_prop in job_configuration[job_type]: + table = job_configuration[job_type][table_prop] + persist_kwargs = { + "context": context, + "task_instance": self, + "project_id": self.project_id, + "table_id": table, + } + if not isinstance(table, str): + persist_kwargs["table_id"] = table["tableId"] + persist_kwargs["dataset_id"] = table["datasetId"] + + BigQueryTableLink.persist(**persist_kwargs) + hook = BigQueryHook( gcp_conn_id=self.gcp_conn_id, delegate_to=self.delegate_to, @@ -2174,7 +2201,6 @@ def execute(self, context: Any): try: self.log.info("Executing: %s'", self.configuration) job = self._submit_job(hook, job_id) - self._handle_job_error(job) except Conflict: # If the job already exists retrieve it job = hook.get_job( @@ -2182,11 +2208,7 @@ def execute(self, context: Any): location=self.location, job_id=job_id, ) - if job.state in self.reattach_states: - # We are reattaching to a job - job.result(timeout=self.result_timeout, retry=self.result_retry) - self._handle_job_error(job) - else: + if job.state not in self.reattach_states: # Same job configuration so we need force_rerun raise AirflowException( f"Job with id: {job_id} already exists and is in {job.state} state. If you " @@ -2194,34 +2216,12 @@ def execute(self, context: Any): f"Or, if you want to reattach in this scenario add {job.state} to `reattach_states`" ) - job_types = { - LoadJob._JOB_TYPE: ["sourceTable", "destinationTable"], - CopyJob._JOB_TYPE: ["sourceTable", "destinationTable"], - ExtractJob._JOB_TYPE: ["sourceTable"], - QueryJob._JOB_TYPE: ["destinationTable"], - } - - if self.project_id: - for job_type, tables_prop in job_types.items(): - job_configuration = job.to_api_repr()["configuration"] - if job_type in job_configuration: - for table_prop in tables_prop: - if table_prop in job_configuration[job_type]: - table = job_configuration[job_type][table_prop] - persist_kwargs = { - "context": context, - "task_instance": self, - "project_id": self.project_id, - "table_id": table, - } - if not isinstance(table, str): - persist_kwargs["table_id"] = table["tableId"] - persist_kwargs["dataset_id"] = table["datasetId"] - - BigQueryTableLink.persist(**persist_kwargs) - self.job_id = job.job_id - return job.job_id + # Wait for the job to complete + job.result(timeout=self.result_timeout, retry=self.result_retry) + self._handle_job_error(job) + + return self.job_id def on_kill(self) -> None: if self.job_id and self.cancel_on_kill: From 5c4550d7e89dc297eb839e384e0e9c5c8b9371c4 Mon Sep 17 00:00:00 2001 From: Dalei Li Date: Wed, 27 Jul 2022 15:56:28 +0200 Subject: [PATCH 2/9] Add test case on_kill_after_execution_timeout --- .../google/cloud/operators/test_bigquery.py | 26 +++++++++++++++++++ 1 file changed, 26 insertions(+) diff --git a/tests/providers/google/cloud/operators/test_bigquery.py b/tests/providers/google/cloud/operators/test_bigquery.py index 7be855a9a0e6f..cb17127e1bbc9 100644 --- a/tests/providers/google/cloud/operators/test_bigquery.py +++ b/tests/providers/google/cloud/operators/test_bigquery.py @@ -912,6 +912,32 @@ def test_on_kill(self, mock_hook): location=TEST_DATASET_LOCATION, project_id=TEST_GCP_PROJECT_ID, ) + + @mock.patch('airflow.providers.google.cloud.operators.bigquery.BigQueryHook') + def test_on_kill_after_execution_timeout(self, mock_hook): + job_id = "123456" + hash_ = "hash" + real_job_id = f"{job_id}_{hash_}" + mock_job = MagicMock(job_id=real_job_id, error_result=False) + mock_job.result.side_effect = AirflowTaskTimeout() + mock_hook.return_value.insert_job.return_value = mock_job + mock_hook.return_value.generate_job_id.return_value = real_job_id + + op = BigQueryInsertJobOperator( + task_id="insert_query_job", + configuration=configuration, + location=TEST_DATASET_LOCATION, + job_id=job_id, + project_id=TEST_GCP_PROJECT_ID, + cancel_on_kill=False, + ) + with pytest.raises(AirflowTaskTimeout): + op.execute(context=self.mock_context) + + op.on_kill() + mock_hook.return_value.cancel_job.assert_called_once_with( + project_id=GCP_PROJECT, region=GCP_LOCATION, job_id=job_id + ) @mock.patch('airflow.providers.google.cloud.operators.bigquery.BigQueryHook') def test_execute_failure(self, mock_hook): From 51c58c37c66fe90d8d906cd61e4ad68f35e21a99 Mon Sep 17 00:00:00 2001 From: Dalei Li Date: Wed, 27 Jul 2022 15:58:31 +0200 Subject: [PATCH 3/9] Cancels -> Cancel --- airflow/providers/google/cloud/hooks/bigquery.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/airflow/providers/google/cloud/hooks/bigquery.py b/airflow/providers/google/cloud/hooks/bigquery.py index 712a18bfd9a7c..1e9848d7670a5 100644 --- a/airflow/providers/google/cloud/hooks/bigquery.py +++ b/airflow/providers/google/cloud/hooks/bigquery.py @@ -1415,7 +1415,7 @@ def cancel_job( location: Optional[str] = None, ) -> None: """ - Cancels a job and wait for cancellation to complete + Cancel a job and wait for cancellation to complete :param job_id: id of the job. :param project_id: Google Cloud Project where the job is running From 7f6aeec77e541bff122cec7413daaffccead0b93 Mon Sep 17 00:00:00 2001 From: Dalei Li Date: Thu, 28 Jul 2022 18:45:15 +0200 Subject: [PATCH 4/9] move BigQueryTableLink back, fix test --- .../google/cloud/operators/bigquery.py | 54 +++++++++---------- .../google/cloud/operators/test_bigquery.py | 29 +++++++--- 2 files changed, 48 insertions(+), 35 deletions(-) diff --git a/airflow/providers/google/cloud/operators/bigquery.py b/airflow/providers/google/cloud/operators/bigquery.py index 71e0052f81993..58e72cf058ae6 100644 --- a/airflow/providers/google/cloud/operators/bigquery.py +++ b/airflow/providers/google/cloud/operators/bigquery.py @@ -2156,32 +2156,6 @@ def _handle_job_error(job: BigQueryJob) -> None: raise AirflowException(f"BigQuery job {job.job_id} failed: {job.error_result}") def execute(self, context: Any): - job_types = { - LoadJob._JOB_TYPE: ["sourceTable", "destinationTable"], - CopyJob._JOB_TYPE: ["sourceTable", "destinationTable"], - ExtractJob._JOB_TYPE: ["sourceTable"], - QueryJob._JOB_TYPE: ["destinationTable"], - } - - if self.project_id: - for job_type, tables_prop in job_types.items(): - job_configuration = job.to_api_repr()["configuration"] - if job_type in job_configuration: - for table_prop in tables_prop: - if table_prop in job_configuration[job_type]: - table = job_configuration[job_type][table_prop] - persist_kwargs = { - "context": context, - "task_instance": self, - "project_id": self.project_id, - "table_id": table, - } - if not isinstance(table, str): - persist_kwargs["table_id"] = table["tableId"] - persist_kwargs["dataset_id"] = table["datasetId"] - - BigQueryTableLink.persist(**persist_kwargs) - hook = BigQueryHook( gcp_conn_id=self.gcp_conn_id, delegate_to=self.delegate_to, @@ -2216,11 +2190,37 @@ def execute(self, context: Any): f"Or, if you want to reattach in this scenario add {job.state} to `reattach_states`" ) + job_types = { + LoadJob._JOB_TYPE: ["sourceTable", "destinationTable"], + CopyJob._JOB_TYPE: ["sourceTable", "destinationTable"], + ExtractJob._JOB_TYPE: ["sourceTable"], + QueryJob._JOB_TYPE: ["destinationTable"], + } + + if self.project_id: + for job_type, tables_prop in job_types.items(): + job_configuration = job.to_api_repr()["configuration"] + if job_type in job_configuration: + for table_prop in tables_prop: + if table_prop in job_configuration[job_type]: + table = job_configuration[job_type][table_prop] + persist_kwargs = { + "context": context, + "task_instance": self, + "project_id": self.project_id, + "table_id": table, + } + if not isinstance(table, str): + persist_kwargs["table_id"] = table["tableId"] + persist_kwargs["dataset_id"] = table["datasetId"] + + BigQueryTableLink.persist(**persist_kwargs) + self.job_id = job.job_id # Wait for the job to complete job.result(timeout=self.result_timeout, retry=self.result_retry) self._handle_job_error(job) - + return self.job_id def on_kill(self) -> None: diff --git a/tests/providers/google/cloud/operators/test_bigquery.py b/tests/providers/google/cloud/operators/test_bigquery.py index cb17127e1bbc9..47dc7434c34df 100644 --- a/tests/providers/google/cloud/operators/test_bigquery.py +++ b/tests/providers/google/cloud/operators/test_bigquery.py @@ -23,7 +23,7 @@ from google.cloud.bigquery import DEFAULT_RETRY from google.cloud.exceptions import Conflict -from airflow.exceptions import AirflowException +from airflow.exceptions import AirflowException, AirflowTaskTimeout from airflow.providers.google.cloud.operators.bigquery import ( BigQueryCheckOperator, BigQueryConsoleIndexableLink, @@ -912,31 +912,44 @@ def test_on_kill(self, mock_hook): location=TEST_DATASET_LOCATION, project_id=TEST_GCP_PROJECT_ID, ) - + @mock.patch('airflow.providers.google.cloud.operators.bigquery.BigQueryHook') - def test_on_kill_after_execution_timeout(self, mock_hook): + @mock.patch('airflow.providers.google.cloud.hooks.bigquery.BigQueryJob') + def test_on_kill_after_execution_timeout(self, mock_job, mock_hook): job_id = "123456" hash_ = "hash" real_job_id = f"{job_id}_{hash_}" - mock_job = MagicMock(job_id=real_job_id, error_result=False) + + configuration = { + "query": { + "query": "SELECT * FROM any", + "useLegacySql": False, + } + } + + mock_job.job_id = real_job_id + mock_job.error_result = False mock_job.result.side_effect = AirflowTaskTimeout() + mock_hook.return_value.insert_job.return_value = mock_job mock_hook.return_value.generate_job_id.return_value = real_job_id - + op = BigQueryInsertJobOperator( task_id="insert_query_job", configuration=configuration, location=TEST_DATASET_LOCATION, job_id=job_id, project_id=TEST_GCP_PROJECT_ID, - cancel_on_kill=False, + cancel_on_kill=True, ) with pytest.raises(AirflowTaskTimeout): - op.execute(context=self.mock_context) + op.execute(context=MagicMock()) op.on_kill() mock_hook.return_value.cancel_job.assert_called_once_with( - project_id=GCP_PROJECT, region=GCP_LOCATION, job_id=job_id + job_id=real_job_id, + location=TEST_DATASET_LOCATION, + project_id=TEST_GCP_PROJECT_ID, ) @mock.patch('airflow.providers.google.cloud.operators.bigquery.BigQueryHook') From 8bd6310aacdb1903eb174caa5ca16b9e4467a7a4 Mon Sep 17 00:00:00 2001 From: Dalei Li Date: Fri, 29 Jul 2022 09:39:08 +0200 Subject: [PATCH 5/9] fix existing test --- tests/providers/google/cloud/operators/test_bigquery.py | 3 +++ 1 file changed, 3 insertions(+) diff --git a/tests/providers/google/cloud/operators/test_bigquery.py b/tests/providers/google/cloud/operators/test_bigquery.py index 47dc7434c34df..ebb6333856119 100644 --- a/tests/providers/google/cloud/operators/test_bigquery.py +++ b/tests/providers/google/cloud/operators/test_bigquery.py @@ -830,6 +830,7 @@ def test_execute_query_success(self, mock_hook): configuration=configuration, location=TEST_DATASET_LOCATION, job_id=real_job_id, + nowait=True, project_id=TEST_GCP_PROJECT_ID, retry=DEFAULT_RETRY, timeout=None, @@ -870,6 +871,7 @@ def test_execute_copy_success(self, mock_hook): configuration=configuration, location=TEST_DATASET_LOCATION, job_id=real_job_id, + nowait=True, project_id=TEST_GCP_PROJECT_ID, retry=DEFAULT_RETRY, timeout=None, @@ -1057,6 +1059,7 @@ def test_execute_force_rerun(self, mock_hook): configuration=configuration, location=TEST_DATASET_LOCATION, job_id=real_job_id, + nowait=True, project_id=TEST_GCP_PROJECT_ID, retry=DEFAULT_RETRY, timeout=None, From 8b3d6736b4a72f0f8624adb25585f8e23d9fc539 Mon Sep 17 00:00:00 2001 From: Dalei Li Date: Fri, 29 Jul 2022 09:44:30 +0200 Subject: [PATCH 6/9] log a message when not cancelling a job --- airflow/providers/google/cloud/operators/bigquery.py | 2 ++ 1 file changed, 2 insertions(+) diff --git a/airflow/providers/google/cloud/operators/bigquery.py b/airflow/providers/google/cloud/operators/bigquery.py index 58e72cf058ae6..2d33b22674417 100644 --- a/airflow/providers/google/cloud/operators/bigquery.py +++ b/airflow/providers/google/cloud/operators/bigquery.py @@ -2228,3 +2228,5 @@ def on_kill(self) -> None: self.hook.cancel_job( # type: ignore[union-attr] job_id=self.job_id, project_id=self.project_id, location=self.location ) + else: + self.log.info(f'Skipping to cancel job: {self.project_id}:{self.location}.{self.job_id}') From dc0674f6d552f45b1a2bbd24ffcd3dfabc7bd48b Mon Sep 17 00:00:00 2001 From: Dalei Li Date: Fri, 29 Jul 2022 16:49:11 +0200 Subject: [PATCH 7/9] fix test case test_execute_no_force_rerun --- tests/providers/google/cloud/operators/test_bigquery.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tests/providers/google/cloud/operators/test_bigquery.py b/tests/providers/google/cloud/operators/test_bigquery.py index ebb6333856119..494749282b6a9 100644 --- a/tests/providers/google/cloud/operators/test_bigquery.py +++ b/tests/providers/google/cloud/operators/test_bigquery.py @@ -1080,7 +1080,7 @@ def test_execute_no_force_rerun(self, mock_hook): } } - mock_hook.return_value.insert_job.return_value.result.side_effect = Conflict("any") + mock_hook.return_value.insert_job.side_effect = Conflict("any") mock_hook.return_value.generate_job_id.return_value = real_job_id job = MagicMock( job_id=real_job_id, From 5344c66be960ced9016b830226d0c154d4467122 Mon Sep 17 00:00:00 2001 From: Dalei Li Date: Fri, 29 Jul 2022 16:57:03 +0200 Subject: [PATCH 8/9] use printf-style string format in logging --- airflow/providers/google/cloud/operators/bigquery.py | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/airflow/providers/google/cloud/operators/bigquery.py b/airflow/providers/google/cloud/operators/bigquery.py index 2d33b22674417..b9429fd761100 100644 --- a/airflow/providers/google/cloud/operators/bigquery.py +++ b/airflow/providers/google/cloud/operators/bigquery.py @@ -2229,4 +2229,6 @@ def on_kill(self) -> None: job_id=self.job_id, project_id=self.project_id, location=self.location ) else: - self.log.info(f'Skipping to cancel job: {self.project_id}:{self.location}.{self.job_id}') + self.log.info( + 'Skipping to cancel job: %s:%s.%s', self.project_id, self.location, self.job_id + ) From 3476c94212bdf682dba3789f5e227134e7f8b78e Mon Sep 17 00:00:00 2001 From: Dalei Li Date: Mon, 1 Aug 2022 22:45:30 +0200 Subject: [PATCH 9/9] resolve back lint issue --- airflow/providers/google/cloud/operators/bigquery.py | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/airflow/providers/google/cloud/operators/bigquery.py b/airflow/providers/google/cloud/operators/bigquery.py index b9429fd761100..e86c19e83b404 100644 --- a/airflow/providers/google/cloud/operators/bigquery.py +++ b/airflow/providers/google/cloud/operators/bigquery.py @@ -2229,6 +2229,4 @@ def on_kill(self) -> None: job_id=self.job_id, project_id=self.project_id, location=self.location ) else: - self.log.info( - 'Skipping to cancel job: %s:%s.%s', self.project_id, self.location, self.job_id - ) + self.log.info('Skipping to cancel job: %s:%s.%s', self.project_id, self.location, self.job_id)