Skip to content

Commit

Permalink
chore: iceberg spec (#48559)
Browse files Browse the repository at this point in the history
  • Loading branch information
subodh1810 authored Nov 19, 2024
1 parent cd4393c commit e610db7
Show file tree
Hide file tree
Showing 14 changed files with 287 additions and 37 deletions.
3 changes: 2 additions & 1 deletion airbyte-cdk/bulk/toolkits/load-iceberg-parquet/build.gradle
Original file line number Diff line number Diff line change
@@ -1,11 +1,12 @@
dependencies {
api project(':airbyte-cdk:bulk:toolkits:bulk-cdk-toolkit-load-object-storage')
api project(':airbyte-cdk:bulk:toolkits:bulk-cdk-toolkit-load-s3')

implementation project(':airbyte-cdk:bulk:core:bulk-cdk-core-base')
implementation project(':airbyte-cdk:bulk:core:bulk-cdk-core-load')
api 'org.apache.iceberg:iceberg-core:1.6.1'
api 'org.apache.iceberg:iceberg-api:1.6.1'
api 'org.apache.iceberg:iceberg-parquet:1.6.1'
api 'org.apache.iceberg:iceberg-nessie:1.6.1'

testFixturesImplementation testFixtures(project(":airbyte-cdk:bulk:core:bulk-cdk-core-load"))
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,86 @@
/*
* Copyright (c) 2024 Airbyte, Inc., all rights reserved.
*/

package io.airbyte.cdk.load.command.iceberg.parquet

import com.fasterxml.jackson.annotation.JsonProperty
import com.fasterxml.jackson.annotation.JsonPropertyDescription
import com.kjetland.jackson.jsonSchema.annotations.JsonSchemaInject
import com.kjetland.jackson.jsonSchema.annotations.JsonSchemaTitle

/**
* Interface defining specifications for connecting to a Nessie server. This includes details such
* as server URI, authentication tokens, and repository settings.
*/
interface NessieServerSpecifications {

/**
* The URI of the Nessie server.
*
* This field is required and specifies the base URL used to connect to the Nessie server.
* Example: `https://nessie-server.example.com`
*/
@get:JsonSchemaTitle("Nessie Server URI")
@get:JsonPropertyDescription(
"The URI of the Nessie server, required to establish a connection."
)
@get:JsonProperty("server_uri")
val serverUri: String

/**
* Access token for authenticating with the Nessie server.
*
* This field is optional and can be used for secure authentication. Example:
* `a012345678910ABCDEFGH/AbCdEfGhEXAMPLEKEY`
*/
@get:JsonSchemaTitle("Nessie Access Token")
@get:JsonPropertyDescription("Optional token for authenticating with the Nessie server.")
@get:JsonProperty("access_token")
@get:JsonSchemaInject(
json =
"""{
"examples": ["a012345678910ABCDEFGH/AbCdEfGhEXAMPLEKEY"],
"airbyte_secret": true
}""",
)
val accessToken: String?

/**
* The warehouse location for the Nessie server.
*
* Specifies the physical or logical location of the data warehouse managed by Nessie. Example:
* `s3://my-bucket/warehouse/`
*/
@get:JsonSchemaTitle("Nessie Warehouse Location")
@get:JsonPropertyDescription(
"The location of the data warehouse associated with the Nessie repository."
)
@get:JsonProperty("warehouse_location")
val warehouseLocation: String

/**
* The name of the main branch in the Nessie repository.
*
* Specifies the default or primary branch name in the Nessie repository. Example: `main`
*/
@get:JsonSchemaTitle("Nessie Main Branch Name")
@get:JsonPropertyDescription("The name of the main branch in the Nessie repository.")
@get:JsonProperty("main_branch_name")
val mainBranchName: String

fun toNessieServerConfiguration(): NessieServerConfiguration {
return NessieServerConfiguration(serverUri, accessToken, warehouseLocation, mainBranchName)
}
}

data class NessieServerConfiguration(
val serverUri: String,
val accessToken: String?,
val warehouseLocation: String,
val mainBranchName: String
)

interface NessieServerConfigurationProvider {
val nessieServerConfiguration: NessieServerConfiguration
}
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ data:
connectorSubtype: file
connectorType: destination
definitionId: 37a928c1-2d5c-431a-a97d-ae236bd1ea0c
dockerImageTag: 0.1.1
dockerImageTag: 0.1.2
dockerRepository: airbyte/destination-iceberg-v2
githubIssueLabel: destination-iceberg-v2
icon: s3.svg
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
* Copyright (c) 2024 Airbyte, Inc., all rights reserved.
*/

package io.airbyte.integrations.destination.iceberg_v2
package io.airbyte.integrations.destination.iceberg.v2

import io.airbyte.cdk.load.check.DestinationChecker
import javax.inject.Singleton
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
/*
* Copyright (c) 2024 Airbyte, Inc., all rights reserved.
*/

package io.airbyte.integrations.destination.iceberg.v2

import io.airbyte.cdk.load.command.DestinationConfiguration
import io.airbyte.cdk.load.command.DestinationConfigurationFactory
import io.airbyte.cdk.load.command.aws.AWSAccessKeyConfiguration
import io.airbyte.cdk.load.command.aws.AWSAccessKeyConfigurationProvider
import io.airbyte.cdk.load.command.iceberg.parquet.NessieServerConfiguration
import io.airbyte.cdk.load.command.iceberg.parquet.NessieServerConfigurationProvider
import io.airbyte.cdk.load.command.s3.S3BucketConfiguration
import io.airbyte.cdk.load.command.s3.S3BucketConfigurationProvider
import io.micronaut.context.annotation.Factory
import jakarta.inject.Singleton

data class IcebergV2Configuration(
override val awsAccessKeyConfiguration: AWSAccessKeyConfiguration,
override val nessieServerConfiguration: NessieServerConfiguration,
override val s3BucketConfiguration: S3BucketConfiguration
) :
DestinationConfiguration(),
AWSAccessKeyConfigurationProvider,
NessieServerConfigurationProvider,
S3BucketConfigurationProvider

@Singleton
class IcebergV2ConfigurationFactory :
DestinationConfigurationFactory<IcebergV2Specification, IcebergV2Configuration> {
override fun makeWithoutExceptionHandling(
pojo: IcebergV2Specification
): IcebergV2Configuration {
return IcebergV2Configuration(
awsAccessKeyConfiguration = pojo.toAWSAccessKeyConfiguration(),
s3BucketConfiguration = pojo.toS3BucketConfiguration(),
nessieServerConfiguration = pojo.toNessieServerConfiguration(),
)
}
}

@Suppress("UNCHECKED_CAST")
@Factory
class IcebergV2ConfigurationProvider(private val config: DestinationConfiguration) {
@Singleton
fun get(): IcebergV2Configuration {
return config as IcebergV2Configuration
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -6,13 +6,31 @@ package io.airbyte.integrations.destination.iceberg.v2

import com.kjetland.jackson.jsonSchema.annotations.JsonSchemaTitle
import io.airbyte.cdk.command.ConfigurationSpecification
import io.airbyte.cdk.load.command.aws.AWSAccessKeySpecification
import io.airbyte.cdk.load.command.iceberg.parquet.NessieServerSpecifications
import io.airbyte.cdk.load.command.s3.S3BucketRegion
import io.airbyte.cdk.load.command.s3.S3BucketSpecification
import io.airbyte.cdk.load.spec.DestinationSpecificationExtension
import io.airbyte.protocol.models.v0.DestinationSyncMode
import jakarta.inject.Singleton

@Singleton
@JsonSchemaTitle("Iceberg V2 Destination Spec")
class IcebergV2Specification : ConfigurationSpecification() {}
class IcebergV2Specification :
ConfigurationSpecification(),
AWSAccessKeySpecification,
S3BucketSpecification,
NessieServerSpecifications {
override val accessKeyId: String? = null
override val secretAccessKey: String? = null
override val s3BucketName: String = ""
override val s3BucketRegion: S3BucketRegion = S3BucketRegion.`us-west-1`
override val s3Endpoint: String? = null
override val serverUri: String = ""
override val accessToken: String? = null
override val warehouseLocation: String = ""
override val mainBranchName: String = ""
}

@Singleton
class IcebergV2SpecificationExtension : DestinationSpecificationExtension {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
* Copyright (c) 2024 Airbyte, Inc., all rights reserved.
*/

package io.airbyte.integrations.destination.iceberg_v2
package io.airbyte.integrations.destination.iceberg.v2

import io.airbyte.cdk.load.command.DestinationStream
import io.airbyte.cdk.load.message.Batch
Expand Down

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -6,12 +6,11 @@ package io.airbyte.integrations.destination.iceberg.v2

import io.airbyte.cdk.load.check.CheckIntegrationTest
import io.airbyte.cdk.load.check.CheckTestConfig
import java.nio.file.Path
import io.airbyte.integrations.destination.iceberg.v2.IcebergV2TestUtil.PATH

class IcebergV2CheckTest :
CheckIntegrationTest<IcebergV2Specification>(
successConfigFilenames =
listOf(CheckTestConfig(Path.of(IcebergV2TestUtil.SOME_RANDOM_S3_CONFIG))),
successConfigFilenames = listOf(CheckTestConfig(PATH)),
// TODO we maybe should add some configs that are expected to fail `check`
failConfigFilenamesAndFailureReasons = mapOf(),
)
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,10 @@ import java.nio.file.Path

object IcebergV2TestUtil {
// TODO this is just here as an example, we should remove it + add real configs
const val SOME_RANDOM_S3_CONFIG = "secrets/s3_dest_v2_minimal_required_config.json"
private val resource =
this::class.java.classLoader.getResource("iceberg_dest_v2_minimal_required_config.json")
?: throw IllegalArgumentException("File not found in resources")
val PATH: Path = Path.of(resource.toURI())

fun getConfig(configPath: String): String = Files.readString(Path.of(configPath))
}
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import io.airbyte.cdk.load.test.util.NoopDestinationCleaner
import io.airbyte.cdk.load.test.util.NoopExpectedRecordMapper
import io.airbyte.cdk.load.write.BasicFunctionalityIntegrationTest
import io.airbyte.cdk.load.write.StronglyTyped
import io.airbyte.integrations.destination.iceberg.v2.IcebergV2TestUtil.PATH
import org.junit.jupiter.api.Disabled

abstract class IcebergV2WriteTest(path: String) :
Expand All @@ -30,4 +31,4 @@ abstract class IcebergV2WriteTest(path: String) :

// TODO replace this with a real test class for an actual config
@Disabled("nowhere even close to functional")
class FakeIcebergWriteTest : IcebergV2WriteTest(IcebergV2TestUtil.SOME_RANDOM_S3_CONFIG)
class FakeIcebergWriteTest : IcebergV2WriteTest(PATH.toString())
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,62 @@
"title" : "Iceberg V2 Destination Spec",
"type" : "object",
"additionalProperties" : true,
"properties" : { }
"properties" : {
"access_key_id" : {
"type" : "string",
"description" : "The access key ID to access the S3 bucket. Airbyte requires Read and Write permissions to the given bucket. Read more <a href=\"https://docs.aws.amazon.com/general/latest/gr/aws-sec-cred-types.html#access-keys-and-secret-access-keys\">here</a>.",
"title" : "S3 Key ID",
"examples" : [ "A012345678910EXAMPLE" ]
},
"secret_access_key" : {
"type" : "string",
"description" : "The corresponding secret to the access key ID. Read more <a href=\"https://docs.aws.amazon.com/general/latest/gr/aws-sec-cred-types.html#access-keys-and-secret-access-keys\">here</a>",
"title" : "S3 Access Key",
"examples" : [ "a012345678910ABCDEFGH/AbCdEfGhEXAMPLEKEY" ]
},
"s3_bucket_name" : {
"type" : "string",
"description" : "The name of the S3 bucket. Read more <a href=\"https://docs.aws.amazon.com/AmazonS3/latest/userguide/create-bucket-overview.html\">here</a>.",
"title" : "S3 Bucket Name",
"examples" : [ "airbyte_sync" ]
},
"s3_bucket_region" : {
"type" : "string",
"enum" : [ "af-south-1", "ap-east-1", "ap-northeast-1", "ap-northeast-2", "ap-northeast-3", "ap-south-1", "ap-south-2", "ap-southeast-1", "ap-southeast-2", "ap-southeast-3", "ap-southeast-4", "ca-central-1", "ca-west-1", "cn-north-1", "cn-northwest-1", "eu-central-1", "eu-central-2", "eu-north-1", "eu-south-1", "eu-south-2", "eu-west-1", "eu-west-2", "eu-west-3", "il-central-1", "me-central-1", "me-south-1", "sa-east-1", "us-east-1", "us-east-2", "us-gov-east-1", "us-gov-west-1", "us-west-1", "us-west-2" ],
"description" : "The region of the S3 bucket. See <a href=\"https://docs.aws.amazon.com/AWSEC2/latest/UserGuide/using-regions-availability-zones.html#concepts-available-regions\">here</a> for all region codes.",
"title" : "S3 Bucket Region",
"examples" : [ "us-east-1" ]
},
"s3_endpoint" : {
"type" : "string",
"description" : "Your S3 endpoint url. Read more <a href=\"https://docs.aws.amazon.com/general/latest/gr/s3.html#:~:text=Service%20endpoints-,Amazon%20S3%20endpoints,-When%20you%20use\">here</a>",
"title" : "S3 Endpoint",
"examples" : [ "http://localhost:9000" ]
},
"server_uri" : {
"type" : "string",
"description" : "The URI of the Nessie server, required to establish a connection.",
"title" : "Nessie Server URI"
},
"access_token" : {
"type" : "string",
"description" : "Optional token for authenticating with the Nessie server.",
"title" : "Nessie Access Token",
"examples" : [ "a012345678910ABCDEFGH/AbCdEfGhEXAMPLEKEY" ],
"airbyte_secret" : true
},
"warehouse_location" : {
"type" : "string",
"description" : "The location of the data warehouse associated with the Nessie repository.",
"title" : "Nessie Warehouse Location"
},
"main_branch_name" : {
"type" : "string",
"description" : "The name of the main branch in the Nessie repository.",
"title" : "Nessie Main Branch Name"
}
},
"required" : [ "s3_bucket_name", "s3_bucket_region", "server_uri", "warehouse_location", "main_branch_name" ]
},
"supportsIncremental" : true,
"supportsNormalization" : false,
Expand Down
Loading

0 comments on commit e610db7

Please sign in to comment.