Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

BugFix: Fixed query auth #1048

Open
wants to merge 1 commit into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion src/cli.rs
Original file line number Diff line number Diff line change
Expand Up @@ -529,7 +529,9 @@ impl FromArgMatches for Cli {
self.kafka_host = m.get_one::<String>(Self::KAFKA_HOST).cloned();
self.kafka_group = m.get_one::<String>(Self::KAFKA_GROUP).cloned();
self.kafka_client_id = m.get_one::<String>(Self::KAFKA_CLIENT_ID).cloned();
self.kafka_security_protocol = m.get_one::<SslProtocol>(Self::KAFKA_SECURITY_PROTOCOL).cloned();
self.kafka_security_protocol = m
.get_one::<SslProtocol>(Self::KAFKA_SECURITY_PROTOCOL)
.cloned();
self.kafka_partitions = m.get_one::<String>(Self::KAFKA_PARTITIONS).cloned();

self.tls_cert_path = m.get_one::<PathBuf>(Self::TLS_CERT).cloned();
Expand Down
4 changes: 2 additions & 2 deletions src/handlers/airplane.rs
Original file line number Diff line number Diff line change
Expand Up @@ -157,7 +157,7 @@ impl FlightService for AirServiceImpl {
.ok_or_else(|| Status::aborted("Malformed SQL Provided, Table Name Not Found"))?
.to_owned();

update_schema_when_distributed(streams)
update_schema_when_distributed(&streams)
.await
.map_err(|err| Status::internal(err.to_string()))?;

Expand Down Expand Up @@ -212,7 +212,7 @@ impl FlightService for AirServiceImpl {

let permissions = Users.get_permissions(&key);

authorize_and_set_filter_tags(&mut query, permissions, &stream_name).map_err(|_| {
authorize_and_set_filter_tags(&mut query, permissions, &streams).map_err(|_| {
Status::permission_denied("User Does not have permission to access this")
})?;
let time = Instant::now();
Expand Down
25 changes: 20 additions & 5 deletions src/handlers/http/logstream.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,9 +32,12 @@ use crate::hottier::{HotTierManager, StreamHotTier, CURRENT_HOT_TIER_VERSION};
use crate::metadata::STREAM_INFO;
use crate::metrics::{EVENTS_INGESTED_DATE, EVENTS_INGESTED_SIZE_DATE, EVENTS_STORAGE_SIZE_DATE};
use crate::option::{Mode, CONFIG};
use crate::rbac::role::Action;
use crate::rbac::Users;
use crate::stats::{event_labels_date, storage_size_labels_date, Stats};
use crate::storage::StreamType;
use crate::storage::{retention::Retention, StorageDir, StreamInfo};
use crate::utils::actix::extract_session_key_from_req;
use crate::{event, stats};

use crate::{metadata, validator};
Expand All @@ -46,6 +49,7 @@ use arrow_schema::{Field, Schema};
use bytes::Bytes;
use chrono::Utc;
use http::{HeaderName, HeaderValue};
use itertools::Itertools;
use serde_json::Value;
use std::collections::HashMap;
use std::fs;
Expand Down Expand Up @@ -85,16 +89,27 @@ pub async fn delete(req: HttpRequest) -> Result<impl Responder, StreamError> {
Ok((format!("log stream {stream_name} deleted"), StatusCode::OK))
}

pub async fn list(_: HttpRequest) -> impl Responder {
//list all streams from storage
pub async fn list(req: HttpRequest) -> Result<impl Responder, StreamError> {
let key = extract_session_key_from_req(&req)
.map_err(|err| StreamError::Anyhow(anyhow::Error::msg(err.to_string())))?;

// list all streams from storage
let res = CONFIG
.storage()
.get_object_store()
.list_streams()
.await
.unwrap();
.unwrap()
.into_iter()
.filter(|logstream| {
warn!("logstream-\n{logstream:?}");

Users.authorize(key.clone(), Action::ListStream, Some(&logstream.name), None)
== crate::rbac::Response::Authorized
})
.collect_vec();

web::Json(res)
Ok(web::Json(res))
}

pub async fn detect_schema(body: Bytes) -> Result<impl Responder, StreamError> {
Expand Down Expand Up @@ -129,7 +144,7 @@ pub async fn schema(req: HttpRequest) -> Result<impl Responder, StreamError> {
}
Err(err) => return Err(StreamError::from(err)),
};
match update_schema_when_distributed(vec![stream_name.clone()]).await {
match update_schema_when_distributed(&vec![stream_name.clone()]).await {
Ok(_) => {
let schema = STREAM_INFO.schema(&stream_name)?;
Ok((web::Json(schema), StatusCode::OK))
Expand Down
67 changes: 36 additions & 31 deletions src/handlers/http/query.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ use std::collections::HashMap;
use std::pin::Pin;
use std::sync::Arc;
use std::time::Instant;
use tracing::error;
use tracing::{error, trace};

use crate::event::error::EventError;
use crate::handlers::http::fetch_schema;
Expand Down Expand Up @@ -90,7 +90,7 @@ pub async fn query(req: HttpRequest, query_request: Query) -> Result<impl Respon
let _ = raw_logical_plan.visit(&mut visitor);

let tables = visitor.into_inner();
update_schema_when_distributed(tables).await?;
update_schema_when_distributed(&tables).await?;
let mut query: LogicalQuery = into_query(&query_request, &session_state, time_range).await?;

let creds = extract_session_key_from_req(&req)?;
Expand All @@ -100,7 +100,7 @@ pub async fn query(req: HttpRequest, query_request: Query) -> Result<impl Respon
.first_table_name()
.ok_or_else(|| QueryError::MalformedQuery("No table name found in query"))?;

authorize_and_set_filter_tags(&mut query, permissions, &table_name)?;
authorize_and_set_filter_tags(&mut query, permissions, &tables)?;

let time = Instant::now();
let (records, fields) = query.execute(table_name.clone()).await?;
Expand All @@ -122,14 +122,14 @@ pub async fn query(req: HttpRequest, query_request: Query) -> Result<impl Respon
Ok(response)
}

pub async fn update_schema_when_distributed(tables: Vec<String>) -> Result<(), QueryError> {
pub async fn update_schema_when_distributed(tables: &Vec<String>) -> Result<(), QueryError> {
if CONFIG.parseable.mode == Mode::Query {
for table in tables {
if let Ok(new_schema) = fetch_schema(&table).await {
if let Ok(new_schema) = fetch_schema(table).await {
// commit schema merges the schema internally and updates the schema in storage.
commit_schema_to_storage(&table, new_schema.clone()).await?;
commit_schema_to_storage(table, new_schema.clone()).await?;

commit_schema(&table, Arc::new(new_schema))?;
commit_schema(table, Arc::new(new_schema))?;
}
}
}
Expand All @@ -153,41 +153,46 @@ pub async fn create_streams_for_querier() {
}
}

// check auth for each table in the tables vector
pub fn authorize_and_set_filter_tags(
query: &mut LogicalQuery,
permissions: Vec<Permission>,
table_name: &str,
tables: &Vec<String>,
) -> Result<(), QueryError> {
// check authorization of this query if it references physical table;
let mut authorized = false;
let mut tags = Vec::new();

// in permission check if user can run query on the stream.
// also while iterating add any filter tags for this stream
for permission in permissions {
match permission {
Permission::Stream(Action::All, _) => {
authorized = true;
break;
}
Permission::StreamWithTag(Action::Query, ref stream, tag)
if stream == table_name || stream == "*" =>
{
authorized = true;
if let Some(tag) = tag {
tags.push(tag)

trace!("table names in auth- {tables:?}");
for table_name in tables.iter() {
let mut tags = Vec::new();

// in permission check if user can run query on the stream.
// also while iterating add any filter tags for this stream
for permission in &permissions {
match permission {
Permission::Stream(Action::All, _) => {
authorized = true;
break;
}
Permission::StreamWithTag(Action::Query, ref stream, tag)
if stream == table_name || stream == "*" =>
{
authorized = true;
if let Some(tag) = tag {
tags.push(tag.clone())
}
}
_ => (),
}
_ => (),
}
}

if !authorized {
return Err(QueryError::Unauthorized);
}
if !authorized {
return Err(QueryError::Unauthorized);
}

if !tags.is_empty() {
query.filter_tag = Some(tags)
if !tags.is_empty() {
query.filter_tag = Some(tags)
}
}

Ok(())
Expand Down
1 change: 1 addition & 0 deletions src/rbac/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ use self::map::SessionKey;
use self::role::{Permission, RoleBuilder};
use self::user::UserType;

#[derive(PartialEq)]
pub enum Response {
Authorized,
UnAuthorized,
Expand Down
2 changes: 1 addition & 1 deletion src/rbac/role.rs
Original file line number Diff line number Diff line change
Expand Up @@ -117,7 +117,6 @@ impl RoleBuilder {
| Action::CreateStream
| Action::DeleteStream
| Action::GetStreamInfo
| Action::ListStream
| Action::ListCluster
| Action::ListClusterMetrics
| Action::Deleteingestor
Expand All @@ -134,6 +133,7 @@ impl RoleBuilder {
| Action::DeleteFilter
| Action::GetAnalytics => Permission::Unit(action),
Action::Ingest
| Action::ListStream
| Action::GetSchema
| Action::DetectSchema
| Action::GetStats
Expand Down
2 changes: 1 addition & 1 deletion src/storage/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -202,7 +202,7 @@ impl ObjectStoreFormat {
}
}

#[derive(serde::Serialize, PartialEq)]
#[derive(serde::Serialize, PartialEq, Debug)]
pub struct LogStream {
pub name: String,
}
Expand Down
Loading