Skip to content

Commit

Permalink
feat: Allow FileIO to reuse http client (#544)
Browse files Browse the repository at this point in the history
Signed-off-by: Xuanwo <github@xuanwo.io>
  • Loading branch information
Xuanwo authored Aug 14, 2024
1 parent cbd1844 commit 2137f6b
Show file tree
Hide file tree
Showing 2 changed files with 27 additions and 7 deletions.
13 changes: 11 additions & 2 deletions crates/iceberg/src/io/storage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,10 @@ pub(crate) enum Storage {
/// s3 storage could have `s3://` and `s3a://`.
/// Storing the scheme string here to return the correct path.
scheme_str: String,
/// uses the same client for one FileIO Storage.
///
/// TODO: allow users to configure this client.
client: reqwest::Client,
config: Arc<S3Config>,
},
#[cfg(feature = "storage-gcs")]
Expand All @@ -58,6 +62,7 @@ impl Storage {
#[cfg(feature = "storage-s3")]
Scheme::S3 => Ok(Self::S3 {
scheme_str,
client: reqwest::Client::new(),
config: super::s3_config_parse(props)?.into(),
}),
#[cfg(feature = "storage-gcs")]
Expand Down Expand Up @@ -110,8 +115,12 @@ impl Storage {
}
}
#[cfg(feature = "storage-s3")]
Storage::S3 { scheme_str, config } => {
let op = super::s3_config_build(config, path)?;
Storage::S3 {
scheme_str,
client,
config,
} => {
let op = super::s3_config_build(client, config, path)?;
let op_info = op.info();

// Check prefix of s3 path.
Expand Down
21 changes: 16 additions & 5 deletions crates/iceberg/src/io/storage_s3.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,9 @@

use std::collections::HashMap;

use opendal::raw::HttpClient;
use opendal::services::S3Config;
use opendal::Operator;
use opendal::{Configurator, Operator};
use url::Url;

use crate::{Error, ErrorKind, Result};
Expand Down Expand Up @@ -106,7 +107,11 @@ pub(crate) fn s3_config_parse(mut m: HashMap<String, String>) -> Result<S3Config
}

/// Build new opendal operator from give path.
pub(crate) fn s3_config_build(cfg: &S3Config, path: &str) -> Result<Operator> {
pub(crate) fn s3_config_build(
client: &reqwest::Client,
cfg: &S3Config,
path: &str,
) -> Result<Operator> {
let url = Url::parse(path)?;
let bucket = url.host_str().ok_or_else(|| {
Error::new(
Expand All @@ -115,7 +120,13 @@ pub(crate) fn s3_config_build(cfg: &S3Config, path: &str) -> Result<Operator> {
)
})?;

let mut cfg = cfg.clone();
cfg.bucket = bucket.to_string();
Ok(Operator::from_config(cfg)?.finish())
let builder = cfg
.clone()
.into_builder()
// Set bucket name.
.bucket(bucket)
// Set http client we want to use.
.http_client(HttpClient::with(client.clone()));

Ok(Operator::new(builder)?.finish())
}

0 comments on commit 2137f6b

Please sign in to comment.