Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Provider Airbyte: breaking change, update provider to use Airbyte API Python SDK #41122

Merged
merged 28 commits into from
Aug 28, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
28 commits
Select commit Hold shift + click to select a range
c333f10
rename fields login=>client-id and password=>client-secret
marcosmarxm Jul 26, 2024
4c61282
replace HttpHook and use airbyte-api sdk
marcosmarxm Jul 26, 2024
e4ec7f4
fix unit tests for op, hook, sensor and triggers
marcosmarxm Jul 30, 2024
e54ba37
run precommit and fix files
marcosmarxm Jul 30, 2024
3879b25
fix breeze unit test selective checks
marcosmarxm Jul 30, 2024
c5a99c2
remove skipPolicy
marcosmarxm Jul 30, 2024
e9ff731
remove skipPolicy
marcosmarxm Jul 30, 2024
1aa3466
make get_job_status test async
marcosmarxm Jul 30, 2024
f25c2de
add async pytest
marcosmarxm Jul 30, 2024
cc015b7
remove async function
marcosmarxm Jul 31, 2024
93cf69c
make airbyte_api dependency >= 0.51.0 latest version
marcosmarxm Jul 31, 2024
eebf6be
regenerate provider_dependencies.json
marcosmarxm Jul 31, 2024
5a0d00f
remove http provider dependency and update changelog
marcosmarxm Jul 31, 2024
170d303
refactor execute op function
marcosmarxm Aug 10, 2024
32bdd47
Update airflow/providers/airbyte/CHANGELOG.rst
eladkal Aug 10, 2024
31aca8c
make to work with OSS and any generic FQDN addres
marcosmarxm Aug 16, 2024
14003c0
regenerate provider_dependencies.json
marcosmarxm Aug 19, 2024
39764c5
fix tests
marcosmarxm Aug 19, 2024
ef910e3
regenerate provider_dependencies.json
marcosmarxm Jul 31, 2024
5faf216
solve conflict changelog
marcosmarxm Aug 19, 2024
861700f
solve conflict dependencies
marcosmarxm Aug 19, 2024
f0ef5b0
fix conflicts and run-precommit
marcosmarxm Aug 19, 2024
3308a14
rename function get_conn
marcosmarxm Aug 20, 2024
8455e13
readd line in changelog
marcosmarxm Aug 20, 2024
b8d196e
add pytest db_test
marcosmarxm Aug 20, 2024
c99c81b
add db_Test to class level
marcosmarxm Aug 20, 2024
da70f29
add pytest to op test class
marcosmarxm Aug 21, 2024
b3962a3
fix sensor conflict
marcosmarxm Aug 26, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 15 additions & 0 deletions airflow/providers/airbyte/CHANGELOG.rst
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,21 @@
Changelog
---------

4.0.0
.....

Breaking changes
~~~~~~~~~~~~~~~~

.. note::
This version introduce a new way to handle the connection to Airbyte using ``client_id`` and ``client_secret`` instead of ``login`` and ``password``.
You can get them accessing the Airbyte UI and creating a new Application in the Settings page.

There is a large refactor to create a connection.
You must specify the Full Qualified Domain Name in the ``host`` parameter, eg: ``https://my.company:8000/airbyte/v1/``.
The ``token_url`` parameter is optional and it is used to create the access token, the default value is ``v1/applications/token`` used by Airbyte Cloud.
You must remove the ``api_type`` parameter from your DAG it isn't required anymore.

3.9.0
.....

Expand Down
241 changes: 87 additions & 154 deletions airflow/providers/airbyte/hooks/airbyte.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,126 +17,105 @@
# under the License.
from __future__ import annotations

import base64
import json
import time
from typing import TYPE_CHECKING, Any, Literal, TypeVar
from typing import Any, TypeVar

import aiohttp
from aiohttp import ClientResponseError
from asgiref.sync import sync_to_async
from airbyte_api import AirbyteAPI
from airbyte_api.api import CancelJobRequest, GetJobRequest
from airbyte_api.models import JobCreateRequest, JobStatusEnum, JobTypeEnum, SchemeClientCredentials, Security

from airflow.exceptions import AirflowException
from airflow.providers.http.hooks.http import HttpHook

if TYPE_CHECKING:
from airflow.models import Connection
from airflow.hooks.base import BaseHook

T = TypeVar("T", bound=Any)


class AirbyteHook(HttpHook):
class AirbyteHook(BaseHook):
"""
Hook for Airbyte API.

:param airbyte_conn_id: Optional. The name of the Airflow connection to get
connection information for Airbyte. Defaults to "airbyte_default".
:param api_version: Optional. Airbyte API version. Defaults to "v1".
:param api_type: Optional. The type of Airbyte API to use. Either "config" or "cloud". Defaults to "config".
"""

conn_name_attr = "airbyte_conn_id"
default_conn_name = "airbyte_default"
conn_type = "airbyte"
hook_name = "Airbyte"

RUNNING = "running"
SUCCEEDED = "succeeded"
CANCELLED = "cancelled"
PENDING = "pending"
FAILED = "failed"
ERROR = "error"
INCOMPLETE = "incomplete"
marcosmarxm marked this conversation as resolved.
Show resolved Hide resolved

def __init__(
self,
airbyte_conn_id: str = "airbyte_default",
api_version: str = "v1",
api_type: Literal["config", "cloud"] = "config",
) -> None:
super().__init__(http_conn_id=airbyte_conn_id)
super().__init__()
self.api_version: str = api_version
self.api_type: str = api_type

async def get_headers_tenants_from_connection(self) -> tuple[dict[str, Any], str]:
"""Get Headers, tenants from the connection details."""
connection: Connection = await sync_to_async(self.get_connection)(self.http_conn_id)
# schema defaults to HTTP
schema = connection.schema if connection.schema else "http"
base_url = f"{schema}://{connection.host}"

if connection.port:
base_url += f":{connection.port}"

if self.api_type == "config":
credentials = f"{connection.login}:{connection.password}"
credentials_base64 = base64.b64encode(credentials.encode("utf-8")).decode("utf-8")
authorized_headers = {
"accept": "application/json",
"content-type": "application/json",
"authorization": f"Basic {credentials_base64}",
}
else:
authorized_headers = {
"accept": "application/json",
"content-type": "application/json",
"authorization": f"Bearer {connection.password}",
}

return authorized_headers, base_url

async def get_job_details(self, job_id: int) -> Any:
self.airbyte_conn_id = airbyte_conn_id
self.conn = self.get_conn_params(self.airbyte_conn_id)
self.airbyte_api = self.create_api_session()

def get_conn_params(self, conn_id: str) -> Any:
conn = self.get_connection(conn_id)

conn_params: dict = {}
conn_params["host"] = conn.host
conn_params["client_id"] = conn.login
conn_params["client_secret"] = conn.password
conn_params["token_url"] = conn.schema or "v1/applications/token"

return conn_params

def create_api_session(self) -> AirbyteAPI:
"""Create Airbyte API session."""
credentials = SchemeClientCredentials(
client_id=self.conn["client_id"],
client_secret=self.conn["client_secret"],
TOKEN_URL=self.conn["token_url"],
)

return AirbyteAPI(
server_url=self.conn["host"],
security=Security(client_credentials=credentials),
)

@classmethod
def get_ui_field_behaviour(cls) -> dict[str, Any]:
"""Return custom field behaviour."""
return {
"hidden_fields": [
"extra",
"port",
],
"relabeling": {"login": "Client ID", "password": "Client Secret", "schema": "Token URL"},
"placeholders": {},
}

def get_job_details(self, job_id: int) -> Any:
"""
Use Http async call to retrieve metadata for a specific job of an Airbyte Sync.

:param job_id: The ID of an Airbyte Sync Job.
"""
headers, base_url = await self.get_headers_tenants_from_connection()
if self.api_type == "config":
url = f"{base_url}/api/{self.api_version}/jobs/get"
self.log.info("URL for api request: %s", url)
async with aiohttp.ClientSession(headers=headers) as session:
async with session.post(url=url, data=json.dumps({"id": job_id})) as response:
try:
response.raise_for_status()
return await response.json()
except ClientResponseError as e:
msg = f"{e.status}: {e.message} - {e.request_info}"
raise AirflowException(msg)
else:
url = f"{base_url}/{self.api_version}/jobs/{job_id}"
self.log.info("URL for api request: %s", url)
async with aiohttp.ClientSession(headers=headers) as session:
async with session.get(url=url) as response:
try:
response.raise_for_status()
return await response.json()
except ClientResponseError as e:
msg = f"{e.status}: {e.message} - {e.request_info}"
raise AirflowException(msg)
try:
get_job_res = self.airbyte_api.jobs.get_job(
request=GetJobRequest(
job_id=job_id,
)
)
return get_job_res.job_response
except Exception as e:
raise AirflowException(e)

async def get_job_status(self, job_id: int) -> str:
def get_job_status(self, job_id: int) -> str:
"""
Retrieve the status for a specific job of an Airbyte Sync.

:param job_id: The ID of an Airbyte Sync Job.
"""
self.log.info("Getting the status of job run %s.", job_id)
response = await self.get_job_details(job_id=job_id)
if self.api_type == "config":
return str(response["job"]["status"])
else:
return str(response["status"])
response = self.get_job_details(job_id=job_id)
return response.status

def wait_for_job(self, job_id: str | int, wait_seconds: float = 3, timeout: float | None = 3600) -> None:
"""
Expand All @@ -155,105 +134,59 @@ def wait_for_job(self, job_id: str | int, wait_seconds: float = 3, timeout: floa
raise AirflowException(f"Timeout: Airbyte job {job_id} is not ready after {timeout}s")
time.sleep(wait_seconds)
try:
job = self.get_job(job_id=(int(job_id)))
if self.api_type == "config":
state = job.json()["job"]["status"]
else:
state = job.json()["status"]
job = self.get_job_details(job_id=(int(job_id)))
state = job.status

except AirflowException as err:
self.log.info("Retrying. Airbyte API returned server error when waiting for job: %s", err)
continue

if state in (self.RUNNING, self.PENDING, self.INCOMPLETE):
if state in (JobStatusEnum.RUNNING, JobStatusEnum.PENDING, JobStatusEnum.INCOMPLETE):
continue
if state == self.SUCCEEDED:
if state == JobStatusEnum.SUCCEEDED:
break
if state == self.ERROR:
if state == JobStatusEnum.FAILED:
raise AirflowException(f"Job failed:\n{job}")
elif state == self.CANCELLED:
elif state == JobStatusEnum.CANCELLED:
raise AirflowException(f"Job was cancelled:\n{job}")
else:
raise AirflowException(f"Encountered unexpected state `{state}` for job_id `{job_id}`")

def submit_sync_connection(self, connection_id: str) -> Any:
"""
Submit a job to a Airbyte server.

:param connection_id: Required. The ConnectionId of the Airbyte Connection.
"""
if self.api_type == "config":
return self.run(
endpoint=f"api/{self.api_version}/connections/sync",
json={"connectionId": connection_id},
headers={"accept": "application/json"},
)
else:
conn = self.get_connection(self.http_conn_id)
self.method = "POST"
return self.run(
endpoint=f"{self.api_version}/jobs",
headers={"accept": "application/json", "authorization": f"Bearer {conn.password}"},
json={
"jobType": "sync",
"connectionId": connection_id,
}, # TODO: add an option to pass jobType = reset
)

def get_job(self, job_id: int) -> Any:
"""
Get the resource representation for a job in Airbyte.

:param job_id: Required. Id of the Airbyte job
"""
if self.api_type == "config":
return self.run(
endpoint=f"api/{self.api_version}/jobs/get",
json={"id": job_id},
headers={"accept": "application/json"},
)
else:
self.method = "GET"
conn = self.get_connection(self.http_conn_id)
return self.run(
endpoint=f"{self.api_version}/jobs/{job_id}",
headers={"accept": "application/json", "authorization": f"Bearer {conn.password}"},
try:
res = self.airbyte_api.jobs.create_job(
request=JobCreateRequest(
connection_id=connection_id,
job_type=JobTypeEnum.SYNC,
)
)
return res.job_response
except Exception as e:
raise AirflowException(e)

def cancel_job(self, job_id: int) -> Any:
"""
Cancel the job when task is cancelled.

:param job_id: Required. Id of the Airbyte job
"""
if self.api_type == "config":
return self.run(
endpoint=f"api/{self.api_version}/jobs/cancel",
json={"id": job_id},
headers={"accept": "application/json"},
)
else:
self.method = "DELETE"
conn = self.get_connection(self.http_conn_id)
return self.run(
endpoint=f"{self.api_version}/jobs/{job_id}",
headers={"accept": "application/json", "authorization": f"Bearer {conn.password}"},
try:
cancel_job_res = self.airbyte_api.jobs.cancel_job(
request=CancelJobRequest(
job_id=job_id,
)
)
return cancel_job_res.job_response
except Exception as e:
raise AirflowException(e)

def test_connection(self):
"""Tests the Airbyte connection by hitting the health API."""
self.method = "GET"
try:
res = self.run(
endpoint=f"api/{self.api_version}/health",
headers={"accept": "application/json"},
extra_options={"check_response": False},
)

if res.status_code == 200:
health_check = self.airbyte_api.health.get_health_check()
if health_check.status_code == 200:
return True, "Connection successfully tested"
else:
return False, res.text
return False, str(health_check.raw_response)
except Exception as e:
return False, str(e)
finally:
self.method = "POST"
Loading