diff --git a/Cargo.toml b/Cargo.toml index 49e10e052..78763aa8c 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -17,7 +17,7 @@ [workspace] resolver = "2" -members = ["crates/catalog/*", "crates/iceberg"] +members = ["crates/catalog/*", "crates/iceberg", "crates/test_utils"] [workspace.dependencies] anyhow = "1.0.72" @@ -31,6 +31,7 @@ bitvec = "1.0.1" chrono = "0.4" derive_builder = "0.12.0" either = "1" +env_logger = "0.10.0" futures = "0.3" iceberg = { path = "./crates/iceberg" } iceberg-catalog-rest = { path = "./crates/catalog/rest" } @@ -43,6 +44,7 @@ once_cell = "1" opendal = "0.42" ordered-float = "4.0.0" pretty_assertions = "1.4.0" +port_scanner = "0.1.5" reqwest = { version = "^0.11", features = ["json"] } rust_decimal = "1.31.0" serde = { version = "^1.0", features = ["rc"] } diff --git a/Makefile b/Makefile index 63fcb6473..d846303ff 100644 --- a/Makefile +++ b/Makefile @@ -15,9 +15,13 @@ # specific language governing permissions and limitations # under the License. +.EXPORT_ALL_VARIABLES: + +RUST_LOG = debug + build: cargo build - + check-fmt: cargo fmt --all -- --check diff --git a/crates/catalog/rest/Cargo.toml b/crates/catalog/rest/Cargo.toml index 43e2ae6ff..883f55c02 100644 --- a/crates/catalog/rest/Cargo.toml +++ b/crates/catalog/rest/Cargo.toml @@ -31,6 +31,7 @@ keywords = ["iceberg", "rest", "catalog"] async-trait = { workspace = true } chrono = { workspace = true } iceberg = { workspace = true } +log = "0.4.20" reqwest = { workspace = true } serde = { workspace = true } serde_derive = { workspace = true } @@ -40,5 +41,7 @@ urlencoding = { workspace = true } uuid = { workspace = true, features = ["v4"] } [dev-dependencies] +iceberg_test_utils = { path = "../../test_utils", features = ["tests"] } mockito = { workspace = true } +port_scanner = { workspace = true } tokio = { workspace = true } diff --git a/crates/catalog/rest/src/catalog.rs b/crates/catalog/rest/src/catalog.rs index 1dfbe79e4..7ccd108b6 100644 --- a/crates/catalog/rest/src/catalog.rs +++ b/crates/catalog/rest/src/catalog.rs @@ -21,7 +21,7 @@ use std::collections::HashMap; use async_trait::async_trait; use reqwest::header::{self, HeaderMap, HeaderName, HeaderValue}; -use reqwest::{Client, Request}; +use reqwest::{Client, Request, Response, StatusCode}; use serde::de::DeserializeOwned; use typed_builder::TypedBuilder; use urlencoding::encode; @@ -166,6 +166,7 @@ impl HttpClient { if resp.status().as_u16() == SUCCESS_CODE { Ok(()) } else { + let code = resp.status(); let text = resp.bytes().await?; let e = serde_json::from_slice::(&text).map_err(|e| { Error::new( @@ -173,6 +174,33 @@ impl HttpClient { "Failed to parse response from rest catalog server!", ) .with_context("json", String::from_utf8_lossy(&text)) + .with_context("code", code.to_string()) + .with_source(e) + })?; + Err(e.into()) + } + } + + /// More generic logic handling for special cases like head. + async fn do_execute>( + &self, + request: Request, + handler: impl FnOnce(&Response) -> Option, + ) -> Result { + let resp = self.0.execute(request).await?; + + if let Some(ret) = handler(&resp) { + Ok(ret) + } else { + let code = resp.status(); + let text = resp.bytes().await?; + let e = serde_json::from_slice::(&text).map_err(|e| { + Error::new( + ErrorKind::Unexpected, + "Failed to parse response from rest catalog server!", + ) + .with_context("code", code.to_string()) + .with_context("json", String::from_utf8_lossy(&text)) .with_source(e) })?; Err(e.into()) @@ -273,9 +301,12 @@ impl Catalog for RestCatalog { .build()?; self.client - .execute::(request) + .do_execute::(request, |resp| match resp.status() { + StatusCode::NO_CONTENT => Some(true), + StatusCode::NOT_FOUND => Some(false), + _ => None, + }) .await - .map(|_| true) } /// Drop a namespace from the catalog. @@ -326,7 +357,7 @@ impl Catalog for RestCatalog { partition_spec: creation.partition_spec, write_order: creation.sort_order, // We don't support stage create yet. - stage_create: None, + stage_create: Some(false), properties: if creation.properties.is_empty() { None } else { @@ -406,9 +437,12 @@ impl Catalog for RestCatalog { .build()?; self.client - .execute::(request) + .do_execute::(request, |resp| match resp.status() { + StatusCode::NO_CONTENT => Some(true), + StatusCode::NOT_FOUND => Some(false), + _ => None, + }) .await - .map(|_| true) } /// Rename a table in the catalog. diff --git a/crates/catalog/rest/testdata/rest_catalog/docker-compose.yaml b/crates/catalog/rest/testdata/rest_catalog/docker-compose.yaml new file mode 100644 index 000000000..5c101463f --- /dev/null +++ b/crates/catalog/rest/testdata/rest_catalog/docker-compose.yaml @@ -0,0 +1,65 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +version: '3.8' + +services: + rest: + image: tabulario/iceberg-rest:0.10.0 + environment: + - AWS_ACCESS_KEY_ID=admin + - AWS_SECRET_ACCESS_KEY=password + - AWS_REGION=us-east-1 + - CATALOG_CATOLOG__IMPL=org.apache.iceberg.jdbc.JdbcCatalog + - CATALOG_URI=jdbc:sqlite:file:/tmp/iceberg_rest_mode=memory + - CATALOG_WAREHOUSE=s3://icebergdata/demo + - CATALOG_IO__IMPL=org.apache.iceberg.aws.s3.S3FileIO + - CATALOG_S3_ENDPOINT=http://minio:9000 + depends_on: + - minio + links: + - minio:icebergdata.minio + expose: + - 8181 + + minio: + image: minio/minio + environment: + - MINIO_ROOT_USER=admin + - MINIO_ROOT_PASSWORD=password + - MINIO_DOMAIN=minio + expose: + - 9001 + - 9000 + command: [ "server", "/data", "--console-address", ":9001" ] + + mc: + depends_on: + - minio + image: minio/mc + environment: + - AWS_ACCESS_KEY_ID=admin + - AWS_SECRET_ACCESS_KEY=password + - AWS_REGION=us-east-1 + entrypoint: > + /bin/sh -c " + until (/usr/bin/mc config host add minio http://minio:9000 admin password) do echo '...waiting...' && sleep 1; done; + /usr/bin/mc rm -r --force minio/icebergdata; + /usr/bin/mc mb minio/icebergdata; + /usr/bin/mc policy set public minio/icebergdata; + tail -f /dev/null + " \ No newline at end of file diff --git a/crates/catalog/rest/tests/rest_catalog_test.rs b/crates/catalog/rest/tests/rest_catalog_test.rs new file mode 100644 index 000000000..a4d07955b --- /dev/null +++ b/crates/catalog/rest/tests/rest_catalog_test.rs @@ -0,0 +1,376 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +//! Integration tests for rest catalog. + +use iceberg::spec::{FormatVersion, NestedField, PrimitiveType, Schema, Type}; +use iceberg::transaction::Transaction; +use iceberg::{Catalog, Namespace, NamespaceIdent, TableCreation, TableIdent}; +use iceberg_catalog_rest::{RestCatalog, RestCatalogConfig}; +use iceberg_test_utils::docker::DockerCompose; +use iceberg_test_utils::{normalize_test_name, set_up}; +use port_scanner::scan_port_addr; +use std::collections::HashMap; +use tokio::time::sleep; + +const REST_CATALOG_PORT: u16 = 8181; + +struct TestFixture { + _docker_compose: DockerCompose, + rest_catalog: RestCatalog, +} + +async fn set_test_fixture(func: &str) -> TestFixture { + set_up(); + let docker_compose = DockerCompose::new( + normalize_test_name(format!("{}_{func}", module_path!())), + format!("{}/testdata/rest_catalog", env!("CARGO_MANIFEST_DIR")), + ); + + // Start docker compose + docker_compose.run(); + + let rest_catalog_ip = docker_compose.get_container_ip("rest"); + + let read_port = format!("{}:{}", rest_catalog_ip, REST_CATALOG_PORT); + loop { + if !scan_port_addr(&read_port) { + log::info!("Waiting for 1s rest catalog to ready..."); + sleep(std::time::Duration::from_millis(1000)).await; + } else { + break; + } + } + + let config = RestCatalogConfig::builder() + .uri(format!("http://{}:{}", rest_catalog_ip, REST_CATALOG_PORT)) + .build(); + let rest_catalog = RestCatalog::new(config).await.unwrap(); + + TestFixture { + _docker_compose: docker_compose, + rest_catalog, + } +} +#[tokio::test] +async fn test_get_non_exist_namespace() { + let fixture = set_test_fixture("test_get_non_exist_namespace").await; + + let result = fixture + .rest_catalog + .get_namespace(&NamespaceIdent::from_strs(["demo"]).unwrap()) + .await; + + assert!(result.is_err()); + assert!(result + .unwrap_err() + .to_string() + .contains("Namespace does not exist")); +} + +#[tokio::test] +async fn test_get_namespace() { + let fixture = set_test_fixture("test_get_namespace").await; + + let ns = Namespace::with_properties( + NamespaceIdent::from_strs(["apple", "ios"]).unwrap(), + HashMap::from([ + ("owner".to_string(), "ray".to_string()), + ("community".to_string(), "apache".to_string()), + ]), + ); + + // Verify that namespace doesn't exist + assert!(fixture.rest_catalog.get_namespace(ns.name()).await.is_err()); + + // Create this namespace + let created_ns = fixture + .rest_catalog + .create_namespace(ns.name(), ns.properties().clone()) + .await + .unwrap(); + + assert_eq!(ns.name(), created_ns.name()); + assert_map_contains(ns.properties(), created_ns.properties()); + + // Check that this namespace already exists + let get_ns = fixture.rest_catalog.get_namespace(ns.name()).await.unwrap(); + assert_eq!(ns.name(), get_ns.name()); + assert_map_contains(ns.properties(), created_ns.properties()); +} + +#[tokio::test] +async fn test_list_namespace() { + let fixture = set_test_fixture("test_list_namespace").await; + + let ns1 = Namespace::with_properties( + NamespaceIdent::from_strs(["apple", "ios"]).unwrap(), + HashMap::from([ + ("owner".to_string(), "ray".to_string()), + ("community".to_string(), "apache".to_string()), + ]), + ); + + let ns2 = Namespace::with_properties( + NamespaceIdent::from_strs(["apple", "macos"]).unwrap(), + HashMap::from([ + ("owner".to_string(), "xuanwo".to_string()), + ("community".to_string(), "apache".to_string()), + ]), + ); + + // Currently this namespace doesn't exist, so it should return error. + assert!(fixture + .rest_catalog + .list_namespaces(Some(&NamespaceIdent::from_strs(["apple"]).unwrap())) + .await + .is_err()); + + // Create namespaces + fixture + .rest_catalog + .create_namespace(ns1.name(), ns1.properties().clone()) + .await + .unwrap(); + fixture + .rest_catalog + .create_namespace(ns2.name(), ns1.properties().clone()) + .await + .unwrap(); + + // List namespace + let mut nss = fixture + .rest_catalog + .list_namespaces(Some(&NamespaceIdent::from_strs(["apple"]).unwrap())) + .await + .unwrap(); + nss.sort(); + + assert_eq!(&nss[0], ns1.name()); + assert_eq!(&nss[1], ns2.name()); +} + +#[tokio::test] +async fn test_list_empty_namespace() { + let fixture = set_test_fixture("test_list_empty_namespace").await; + + let ns_apple = Namespace::with_properties( + NamespaceIdent::from_strs(["apple"]).unwrap(), + HashMap::from([ + ("owner".to_string(), "ray".to_string()), + ("community".to_string(), "apache".to_string()), + ]), + ); + + // Currently this namespace doesn't exist, so it should return error. + assert!(fixture + .rest_catalog + .list_namespaces(Some(ns_apple.name())) + .await + .is_err()); + + // Create namespaces + fixture + .rest_catalog + .create_namespace(ns_apple.name(), ns_apple.properties().clone()) + .await + .unwrap(); + + // List namespace + let nss = fixture + .rest_catalog + .list_namespaces(Some(&NamespaceIdent::from_strs(["apple"]).unwrap())) + .await + .unwrap(); + assert!(nss.is_empty()); +} + +#[tokio::test] +async fn test_list_root_namespace() { + let fixture = set_test_fixture("test_list_root_namespace").await; + + let ns1 = Namespace::with_properties( + NamespaceIdent::from_strs(["apple", "ios"]).unwrap(), + HashMap::from([ + ("owner".to_string(), "ray".to_string()), + ("community".to_string(), "apache".to_string()), + ]), + ); + + let ns2 = Namespace::with_properties( + NamespaceIdent::from_strs(["google", "android"]).unwrap(), + HashMap::from([ + ("owner".to_string(), "xuanwo".to_string()), + ("community".to_string(), "apache".to_string()), + ]), + ); + + // Currently this namespace doesn't exist, so it should return error. + assert!(fixture + .rest_catalog + .list_namespaces(Some(&NamespaceIdent::from_strs(["apple"]).unwrap())) + .await + .is_err()); + + // Create namespaces + fixture + .rest_catalog + .create_namespace(ns1.name(), ns1.properties().clone()) + .await + .unwrap(); + fixture + .rest_catalog + .create_namespace(ns2.name(), ns1.properties().clone()) + .await + .unwrap(); + + // List namespace + let mut nss = fixture.rest_catalog.list_namespaces(None).await.unwrap(); + nss.sort(); + + assert_eq!(&nss[0], &NamespaceIdent::from_strs(["apple"]).unwrap()); + assert_eq!(&nss[1], &NamespaceIdent::from_strs(["google"]).unwrap()); +} + +#[tokio::test] +async fn test_create_table() { + let fixture = set_test_fixture("test_create_table").await; + + let ns = Namespace::with_properties( + NamespaceIdent::from_strs(["apple", "ios"]).unwrap(), + HashMap::from([ + ("owner".to_string(), "ray".to_string()), + ("community".to_string(), "apache".to_string()), + ]), + ); + + // Create namespaces + fixture + .rest_catalog + .create_namespace(ns.name(), ns.properties().clone()) + .await + .unwrap(); + + let schema = Schema::builder() + .with_schema_id(1) + .with_identifier_field_ids(vec![2]) + .with_fields(vec![ + NestedField::optional(1, "foo", Type::Primitive(PrimitiveType::String)).into(), + NestedField::required(2, "bar", Type::Primitive(PrimitiveType::Int)).into(), + NestedField::optional(3, "baz", Type::Primitive(PrimitiveType::Boolean)).into(), + ]) + .build() + .unwrap(); + + let table_creation = TableCreation::builder() + .name("t1".to_string()) + .schema(schema.clone()) + .build(); + + let table = fixture + .rest_catalog + .create_table(ns.name(), table_creation) + .await + .unwrap(); + + assert_eq!( + table.identifier(), + &TableIdent::new(ns.name().clone(), "t1".to_string()) + ); + + assert_eq!( + table.metadata().current_schema().as_struct(), + schema.as_struct() + ); + assert_eq!(table.metadata().format_version(), FormatVersion::V2); + assert!(table.metadata().current_snapshot().is_none()); + assert!(table.metadata().history().is_empty()); + assert!(table.metadata().default_sort_order().unwrap().is_unsorted()); + assert!(table + .metadata() + .default_partition_spec() + .unwrap() + .is_unpartitioned()); +} + +#[tokio::test] +async fn test_update_table() { + let fixture = set_test_fixture("test_update_table").await; + + let ns = Namespace::with_properties( + NamespaceIdent::from_strs(["apple", "ios"]).unwrap(), + HashMap::from([ + ("owner".to_string(), "ray".to_string()), + ("community".to_string(), "apache".to_string()), + ]), + ); + + // Create namespaces + fixture + .rest_catalog + .create_namespace(ns.name(), ns.properties().clone()) + .await + .unwrap(); + + let schema = Schema::builder() + .with_schema_id(1) + .with_identifier_field_ids(vec![2]) + .with_fields(vec![ + NestedField::optional(1, "foo", Type::Primitive(PrimitiveType::String)).into(), + NestedField::required(2, "bar", Type::Primitive(PrimitiveType::Int)).into(), + NestedField::optional(3, "baz", Type::Primitive(PrimitiveType::Boolean)).into(), + ]) + .build() + .unwrap(); + + // Now we create a table + let table_creation = TableCreation::builder() + .name("t1".to_string()) + .schema(schema.clone()) + .build(); + + let table = fixture + .rest_catalog + .create_table(ns.name(), table_creation) + .await + .unwrap(); + + assert_eq!( + table.identifier(), + &TableIdent::new(ns.name().clone(), "t1".to_string()) + ); + + // Update table by committing transaction + let table2 = Transaction::new(&table) + .set_properties(HashMap::from([("prop1".to_string(), "v1".to_string())])) + .unwrap() + .commit(&fixture.rest_catalog) + .await + .unwrap(); + + assert_map_contains( + &HashMap::from([("prop1".to_string(), "v1".to_string())]), + table2.metadata().properties(), + ); +} + +fn assert_map_contains(map1: &HashMap, map2: &HashMap) { + for (k, v) in map1 { + assert!(map2.contains_key(k)); + assert_eq!(map2.get(k).unwrap(), v); + } +} diff --git a/crates/iceberg/src/catalog/mod.rs b/crates/iceberg/src/catalog/mod.rs index 2ddeacea8..b68837593 100644 --- a/crates/iceberg/src/catalog/mod.rs +++ b/crates/iceberg/src/catalog/mod.rs @@ -97,7 +97,7 @@ pub trait Catalog: std::fmt::Debug { /// The namespace identifier is a list of strings, where each string is a /// component of the namespace. It's catalog implementer's responsibility to /// handle the namespace identifier correctly. -#[derive(Serialize, Deserialize, Debug, Clone, PartialEq, Eq, Hash)] +#[derive(Serialize, Deserialize, Debug, Clone, PartialEq, Eq, Hash, PartialOrd, Ord)] pub struct NamespaceIdent(Vec); impl NamespaceIdent { diff --git a/crates/iceberg/src/spec/partition.rs b/crates/iceberg/src/spec/partition.rs index 16395dc72..484ec7e56 100644 --- a/crates/iceberg/src/spec/partition.rs +++ b/crates/iceberg/src/spec/partition.rs @@ -58,6 +58,17 @@ impl PartitionSpec { pub fn builder() -> PartitionSpecBuilder { PartitionSpecBuilder::default() } + + /// Returns if the partition spec is unpartitioned. + /// + /// A [`PartitionSpec`] is unpartitioned if it has no fields or all fields are [`Transform::Void`] transform. + pub fn is_unpartitioned(&self) -> bool { + self.fields.is_empty() + || self + .fields + .iter() + .all(|f| matches!(f.transform, Transform::Void)) + } } /// Reference to [`UnboundPartitionSpec`]. @@ -143,6 +154,69 @@ mod tests { assert_eq!(Transform::Truncate(4), partition_spec.fields[2].transform); } + #[test] + fn test_is_unpartitioned() { + let partition_spec = PartitionSpec::builder() + .with_spec_id(1) + .with_fields(vec![]) + .build() + .unwrap(); + assert!( + partition_spec.is_unpartitioned(), + "Empty partition spec should be unpartitioned" + ); + + let partition_spec = PartitionSpec::builder() + .with_partition_field( + PartitionField::builder() + .source_id(1) + .field_id(1) + .name("id".to_string()) + .transform(Transform::Identity) + .build(), + ) + .with_partition_field( + PartitionField::builder() + .source_id(2) + .field_id(2) + .name("name".to_string()) + .transform(Transform::Void) + .build(), + ) + .with_spec_id(1) + .build() + .unwrap(); + assert!( + !partition_spec.is_unpartitioned(), + "Partition spec with one non void transform should not be unpartitioned" + ); + + let partition_spec = PartitionSpec::builder() + .with_spec_id(1) + .with_partition_field( + PartitionField::builder() + .source_id(1) + .field_id(1) + .name("id".to_string()) + .transform(Transform::Void) + .build(), + ) + .with_partition_field( + PartitionField::builder() + .source_id(2) + .field_id(2) + .name("name".to_string()) + .transform(Transform::Void) + .build(), + ) + .build() + .unwrap(); + assert!( + partition_spec.is_unpartitioned(), + "Partition spec with all void field should be unpartitioned" + ); + } + #[test] fn test_unbound_partition_spec() { let spec = r#" diff --git a/crates/iceberg/src/spec/sort.rs b/crates/iceberg/src/spec/sort.rs index 2150421b7..01a1eddea 100644 --- a/crates/iceberg/src/spec/sort.rs +++ b/crates/iceberg/src/spec/sort.rs @@ -81,6 +81,13 @@ impl SortOrder { pub fn builder() -> SortOrderBuilder { SortOrderBuilder::default() } + + /// Returns true if the sort order is unsorted. + /// + /// A [`SortOrder`] is unsorted if it has no sort fields. + pub fn is_unsorted(&self) -> bool { + self.fields.is_empty() + } } #[cfg(test)] diff --git a/crates/test_utils/Cargo.toml b/crates/test_utils/Cargo.toml new file mode 100644 index 000000000..91210c50f --- /dev/null +++ b/crates/test_utils/Cargo.toml @@ -0,0 +1,28 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +[package] +name = "iceberg_test_utils" +version = "0.1.0" +edition = "2021" + +[dependencies] +env_logger = { workspace = true } +log = "0.4.20" + +[features] +tests = [] diff --git a/crates/test_utils/src/cmd.rs b/crates/test_utils/src/cmd.rs new file mode 100644 index 000000000..604d4a14d --- /dev/null +++ b/crates/test_utils/src/cmd.rs @@ -0,0 +1,41 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use std::process::Command; + +pub fn run_command(mut cmd: Command, desc: impl ToString) { + let desc = desc.to_string(); + log::info!("Starting to {}, command: {:?}", &desc, cmd); + let exit = cmd.status().unwrap(); + if exit.success() { + log::info!("{} succeed!", desc) + } else { + panic!("{} failed: {:?}", desc, exit); + } +} + +pub fn get_cmd_output(mut cmd: Command, desc: impl ToString) -> String { + let desc = desc.to_string(); + log::info!("Starting to {}, command: {:?}", &desc, cmd); + let output = cmd.output().unwrap(); + if output.status.success() { + log::info!("{} succeed!", desc); + String::from_utf8(output.stdout).unwrap() + } else { + panic!("{} failed: {:?}", desc, output.status); + } +} diff --git a/crates/test_utils/src/docker.rs b/crates/test_utils/src/docker.rs new file mode 100644 index 000000000..6c5fbef1e --- /dev/null +++ b/crates/test_utils/src/docker.rs @@ -0,0 +1,102 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use crate::cmd::{get_cmd_output, run_command}; +use std::process::Command; + +/// A utility to manage lifecycle of docker compose. +/// +/// It's will start docker compose when calling `run` method, and will be stopped when dropped. +#[derive(Debug)] +pub struct DockerCompose { + project_name: String, + docker_compose_dir: String, +} + +impl DockerCompose { + pub fn new(project_name: impl ToString, docker_compose_dir: impl ToString) -> Self { + Self { + project_name: project_name.to_string(), + docker_compose_dir: docker_compose_dir.to_string(), + } + } + + pub fn project_name(&self) -> &str { + self.project_name.as_str() + } + + pub fn run(&self) { + let mut cmd = Command::new("docker"); + cmd.current_dir(&self.docker_compose_dir); + + cmd.args(vec![ + "compose", + "-p", + self.project_name.as_str(), + "up", + "-d", + "--wait", + "--timeout", + "1200000", + ]); + + run_command( + cmd, + format!( + "Starting docker compose in {}, project name: {}", + self.docker_compose_dir, self.project_name + ), + ) + } + + pub fn get_container_ip(&self, service_name: impl AsRef) -> String { + let container_name = format!("{}-{}-1", self.project_name, service_name.as_ref()); + let mut cmd = Command::new("docker"); + cmd.arg("inspect") + .arg("-f") + .arg("{{range.NetworkSettings.Networks}}{{.IPAddress}}{{end}}") + .arg(&container_name); + + get_cmd_output(cmd, format!("Get container ip of {container_name}")) + .trim() + .to_string() + } +} + +impl Drop for DockerCompose { + fn drop(&mut self) { + let mut cmd = Command::new("docker"); + cmd.current_dir(&self.docker_compose_dir); + + cmd.args(vec![ + "compose", + "-p", + self.project_name.as_str(), + "down", + "-v", + "--remove-orphans", + ]); + + run_command( + cmd, + format!( + "Stopping docker compose in {}, project name: {}", + self.docker_compose_dir, self.project_name + ), + ) + } +} diff --git a/crates/test_utils/src/lib.rs b/crates/test_utils/src/lib.rs new file mode 100644 index 000000000..4f63b8dd7 --- /dev/null +++ b/crates/test_utils/src/lib.rs @@ -0,0 +1,41 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +//! This crate contains common utilities for testing. +//! +//! It's not intended for use outside of `iceberg-rust`. + +#[cfg(feature = "tests")] +mod cmd; +#[cfg(feature = "tests")] +pub mod docker; + +#[cfg(feature = "tests")] +pub use common::*; + +#[cfg(feature = "tests")] +mod common { + use std::sync::Once; + + static INIT: Once = Once::new(); + pub fn set_up() { + INIT.call_once(env_logger::init); + } + pub fn normalize_test_name(s: impl ToString) -> String { + s.to_string().replace("::", "__").replace('.', "_") + } +}