From d18a8d9c75e35dab0712f41ec2d1952e5dc35b65 Mon Sep 17 00:00:00 2001 From: anant Date: Sun, 22 Dec 2024 14:22:34 +0530 Subject: [PATCH] BugFix: Fixed query auth Auth flow for query and permission assignment for `ListStream` changed --- src/cli.rs | 4 +- src/handlers/airplane.rs | 4 +- src/handlers/http/logstream.rs | 25 ++++++++++--- src/handlers/http/query.rs | 67 ++++++++++++++++++---------------- src/rbac/mod.rs | 1 + src/rbac/role.rs | 2 +- src/storage/mod.rs | 2 +- 7 files changed, 64 insertions(+), 41 deletions(-) diff --git a/src/cli.rs b/src/cli.rs index 38648c7ad..1205fdfb2 100644 --- a/src/cli.rs +++ b/src/cli.rs @@ -529,7 +529,9 @@ impl FromArgMatches for Cli { self.kafka_host = m.get_one::(Self::KAFKA_HOST).cloned(); self.kafka_group = m.get_one::(Self::KAFKA_GROUP).cloned(); self.kafka_client_id = m.get_one::(Self::KAFKA_CLIENT_ID).cloned(); - self.kafka_security_protocol = m.get_one::(Self::KAFKA_SECURITY_PROTOCOL).cloned(); + self.kafka_security_protocol = m + .get_one::(Self::KAFKA_SECURITY_PROTOCOL) + .cloned(); self.kafka_partitions = m.get_one::(Self::KAFKA_PARTITIONS).cloned(); self.tls_cert_path = m.get_one::(Self::TLS_CERT).cloned(); diff --git a/src/handlers/airplane.rs b/src/handlers/airplane.rs index e910c7035..65c4cf400 100644 --- a/src/handlers/airplane.rs +++ b/src/handlers/airplane.rs @@ -157,7 +157,7 @@ impl FlightService for AirServiceImpl { .ok_or_else(|| Status::aborted("Malformed SQL Provided, Table Name Not Found"))? .to_owned(); - update_schema_when_distributed(streams) + update_schema_when_distributed(&streams) .await .map_err(|err| Status::internal(err.to_string()))?; @@ -212,7 +212,7 @@ impl FlightService for AirServiceImpl { let permissions = Users.get_permissions(&key); - authorize_and_set_filter_tags(&mut query, permissions, &stream_name).map_err(|_| { + authorize_and_set_filter_tags(&mut query, permissions, &streams).map_err(|_| { Status::permission_denied("User Does not have permission to access this") })?; let time = Instant::now(); diff --git a/src/handlers/http/logstream.rs b/src/handlers/http/logstream.rs index 98ab11ab8..7afa681ae 100644 --- a/src/handlers/http/logstream.rs +++ b/src/handlers/http/logstream.rs @@ -32,9 +32,12 @@ use crate::hottier::{HotTierManager, StreamHotTier, CURRENT_HOT_TIER_VERSION}; use crate::metadata::STREAM_INFO; use crate::metrics::{EVENTS_INGESTED_DATE, EVENTS_INGESTED_SIZE_DATE, EVENTS_STORAGE_SIZE_DATE}; use crate::option::{Mode, CONFIG}; +use crate::rbac::role::Action; +use crate::rbac::Users; use crate::stats::{event_labels_date, storage_size_labels_date, Stats}; use crate::storage::StreamType; use crate::storage::{retention::Retention, StorageDir, StreamInfo}; +use crate::utils::actix::extract_session_key_from_req; use crate::{event, stats}; use crate::{metadata, validator}; @@ -46,6 +49,7 @@ use arrow_schema::{Field, Schema}; use bytes::Bytes; use chrono::Utc; use http::{HeaderName, HeaderValue}; +use itertools::Itertools; use serde_json::Value; use std::collections::HashMap; use std::fs; @@ -85,16 +89,27 @@ pub async fn delete(req: HttpRequest) -> Result { Ok((format!("log stream {stream_name} deleted"), StatusCode::OK)) } -pub async fn list(_: HttpRequest) -> impl Responder { - //list all streams from storage +pub async fn list(req: HttpRequest) -> Result { + let key = extract_session_key_from_req(&req) + .map_err(|err| StreamError::Anyhow(anyhow::Error::msg(err.to_string())))?; + + // list all streams from storage let res = CONFIG .storage() .get_object_store() .list_streams() .await - .unwrap(); + .unwrap() + .into_iter() + .filter(|logstream| { + warn!("logstream-\n{logstream:?}"); + + Users.authorize(key.clone(), Action::ListStream, Some(&logstream.name), None) + == crate::rbac::Response::Authorized + }) + .collect_vec(); - web::Json(res) + Ok(web::Json(res)) } pub async fn detect_schema(body: Bytes) -> Result { @@ -129,7 +144,7 @@ pub async fn schema(req: HttpRequest) -> Result { } Err(err) => return Err(StreamError::from(err)), }; - match update_schema_when_distributed(vec![stream_name.clone()]).await { + match update_schema_when_distributed(&vec![stream_name.clone()]).await { Ok(_) => { let schema = STREAM_INFO.schema(&stream_name)?; Ok((web::Json(schema), StatusCode::OK)) diff --git a/src/handlers/http/query.rs b/src/handlers/http/query.rs index 27414b9d0..23eca77df 100644 --- a/src/handlers/http/query.rs +++ b/src/handlers/http/query.rs @@ -29,7 +29,7 @@ use std::collections::HashMap; use std::pin::Pin; use std::sync::Arc; use std::time::Instant; -use tracing::error; +use tracing::{error, trace}; use crate::event::error::EventError; use crate::handlers::http::fetch_schema; @@ -90,7 +90,7 @@ pub async fn query(req: HttpRequest, query_request: Query) -> Result Result Result) -> Result<(), QueryError> { +pub async fn update_schema_when_distributed(tables: &Vec) -> Result<(), QueryError> { if CONFIG.parseable.mode == Mode::Query { for table in tables { - if let Ok(new_schema) = fetch_schema(&table).await { + if let Ok(new_schema) = fetch_schema(table).await { // commit schema merges the schema internally and updates the schema in storage. - commit_schema_to_storage(&table, new_schema.clone()).await?; + commit_schema_to_storage(table, new_schema.clone()).await?; - commit_schema(&table, Arc::new(new_schema))?; + commit_schema(table, Arc::new(new_schema))?; } } } @@ -153,41 +153,46 @@ pub async fn create_streams_for_querier() { } } +// check auth for each table in the tables vector pub fn authorize_and_set_filter_tags( query: &mut LogicalQuery, permissions: Vec, - table_name: &str, + tables: &Vec, ) -> Result<(), QueryError> { // check authorization of this query if it references physical table; let mut authorized = false; - let mut tags = Vec::new(); - - // in permission check if user can run query on the stream. - // also while iterating add any filter tags for this stream - for permission in permissions { - match permission { - Permission::Stream(Action::All, _) => { - authorized = true; - break; - } - Permission::StreamWithTag(Action::Query, ref stream, tag) - if stream == table_name || stream == "*" => - { - authorized = true; - if let Some(tag) = tag { - tags.push(tag) + + trace!("table names in auth- {tables:?}"); + for table_name in tables.iter() { + let mut tags = Vec::new(); + + // in permission check if user can run query on the stream. + // also while iterating add any filter tags for this stream + for permission in &permissions { + match permission { + Permission::Stream(Action::All, _) => { + authorized = true; + break; } + Permission::StreamWithTag(Action::Query, ref stream, tag) + if stream == table_name || stream == "*" => + { + authorized = true; + if let Some(tag) = tag { + tags.push(tag.clone()) + } + } + _ => (), } - _ => (), } - } - if !authorized { - return Err(QueryError::Unauthorized); - } + if !authorized { + return Err(QueryError::Unauthorized); + } - if !tags.is_empty() { - query.filter_tag = Some(tags) + if !tags.is_empty() { + query.filter_tag = Some(tags) + } } Ok(()) diff --git a/src/rbac/mod.rs b/src/rbac/mod.rs index 1679d7ffa..51b46c958 100644 --- a/src/rbac/mod.rs +++ b/src/rbac/mod.rs @@ -33,6 +33,7 @@ use self::map::SessionKey; use self::role::{Permission, RoleBuilder}; use self::user::UserType; +#[derive(PartialEq)] pub enum Response { Authorized, UnAuthorized, diff --git a/src/rbac/role.rs b/src/rbac/role.rs index f94c8f171..39b54bcdc 100644 --- a/src/rbac/role.rs +++ b/src/rbac/role.rs @@ -117,7 +117,6 @@ impl RoleBuilder { | Action::CreateStream | Action::DeleteStream | Action::GetStreamInfo - | Action::ListStream | Action::ListCluster | Action::ListClusterMetrics | Action::Deleteingestor @@ -134,6 +133,7 @@ impl RoleBuilder { | Action::DeleteFilter | Action::GetAnalytics => Permission::Unit(action), Action::Ingest + | Action::ListStream | Action::GetSchema | Action::DetectSchema | Action::GetStats diff --git a/src/storage/mod.rs b/src/storage/mod.rs index a018c2b1c..b06dd368f 100644 --- a/src/storage/mod.rs +++ b/src/storage/mod.rs @@ -202,7 +202,7 @@ impl ObjectStoreFormat { } } -#[derive(serde::Serialize, PartialEq)] +#[derive(serde::Serialize, PartialEq, Debug)] pub struct LogStream { pub name: String, }