From 2f383e764aa2b79e52d562e24eb0d1dce41f5ce7 Mon Sep 17 00:00:00 2001 From: Robert Pack <42610831+roeap@users.noreply.github.com> Date: Thu, 4 Jan 2024 17:40:51 +0100 Subject: [PATCH] feat(object_store): Azure url signing (#5259) * refactor: move current signing to new AzureAuthrizer * feat: generate signed urls with master key * feat: sign with user delegated keys * chore: clippy * pr feedback * chore: clippy * pr feedback II * fix: move sigining test --- .gitignore | 1 + object_store/src/aws/mod.rs | 1 + object_store/src/azure/client.rs | 117 ++++++++++- object_store/src/azure/credential.rs | 295 ++++++++++++++++++++++++--- object_store/src/azure/mod.rs | 103 +++++++++- object_store/src/lib.rs | 23 +++ object_store/src/signer.rs | 16 ++ 7 files changed, 518 insertions(+), 38 deletions(-) diff --git a/.gitignore b/.gitignore index efdd9de5fbf9..0788daea0166 100644 --- a/.gitignore +++ b/.gitignore @@ -15,6 +15,7 @@ parquet/data.parquet justfile .prettierignore .env +.editorconfig # local azurite file __azurite* __blobstorage__ diff --git a/object_store/src/aws/mod.rs b/object_store/src/aws/mod.rs index 75b43d448bac..20e7b032ab35 100644 --- a/object_store/src/aws/mod.rs +++ b/object_store/src/aws/mod.rs @@ -372,6 +372,7 @@ mod tests { rename_and_copy(&integration).await; stream_get(&integration).await; multipart(&integration, &integration).await; + signing(&integration).await; tagging(&integration, !config.disable_tagging, |p| { let client = Arc::clone(&integration.client); diff --git a/object_store/src/azure/client.rs b/object_store/src/azure/client.rs index 3c71e69da00c..865e0a1a939c 100644 --- a/object_store/src/azure/client.rs +++ b/object_store/src/azure/client.rs @@ -46,6 +46,7 @@ use serde::{Deserialize, Serialize}; use snafu::{OptionExt, ResultExt, Snafu}; use std::collections::HashMap; use std::sync::Arc; +use std::time::Duration; use url::Url; const VERSION_HEADER: &str = "x-ms-version-id"; @@ -101,6 +102,18 @@ pub(crate) enum Error { #[snafu(display("ETag required for conditional update"))] MissingETag, + + #[snafu(display("Error requesting user delegation key: {}", source))] + DelegationKeyRequest { source: crate::client::retry::Error }, + + #[snafu(display("Error getting user delegation key response body: {}", source))] + DelegationKeyResponseBody { source: reqwest::Error }, + + #[snafu(display("Got invalid user delegation key response: {}", source))] + DelegationKeyResponse { source: quick_xml::de::DeError }, + + #[snafu(display("Generating SAS keys with SAS tokens auth is not supported"))] + SASforSASNotSupported, } impl From for crate::Error { @@ -324,6 +337,78 @@ impl AzureClient { Ok(()) } + /// Make a Get User Delegation Key request + /// + async fn get_user_delegation_key( + &self, + start: &DateTime, + end: &DateTime, + ) -> Result { + let credential = self.get_credential().await?; + let url = self.config.service.clone(); + + let start = start.to_rfc3339_opts(chrono::SecondsFormat::Secs, true); + let expiry = end.to_rfc3339_opts(chrono::SecondsFormat::Secs, true); + + let mut body = String::new(); + body.push_str("\n\n"); + body.push_str(&format!( + "\t{start}\n\t{expiry}\n" + )); + body.push_str(""); + + let response = self + .client + .request(Method::POST, url) + .body(body) + .query(&[("restype", "service"), ("comp", "userdelegationkey")]) + .with_azure_authorization(&credential, &self.config.account) + .send_retry(&self.config.retry_config) + .await + .context(DelegationKeyRequestSnafu)? + .bytes() + .await + .context(DelegationKeyResponseBodySnafu)?; + + let response: UserDelegationKey = + quick_xml::de::from_reader(response.reader()).context(DelegationKeyResponseSnafu)?; + + Ok(response) + } + + /// Creat an AzureSigner for generating SAS tokens (pre-signed urls). + /// + /// Depending on the type of credential, this will either use the account key or a user delegation key. + /// Since delegation keys are acquired ad-hoc, the signer aloows for signing multiple urls with the same key. + pub async fn signer(&self, expires_in: Duration) -> Result { + let credential = self.get_credential().await?; + let signed_start = chrono::Utc::now(); + let signed_expiry = signed_start + expires_in; + match credential.as_ref() { + AzureCredential::BearerToken(_) => { + let key = self + .get_user_delegation_key(&signed_start, &signed_expiry) + .await?; + let signing_key = AzureAccessKey::try_new(&key.value)?; + Ok(AzureSigner::new( + signing_key, + self.config.account.clone(), + signed_start, + signed_expiry, + Some(key), + )) + } + AzureCredential::AccessKey(key) => Ok(AzureSigner::new( + key.to_owned(), + self.config.account.clone(), + signed_start, + signed_expiry, + None, + )), + _ => Err(Error::SASforSASNotSupported.into()), + } + } + #[cfg(test)] pub async fn get_blob_tagging(&self, path: &Path) -> Result { let credential = self.get_credential().await?; @@ -600,6 +685,18 @@ impl BlockList { } } +#[derive(Debug, Clone, PartialEq, Deserialize)] +#[serde(rename_all = "PascalCase")] +pub(crate) struct UserDelegationKey { + pub signed_oid: String, + pub signed_tid: String, + pub signed_start: String, + pub signed_expiry: String, + pub signed_service: String, + pub signed_version: String, + pub value: String, +} + #[cfg(test)] mod tests { use bytes::Bytes; @@ -757,8 +854,7 @@ mod tests { "; - let mut _list_blobs_response_internal: ListResultInternal = - quick_xml::de::from_str(S).unwrap(); + let _list_blobs_response_internal: ListResultInternal = quick_xml::de::from_str(S).unwrap(); } #[test] @@ -778,4 +874,21 @@ mod tests { assert_eq!(res, S) } + + #[test] + fn test_delegated_key_response() { + const S: &str = r#" + + String containing a GUID value + String containing a GUID value + String formatted as ISO date + String formatted as ISO date + b + String specifying REST api version to use to create the user delegation key + String containing the user delegation key +"#; + + let _delegated_key_response_internal: UserDelegationKey = + quick_xml::de::from_str(S).unwrap(); + } } diff --git a/object_store/src/azure/credential.rs b/object_store/src/azure/credential.rs index 2b8788d333b2..bfbbde826046 100644 --- a/object_store/src/azure/credential.rs +++ b/object_store/src/azure/credential.rs @@ -24,26 +24,27 @@ use crate::RetryConfig; use async_trait::async_trait; use base64::prelude::BASE64_STANDARD; use base64::Engine; -use chrono::{DateTime, Utc}; -use reqwest::header::ACCEPT; -use reqwest::{ - header::{ - HeaderMap, HeaderName, HeaderValue, AUTHORIZATION, CONTENT_ENCODING, CONTENT_LANGUAGE, - CONTENT_LENGTH, CONTENT_TYPE, DATE, IF_MATCH, IF_MODIFIED_SINCE, IF_NONE_MATCH, - IF_UNMODIFIED_SINCE, RANGE, - }, - Client, Method, RequestBuilder, +use chrono::{DateTime, SecondsFormat, Utc}; +use reqwest::header::{ + HeaderMap, HeaderName, HeaderValue, ACCEPT, AUTHORIZATION, CONTENT_ENCODING, CONTENT_LANGUAGE, + CONTENT_LENGTH, CONTENT_TYPE, DATE, IF_MATCH, IF_MODIFIED_SINCE, IF_NONE_MATCH, + IF_UNMODIFIED_SINCE, RANGE, }; +use reqwest::{Client, Method, Request, RequestBuilder}; use serde::Deserialize; use snafu::{ResultExt, Snafu}; use std::borrow::Cow; +use std::collections::HashMap; +use std::fmt::Debug; use std::process::Command; use std::str; use std::sync::Arc; use std::time::{Duration, Instant, SystemTime}; use url::Url; -static AZURE_VERSION: HeaderValue = HeaderValue::from_static("2021-08-06"); +use super::client::UserDelegationKey; + +static AZURE_VERSION: HeaderValue = HeaderValue::from_static("2023-11-03"); static VERSION: HeaderName = HeaderName::from_static("x-ms-version"); pub(crate) static BLOB_TYPE: HeaderName = HeaderName::from_static("x-ms-blob-type"); pub(crate) static DELETE_SNAPSHOTS: HeaderName = HeaderName::from_static("x-ms-delete-snapshots"); @@ -83,6 +84,9 @@ pub enum Error { #[snafu(display("Failed to parse azure cli response: {source}"))] AzureCliResponse { source: serde_json::Error }, + + #[snafu(display("Generating SAS keys with SAS tokens auth is not supported"))] + SASforSASNotSupported, } pub type Result = std::result::Result; @@ -97,7 +101,7 @@ impl From for crate::Error { } /// A shared Azure Storage Account Key -#[derive(Debug, Eq, PartialEq)] +#[derive(Debug, Clone, Eq, PartialEq)] pub struct AzureAccessKey(Vec); impl AzureAccessKey { @@ -137,33 +141,86 @@ pub mod authority_hosts { pub const AZURE_PUBLIC_CLOUD: &str = "https://login.microsoftonline.com"; } -pub(crate) trait CredentialExt { - /// Apply authorization to requests against azure storage accounts - /// - fn with_azure_authorization(self, credential: &AzureCredential, account: &str) -> Self; +pub(crate) struct AzureSigner { + signing_key: AzureAccessKey, + start: DateTime, + end: DateTime, + account: String, + delegation_key: Option, } -impl CredentialExt for RequestBuilder { - fn with_azure_authorization(mut self, credential: &AzureCredential, account: &str) -> Self { +impl AzureSigner { + pub fn new( + signing_key: AzureAccessKey, + account: String, + start: DateTime, + end: DateTime, + delegation_key: Option, + ) -> Self { + Self { + signing_key, + account, + start, + end, + delegation_key, + } + } + + pub fn sign(&self, method: &Method, url: &mut Url) -> Result<()> { + let (str_to_sign, query_pairs) = match &self.delegation_key { + Some(delegation_key) => string_to_sign_user_delegation_sas( + url, + method, + &self.account, + &self.start, + &self.end, + delegation_key, + ), + None => string_to_sign_service_sas(url, method, &self.account, &self.start, &self.end), + }; + let auth = hmac_sha256(&self.signing_key.0, str_to_sign); + url.query_pairs_mut().extend_pairs(query_pairs); + url.query_pairs_mut() + .append_pair("sig", BASE64_STANDARD.encode(auth).as_str()); + Ok(()) + } +} + +/// Authorize a [`Request`] with an [`AzureAuthorizer`] +#[derive(Debug)] +pub struct AzureAuthorizer<'a> { + credential: &'a AzureCredential, + account: &'a str, +} + +impl<'a> AzureAuthorizer<'a> { + /// Create a new [`AzureAuthorizer`] + pub fn new(credential: &'a AzureCredential, account: &'a str) -> Self { + AzureAuthorizer { + credential, + account, + } + } + + /// Authorize `request` + pub fn authorize(&self, request: &mut Request) { // rfc2822 string should never contain illegal characters let date = Utc::now(); let date_str = date.format(RFC1123_FMT).to_string(); // we formatted the data string ourselves, so unwrapping should be fine let date_val = HeaderValue::from_str(&date_str).unwrap(); - self = self - .header(DATE, &date_val) - .header(&VERSION, &AZURE_VERSION); + request.headers_mut().insert(DATE, date_val); + request + .headers_mut() + .insert(&VERSION, AZURE_VERSION.clone()); - match credential { + match self.credential { AzureCredential::AccessKey(key) => { - let (client, request) = self.build_split(); - let mut request = request.expect("request valid"); - let signature = generate_authorization( request.headers(), request.url(), request.method(), - account, + self.account, key, ); @@ -173,15 +230,40 @@ impl CredentialExt for RequestBuilder { AUTHORIZATION, HeaderValue::from_str(signature.as_str()).unwrap(), ); - - Self::from_parts(client, request) } - AzureCredential::BearerToken(token) => self.bearer_auth(token), - AzureCredential::SASToken(query_pairs) => self.query(&query_pairs), + AzureCredential::BearerToken(token) => { + request.headers_mut().append( + AUTHORIZATION, + HeaderValue::from_str(format!("Bearer {}", token).as_str()).unwrap(), + ); + } + AzureCredential::SASToken(query_pairs) => { + request + .url_mut() + .query_pairs_mut() + .extend_pairs(query_pairs); + } } } } +pub(crate) trait CredentialExt { + /// Apply authorization to requests against azure storage accounts + /// + fn with_azure_authorization(self, credential: &AzureCredential, account: &str) -> Self; +} + +impl CredentialExt for RequestBuilder { + fn with_azure_authorization(self, credential: &AzureCredential, account: &str) -> Self { + let (client, request) = self.build_split(); + let mut request = request.expect("request valid"); + + AzureAuthorizer::new(credential, account).authorize(&mut request); + + Self::from_parts(client, request) + } +} + /// Generate signed key for authorization via access keys /// fn generate_authorization( @@ -205,6 +287,152 @@ fn add_if_exists<'a>(h: &'a HeaderMap, key: &HeaderName) -> &'a str { .unwrap_or_default() } +fn string_to_sign_sas( + u: &Url, + method: &Method, + account: &str, + start: &DateTime, + end: &DateTime, +) -> (String, String, String, String, String) { + // NOTE: for now only blob signing is supported. + let signed_resource = "b".to_string(); + + // https://learn.microsoft.com/en-us/rest/api/storageservices/create-service-sas#permissions-for-a-directory-container-or-blob + let signed_permissions = match *method { + // read and list permissions + Method::GET => match signed_resource.as_str() { + "c" => "rl", + "b" => "r", + _ => unreachable!(), + }, + // write permissions (also allows crating a new blob in a sub-key) + Method::PUT => "w", + // delete permissions + Method::DELETE => "d", + // other methods are not used in any of the current operations + _ => "", + } + .to_string(); + let signed_start = start.to_rfc3339_opts(SecondsFormat::Secs, true); + let signed_expiry = end.to_rfc3339_opts(SecondsFormat::Secs, true); + let canonicalized_resource = if u.host_str().unwrap_or_default().contains(account) { + format!("/blob/{}{}", account, u.path()) + } else { + // NOTE: in case of the emulator, the account name is not part of the host + // but the path starts with the account name + format!("/blob{}", u.path()) + }; + + ( + signed_resource, + signed_permissions, + signed_start, + signed_expiry, + canonicalized_resource, + ) +} + +/// Create a string to be signed for authorization via [service sas]. +/// +/// [service sas]: https://learn.microsoft.com/en-us/rest/api/storageservices/create-service-sas#version-2020-12-06-and-later +fn string_to_sign_service_sas( + u: &Url, + method: &Method, + account: &str, + start: &DateTime, + end: &DateTime, +) -> (String, HashMap<&'static str, String>) { + let (signed_resource, signed_permissions, signed_start, signed_expiry, canonicalized_resource) = + string_to_sign_sas(u, method, account, start, end); + + let string_to_sign = format!( + "{}\n{}\n{}\n{}\n{}\n{}\n{}\n{}\n{}\n{}\n{}\n{}\n{}\n{}\n{}\n{}", + signed_permissions, + signed_start, + signed_expiry, + canonicalized_resource, + "", // signed identifier + "", // signed ip + "", // signed protocol + &AZURE_VERSION.to_str().unwrap(), // signed version + signed_resource, // signed resource + "", // signed snapshot time + "", // signed encryption scope + "", // rscc - response header: Cache-Control + "", // rscd - response header: Content-Disposition + "", // rsce - response header: Content-Encoding + "", // rscl - response header: Content-Language + "", // rsct - response header: Content-Type + ); + + let mut pairs = HashMap::new(); + pairs.insert("sv", AZURE_VERSION.to_str().unwrap().to_string()); + pairs.insert("sp", signed_permissions); + pairs.insert("st", signed_start); + pairs.insert("se", signed_expiry); + pairs.insert("sr", signed_resource); + + (string_to_sign, pairs) +} + +/// Create a string to be signed for authorization via [user delegation sas]. +/// +/// [user delegation sas]: https://learn.microsoft.com/en-us/rest/api/storageservices/create-user-delegation-sas#version-2020-12-06-and-later +fn string_to_sign_user_delegation_sas( + u: &Url, + method: &Method, + account: &str, + start: &DateTime, + end: &DateTime, + delegation_key: &UserDelegationKey, +) -> (String, HashMap<&'static str, String>) { + let (signed_resource, signed_permissions, signed_start, signed_expiry, canonicalized_resource) = + string_to_sign_sas(u, method, account, start, end); + + let string_to_sign = format!( + "{}\n{}\n{}\n{}\n{}\n{}\n{}\n{}\n{}\n{}\n{}\n{}\n{}\n{}\n{}\n{}\n{}\n{}\n{}\n{}\n{}\n{}\n{}\n{}", + signed_permissions, + signed_start, + signed_expiry, + canonicalized_resource, + delegation_key.signed_oid, // signed key object id + delegation_key.signed_tid, // signed key tenant id + delegation_key.signed_start, // signed key start + delegation_key.signed_expiry, // signed key expiry + delegation_key.signed_service, // signed key service + delegation_key.signed_version, // signed key version + "", // signed authorized user object id + "", // signed unauthorized user object id + "", // signed correlation id + "", // signed ip + "", // signed protocol + &AZURE_VERSION.to_str().unwrap(), // signed version + signed_resource, // signed resource + "", // signed snapshot time + "", // signed encryption scope + "", // rscc - response header: Cache-Control + "", // rscd - response header: Content-Disposition + "", // rsce - response header: Content-Encoding + "", // rscl - response header: Content-Language + "", // rsct - response header: Content-Type + ); + + let mut pairs = HashMap::new(); + pairs.insert("sv", AZURE_VERSION.to_str().unwrap().to_string()); + pairs.insert("sp", signed_permissions); + pairs.insert("st", signed_start); + pairs.insert("se", signed_expiry); + pairs.insert("sr", signed_resource); + pairs.insert("skoid", delegation_key.signed_oid.clone()); + pairs.insert("sktid", delegation_key.signed_tid.clone()); + pairs.insert("skt", delegation_key.signed_start.clone()); + pairs.insert("ske", delegation_key.signed_expiry.clone()); + pairs.insert("sks", delegation_key.signed_service.clone()); + pairs.insert("skv", delegation_key.signed_version.clone()); + + (string_to_sign, pairs) +} + /// fn string_to_sign(h: &HeaderMap, u: &Url, method: &Method, account: &str) -> String { // content length must only be specified if != 0 @@ -232,7 +460,7 @@ fn string_to_sign(h: &HeaderMap, u: &Url, method: &Method, account: &str) -> Str add_if_exists(h, &IF_UNMODIFIED_SINCE), add_if_exists(h, &RANGE), canonicalize_header(h), - canonicalized_resource(account, u) + canonicalize_resource(account, u) ) } @@ -257,7 +485,7 @@ fn canonicalize_header(headers: &HeaderMap) -> String { } /// -fn canonicalized_resource(account: &str, uri: &Url) -> String { +fn canonicalize_resource(account: &str, uri: &Url) -> String { let mut can_res: String = String::new(); can_res.push('/'); can_res.push_str(account); @@ -681,14 +909,15 @@ impl CredentialProvider for AzureCliCredential { #[cfg(test)] mod tests { - use super::*; - use crate::client::mock_server::MockServer; use futures::executor::block_on; use hyper::body::to_bytes; use hyper::{Body, Response}; use reqwest::{Client, Method}; use tempfile::NamedTempFile; + use super::*; + use crate::client::mock_server::MockServer; + #[tokio::test] async fn test_managed_identity() { let server = MockServer::new(); diff --git a/object_store/src/azure/mod.rs b/object_store/src/azure/mod.rs index af0a4cefa13b..712b7a36c56a 100644 --- a/object_store/src/azure/mod.rs +++ b/object_store/src/azure/mod.rs @@ -27,22 +27,26 @@ //! a way to drop old blocks. Instead unused blocks are automatically cleaned up //! after 7 days. use crate::{ - multipart::{PartId, PutPart, WriteMultiPart}, + multipart::{MultiPartStore, PartId, PutPart, WriteMultiPart}, path::Path, + signer::Signer, GetOptions, GetResult, ListResult, MultipartId, ObjectMeta, ObjectStore, PutOptions, PutResult, Result, }; use async_trait::async_trait; use bytes::Bytes; use futures::stream::BoxStream; +use reqwest::Method; use std::fmt::Debug; use std::sync::Arc; +use std::time::Duration; use tokio::io::AsyncWrite; +use url::Url; use crate::client::get::GetClientExt; use crate::client::list::ListClientExt; use crate::client::CredentialProvider; -pub use credential::authority_hosts; +pub use credential::{authority_hosts, AzureAccessKey, AzureAuthorizer}; mod builder; mod client; @@ -50,7 +54,6 @@ mod credential; /// [`CredentialProvider`] for [`MicrosoftAzure`] pub type AzureCredentialProvider = Arc>; -use crate::multipart::MultiPartStore; pub use builder::{AzureConfigKey, MicrosoftAzureBuilder}; pub use credential::AzureCredential; @@ -67,6 +70,11 @@ impl MicrosoftAzure { pub fn credentials(&self) -> &AzureCredentialProvider { &self.client.config().credentials } + + /// Create a full URL to the resource specified by `path` with this instance's configuration. + fn path_url(&self, path: &Path) -> url::Url { + self.client.config().path_url(path) + } } impl std::fmt::Display for MicrosoftAzure { @@ -128,6 +136,62 @@ impl ObjectStore for MicrosoftAzure { } } +#[async_trait] +impl Signer for MicrosoftAzure { + /// Create a URL containing the relevant [Service SAS] query parameters that authorize a request + /// via `method` to the resource at `path` valid for the duration specified in `expires_in`. + /// + /// [Service SAS]: https://learn.microsoft.com/en-us/rest/api/storageservices/create-service-sas + /// + /// # Example + /// + /// This example returns a URL that will enable a user to upload a file to + /// "some-folder/some-file.txt" in the next hour. + /// + /// ``` + /// # async fn example() -> Result<(), Box> { + /// # use object_store::{azure::MicrosoftAzureBuilder, path::Path, signer::Signer}; + /// # use reqwest::Method; + /// # use std::time::Duration; + /// # + /// let azure = MicrosoftAzureBuilder::new() + /// .with_account("my-account") + /// .with_access_key("my-access-key") + /// .with_container_name("my-container") + /// .build()?; + /// + /// let url = azure.signed_url( + /// Method::PUT, + /// &Path::from("some-folder/some-file.txt"), + /// Duration::from_secs(60 * 60) + /// ).await?; + /// # Ok(()) + /// # } + /// ``` + async fn signed_url(&self, method: Method, path: &Path, expires_in: Duration) -> Result { + let mut url = self.path_url(path); + let signer = self.client.signer(expires_in).await?; + signer.sign(&method, &mut url)?; + Ok(url) + } + + async fn signed_urls( + &self, + method: Method, + paths: &[Path], + expires_in: Duration, + ) -> Result> { + let mut urls = Vec::with_capacity(paths.len()); + let signer = self.client.signer(expires_in).await?; + for path in paths { + let mut url = self.path_url(path); + signer.sign(&method, &mut url)?; + urls.push(url); + } + Ok(urls) + } +} + /// Relevant docs: /// In Azure Blob Store, parts are "blocks" /// put_multipart_part -> PUT block @@ -202,6 +266,7 @@ mod tests { stream_get(&integration).await; put_opts(&integration, true).await; multipart(&integration, &integration).await; + signing(&integration).await; let validate = !integration.client.config().disable_tagging; tagging(&integration, validate, |p| { @@ -211,6 +276,38 @@ mod tests { .await } + #[ignore = "Used for manual testing against a real storage account."] + #[tokio::test] + async fn test_user_delegation_key() { + let account = std::env::var("AZURE_ACCOUNT_NAME").unwrap(); + let container = std::env::var("AZURE_CONTAINER_NAME").unwrap(); + let client_id = std::env::var("AZURE_CLIENT_ID").unwrap(); + let client_secret = std::env::var("AZURE_CLIENT_SECRET").unwrap(); + let tenant_id = std::env::var("AZURE_TENANT_ID").unwrap(); + let integration = MicrosoftAzureBuilder::new() + .with_account(account) + .with_container_name(container) + .with_client_id(client_id) + .with_client_secret(client_secret) + .with_tenant_id(&tenant_id) + .build() + .unwrap(); + + let data = Bytes::from("hello world"); + let path = Path::from("file.txt"); + integration.put(&path, data.clone()).await.unwrap(); + + let signed = integration + .signed_url(Method::GET, &path, Duration::from_secs(60)) + .await + .unwrap(); + + let resp = reqwest::get(signed).await.unwrap(); + let loaded = resp.bytes().await.unwrap(); + + assert_eq!(data, loaded); + } + #[test] fn azure_test_config_get_value() { let azure_client_id = "object_store:fake_access_key_id".to_string(); diff --git a/object_store/src/lib.rs b/object_store/src/lib.rs index b438254bdd01..ab462cc15607 100644 --- a/object_store/src/lib.rs +++ b/object_store/src/lib.rs @@ -2126,6 +2126,29 @@ mod tests { assert_eq!(meta.size, chunk_size * 2); } + #[cfg(any(feature = "azure", feature = "aws"))] + pub(crate) async fn signing(integration: &T) + where + T: ObjectStore + crate::signer::Signer, + { + use reqwest::Method; + use std::time::Duration; + + let data = Bytes::from("hello world"); + let path = Path::from("file.txt"); + integration.put(&path, data.clone()).await.unwrap(); + + let signed = integration + .signed_url(Method::GET, &path, Duration::from_secs(60)) + .await + .unwrap(); + + let resp = reqwest::get(signed).await.unwrap(); + let loaded = resp.bytes().await.unwrap(); + + assert_eq!(data, loaded); + } + #[cfg(any(feature = "aws", feature = "azure"))] pub(crate) async fn tagging(storage: &dyn ObjectStore, validate: bool, get_tags: F) where diff --git a/object_store/src/signer.rs b/object_store/src/signer.rs index ed92e28799e5..da55c689aef5 100644 --- a/object_store/src/signer.rs +++ b/object_store/src/signer.rs @@ -31,4 +31,20 @@ pub trait Signer: Send + Sync + fmt::Debug + 'static { /// implementation's credentials such that the URL can be handed to something that doesn't have /// access to the object store's credentials, to allow limited access to the object store. async fn signed_url(&self, method: Method, path: &Path, expires_in: Duration) -> Result; + + /// Generate signed urls for multiple paths. + /// + /// See [`Signer::signed_url`] for more details. + async fn signed_urls( + &self, + method: Method, + paths: &[Path], + expires_in: Duration, + ) -> Result> { + let mut urls = Vec::with_capacity(paths.len()); + for path in paths { + urls.push(self.signed_url(method.clone(), path, expires_in).await?); + } + Ok(urls) + } }