Skip to content

Commit

Permalink
fix: handle edge cases while parsing ingester indentifiers
Browse files Browse the repository at this point in the history
  • Loading branch information
Eshanatnight committed Mar 29, 2024
1 parent 187854e commit 564c20c
Show file tree
Hide file tree
Showing 6 changed files with 38 additions and 14 deletions.
17 changes: 12 additions & 5 deletions server/src/handlers/http/cluster/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ use crate::handlers::http::logstream::error::StreamError;
use crate::option::CONFIG;

use crate::metrics::prom_utils::Metrics;
use crate::storage::ObjectStorageError;
use actix_web::http::header;
use actix_web::{HttpRequest, Responder};
use http::StatusCode;
Expand Down Expand Up @@ -354,21 +355,27 @@ pub async fn remove_ingester(req: HttpRequest) -> Result<impl Responder, PostErr
let domain_name = to_url_string(domain_name);

if check_liveness(&domain_name).await {
return Err(PostError::Invalid(anyhow::anyhow!("Ingester is Online")));
return Err(PostError::Invalid(anyhow::anyhow!("Node Online")));
}

let ingester_meta_filename = ingester_meta_filename(&domain_name);
let object_store = CONFIG.storage().get_object_store();
let msg = match object_store
.delete_ingester_meta(ingester_meta_filename)
.try_delete_ingester_meta(ingester_meta_filename)
.await
{
Ok(_) => {
format!("Ingester {} Removed", domain_name)
format!("Node {} Removed Successfully", domain_name)
}
Err(err) => {
if matches!(err, ObjectStorageError::IoError(_)) {
format!("Node {} Not Found", domain_name)
} else {
format!("Error Removing Node {}\n Reason: {}", domain_name, err)
}
}
Err(err) => err.to_string(),
};

log::error!("{}", &msg);
log::info!("{}", &msg);
Ok((msg, StatusCode::OK))
}
8 changes: 7 additions & 1 deletion server/src/handlers/http/cluster/utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -188,7 +188,13 @@ pub fn merge_quried_stats(stats: Vec<QueriedStats>) -> QueriedStats {
}

pub async fn check_liveness(domain_name: &str) -> bool {
let uri = Url::parse(&format!("{}liveness", domain_name)).unwrap();
let uri = match Url::parse(&format!("{}liveness", domain_name)) {
Ok(uri) => uri,
Err(err) => {
log::error!("Node Indentifier Failed To Parse: {}", err);
return false;
}
};

let reqw = reqwest::Client::new()
.get(uri)
Expand Down
4 changes: 2 additions & 2 deletions server/src/handlers/http/modal/query_server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@
*/

use crate::handlers::http::cluster::utils::check_liveness;
use crate::handlers::http::cluster::{self, get_ingester_info, remove_ingester};
use crate::handlers::http::cluster::{self, get_ingester_info};
use crate::handlers::http::middleware::RouteExt;
use crate::handlers::http::{base_path, cross_origin_config, API_BASE_PATH, API_VERSION};

Expand Down Expand Up @@ -156,7 +156,7 @@ impl QueryServer {
web::scope("/{ingester}").service(
web::resource("").route(
web::delete()
.to(remove_ingester)
.to(cluster::remove_ingester)
.authorize(Action::DeleteIngester),
),
),
Expand Down
2 changes: 1 addition & 1 deletion server/src/storage/localfs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -193,7 +193,7 @@ impl ObjectStorage for LocalFS {
Ok(fs::remove_dir_all(path).await?)
}

async fn delete_ingester_meta(
async fn try_delete_ingester_meta(
&self,
ingester_filename: String,
) -> Result<(), ObjectStorageError> {
Expand Down
2 changes: 1 addition & 1 deletion server/src/storage/object_storage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,7 @@ pub trait ObjectStorage: Sync + 'static {
async fn list_dirs(&self) -> Result<Vec<String>, ObjectStorageError>;
async fn list_dates(&self, stream_name: &str) -> Result<Vec<String>, ObjectStorageError>;
async fn upload_file(&self, key: &str, path: &Path) -> Result<(), ObjectStorageError>;
async fn delete_ingester_meta(
async fn try_delete_ingester_meta(
&self,
ingester_filename: String,
) -> Result<(), ObjectStorageError>;
Expand Down
19 changes: 15 additions & 4 deletions server/src/storage/s3.rs
Original file line number Diff line number Diff line change
Expand Up @@ -482,14 +482,25 @@ impl ObjectStorage for S3 {
Ok(())
}

async fn delete_ingester_meta(
async fn try_delete_ingester_meta(
&self,
ingester_filename: String,
) -> Result<(), ObjectStorageError> {
let file = RelativePathBuf::from(&ingester_filename);
self.client.delete(&to_object_store_path(&file)).await?;

Ok(())
match self.client.delete(&to_object_store_path(&file)).await {
Ok(_) => Ok(()),
Err(err) => {
// if the object is not found, it is not an error
// the given url path was incorrect
if matches!(err, object_store::Error::NotFound { .. }) {
log::error!("Node does not exist");
Err(err.into())
} else {
log::error!("Error deleting ingester meta file: {:?}", err);
Err(err.into())
}
}
}
}

async fn list_streams(&self) -> Result<Vec<LogStream>, ObjectStorageError> {
Expand Down

0 comments on commit 564c20c

Please sign in to comment.