diff --git a/airflow/providers/airbyte/CHANGELOG.rst b/airflow/providers/airbyte/CHANGELOG.rst index 854d5b1cc739..15cc647ec7da 100644 --- a/airflow/providers/airbyte/CHANGELOG.rst +++ b/airflow/providers/airbyte/CHANGELOG.rst @@ -26,6 +26,21 @@ Changelog --------- +4.0.0 +..... + +Breaking changes +~~~~~~~~~~~~~~~~ + +.. note:: + This version introduce a new way to handle the connection to Airbyte using ``client_id`` and ``client_secret`` instead of ``login`` and ``password``. + You can get them accessing the Airbyte UI and creating a new Application in the Settings page. + + There is a large refactor to create a connection. + You must specify the Full Qualified Domain Name in the ``host`` parameter, eg: ``https://my.company:8000/airbyte/v1/``. + The ``token_url`` parameter is optional and it is used to create the access token, the default value is ``v1/applications/token`` used by Airbyte Cloud. + You must remove the ``api_type`` parameter from your DAG it isn't required anymore. + 3.9.0 ..... diff --git a/airflow/providers/airbyte/hooks/airbyte.py b/airflow/providers/airbyte/hooks/airbyte.py index 1cecb5b031ec..d530f3e6685e 100644 --- a/airflow/providers/airbyte/hooks/airbyte.py +++ b/airflow/providers/airbyte/hooks/airbyte.py @@ -17,32 +17,26 @@ # under the License. from __future__ import annotations -import base64 -import json import time -from typing import TYPE_CHECKING, Any, Literal, TypeVar +from typing import Any, TypeVar -import aiohttp -from aiohttp import ClientResponseError -from asgiref.sync import sync_to_async +from airbyte_api import AirbyteAPI +from airbyte_api.api import CancelJobRequest, GetJobRequest +from airbyte_api.models import JobCreateRequest, JobStatusEnum, JobTypeEnum, SchemeClientCredentials, Security from airflow.exceptions import AirflowException -from airflow.providers.http.hooks.http import HttpHook - -if TYPE_CHECKING: - from airflow.models import Connection +from airflow.hooks.base import BaseHook T = TypeVar("T", bound=Any) -class AirbyteHook(HttpHook): +class AirbyteHook(BaseHook): """ Hook for Airbyte API. :param airbyte_conn_id: Optional. The name of the Airflow connection to get connection information for Airbyte. Defaults to "airbyte_default". :param api_version: Optional. Airbyte API version. Defaults to "v1". - :param api_type: Optional. The type of Airbyte API to use. Either "config" or "cloud". Defaults to "config". """ conn_name_attr = "airbyte_conn_id" @@ -50,93 +44,78 @@ class AirbyteHook(HttpHook): conn_type = "airbyte" hook_name = "Airbyte" - RUNNING = "running" - SUCCEEDED = "succeeded" - CANCELLED = "cancelled" - PENDING = "pending" - FAILED = "failed" - ERROR = "error" - INCOMPLETE = "incomplete" - def __init__( self, airbyte_conn_id: str = "airbyte_default", api_version: str = "v1", - api_type: Literal["config", "cloud"] = "config", ) -> None: - super().__init__(http_conn_id=airbyte_conn_id) + super().__init__() self.api_version: str = api_version - self.api_type: str = api_type - - async def get_headers_tenants_from_connection(self) -> tuple[dict[str, Any], str]: - """Get Headers, tenants from the connection details.""" - connection: Connection = await sync_to_async(self.get_connection)(self.http_conn_id) - # schema defaults to HTTP - schema = connection.schema if connection.schema else "http" - base_url = f"{schema}://{connection.host}" - - if connection.port: - base_url += f":{connection.port}" - - if self.api_type == "config": - credentials = f"{connection.login}:{connection.password}" - credentials_base64 = base64.b64encode(credentials.encode("utf-8")).decode("utf-8") - authorized_headers = { - "accept": "application/json", - "content-type": "application/json", - "authorization": f"Basic {credentials_base64}", - } - else: - authorized_headers = { - "accept": "application/json", - "content-type": "application/json", - "authorization": f"Bearer {connection.password}", - } - - return authorized_headers, base_url - - async def get_job_details(self, job_id: int) -> Any: + self.airbyte_conn_id = airbyte_conn_id + self.conn = self.get_conn_params(self.airbyte_conn_id) + self.airbyte_api = self.create_api_session() + + def get_conn_params(self, conn_id: str) -> Any: + conn = self.get_connection(conn_id) + + conn_params: dict = {} + conn_params["host"] = conn.host + conn_params["client_id"] = conn.login + conn_params["client_secret"] = conn.password + conn_params["token_url"] = conn.schema or "v1/applications/token" + + return conn_params + + def create_api_session(self) -> AirbyteAPI: + """Create Airbyte API session.""" + credentials = SchemeClientCredentials( + client_id=self.conn["client_id"], + client_secret=self.conn["client_secret"], + TOKEN_URL=self.conn["token_url"], + ) + + return AirbyteAPI( + server_url=self.conn["host"], + security=Security(client_credentials=credentials), + ) + + @classmethod + def get_ui_field_behaviour(cls) -> dict[str, Any]: + """Return custom field behaviour.""" + return { + "hidden_fields": [ + "extra", + "port", + ], + "relabeling": {"login": "Client ID", "password": "Client Secret", "schema": "Token URL"}, + "placeholders": {}, + } + + def get_job_details(self, job_id: int) -> Any: """ Use Http async call to retrieve metadata for a specific job of an Airbyte Sync. :param job_id: The ID of an Airbyte Sync Job. """ - headers, base_url = await self.get_headers_tenants_from_connection() - if self.api_type == "config": - url = f"{base_url}/api/{self.api_version}/jobs/get" - self.log.info("URL for api request: %s", url) - async with aiohttp.ClientSession(headers=headers) as session: - async with session.post(url=url, data=json.dumps({"id": job_id})) as response: - try: - response.raise_for_status() - return await response.json() - except ClientResponseError as e: - msg = f"{e.status}: {e.message} - {e.request_info}" - raise AirflowException(msg) - else: - url = f"{base_url}/{self.api_version}/jobs/{job_id}" - self.log.info("URL for api request: %s", url) - async with aiohttp.ClientSession(headers=headers) as session: - async with session.get(url=url) as response: - try: - response.raise_for_status() - return await response.json() - except ClientResponseError as e: - msg = f"{e.status}: {e.message} - {e.request_info}" - raise AirflowException(msg) + try: + get_job_res = self.airbyte_api.jobs.get_job( + request=GetJobRequest( + job_id=job_id, + ) + ) + return get_job_res.job_response + except Exception as e: + raise AirflowException(e) - async def get_job_status(self, job_id: int) -> str: + def get_job_status(self, job_id: int) -> str: """ Retrieve the status for a specific job of an Airbyte Sync. :param job_id: The ID of an Airbyte Sync Job. """ self.log.info("Getting the status of job run %s.", job_id) - response = await self.get_job_details(job_id=job_id) - if self.api_type == "config": - return str(response["job"]["status"]) - else: - return str(response["status"]) + response = self.get_job_details(job_id=job_id) + return response.status def wait_for_job(self, job_id: str | int, wait_seconds: float = 3, timeout: float | None = 3600) -> None: """ @@ -155,69 +134,35 @@ def wait_for_job(self, job_id: str | int, wait_seconds: float = 3, timeout: floa raise AirflowException(f"Timeout: Airbyte job {job_id} is not ready after {timeout}s") time.sleep(wait_seconds) try: - job = self.get_job(job_id=(int(job_id))) - if self.api_type == "config": - state = job.json()["job"]["status"] - else: - state = job.json()["status"] + job = self.get_job_details(job_id=(int(job_id))) + state = job.status + except AirflowException as err: self.log.info("Retrying. Airbyte API returned server error when waiting for job: %s", err) continue - if state in (self.RUNNING, self.PENDING, self.INCOMPLETE): + if state in (JobStatusEnum.RUNNING, JobStatusEnum.PENDING, JobStatusEnum.INCOMPLETE): continue - if state == self.SUCCEEDED: + if state == JobStatusEnum.SUCCEEDED: break - if state == self.ERROR: + if state == JobStatusEnum.FAILED: raise AirflowException(f"Job failed:\n{job}") - elif state == self.CANCELLED: + elif state == JobStatusEnum.CANCELLED: raise AirflowException(f"Job was cancelled:\n{job}") else: raise AirflowException(f"Encountered unexpected state `{state}` for job_id `{job_id}`") def submit_sync_connection(self, connection_id: str) -> Any: - """ - Submit a job to a Airbyte server. - - :param connection_id: Required. The ConnectionId of the Airbyte Connection. - """ - if self.api_type == "config": - return self.run( - endpoint=f"api/{self.api_version}/connections/sync", - json={"connectionId": connection_id}, - headers={"accept": "application/json"}, - ) - else: - conn = self.get_connection(self.http_conn_id) - self.method = "POST" - return self.run( - endpoint=f"{self.api_version}/jobs", - headers={"accept": "application/json", "authorization": f"Bearer {conn.password}"}, - json={ - "jobType": "sync", - "connectionId": connection_id, - }, # TODO: add an option to pass jobType = reset - ) - - def get_job(self, job_id: int) -> Any: - """ - Get the resource representation for a job in Airbyte. - - :param job_id: Required. Id of the Airbyte job - """ - if self.api_type == "config": - return self.run( - endpoint=f"api/{self.api_version}/jobs/get", - json={"id": job_id}, - headers={"accept": "application/json"}, - ) - else: - self.method = "GET" - conn = self.get_connection(self.http_conn_id) - return self.run( - endpoint=f"{self.api_version}/jobs/{job_id}", - headers={"accept": "application/json", "authorization": f"Bearer {conn.password}"}, + try: + res = self.airbyte_api.jobs.create_job( + request=JobCreateRequest( + connection_id=connection_id, + job_type=JobTypeEnum.SYNC, + ) ) + return res.job_response + except Exception as e: + raise AirflowException(e) def cancel_job(self, job_id: int) -> Any: """ @@ -225,35 +170,23 @@ def cancel_job(self, job_id: int) -> Any: :param job_id: Required. Id of the Airbyte job """ - if self.api_type == "config": - return self.run( - endpoint=f"api/{self.api_version}/jobs/cancel", - json={"id": job_id}, - headers={"accept": "application/json"}, - ) - else: - self.method = "DELETE" - conn = self.get_connection(self.http_conn_id) - return self.run( - endpoint=f"{self.api_version}/jobs/{job_id}", - headers={"accept": "application/json", "authorization": f"Bearer {conn.password}"}, + try: + cancel_job_res = self.airbyte_api.jobs.cancel_job( + request=CancelJobRequest( + job_id=job_id, + ) ) + return cancel_job_res.job_response + except Exception as e: + raise AirflowException(e) def test_connection(self): """Tests the Airbyte connection by hitting the health API.""" - self.method = "GET" try: - res = self.run( - endpoint=f"api/{self.api_version}/health", - headers={"accept": "application/json"}, - extra_options={"check_response": False}, - ) - - if res.status_code == 200: + health_check = self.airbyte_api.health.get_health_check() + if health_check.status_code == 200: return True, "Connection successfully tested" else: - return False, res.text + return False, str(health_check.raw_response) except Exception as e: return False, str(e) - finally: - self.method = "POST" diff --git a/airflow/providers/airbyte/operators/airbyte.py b/airflow/providers/airbyte/operators/airbyte.py index 0dd82d468375..51764029dfd6 100644 --- a/airflow/providers/airbyte/operators/airbyte.py +++ b/airflow/providers/airbyte/operators/airbyte.py @@ -18,7 +18,9 @@ from __future__ import annotations import time -from typing import TYPE_CHECKING, Any, Literal, Sequence +from typing import TYPE_CHECKING, Any, Sequence + +from airbyte_api.models import JobStatusEnum from airflow.configuration import conf from airflow.exceptions import AirflowException @@ -46,7 +48,6 @@ class AirbyteTriggerSyncOperator(BaseOperator): waiting on them asynchronously using the AirbyteJobSensor. Defaults to False. :param deferrable: Run operator in the deferrable mode. :param api_version: Optional. Airbyte API version. Defaults to "v1". - :param api_type: Optional. The type of Airbyte API to use. Either "config" or "cloud". Defaults to "config". :param wait_seconds: Optional. Number of seconds between checks. Only used when ``asynchronous`` is False. Defaults to 3 seconds. :param timeout: Optional. The amount of time, in seconds, to wait for the request to complete. @@ -63,7 +64,6 @@ def __init__( asynchronous: bool = False, deferrable: bool = conf.getboolean("operators", "default_deferrable", fallback=False), api_version: str = "v1", - api_type: Literal["config", "cloud"] = "config", wait_seconds: float = 3, timeout: float = 3600, **kwargs, @@ -73,55 +73,47 @@ def __init__( self.connection_id = connection_id self.timeout = timeout self.api_version = api_version - self.api_type = api_type self.wait_seconds = wait_seconds self.asynchronous = asynchronous self.deferrable = deferrable def execute(self, context: Context) -> None: """Create Airbyte Job and wait to finish.""" - hook = AirbyteHook( - airbyte_conn_id=self.airbyte_conn_id, api_version=self.api_version, api_type=self.api_type - ) + hook = AirbyteHook(airbyte_conn_id=self.airbyte_conn_id, api_version=self.api_version) job_object = hook.submit_sync_connection(connection_id=self.connection_id) - if self.api_type == "config": - self.job_id = job_object.json()["job"]["id"] - state = job_object.json()["job"]["status"] - else: - self.job_id = job_object.json()["jobId"] - state = job_object.json()["status"] + self.job_id = job_object.job_id + state = job_object.status end_time = time.time() + self.timeout self.log.info("Job %s was submitted to Airbyte Server", self.job_id) - if not self.asynchronous: - self.log.info("Waiting for job %s to complete", self.job_id) - if self.deferrable: - if state in (hook.RUNNING, hook.PENDING, hook.INCOMPLETE): - self.defer( - timeout=self.execution_timeout, - trigger=AirbyteSyncTrigger( - conn_id=self.airbyte_conn_id, - api_type=self.api_type, - job_id=self.job_id, - end_time=end_time, - poll_interval=60, - ), - method_name="execute_complete", - ) - elif state == hook.SUCCEEDED: - self.log.info("Job %s completed successfully", self.job_id) - return - elif state == hook.ERROR: - raise AirflowException(f"Job failed:\n{self.job_id}") - elif state == hook.CANCELLED: - raise AirflowException(f"Job was cancelled:\n{self.job_id}") - else: - raise AirflowException( - f"Encountered unexpected state `{state}` for job_id `{self.job_id}" - ) + + if self.asynchronous: + self.log.info("Async Task returning job_id %s", self.job_id) + return self.job_id + + if not self.deferrable: + hook.wait_for_job(job_id=self.job_id, wait_seconds=self.wait_seconds, timeout=self.timeout) + else: + if state in (JobStatusEnum.RUNNING, JobStatusEnum.PENDING, JobStatusEnum.INCOMPLETE): + self.defer( + timeout=self.execution_timeout, + trigger=AirbyteSyncTrigger( + conn_id=self.airbyte_conn_id, + job_id=self.job_id, + end_time=end_time, + poll_interval=60, + ), + method_name="execute_complete", + ) + elif state == JobStatusEnum.SUCCEEDED: + self.log.info("Job %s completed successfully", self.job_id) + return + elif state == JobStatusEnum.FAILED: + raise AirflowException(f"Job failed:\n{self.job_id}") + elif state == JobStatusEnum.CANCELLED: + raise AirflowException(f"Job was cancelled:\n{self.job_id}") else: - hook.wait_for_job(job_id=self.job_id, wait_seconds=self.wait_seconds, timeout=self.timeout) - self.log.info("Job %s completed successfully", self.job_id) + raise AirflowException(f"Encountered unexpected state `{state}` for job_id `{self.job_id}") return self.job_id diff --git a/airflow/providers/airbyte/provider.yaml b/airflow/providers/airbyte/provider.yaml index d799066fe25b..ce1e84cbb465 100644 --- a/airflow/providers/airbyte/provider.yaml +++ b/airflow/providers/airbyte/provider.yaml @@ -25,6 +25,7 @@ state: ready source-date-epoch: 1723968864 # note that those versions are maintained by release manager - do not update them manually versions: + - 4.0.0 - 3.9.0 - 3.8.1 - 3.8.0 @@ -50,7 +51,7 @@ versions: dependencies: - apache-airflow>=2.8.0 - - apache-airflow-providers-http + - airbyte-api>=0.51.0 integrations: - integration-name: Airbyte diff --git a/airflow/providers/airbyte/sensors/airbyte.py b/airflow/providers/airbyte/sensors/airbyte.py index 36d772d53130..1dc5a805e936 100644 --- a/airflow/providers/airbyte/sensors/airbyte.py +++ b/airflow/providers/airbyte/sensors/airbyte.py @@ -21,7 +21,9 @@ import time import warnings -from typing import TYPE_CHECKING, Any, Literal, Sequence +from typing import TYPE_CHECKING, Any, Sequence + +from airbyte_api.models import JobStatusEnum from airflow.configuration import conf from airflow.exceptions import AirflowException, AirflowProviderDeprecationWarning @@ -42,7 +44,6 @@ class AirbyteJobSensor(BaseSensorOperator): :param deferrable: Run sensor in the deferrable mode. connection information for Airbyte. Defaults to "airbyte_default". :param api_version: Optional. Airbyte API version. Defaults to "v1". - :param api_type: Optional. The type of Airbyte API to use. Either "config" or "cloud". Defaults to "config". """ template_fields: Sequence[str] = ("airbyte_job_id",) @@ -55,7 +56,6 @@ def __init__( deferrable: bool = conf.getboolean("operators", "default_deferrable", fallback=False), airbyte_conn_id: str = "airbyte_default", api_version: str = "v1", - api_type: Literal["config", "cloud"] = "config", **kwargs, ) -> None: if deferrable: @@ -80,29 +80,21 @@ def __init__( self.airbyte_conn_id = airbyte_conn_id self.airbyte_job_id = airbyte_job_id self.api_version = api_version - self.api_type = api_type def poke(self, context: Context) -> bool: - hook = AirbyteHook( - airbyte_conn_id=self.airbyte_conn_id, api_version=self.api_version, api_type=self.api_type - ) - job = hook.get_job(job_id=self.airbyte_job_id) - if self.api_type == "config": - status = job.json()["job"]["status"] - else: - status = job.json()["status"] + hook = AirbyteHook(airbyte_conn_id=self.airbyte_conn_id, api_version=self.api_version) + job = hook.get_job_details(job_id=self.airbyte_job_id) + status = job.status - if status == hook.FAILED: + if status == JobStatusEnum.FAILED: message = f"Job failed: \n{job}" raise AirflowException(message) - elif status == hook.CANCELLED: + elif status == JobStatusEnum.CANCELLED: message = f"Job was cancelled: \n{job}" raise AirflowException(message) - elif status == hook.SUCCEEDED: + elif status == JobStatusEnum.SUCCEEDED: self.log.info("Job %s completed successfully.", self.airbyte_job_id) return True - elif status == hook.ERROR: - self.log.info("Job %s attempt has failed.", self.airbyte_job_id) self.log.info("Waiting for job %s to complete.", self.airbyte_job_id) return False @@ -112,23 +104,17 @@ def execute(self, context: Context) -> Any: if not self.deferrable: super().execute(context) else: - hook = AirbyteHook( - airbyte_conn_id=self.airbyte_conn_id, api_version=self.api_version, api_type=self.api_type - ) - job = hook.get_job(job_id=(int(self.airbyte_job_id))) - if self.api_type == "config": - state = job.json()["job"]["status"] - else: - state = job.json()["status"] + hook = AirbyteHook(airbyte_conn_id=self.airbyte_conn_id, api_version=self.api_version) + job = hook.get_job_details(job_id=(int(self.airbyte_job_id))) + state = job.status end_time = time.time() + self.timeout self.log.info("Airbyte Job Id: Job %s", self.airbyte_job_id) - if state in (hook.RUNNING, hook.PENDING, hook.INCOMPLETE): + if state in (JobStatusEnum.RUNNING, JobStatusEnum.PENDING, JobStatusEnum.INCOMPLETE): self.defer( timeout=self.execution_timeout, trigger=AirbyteSyncTrigger( - api_type=self.api_type, conn_id=self.airbyte_conn_id, job_id=self.airbyte_job_id, end_time=end_time, @@ -136,12 +122,12 @@ def execute(self, context: Context) -> Any: ), method_name="execute_complete", ) - elif state == hook.SUCCEEDED: + elif state == JobStatusEnum.SUCCEEDED: self.log.info("%s completed successfully.", self.task_id) return - elif state == hook.ERROR: + elif state == JobStatusEnum.FAILED: raise AirflowException(f"Job failed:\n{job}") - elif state == hook.CANCELLED: + elif state == JobStatusEnum.CANCELLED: raise AirflowException(f"Job was cancelled:\n{job}") else: raise AirflowException( diff --git a/airflow/providers/airbyte/triggers/airbyte.py b/airflow/providers/airbyte/triggers/airbyte.py index 38581583eff8..1184b42d812e 100644 --- a/airflow/providers/airbyte/triggers/airbyte.py +++ b/airflow/providers/airbyte/triggers/airbyte.py @@ -18,7 +18,9 @@ import asyncio import time -from typing import Any, AsyncIterator, Literal +from typing import Any, AsyncIterator + +from airbyte_api.models import JobStatusEnum from airflow.providers.airbyte.hooks.airbyte import AirbyteHook from airflow.triggers.base import BaseTrigger, TriggerEvent @@ -32,7 +34,6 @@ class AirbyteSyncTrigger(BaseTrigger): makes use of asynchronous communication to check the progress of a job run over time. :param conn_id: The connection identifier for connecting to Airbyte. - :param api_type: The type of Airbyte API to use. Either "config" or "cloud". :param job_id: The ID of an Airbyte Sync job. :param end_time: Time in seconds to wait for a job run to reach a terminal status. Defaults to 7 days. :param poll_interval: polling period in seconds to check for the status. @@ -42,14 +43,12 @@ def __init__( self, job_id: int, conn_id: str, - api_type: Literal["config", "cloud"], end_time: float, poll_interval: float, ): super().__init__() self.job_id = job_id self.conn_id = conn_id - self.api_type = api_type self.end_time = end_time self.poll_interval = poll_interval @@ -60,7 +59,6 @@ def serialize(self) -> tuple[str, dict[str, Any]]: { "job_id": self.job_id, "conn_id": self.conn_id, - "api_type": self.api_type, "end_time": self.end_time, "poll_interval": self.poll_interval, }, @@ -68,7 +66,7 @@ def serialize(self) -> tuple[str, dict[str, Any]]: async def run(self) -> AsyncIterator[TriggerEvent]: """Make async connection to Airbyte, polls for the pipeline run status.""" - hook = AirbyteHook(airbyte_conn_id=self.conn_id, api_type=self.api_type) + hook = AirbyteHook(airbyte_conn_id=self.conn_id) try: while await self.is_still_running(hook): if self.end_time < time.time(): @@ -82,8 +80,8 @@ async def run(self) -> AsyncIterator[TriggerEvent]: ) return await asyncio.sleep(self.poll_interval) - job_run_status = await hook.get_job_status(self.job_id) - if job_run_status == hook.SUCCEEDED: + job_run_status = hook.get_job_status(self.job_id) + if job_run_status == JobStatusEnum.SUCCEEDED: yield TriggerEvent( { "status": "success", @@ -91,7 +89,7 @@ async def run(self) -> AsyncIterator[TriggerEvent]: "job_id": self.job_id, } ) - elif job_run_status == hook.CANCELLED: + elif job_run_status == JobStatusEnum.CANCELLED: yield TriggerEvent( { "status": "cancelled", @@ -116,7 +114,7 @@ async def is_still_running(self, hook: AirbyteHook) -> bool: If job is in running state returns True if it is still running else return False """ - job_run_status = await hook.get_job_status(self.job_id) - if job_run_status in (AirbyteHook.RUNNING, AirbyteHook.PENDING, AirbyteHook.INCOMPLETE): + job_run_status = hook.get_job_status(self.job_id) + if job_run_status in (JobStatusEnum.RUNNING, JobStatusEnum.PENDING, JobStatusEnum.INCOMPLETE): return True return False diff --git a/dev/breeze/tests/test_selective_checks.py b/dev/breeze/tests/test_selective_checks.py index 09327e18efdc..d9fc95ef0b1e 100644 --- a/dev/breeze/tests/test_selective_checks.py +++ b/dev/breeze/tests/test_selective_checks.py @@ -421,7 +421,7 @@ def assert_outputs_are_printed(expected_outputs: dict[str, str], stderr: str): "tests/providers/http/file.py", ), { - "affected-providers-list-as-string": "airbyte amazon apache.livy " + "affected-providers-list-as-string": "amazon apache.livy " "dbt.cloud dingding discord http", "all-python-versions": "['3.8']", "all-python-versions-list-as-string": "3.8", @@ -437,9 +437,9 @@ def assert_outputs_are_printed(expected_outputs: dict[str, str], stderr: str): "run-kubernetes-tests": "true", "upgrade-to-newer-dependencies": "false", "parallel-test-types-list-as-string": "Always " - "Providers[airbyte,apache.livy,dbt.cloud,dingding,discord,http] Providers[amazon]", - "providers-test-types-list-as-string": "Providers[airbyte,apache.livy,dbt.cloud,dingding,discord,http] Providers[amazon]", - "separate-test-types-list-as-string": "Always Providers[airbyte] Providers[amazon] " + "Providers[amazon] Providers[apache.livy,dbt.cloud,dingding,discord,http]", + "providers-test-types-list-as-string": "Providers[amazon] Providers[apache.livy,dbt.cloud,dingding,discord,http]", + "separate-test-types-list-as-string": "Always Providers[amazon] " "Providers[apache.livy] Providers[dbt.cloud] " "Providers[dingding] Providers[discord] Providers[http]", "needs-mypy": "true", @@ -457,7 +457,7 @@ def assert_outputs_are_printed(expected_outputs: dict[str, str], stderr: str): "tests/providers/airbyte/file.py", ), { - "affected-providers-list-as-string": "airbyte http", + "affected-providers-list-as-string": "airbyte", "all-python-versions": "['3.8']", "all-python-versions-list-as-string": "3.8", "python-versions": "['3.8']", @@ -471,12 +471,12 @@ def assert_outputs_are_printed(expected_outputs: dict[str, str], stderr: str): "skip-pre-commits": "identity,mypy-airflow,mypy-dev,mypy-docs,mypy-providers,ts-compile-format-lint-www", "run-kubernetes-tests": "true", "upgrade-to-newer-dependencies": "false", - "parallel-test-types-list-as-string": "Always Providers[airbyte,http]", - "providers-test-types-list-as-string": "Providers[airbyte,http]", + "parallel-test-types-list-as-string": "Always Providers[airbyte]", + "providers-test-types-list-as-string": "Providers[airbyte]", "needs-mypy": "true", "mypy-folders": "['providers']", }, - id="Helm tests, airbyte/http providers, kubernetes tests and " + id="Helm tests, airbyte providers, kubernetes tests and " "docs should run even if unimportant files were added", ) ), @@ -595,7 +595,7 @@ def assert_outputs_are_printed(expected_outputs: dict[str, str], stderr: str): pytest.param( ("tests/providers/airbyte/__init__.py",), { - "affected-providers-list-as-string": "airbyte http", + "affected-providers-list-as-string": "airbyte", "all-python-versions": "['3.8']", "all-python-versions-list-as-string": "3.8", "python-versions": "['3.8']", @@ -609,7 +609,7 @@ def assert_outputs_are_printed(expected_outputs: dict[str, str], stderr: str): "skip-pre-commits": "identity,lint-helm-chart,mypy-airflow,mypy-dev,mypy-docs,mypy-providers,ts-compile-format-lint-www", "run-kubernetes-tests": "false", "upgrade-to-newer-dependencies": "false", - "parallel-test-types-list-as-string": "Always Providers[airbyte,http]", + "parallel-test-types-list-as-string": "Always Providers[airbyte]", "needs-mypy": "true", "mypy-folders": "['providers']", }, @@ -1532,9 +1532,9 @@ def test_no_commit_provided_trigger_full_build_for_any_event_type(github_event): "run-tests": "true", "docs-build": "true", "skip-pre-commits": "identity,mypy-airflow,mypy-dev,mypy-docs,mypy-providers", - "upgrade-to-newer-dependencies": "true" - if github_event in [GithubEvents.PUSH, GithubEvents.SCHEDULE] - else "false", + "upgrade-to-newer-dependencies": ( + "true" if github_event in [GithubEvents.PUSH, GithubEvents.SCHEDULE] else "false" + ), "parallel-test-types-list-as-string": ALL_CI_SELECTIVE_TEST_TYPES, "needs-mypy": "true", "mypy-folders": "['airflow', 'providers', 'docs', 'dev']", @@ -1636,14 +1636,14 @@ def test_upgrade_to_newer_dependencies( pytest.param( ("docs/apache-airflow-providers-airbyte/docs.rst",), { - "docs-list-as-string": "airbyte http", + "docs-list-as-string": "airbyte", }, id="Airbyte provider docs changed", ), pytest.param( ("docs/apache-airflow-providers-airbyte/docs.rst", "docs/apache-airflow/docs.rst"), { - "docs-list-as-string": "apache-airflow airbyte http", + "docs-list-as-string": "apache-airflow airbyte", }, id="Airbyte provider and airflow core docs changed", ), @@ -1654,7 +1654,7 @@ def test_upgrade_to_newer_dependencies( "docs/apache-airflow-providers/docs.rst", ), { - "docs-list-as-string": "apache-airflow apache-airflow-providers airbyte http", + "docs-list-as-string": "apache-airflow apache-airflow-providers airbyte", }, id="Airbyte provider and airflow core and common provider docs changed", ), diff --git a/docs/apache-airflow-providers-airbyte/connections.rst b/docs/apache-airflow-providers-airbyte/connections.rst index 28f6a3a74717..f0c14000aa61 100644 --- a/docs/apache-airflow-providers-airbyte/connections.rst +++ b/docs/apache-airflow-providers-airbyte/connections.rst @@ -19,26 +19,24 @@ Airbyte Connection ================== -The Airbyte connection type use the HTTP protocol. +The Airbyte connection type use the Airbyte API Python SDK to authenticate to the server. -Configuring the Connection - Config API ---------------------------------------- Host(required) - The host to connect to the Airbyte server. - -Port (required) - The port for the Airbyte server. - -Login (optional) - Specify the user name to connect. - -Password (optional) - Specify the password to connect. - -Configuring the Connection - Cloud API --------------------------------------- -Host(required) - The host to connect to the Airbyte Cloud. (Typically ``https://api.airbyte.com``) - -Password (required) - Cloud API Key obtained from https://portal.airbyte.com/ + The full qualified host domain to connect to the Airbyte server. + If you are using Airbyte Cloud: ``https://api.airbyte.com/v1/`` + If you are using Airbyte OSS: ``http://localhost:8000/api/public/v1/`` + Be aware: If you're changing the API path, you must update the value accordingly. + +Token URL (optional) + The prefix for URL used to create the access token. + If you are using Airbyte Cloud: ``v1/applications/token`` (default value) + If you are using Airbyte OSS: ``/api/public/v1/applications/token``` + Be aware: If you're changing the API path, you must update the value accordingly. + +Client ID (required) + The Client ID to connect to the Airbyte server. + You can find this information in the Settings / Applications page in Airbyte UI. + +Client Secret (required) + The Client Secret to connect to the Airbyte server. + You can find this information in the Settings / Applications page in Airbyte UI. diff --git a/docs/apache-airflow-providers-airbyte/operators/airbyte.rst b/docs/apache-airflow-providers-airbyte/operators/airbyte.rst index 55eb1101633e..d07301dc42b8 100644 --- a/docs/apache-airflow-providers-airbyte/operators/airbyte.rst +++ b/docs/apache-airflow-providers-airbyte/operators/airbyte.rst @@ -38,10 +38,8 @@ create in Airbyte between a source and destination synchronization job. Use the ``airbyte_conn_id`` parameter to specify the Airbyte connection to use to connect to your account. -Airbyte currently supports two different API's. The first one is the `Config API `_ -which is specifically used for Open Source Airbyte Instances. The second is the `Cloud API `_ -which is used for the Airbyte Cloud Service. If you are using Airbyte's Cloud service, -then you will need to specify ``api_type="cloud"`` as part of the Operator's parameters. +Airbyte offers a single method to authenticate for Cloud and OSS users. +You need to provide the ``client_id`` and ``client_secret`` to authenticate with the Airbyte server. You can trigger a synchronization job in Airflow in two ways with the Operator. The first one is a synchronous process. This Operator will initiate the Airbyte job, and the Operator manages the job status. Another way is to use the flag diff --git a/generated/provider_dependencies.json b/generated/provider_dependencies.json index 3d0e7841c528..ed4d60e75f63 100644 --- a/generated/provider_dependencies.json +++ b/generated/provider_dependencies.json @@ -1,14 +1,12 @@ { "airbyte": { "deps": [ - "apache-airflow-providers-http", + "airbyte-api>=0.51.0", "apache-airflow>=2.8.0" ], "devel-deps": [], "plugins": [], - "cross-providers-deps": [ - "http" - ], + "cross-providers-deps": [], "excluded-python-versions": [], "state": "ready" }, diff --git a/tests/providers/airbyte/hooks/test_airbyte.py b/tests/providers/airbyte/hooks/test_airbyte.py index e91227a3a090..2bc27f391d13 100644 --- a/tests/providers/airbyte/hooks/test_airbyte.py +++ b/tests/providers/airbyte/hooks/test_airbyte.py @@ -20,6 +20,8 @@ from unittest import mock import pytest +from airbyte_api.api import CancelJobRequest, GetJobRequest +from airbyte_api.models import JobResponse, JobStatusEnum, JobTypeEnum from airflow.exceptions import AirflowException from airflow.models import Connection @@ -30,6 +32,7 @@ pytestmark = [pytest.mark.db_test, pytest.mark.skip_if_database_isolation_mode] +@pytest.mark.db_test class TestAirbyteHook: """ Test all functions from Airbyte Hook @@ -50,137 +53,151 @@ class TestAirbyteHook: def setup_method(self): db.merge_conn( Connection( - conn_id="airbyte_conn_id_test", conn_type="airbyte", host="http://test-airbyte", port=8001 + conn_id="airbyte_conn_id_test", + conn_type="airbyte", + host="http://test-airbyte:8000/public/v1/api/", + port=8001, ) ) self.hook = AirbyteHook(airbyte_conn_id=self.airbyte_conn_id) def return_value_get_job(self, status): response = mock.Mock() - response.json.return_value = {"job": {"status": status}} + response.job_response = JobResponse( + connection_id="connection-mock", + job_id=self.job_id, + start_time="today", + job_type=JobTypeEnum.SYNC, + status=status, + ) return response - def test_submit_sync_connection(self, requests_mock): - requests_mock.post( - self.sync_connection_endpoint, status_code=200, json=self._mock_sync_conn_success_response_body - ) + @mock.patch("airbyte_api.jobs.Jobs.create_job") + def test_submit_sync_connection(self, create_job_mock): + mock_response = mock.Mock() + mock_response.job_response = self._mock_sync_conn_success_response_body + create_job_mock.return_value = mock_response + resp = self.hook.submit_sync_connection(connection_id=self.connection_id) - assert resp.status_code == 200 - assert resp.json() == self._mock_sync_conn_success_response_body - - @pytest.mark.asyncio - @pytest.mark.parametrize( - "host, port, schema, expected_base_url, description", - [ - ("test-airbyte", 8001, "http", "http://test-airbyte:8001", "uri_with_port_and_schema"), - ("test-airbyte", None, "https", "https://test-airbyte", "uri_with_schema"), - ("test-airbyte", None, None, "http://test-airbyte", "uri_without_port_and_schema"), - ], - ) - async def test_get_base_url(self, host, port, schema, expected_base_url, description): - conn_id = f"test_conn_{description}" - conn = Connection(conn_id=conn_id, conn_type="airbyte", host=host, port=port, schema=schema) - hook = AirbyteHook(airbyte_conn_id=conn_id) - db.merge_conn(conn) - _, base_url = await hook.get_headers_tenants_from_connection() - assert base_url == expected_base_url - - def test_get_job_status(self, requests_mock): - requests_mock.post( - self.get_job_endpoint, status_code=200, json=self._mock_job_status_success_response_body + assert resp == self._mock_sync_conn_success_response_body + + @mock.patch("airbyte_api.jobs.Jobs.get_job") + def test_get_job_status(self, get_job_mock): + mock_response = mock.AsyncMock() + mock_response.job_response = JobResponse( + connection_id="connection-mock", + job_id="1", + start_time="today", + job_type=JobTypeEnum.SYNC, + status=JobStatusEnum.RUNNING, ) - resp = self.hook.get_job(job_id=self.job_id) - assert resp.status_code == 200 - assert resp.json() == self._mock_job_status_success_response_body - - def test_cancel_job(self, requests_mock): - requests_mock.post( - self.cancel_job_endpoint, status_code=200, json=self._mock_job_status_success_response_body + get_job_mock.return_value = mock_response + resp = self.hook.get_job_status(job_id=self.job_id) + assert resp == JobStatusEnum.RUNNING + + @mock.patch("airbyte_api.jobs.Jobs.cancel_job") + def test_cancel_job(self, cancel_job_mock): + mock_response = mock.Mock() + mock_response.job_response = JobResponse( + connection_id="connection-mock", + job_id="1", + start_time="today", + job_type=JobTypeEnum.SYNC, + status=JobStatusEnum.CANCELLED, ) + cancel_job_mock.return_value = mock_response + resp = self.hook.cancel_job(job_id=self.job_id) - assert resp.status_code == 200 + assert resp.status == JobStatusEnum.CANCELLED - @mock.patch("airflow.providers.airbyte.hooks.airbyte.AirbyteHook.get_job") + @mock.patch("airbyte_api.jobs.Jobs.get_job") def test_wait_for_job_succeeded(self, mock_get_job): - mock_get_job.side_effect = [self.return_value_get_job(self.hook.SUCCEEDED)] + mock_get_job.side_effect = [self.return_value_get_job(JobStatusEnum.SUCCEEDED)] self.hook.wait_for_job(job_id=self.job_id, wait_seconds=0) - mock_get_job.assert_called_once_with(job_id=self.job_id) + mock_get_job.assert_called_once_with(request=GetJobRequest(self.job_id)) - @mock.patch("airflow.providers.airbyte.hooks.airbyte.AirbyteHook.get_job") + @mock.patch("airbyte_api.jobs.Jobs.get_job") def test_wait_for_job_error(self, mock_get_job): mock_get_job.side_effect = [ - self.return_value_get_job(self.hook.RUNNING), - self.return_value_get_job(self.hook.ERROR), + self.return_value_get_job(JobStatusEnum.RUNNING), + self.return_value_get_job(JobStatusEnum.FAILED), ] with pytest.raises(AirflowException, match="Job failed"): self.hook.wait_for_job(job_id=self.job_id, wait_seconds=0) - calls = [mock.call(job_id=self.job_id), mock.call(job_id=self.job_id)] + calls = [mock.call(request=GetJobRequest(self.job_id)), mock.call(request=GetJobRequest(self.job_id))] mock_get_job.assert_has_calls(calls) - @mock.patch("airflow.providers.airbyte.hooks.airbyte.AirbyteHook.get_job") + @mock.patch("airbyte_api.jobs.Jobs.get_job") def test_wait_for_job_incomplete_succeeded(self, mock_get_job): mock_get_job.side_effect = [ - self.return_value_get_job(self.hook.INCOMPLETE), - self.return_value_get_job(self.hook.SUCCEEDED), + self.return_value_get_job(JobStatusEnum.INCOMPLETE), + self.return_value_get_job(JobStatusEnum.SUCCEEDED), ] self.hook.wait_for_job(job_id=self.job_id, wait_seconds=0) - calls = [mock.call(job_id=self.job_id), mock.call(job_id=self.job_id)] + calls = [mock.call(request=GetJobRequest(self.job_id)), mock.call(request=GetJobRequest(self.job_id))] mock_get_job.assert_has_calls(calls) - @mock.patch("airflow.providers.airbyte.hooks.airbyte.AirbyteHook.get_job") - @mock.patch("airflow.providers.airbyte.hooks.airbyte.AirbyteHook.cancel_job") + @mock.patch("airbyte_api.jobs.Jobs.get_job") + @mock.patch("airbyte_api.jobs.Jobs.cancel_job") def test_wait_for_job_timeout(self, mock_cancel_job, mock_get_job): mock_get_job.side_effect = [ - self.return_value_get_job(self.hook.PENDING), - self.return_value_get_job(self.hook.RUNNING), + self.return_value_get_job(JobStatusEnum.PENDING), + self.return_value_get_job(JobStatusEnum.RUNNING), ] with pytest.raises(AirflowException, match="Timeout"): self.hook.wait_for_job(job_id=self.job_id, wait_seconds=2, timeout=1) - calls = [mock.call(job_id=self.job_id)] - mock_get_job.assert_has_calls(calls) - mock_cancel_job.assert_has_calls(calls) - assert mock_get_job.mock_calls == calls - assert mock_cancel_job.mock_calls == calls + get_calls = [ + mock.call(request=GetJobRequest(self.job_id)), + ] + cancel_calls = [mock.call(request=CancelJobRequest(self.job_id))] + mock_get_job.assert_has_calls(get_calls) + mock_cancel_job.assert_has_calls(cancel_calls) + assert mock_get_job.mock_calls == get_calls + assert mock_cancel_job.mock_calls == cancel_calls - @mock.patch("airflow.providers.airbyte.hooks.airbyte.AirbyteHook.get_job") + @mock.patch("airbyte_api.jobs.Jobs.get_job") def test_wait_for_job_state_unrecognized(self, mock_get_job): mock_get_job.side_effect = [ - self.return_value_get_job(self.hook.RUNNING), + self.return_value_get_job(JobStatusEnum.RUNNING), self.return_value_get_job("UNRECOGNIZED"), ] with pytest.raises(AirflowException, match="unexpected state"): self.hook.wait_for_job(job_id=self.job_id, wait_seconds=0) - calls = [mock.call(job_id=self.job_id), mock.call(job_id=self.job_id)] + calls = [mock.call(request=GetJobRequest(self.job_id)), mock.call(request=GetJobRequest(self.job_id))] mock_get_job.assert_has_calls(calls) - @mock.patch("airflow.providers.airbyte.hooks.airbyte.AirbyteHook.get_job") + @mock.patch("airbyte_api.jobs.Jobs.get_job") def test_wait_for_job_cancelled(self, mock_get_job): mock_get_job.side_effect = [ - self.return_value_get_job(self.hook.RUNNING), - self.return_value_get_job(self.hook.CANCELLED), + self.return_value_get_job(JobStatusEnum.RUNNING), + self.return_value_get_job(JobStatusEnum.CANCELLED), ] with pytest.raises(AirflowException, match="Job was cancelled"): self.hook.wait_for_job(job_id=self.job_id, wait_seconds=0) - calls = [mock.call(job_id=self.job_id), mock.call(job_id=self.job_id)] + calls = [mock.call(request=GetJobRequest(self.job_id)), mock.call(request=GetJobRequest(self.job_id))] mock_get_job.assert_has_calls(calls) - def test_connection_success(self, requests_mock): - requests_mock.get( - self.health_endpoint, - status_code=200, - ) + @mock.patch("airbyte_api.health.Health.get_health_check") + def test_connection_success(self, mock_get_health_check): + mock_response = mock.Mock() + mock_response.status_code = 200 + mock_get_health_check.return_value = mock_response status, msg = self.hook.test_connection() assert status is True assert msg == "Connection successfully tested" - def test_connection_failure(self, requests_mock): - requests_mock.get(self.health_endpoint, status_code=500, json={"message": "internal server error"}) + @mock.patch("airbyte_api.health.Health.get_health_check") + def test_connection_failure(self, mock_get_health_check): + mock_response = mock.Mock() + mock_response.status_code = 502 + mock_response.raw_response = '{"message": "internal server error"}' + mock_get_health_check.return_value = mock_response status, msg = self.hook.test_connection() assert status is False diff --git a/tests/providers/airbyte/operators/test_airbyte.py b/tests/providers/airbyte/operators/test_airbyte.py index 2c0085f53db3..ffbddcd7e5bf 100644 --- a/tests/providers/airbyte/operators/test_airbyte.py +++ b/tests/providers/airbyte/operators/test_airbyte.py @@ -19,9 +19,15 @@ from unittest import mock +import pytest +from airbyte_api.models import JobCreateRequest, JobResponse, JobStatusEnum, JobTypeEnum + +from airflow.models import Connection from airflow.providers.airbyte.operators.airbyte import AirbyteTriggerSyncOperator +from airflow.utils import db +@pytest.mark.db_test class TestAirbyteTriggerSyncOp: """ Test execute function from Airbyte Operator @@ -33,12 +39,20 @@ class TestAirbyteTriggerSyncOp: wait_seconds = 0 timeout = 360 - @mock.patch("airflow.providers.airbyte.hooks.airbyte.AirbyteHook.submit_sync_connection") + @mock.patch("airbyte_api.jobs.Jobs.create_job") @mock.patch("airflow.providers.airbyte.hooks.airbyte.AirbyteHook.wait_for_job", return_value=None) def test_execute(self, mock_wait_for_job, mock_submit_sync_connection): - mock_submit_sync_connection.return_value = mock.Mock( - **{"json.return_value": {"job": {"id": self.job_id, "status": "running"}}} + conn = Connection(conn_id=self.airbyte_conn_id, conn_type="airbyte", host="airbyte.com") + db.merge_conn(conn) + mock_response = mock.Mock() + mock_response.job_response = JobResponse( + connection_id="connection-mock", + job_id=1, + start_time="today", + job_type=JobTypeEnum.SYNC, + status=JobStatusEnum.RUNNING, ) + mock_submit_sync_connection.return_value = mock_response op = AirbyteTriggerSyncOperator( task_id="test_Airbyte_op", @@ -49,7 +63,9 @@ def test_execute(self, mock_wait_for_job, mock_submit_sync_connection): ) op.execute({}) - mock_submit_sync_connection.assert_called_once_with(connection_id=self.connection_id) + mock_submit_sync_connection.assert_called_once_with( + request=JobCreateRequest(connection_id=self.connection_id, job_type=JobTypeEnum.SYNC) + ) mock_wait_for_job.assert_called_once_with( job_id=self.job_id, wait_seconds=self.wait_seconds, timeout=self.timeout ) diff --git a/tests/providers/airbyte/sensors/test_airbyte.py b/tests/providers/airbyte/sensors/test_airbyte.py index 80a7151a6f09..96a06ab0c6bb 100644 --- a/tests/providers/airbyte/sensors/test_airbyte.py +++ b/tests/providers/airbyte/sensors/test_airbyte.py @@ -19,11 +19,16 @@ from unittest import mock import pytest +from airbyte_api.api import GetJobRequest +from airbyte_api.models import JobResponse, JobStatusEnum, JobTypeEnum from airflow.exceptions import AirflowException +from airflow.models import Connection from airflow.providers.airbyte.sensors.airbyte import AirbyteJobSensor +from airflow.utils import db +@pytest.mark.db_test class TestAirbyteJobSensor: task_id = "task-id" airbyte_conn_id = "airbyte-conn-test" @@ -32,12 +37,23 @@ class TestAirbyteJobSensor: def get_job(self, status): response = mock.Mock() - response.json.return_value = {"job": {"status": status}} + response.job_response = JobResponse( + connection_id="connection-mock", + job_id=self.job_id, + start_time="today", + job_type=JobTypeEnum.SYNC, + status=status, + ) return response - @mock.patch("airflow.providers.airbyte.hooks.airbyte.AirbyteHook.get_job") + def setup_method(self): + db.merge_conn( + Connection(conn_id=self.airbyte_conn_id, conn_type="airbyte", host="http://test-airbyte") + ) + + @mock.patch("airbyte_api.jobs.Jobs.get_job") def test_done(self, mock_get_job): - mock_get_job.return_value = self.get_job("succeeded") + mock_get_job.return_value = self.get_job(JobStatusEnum.SUCCEEDED) sensor = AirbyteJobSensor( task_id=self.task_id, @@ -45,12 +61,12 @@ def test_done(self, mock_get_job): airbyte_conn_id=self.airbyte_conn_id, ) ret = sensor.poke(context={}) - mock_get_job.assert_called_once_with(job_id=self.job_id) + mock_get_job.assert_called_once_with(request=GetJobRequest(job_id=self.job_id)) assert ret - @mock.patch("airflow.providers.airbyte.hooks.airbyte.AirbyteHook.get_job") + @mock.patch("airbyte_api.jobs.Jobs.get_job") def test_failed(self, mock_get_job): - mock_get_job.return_value = self.get_job("failed") + mock_get_job.return_value = self.get_job(JobStatusEnum.FAILED) sensor = AirbyteJobSensor( task_id=self.task_id, @@ -60,11 +76,11 @@ def test_failed(self, mock_get_job): with pytest.raises(AirflowException, match="Job failed"): sensor.poke(context={}) - mock_get_job.assert_called_once_with(job_id=self.job_id) + mock_get_job.assert_called_once_with(request=GetJobRequest(job_id=self.job_id)) - @mock.patch("airflow.providers.airbyte.hooks.airbyte.AirbyteHook.get_job") + @mock.patch("airbyte_api.jobs.Jobs.get_job") def test_running(self, mock_get_job): - mock_get_job.return_value = self.get_job("running") + mock_get_job.return_value = self.get_job(JobStatusEnum.RUNNING) sensor = AirbyteJobSensor( task_id=self.task_id, @@ -73,13 +89,13 @@ def test_running(self, mock_get_job): ) ret = sensor.poke(context={}) - mock_get_job.assert_called_once_with(job_id=self.job_id) + mock_get_job.assert_called_once_with(request=GetJobRequest(job_id=self.job_id)) assert not ret - @mock.patch("airflow.providers.airbyte.hooks.airbyte.AirbyteHook.get_job") + @mock.patch("airbyte_api.jobs.Jobs.get_job") def test_cancelled(self, mock_get_job): - mock_get_job.return_value = self.get_job("cancelled") + mock_get_job.return_value = self.get_job(JobStatusEnum.CANCELLED) sensor = AirbyteJobSensor( task_id=self.task_id, @@ -89,4 +105,4 @@ def test_cancelled(self, mock_get_job): with pytest.raises(AirflowException, match="Job was cancelled"): sensor.poke(context={}) - mock_get_job.assert_called_once_with(job_id=self.job_id) + mock_get_job.assert_called_once_with(request=GetJobRequest(job_id=self.job_id)) diff --git a/tests/providers/airbyte/triggers/test_airbyte.py b/tests/providers/airbyte/triggers/test_airbyte.py index 4de34f50536d..1fcbd68459cc 100644 --- a/tests/providers/airbyte/triggers/test_airbyte.py +++ b/tests/providers/airbyte/triggers/test_airbyte.py @@ -21,25 +21,30 @@ from unittest import mock import pytest +from airbyte_api.models import JobStatusEnum +from airflow.models import Connection from airflow.providers.airbyte.hooks.airbyte import AirbyteHook from airflow.providers.airbyte.triggers.airbyte import AirbyteSyncTrigger from airflow.triggers.base import TriggerEvent +from airflow.utils import db +@pytest.mark.db_test class TestAirbyteSyncTrigger: DAG_ID = "airbyte_sync_run" TASK_ID = "airbyte_sync_run_task_op" JOB_ID = 1234 - API_TYPE = "config" CONN_ID = "airbyte_default" END_TIME = time.time() + 60 * 60 * 24 * 7 POLL_INTERVAL = 3.0 + def setup_method(self): + db.merge_conn(Connection(conn_id=self.CONN_ID, conn_type="airbyte", host="http://test-airbyte")) + def test_serialization(self): """Assert TestAirbyteSyncTrigger correctly serializes its arguments and classpath.""" trigger = AirbyteSyncTrigger( - api_type=self.API_TYPE, conn_id=self.CONN_ID, poll_interval=self.POLL_INTERVAL, end_time=self.END_TIME, @@ -50,7 +55,6 @@ def test_serialization(self): assert kwargs == { "job_id": self.JOB_ID, "conn_id": self.CONN_ID, - "api_type": self.API_TYPE, "end_time": self.END_TIME, "poll_interval": self.POLL_INTERVAL, } @@ -61,7 +65,6 @@ async def test_airbyte_run_sync_trigger(self, mocked_is_still_running): """Test AirbyteSyncTrigger is triggered with mocked details and run successfully.""" mocked_is_still_running.return_value = True trigger = AirbyteSyncTrigger( - api_type=self.API_TYPE, conn_id=self.CONN_ID, poll_interval=self.POLL_INTERVAL, end_time=self.END_TIME, @@ -78,7 +81,7 @@ async def test_airbyte_run_sync_trigger(self, mocked_is_still_running): @pytest.mark.parametrize( "mock_value, mock_status, mock_message", [ - (AirbyteHook.SUCCEEDED, "success", "Job run 1234 has completed successfully."), + (JobStatusEnum.SUCCEEDED, "success", "Job run 1234 has completed successfully."), ], ) @mock.patch("airflow.providers.airbyte.triggers.airbyte.AirbyteSyncTrigger.is_still_running") @@ -90,7 +93,6 @@ async def test_airbyte_job_for_terminal_status_success( mocked_is_still_running.return_value = False mock_get_job_status.return_value = mock_value trigger = AirbyteSyncTrigger( - api_type=self.API_TYPE, conn_id=self.CONN_ID, poll_interval=self.POLL_INTERVAL, end_time=self.END_TIME, @@ -110,7 +112,7 @@ async def test_airbyte_job_for_terminal_status_success( @pytest.mark.parametrize( "mock_value, mock_status, mock_message", [ - (AirbyteHook.CANCELLED, "cancelled", "Job run 1234 has been cancelled."), + (JobStatusEnum.CANCELLED, "cancelled", "Job run 1234 has been cancelled."), ], ) @mock.patch("airflow.providers.airbyte.triggers.airbyte.AirbyteSyncTrigger.is_still_running") @@ -122,7 +124,6 @@ async def test_airbyte_job_for_terminal_status_cancelled( mocked_is_still_running.return_value = False mock_get_job_status.return_value = mock_value trigger = AirbyteSyncTrigger( - api_type=self.API_TYPE, conn_id=self.CONN_ID, poll_interval=self.POLL_INTERVAL, end_time=self.END_TIME, @@ -142,7 +143,7 @@ async def test_airbyte_job_for_terminal_status_cancelled( @pytest.mark.parametrize( "mock_value, mock_status, mock_message", [ - (AirbyteHook.ERROR, "error", "Job run 1234 has failed."), + (JobStatusEnum.FAILED, "error", "Job run 1234 has failed."), ], ) @mock.patch("airflow.providers.airbyte.triggers.airbyte.AirbyteSyncTrigger.is_still_running") @@ -154,7 +155,6 @@ async def test_airbyte_job_for_terminal_status_error( mocked_is_still_running.return_value = False mock_get_job_status.return_value = mock_value trigger = AirbyteSyncTrigger( - api_type=self.API_TYPE, conn_id=self.CONN_ID, poll_interval=self.POLL_INTERVAL, end_time=self.END_TIME, @@ -178,7 +178,6 @@ async def test_airbyte_job_exception(self, mock_get_job_status, mocked_is_still_ mocked_is_still_running.return_value = False mock_get_job_status.side_effect = Exception("Test exception") trigger = AirbyteSyncTrigger( - api_type=self.API_TYPE, conn_id=self.CONN_ID, poll_interval=self.POLL_INTERVAL, end_time=self.END_TIME, @@ -204,7 +203,6 @@ async def test_airbyte_job_timeout(self, mock_get_job_status, mocked_is_still_ru mock_get_job_status.side_effect = Exception("Test exception") end_time = time.time() trigger = AirbyteSyncTrigger( - api_type=self.API_TYPE, conn_id=self.CONN_ID, poll_interval=self.POLL_INTERVAL, end_time=end_time, @@ -226,7 +224,7 @@ async def test_airbyte_job_timeout(self, mock_get_job_status, mocked_is_still_ru @pytest.mark.parametrize( "mock_response, expected_status", [ - (AirbyteHook.SUCCEEDED, False), + (JobStatusEnum.SUCCEEDED, False), ], ) @mock.patch("airflow.providers.airbyte.hooks.airbyte.AirbyteHook.get_job_status") @@ -238,7 +236,6 @@ async def test_airbyte_job_is_still_running_success( hook = mock.AsyncMock(AirbyteHook) hook.get_job_status.return_value = mock_response trigger = AirbyteSyncTrigger( - api_type=self.API_TYPE, conn_id=self.CONN_ID, poll_interval=self.POLL_INTERVAL, end_time=self.END_TIME, @@ -251,7 +248,7 @@ async def test_airbyte_job_is_still_running_success( @pytest.mark.parametrize( "mock_response, expected_status", [ - (AirbyteHook.RUNNING, True), + (JobStatusEnum.RUNNING, True), ], ) @mock.patch("airflow.providers.airbyte.hooks.airbyte.AirbyteHook.get_job_status") @@ -263,7 +260,6 @@ async def test_airbyte_sync_run_is_still_running( airbyte_hook = mock.AsyncMock(AirbyteHook) airbyte_hook.get_job_status.return_value = mock_response trigger = AirbyteSyncTrigger( - api_type=self.API_TYPE, conn_id=self.CONN_ID, poll_interval=self.POLL_INTERVAL, end_time=self.END_TIME,