Skip to content

Commit

Permalink
add delete ingester endpoint
Browse files Browse the repository at this point in the history
1. Add delete_ingester_meta func in ObjectStorage Trait
2. Update Permission Actions
3. Update PostError
4. Add delete Ingeter Endpoint ->
`api/v1/cluster/ingester_ip%3Aingester_port`
  • Loading branch information
Eshanatnight committed Mar 28, 2024
1 parent 904fc8a commit b140428
Show file tree
Hide file tree
Showing 8 changed files with 94 additions and 4 deletions.
29 changes: 28 additions & 1 deletion server/src/handlers/http/cluster/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,13 +18,16 @@

pub mod utils;

use crate::handlers::http::cluster::utils::{
check_liveness, ingester_meta_filename, to_url_string,
};
use crate::handlers::http::ingest::PostError;
use crate::handlers::http::logstream::error::StreamError;
use crate::option::CONFIG;

use crate::metrics::prom_utils::Metrics;
use actix_web::http::header;
use actix_web::Responder;
use actix_web::{HttpRequest, Responder};
use http::StatusCode;
use itertools::Itertools;
use relative_path::RelativePathBuf;
Expand Down Expand Up @@ -382,3 +385,27 @@ pub async fn get_ingester_info() -> anyhow::Result<IngesterMetadataArr> {

Ok(arr)
}

pub async fn remove_ingester(req: HttpRequest) -> Result<impl Responder, PostError> {
let domain_name: String = req.match_info().get("ingester").unwrap().parse().unwrap();
let domain_name = to_url_string(domain_name);

if check_liveness(&domain_name).await {
return Err(PostError::Invalid(anyhow::anyhow!("Ingester is Online")));
}

let ingester_meta_filename = ingester_meta_filename(&domain_name);
let object_store = CONFIG.storage().get_object_store();
let msg = match object_store
.delete_ingester_meta(ingester_meta_filename)
.await
{
Ok(_) => {
format!("Ingester {} Removed", domain_name)
}
Err(err) => err.to_string(),
};

log::error!("{}", &msg);
Ok((msg, StatusCode::OK))
}
22 changes: 22 additions & 0 deletions server/src/handlers/http/cluster/utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -249,3 +249,25 @@ pub async fn send_stats_request(

Ok(Some(res))
}

/// domain_name needs to be http://ip:port
pub fn ingester_meta_filename(domain_name: &str) -> String {
if domain_name.starts_with("http://") | domain_name.starts_with("https://") {
let url = Url::parse(domain_name).unwrap();
return format!(
"ingester.{}.{}.json",
url.host_str().unwrap(),
url.port().unwrap()
);
}
format!("ingester.{}.json", domain_name)
}

pub fn to_url_string(str: String) -> String {
// if the str is already a url i am guessing that it will end in '/'
if str.starts_with("http://") || str.starts_with("https://") {
return str;
}

format!("http://{}/", str)
}
4 changes: 4 additions & 0 deletions server/src/handlers/http/ingest.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ use crate::handlers::{
};
use crate::metadata::STREAM_INFO;
use crate::option::{Mode, CONFIG};
use crate::storage::ObjectStorageError;
use crate::utils::header_parsing::{collect_labelled_headers, ParseHeaderError};

use super::logstream::error::CreateStreamError;
Expand Down Expand Up @@ -172,6 +173,8 @@ pub enum PostError {
CreateStream(#[from] CreateStreamError),
#[error("Error: {0}")]
CustomError(String),
#[error("ObjectStorageError: {0}")]
ObjectStorageError(#[from] ObjectStorageError),
}

impl actix_web::ResponseError for PostError {
Expand All @@ -187,6 +190,7 @@ impl actix_web::ResponseError for PostError {
PostError::CreateStream(_) => StatusCode::INTERNAL_SERVER_ERROR,
PostError::StreamNotFound(_) => StatusCode::NOT_FOUND,
PostError::CustomError(_) => StatusCode::INTERNAL_SERVER_ERROR,
PostError::ObjectStorageError(_) => StatusCode::INTERNAL_SERVER_ERROR,
}
}

Expand Down
14 changes: 13 additions & 1 deletion server/src/handlers/http/modal/query_server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@
*/

use crate::handlers::http::cluster::utils::check_liveness;
use crate::handlers::http::cluster::{self, get_ingester_info};
use crate::handlers::http::cluster::{self, get_ingester_info, remove_ingester};
use crate::handlers::http::middleware::RouteExt;
use crate::handlers::http::{base_path, cross_origin_config, API_BASE_PATH, API_VERSION};

Expand Down Expand Up @@ -136,19 +136,31 @@ impl QueryServer {
fn get_cluster_info_web_scope() -> actix_web::Scope {
web::scope("/cluster")
.service(
// GET "/cluster/info" ==> Get info of the cluster
web::resource("/info").route(
web::get()
.to(cluster::get_cluster_info)
.authorize(Action::ListCluster),
),
)
// GET "/cluster/metrics" ==> Get metrics of the cluster
.service(
web::resource("/metrics").route(
web::get()
.to(cluster::get_cluster_metrics)
.authorize(Action::ListClusterMetrics),
),
)
// DELETE "/cluster/{ingester_domain:port}" ==> Delete an ingester from the cluster
.service(
web::scope("/{ingester}").service(
web::resource("").route(
web::delete()
.to(remove_ingester)
.authorize(Action::DeleteIngester),
),
),
)
}

/// initialize the server, run migrations as needed and start the server
Expand Down
2 changes: 2 additions & 0 deletions server/src/rbac/role.rs
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ pub enum Action {
QueryLLM,
ListCluster,
ListClusterMetrics,
DeleteIngester,
All,
}

Expand Down Expand Up @@ -112,6 +113,7 @@ impl RoleBuilder {
| Action::All => Permission::Stream(action, self.stream.clone().unwrap()),
Action::ListCluster => Permission::Unit(action),
Action::ListClusterMetrics => Permission::Unit(action),
Action::DeleteIngester => Permission::Unit(action),
};
perms.push(perm);
}
Expand Down
8 changes: 8 additions & 0 deletions server/src/storage/localfs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -193,6 +193,14 @@ impl ObjectStorage for LocalFS {
Ok(fs::remove_dir_all(path).await?)
}

async fn delete_ingester_meta(
&self,
ingester_filename: String,
) -> Result<(), ObjectStorageError> {
let path = self.root.join(ingester_filename);
Ok(fs::remove_file(path).await?)
}

async fn list_streams(&self) -> Result<Vec<LogStream>, ObjectStorageError> {
let ignore_dir = &["lost+found"];
let directories = ReadDirStream::new(fs::read_dir(&self.root).await?);
Expand Down
5 changes: 4 additions & 1 deletion server/src/storage/object_storage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,10 @@ pub trait ObjectStorage: Sync + 'static {
async fn list_dirs(&self) -> Result<Vec<String>, ObjectStorageError>;
async fn list_dates(&self, stream_name: &str) -> Result<Vec<String>, ObjectStorageError>;
async fn upload_file(&self, key: &str, path: &Path) -> Result<(), ObjectStorageError>;

async fn delete_ingester_meta(
&self,
ingester_filename: String,
) -> Result<(), ObjectStorageError>;
/// Returns the amount of time taken by the `ObjectStore` to perform a get
/// call.
async fn get_latency(&self) -> Duration {
Expand Down
14 changes: 13 additions & 1 deletion server/src/storage/s3.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ use object_store::aws::{AmazonS3, AmazonS3Builder, AmazonS3ConfigKey, Checksum};
use object_store::limit::LimitStore;
use object_store::path::Path as StorePath;
use object_store::{ClientOptions, ObjectStore};
use relative_path::RelativePath;
use relative_path::{RelativePath, RelativePathBuf};
use tokio::fs::OpenOptions;
use tokio::io::{AsyncReadExt, AsyncWriteExt};

Expand Down Expand Up @@ -483,6 +483,18 @@ impl ObjectStorage for S3 {
Ok(())
}

async fn delete_ingester_meta(
&self,
ingester_filename: String,
) -> Result<(), ObjectStorageError> {
// considering ingester_file is in root
// to be changed when metacleanup is done
let file = RelativePathBuf::from(&ingester_filename);
self.client.delete(&to_object_store_path(&file)).await?;

Ok(())
}

async fn list_streams(&self) -> Result<Vec<LogStream>, ObjectStorageError> {
let streams = self._list_streams().await?;

Expand Down

0 comments on commit b140428

Please sign in to comment.