From 5c1af4c21215f8887133233a5978402019c3d3fa Mon Sep 17 00:00:00 2001 From: Dylan Date: Sun, 25 Aug 2024 18:36:35 +0800 Subject: [PATCH] feat(iceberg): use native glue impl for iceberg source (#18106) --- Cargo.lock | 152 +++++++++++++++++++------- Cargo.toml | 5 +- src/connector/Cargo.toml | 1 + src/connector/src/sink/iceberg/mod.rs | 49 ++++++++- 4 files changed, 159 insertions(+), 48 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 93f84a682617..5e8ba1952a80 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1295,13 +1295,14 @@ dependencies = [ [[package]] name = "aws-config" -version = "1.0.1" +version = "1.1.8" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "80c950a809d39bc9480207cb1cfc879ace88ea7e3a4392a8e9999e45d6e5692e" +checksum = "4f4084d18094aec9f79d509f4cb6ccf6b613c5037e32f32e74312e52b836e366" dependencies = [ "aws-credential-types", - "aws-http", "aws-runtime", + "aws-sdk-sso", + "aws-sdk-ssooidc", "aws-sdk-sts", "aws-smithy-async", "aws-smithy-http", @@ -1312,11 +1313,14 @@ dependencies = [ "aws-types", "bytes", "fastrand 2.0.1", + "hex", "http 0.2.9", "hyper 0.14.27", + "ring 0.17.5", "time", "tokio", "tracing", + "zeroize", ] [[package]] @@ -1333,9 +1337,9 @@ dependencies = [ [[package]] name = "aws-http" -version = "0.60.0" +version = "0.60.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "361c4310fdce94328cc2d1ca0c8a48c13f43009c61d3367585685a50ca8c66b6" +checksum = "30e4199d5d62ab09be6a64650c06cc5c4aa45806fed4c74bc4a5c8eaf039a6fa" dependencies = [ "aws-smithy-runtime-api", "aws-smithy-types", @@ -1393,12 +1397,11 @@ dependencies = [ [[package]] name = "aws-runtime" -version = "1.0.1" +version = "1.1.8" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "1ed7ef604a15fd0d4d9e43701295161ea6b504b63c44990ead352afea2bc15e9" +checksum = "b13dc54b4b49f8288532334bba8f87386a40571c47c37b1304979b556dc613c8" dependencies = [ "aws-credential-types", - "aws-http", "aws-sigv4", "aws-smithy-async", "aws-smithy-eventstream", @@ -1406,9 +1409,12 @@ dependencies = [ "aws-smithy-runtime-api", "aws-smithy-types", "aws-types", + "bytes", "fastrand 2.0.1", "http 0.2.9", + "http-body 0.4.5", "percent-encoding", + "pin-project-lite", "tracing", "uuid", ] @@ -1438,12 +1444,11 @@ dependencies = [ [[package]] name = "aws-sdk-glue" -version = "1.4.0" +version = "1.21.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "3b6c34f6f4b9e8f76274a9b309838d670b3bb69b4be6756394de54718aa2ca0a" +checksum = "144c2f3948ed1884256410282216e8d963bc91d3eb91e5525874e070ff1b1148" dependencies = [ "aws-credential-types", - "aws-http", "aws-runtime", "aws-smithy-async", "aws-smithy-http", @@ -1454,7 +1459,8 @@ dependencies = [ "aws-types", "bytes", "http 0.2.9", - "regex", + "once_cell", + "regex-lite", "tracing", ] @@ -1482,12 +1488,11 @@ dependencies = [ [[package]] name = "aws-sdk-s3" -version = "1.1.0" +version = "1.17.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "62d240c8c3e3663cd278e47702bbd2566203362d93b51d95575d7b2e0c265e99" +checksum = "93d35d39379445970fc3e4ddf7559fff2c32935ce0b279f9cb27080d6b7c6d94" dependencies = [ "aws-credential-types", - "aws-http", "aws-runtime", "aws-sigv4", "aws-smithy-async", @@ -1505,19 +1510,62 @@ dependencies = [ "http-body 0.4.5", "once_cell", "percent-encoding", - "regex", + "regex-lite", "tracing", "url", ] +[[package]] +name = "aws-sdk-sso" +version = "1.16.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6729c96a2bc5acdbc0d6f406415678c24de30a9999f33084a34e64fc415cc365" +dependencies = [ + "aws-credential-types", + "aws-runtime", + "aws-smithy-async", + "aws-smithy-http", + "aws-smithy-json", + "aws-smithy-runtime", + "aws-smithy-runtime-api", + "aws-smithy-types", + "aws-types", + "bytes", + "http 0.2.9", + "once_cell", + "regex-lite", + "tracing", +] + +[[package]] +name = "aws-sdk-ssooidc" +version = "1.16.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9ccdd38f35f089c16fe0641cda34f2d06e3ab7b99a884407bce350a9fa70b1a9" +dependencies = [ + "aws-credential-types", + "aws-runtime", + "aws-smithy-async", + "aws-smithy-http", + "aws-smithy-json", + "aws-smithy-runtime", + "aws-smithy-runtime-api", + "aws-smithy-types", + "aws-types", + "bytes", + "http 0.2.9", + "once_cell", + "regex-lite", + "tracing", +] + [[package]] name = "aws-sdk-sts" -version = "1.3.1" +version = "1.16.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "798c8d82203af9e15a8b406574e0b36da91dd6db533028b74676489a1bc8bc7d" +checksum = "396e8064892a3c08b25b60fe3abda7ff5afa74efee500572cae65122ba5afd0d" dependencies = [ "aws-credential-types", - "aws-http", "aws-runtime", "aws-smithy-async", "aws-smithy-http", @@ -1529,15 +1577,16 @@ dependencies = [ "aws-smithy-xml", "aws-types", "http 0.2.9", - "regex", + "once_cell", + "regex-lite", "tracing", ] [[package]] name = "aws-sigv4" -version = "1.1.1" +version = "1.2.3" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "d222297ca90209dc62245f0a490355795f29de362eb5c19caea4f7f55fe69078" +checksum = "5df1b0fa6be58efe9d4ccc257df0a53b89cd8909e86591a13ca54817c87517be" dependencies = [ "aws-credential-types", "aws-smithy-eventstream", @@ -1550,6 +1599,7 @@ dependencies = [ "hex", "hmac", "http 0.2.9", + "http 1.1.0", "once_cell", "p256 0.11.1", "percent-encoding", @@ -1574,9 +1624,9 @@ dependencies = [ [[package]] name = "aws-smithy-checksums" -version = "0.60.0" +version = "0.60.12" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "c5a373ec01aede3dd066ec018c1bc4e8f5dd11b2c11c59c8eef1a5c68101f397" +checksum = "598b1689d001c4d4dc3cb386adb07d37786783aee3ac4b324bcadac116bf3d23" dependencies = [ "aws-smithy-http", "aws-smithy-types", @@ -1595,9 +1645,9 @@ dependencies = [ [[package]] name = "aws-smithy-eventstream" -version = "0.60.3" +version = "0.60.4" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "682371561562d08ab437766903c6bc28f4f95d7ab2ecfb389bda7849dd98aefe" +checksum = "e6363078f927f612b970edf9d1903ef5cef9a64d1e8423525ebb1f0a1633c858" dependencies = [ "aws-smithy-types", "bytes", @@ -1606,9 +1656,9 @@ dependencies = [ [[package]] name = "aws-smithy-http" -version = "0.60.3" +version = "0.60.9" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "365ca49744b2bda2f1e2dc03b856da3fa5a28ca5b0a41e41d7ff5305a8fae190" +checksum = "d9cd0ae3d97daa0a2bf377a4d8e8e1362cae590c4a1aad0d40058ebca18eb91e" dependencies = [ "aws-smithy-eventstream", "aws-smithy-runtime-api", @@ -1627,18 +1677,18 @@ dependencies = [ [[package]] name = "aws-smithy-json" -version = "0.60.0" +version = "0.60.7" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "6a46dd338dc9576d6a6a5b5a19bd678dcad018ececee11cf28ecd7588bd1a55c" +checksum = "4683df9469ef09468dad3473d129960119a0d3593617542b7d52086c8486f2d6" dependencies = [ "aws-smithy-types", ] [[package]] name = "aws-smithy-query" -version = "0.60.0" +version = "0.60.7" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "feb5b8c7a86d4b6399169670723b7e6f21a39fc833a30f5c5a2f997608178129" +checksum = "f2fbd61ceb3fe8a1cb7352e42689cec5335833cd9f94103a61e98f9bb61c64bb" dependencies = [ "aws-smithy-types", "urlencoding", @@ -1646,9 +1696,9 @@ dependencies = [ [[package]] name = "aws-smithy-runtime" -version = "1.1.3" +version = "1.1.8" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "6ab9cb6fee50680af8ceaa293ae79eba32095ca117161cb323f9ee30dd87d139" +checksum = "ec81002d883e5a7fd2bb063d6fb51c4999eb55d404f4fff3dd878bf4733b9f01" dependencies = [ "aws-smithy-async", "aws-smithy-http", @@ -1722,24 +1772,23 @@ dependencies = [ [[package]] name = "aws-smithy-xml" -version = "0.60.0" +version = "0.60.8" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "0ec40d74a67fd395bc3f6b4ccbdf1543672622d905ef3f979689aea5b730cb95" +checksum = "d123fbc2a4adc3c301652ba8e149bf4bc1d1725affb9784eb20c953ace06bf55" dependencies = [ "xmlparser", ] [[package]] name = "aws-types" -version = "1.3.0" +version = "1.3.3" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "02fa328e19c849b20ef7ada4c9b581dd12351ff35ecc7642d06e69de4f98407c" +checksum = "5221b91b3e441e6675310829fd8984801b772cb1546ef6c0e54dec9f1ac13fef" dependencies = [ "aws-credential-types", "aws-smithy-async", "aws-smithy-runtime-api", "aws-smithy-types", - "http 0.2.9", "rustc_version 0.4.0", "tracing", ] @@ -5955,7 +6004,8 @@ dependencies = [ [[package]] name = "iceberg" version = "0.3.0" -source = "git+https://github.com/apache/iceberg-rust.git?rev=4440af69a354d9af56f239a6126a7f4b7945d58b#4440af69a354d9af56f239a6126a7f4b7945d58b" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "651dfca7c429918e164607a549287cfdd1e7814d2e4cb577d0d6dc57fe19b785" dependencies = [ "anyhow", "apache-avro 0.17.0 (registry+https://github.com/rust-lang/crates.io-index)", @@ -5994,10 +6044,29 @@ dependencies = [ "uuid", ] +[[package]] +name = "iceberg-catalog-glue" +version = "0.3.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "64ef7c992442a80c46975e08f3862140ca3e1c1c772aa68baaf65bb08f97ff07" +dependencies = [ + "anyhow", + "async-trait", + "aws-config", + "aws-sdk-glue", + "iceberg", + "log", + "serde_json", + "tokio", + "typed-builder 0.19.1", + "uuid", +] + [[package]] name = "iceberg-catalog-rest" version = "0.3.0" -source = "git+https://github.com/apache/iceberg-rust.git?rev=4440af69a354d9af56f239a6126a7f4b7945d58b#4440af69a354d9af56f239a6126a7f4b7945d58b" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f351c7b964fa6f3b4f976f8de3f16f1bf84eea8478606aaebdfd6a871d6b082c" dependencies = [ "async-trait", "chrono", @@ -10551,6 +10620,7 @@ dependencies = [ "google-cloud-pubsub", "http 0.2.9", "iceberg", + "iceberg-catalog-glue", "iceberg-catalog-rest", "icelake", "indexmap 2.2.6", diff --git a/Cargo.toml b/Cargo.toml index d7622bee9bb5..a5da9b82b658 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -142,8 +142,9 @@ arrow-array-iceberg = { package = "arrow-array", version = "52" } arrow-schema-iceberg = { package = "arrow-schema", version = "52" } arrow-buffer-iceberg = { package = "arrow-buffer", version = "52" } arrow-cast-iceberg = { package = "arrow-cast", version = "52" } -iceberg = { git = "https://github.com/apache/iceberg-rust.git", rev = "4440af69a354d9af56f239a6126a7f4b7945d58b" } -iceberg-catalog-rest = { git = "https://github.com/apache/iceberg-rust.git", rev = "4440af69a354d9af56f239a6126a7f4b7945d58b" } +iceberg = "0.3.0" +iceberg-catalog-rest = "0.3.0" +iceberg-catalog-glue = "0.3.0" opendal = "0.47" arrow-array = "50" arrow-arith = "50" diff --git a/src/connector/Cargo.toml b/src/connector/Cargo.toml index 38e82ccdf76e..30dbc2a7c721 100644 --- a/src/connector/Cargo.toml +++ b/src/connector/Cargo.toml @@ -67,6 +67,7 @@ google-cloud-googleapis = { version = "0.15", features = ["pubsub", "bigquery"] google-cloud-pubsub = "0.28" http = "0.2" iceberg = { workspace = true } +iceberg-catalog-glue = { workspace = true } iceberg-catalog-rest = { workspace = true } icelake = { workspace = true } indexmap = { version = "2.2.6", features = ["serde"] } diff --git a/src/connector/src/sink/iceberg/mod.rs b/src/connector/src/sink/iceberg/mod.rs index 4984fb9efdab..0e2d06d1dcaf 100644 --- a/src/connector/src/sink/iceberg/mod.rs +++ b/src/connector/src/sink/iceberg/mod.rs @@ -32,6 +32,7 @@ use iceberg::io::{S3_ACCESS_KEY_ID, S3_ENDPOINT, S3_REGION, S3_SECRET_ACCESS_KEY use iceberg::spec::TableMetadata; use iceberg::table::Table as TableV2; use iceberg::{Catalog as CatalogV2, TableIdent}; +use iceberg_catalog_glue::{AWS_ACCESS_KEY_ID, AWS_REGION_NAME, AWS_SECRET_ACCESS_KEY}; use icelake::catalog::{ load_catalog, load_iceberg_base_catalog_config, BaseCatalogConfig, CatalogRef, CATALOG_NAME, CATALOG_TYPE, @@ -515,7 +516,7 @@ impl IcebergConfig { .map_err(|e| SinkError::Iceberg(anyhow!(e))) } - fn create_catalog_v2(&self) -> ConnectorResult> { + async fn create_catalog_v2(&self) -> ConnectorResult> { match self.catalog_type() { "storage" => { let config = StorageCatalogConfig::builder() @@ -553,15 +554,52 @@ impl IcebergConfig { let catalog = iceberg_catalog_rest::RestCatalog::new(config); Ok(Arc::new(catalog)) } - catalog_type - if catalog_type == "hive" || catalog_type == "jdbc" || catalog_type == "glue" => - { + "glue" => { + let mut iceberg_configs = HashMap::new(); + // glue + if let Some(region) = &self.region { + iceberg_configs.insert(AWS_REGION_NAME.to_string(), region.clone().to_string()); + } + iceberg_configs.insert( + AWS_ACCESS_KEY_ID.to_string(), + self.access_key.clone().to_string(), + ); + iceberg_configs.insert( + AWS_SECRET_ACCESS_KEY.to_string(), + self.secret_key.clone().to_string(), + ); + // s3 + if let Some(region) = &self.region { + iceberg_configs.insert(S3_REGION.to_string(), region.clone().to_string()); + } + if let Some(endpoint) = &self.endpoint { + iceberg_configs.insert(S3_ENDPOINT.to_string(), endpoint.clone().to_string()); + } + iceberg_configs.insert( + S3_ACCESS_KEY_ID.to_string(), + self.access_key.clone().to_string(), + ); + iceberg_configs.insert( + S3_SECRET_ACCESS_KEY.to_string(), + self.secret_key.clone().to_string(), + ); + let config_builder = iceberg_catalog_glue::GlueCatalogConfig::builder() + .warehouse(self.path.clone()) + .props(iceberg_configs); + let config = if let Some(uri) = self.uri.as_deref() { + config_builder.uri(uri.to_string()).build() + } else { + config_builder.build() + }; + let catalog = iceberg_catalog_glue::GlueCatalog::new(config).await?; + Ok(Arc::new(catalog)) + } + catalog_type if catalog_type == "hive" || catalog_type == "jdbc" => { // Create java catalog let (base_catalog_config, java_catalog_props) = self.build_jni_catalog_configs()?; let catalog_impl = match catalog_type { "hive" => "org.apache.iceberg.hive.HiveCatalog", "jdbc" => "org.apache.iceberg.jdbc.JdbcCatalog", - "glue" => "org.apache.iceberg.aws.glue.GlueCatalog", _ => unreachable!(), }; @@ -584,6 +622,7 @@ impl IcebergConfig { pub async fn load_table_v2(&self) -> ConnectorResult { let catalog = self .create_catalog_v2() + .await .context("Unable to load iceberg catalog")?; let table_id = self