Skip to content

Commit

Permalink
Update: /cluster/info endpoint to also return staging path (#713)
Browse files Browse the repository at this point in the history
  • Loading branch information
Eshanatnight authored Mar 26, 2024
1 parent b1a8262 commit 81b241d
Show file tree
Hide file tree
Showing 2 changed files with 81 additions and 12 deletions.
20 changes: 20 additions & 0 deletions server/src/handlers/http/about.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,26 @@ use crate::{
};
use std::path::PathBuf;

/// {
/// "version": current_version,
/// "uiVersion": ui_version,
/// "commit": commit,
/// "deploymentId": deployment_id,
/// "updateAvailable": update_available,
/// "latestVersion": latest_release,
/// "llmActive": is_llm_active,
/// "llmProvider": llm_provider,
/// "oidcActive": is_oidc_active,
/// "license": "AGPL-3.0-only",
/// "mode": mode,
/// "staging": staging,
/// "cache": cache_details,
/// "grpcPort": grpc_port,
/// "store": {
/// "type": CONFIG.get_storage_mode_string(),
/// "path": store_endpoint
/// }
/// }
pub async fn about() -> Json<serde_json::Value> {
let meta = StorageMetadata::global();

Expand Down
73 changes: 61 additions & 12 deletions server/src/handlers/http/modal/query_server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ use itertools::Itertools;
use relative_path::RelativePathBuf;
use reqwest::Response;
use serde::{Deserialize, Serialize};
use serde_json::Value as JsonValue;
use std::sync::Arc;
use url::Url;

Expand Down Expand Up @@ -194,20 +195,62 @@ impl QueryServer {
let mut infos = vec![];

for ingester in ingester_infos {
let uri = Url::parse(&format!("{}liveness", ingester.domain_name))
.expect("should always be a valid url");
let uri = Url::parse(&format!(
"{}{}/about",
ingester.domain_name,
base_path_without_preceding_slash()
))
.expect("should always be a valid url");

let reqw = reqwest::Client::new()
let resp = reqwest::Client::new()
.get(uri)
.header(header::AUTHORIZATION, ingester.token.clone())
.header(header::CONTENT_TYPE, "application/json")
.send()
.await;

let (reachable, staging_path, error, status) = if let Ok(resp) = resp {
let status = Some(resp.status().to_string());

let resp_data = resp.bytes().await.map_err(|err| {
log::error!("Fatal: failed to parse ingester info to bytes: {:?}", err);
StreamError::Custom {
msg: format!("failed to parse ingester info to bytes: {:?}", err),
status: StatusCode::INTERNAL_SERVER_ERROR,
}
})?;

let sp = serde_json::from_slice::<JsonValue>(&resp_data)
.map_err(|err| {
log::error!("Fatal: failed to parse ingester info: {:?}", err);
StreamError::Custom {
msg: format!("failed to parse ingester info: {:?}", err),
status: StatusCode::INTERNAL_SERVER_ERROR,
}
})?
.get("staging")
.unwrap()
.as_str()
.unwrap()
.to_string();

(true, sp, None, status)
} else {
(
false,
"".to_owned(),
resp.as_ref().err().map(|e| e.to_string()),
resp.unwrap_err().status().map(|s| s.to_string()),
)
};

infos.push(ClusterInfo::new(
&ingester.domain_name,
reqw.is_ok(),
reqw.as_ref().err().map(|e| e.to_string()),
reqw.ok().map(|r| r.status().to_string()),
reachable,
staging_path,
CONFIG.storage().get_endpoint(),
error,
status,
));
}

Expand Down Expand Up @@ -375,8 +418,8 @@ impl QueryServer {
let client = reqwest::Client::new();
let res = client
.get(url)
.header("Content-Type", "application/json")
.header("Authorization", ingester.token)
.header(header::CONTENT_TYPE, "application/json")
.header(header::AUTHORIZATION, ingester.token)
.send()
.await
.map_err(|err| {
Expand Down Expand Up @@ -424,8 +467,8 @@ impl QueryServer {
let client = reqwest::Client::new();
let res = client
.put(url)
.header("Content-Type", "application/json")
.header("Authorization", ingester.token)
.header(header::CONTENT_TYPE, "application/json")
.header(header::AUTHORIZATION, ingester.token)
.send()
.await
.map_err(|err| {
Expand Down Expand Up @@ -473,8 +516,8 @@ impl QueryServer {
let client = reqwest::Client::new();
let resp = client
.delete(url)
.header("Content-Type", "application/json")
.header("Authorization", ingester.token)
.header(header::CONTENT_TYPE, "application/json")
.header(header::AUTHORIZATION, ingester.token)
.send()
.await
.map_err(|err| {
Expand Down Expand Up @@ -649,6 +692,8 @@ impl StorageStats {
struct ClusterInfo {
domain_name: String,
reachable: bool,
staging_path: String,
storage_path: String,
error: Option<String>, // error message if the ingester is not reachable
status: Option<String>, // status message if the ingester is reachable
}
Expand All @@ -657,12 +702,16 @@ impl ClusterInfo {
fn new(
domain_name: &str,
reachable: bool,
staging_path: String,
storage_path: String,
error: Option<String>,
status: Option<String>,
) -> Self {
Self {
domain_name: domain_name.to_string(),
reachable,
staging_path,
storage_path,
error,
status,
}
Expand Down

0 comments on commit 81b241d

Please sign in to comment.