Skip to content

Commit

Permalink
deepsource analysis fix
Browse files Browse the repository at this point in the history
  • Loading branch information
nikhilsinhaparseable committed May 21, 2024
1 parent 30353ab commit fc1d9db
Show file tree
Hide file tree
Showing 3 changed files with 69 additions and 47 deletions.
15 changes: 9 additions & 6 deletions server/src/handlers/http/cluster/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -575,12 +575,15 @@ pub fn init_cluster_metrics_schedular() -> Result<(), PostError> {
log::info!("Cluster metrics fetched successfully from all ingestors");
if let Ok(metrics_bytes) = serde_json::to_vec(&metrics) {
let stream_name = INTERNAL_STREAM_NAME;
if let Ok(()) = ingest_internal_stream(
stream_name.to_string(),
bytes::Bytes::from(metrics_bytes),
)
.await
{

if matches!(
ingest_internal_stream(
stream_name.to_string(),
bytes::Bytes::from(metrics_bytes),
)
.await,
Ok(())
) {
log::info!(
"Cluster metrics successfully ingested into internal stream"
);
Expand Down
2 changes: 1 addition & 1 deletion server/src/handlers/http/modal/query_server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -186,7 +186,7 @@ impl QueryServer {
analytics::init_analytics_scheduler()?;
}

if let Ok(()) = init_cluster_metrics_schedular() {
if matches!(init_cluster_metrics_schedular(), Ok(())) {
log::info!("Cluster metrics scheduler started successfully");
}
let (localsync_handler, mut localsync_outbox, localsync_inbox) = sync::run_local_sync();
Expand Down
99 changes: 59 additions & 40 deletions server/src/metrics/prom_utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -105,86 +105,89 @@ impl Metrics {
if let PromValue::Gauge(val) = sample.value {
prom_dress.parseable_events_ingested += val;
}
} else if &sample.metric == "parseable_events_ingested_size" {
}
if &sample.metric == "parseable_events_ingested_size" {
if let PromValue::Gauge(val) = sample.value {
prom_dress.parseable_events_ingested_size += val;
}
} else if &sample.metric == "parseable_lifetime_events_ingested" {
}
if &sample.metric == "parseable_lifetime_events_ingested" {
if let PromValue::Gauge(val) = sample.value {
prom_dress.parseable_lifetime_events_ingested += val;
}
} else if &sample.metric == "parseable_lifetime_events_ingested_size" {
}
if &sample.metric == "parseable_lifetime_events_ingested_size" {
if let PromValue::Gauge(val) = sample.value {
prom_dress.parseable_lifetime_events_ingested_size += val;
}
} else if &sample.metric == "parseable_deleted_events_ingested" {
}
if &sample.metric == "parseable_deleted_events_ingested" {
if let PromValue::Gauge(val) = sample.value {
prom_dress.parseable_deleted_events_ingested += val;
}
} else if &sample.metric == "parseable_deleted_events_ingested_size" {
}
if &sample.metric == "parseable_deleted_events_ingested_size" {
if let PromValue::Gauge(val) = sample.value {
prom_dress.parseable_deleted_events_ingested_size += val;
}
} else if sample.metric == "parseable_staging_files" {
}
if sample.metric == "parseable_staging_files" {
if let PromValue::Gauge(val) = sample.value {
prom_dress.parseable_staging_files += val;
}
} else if sample.metric == "process_resident_memory_bytes" {
}
if sample.metric == "process_resident_memory_bytes" {
if let PromValue::Gauge(val) = sample.value {
prom_dress.process_resident_memory_bytes += val;
}
} else if sample.metric == "parseable_storage_size" {
if sample.labels.get("type").expect("type is present") == "data" {
if let PromValue::Gauge(val) = sample.value {
prom_dress.parseable_storage_size.data += val;
}
} else if sample.labels.get("type").expect("type is present") == "staging" {
if let PromValue::Gauge(val) = sample.value {
prom_dress.parseable_storage_size.staging += val;
}
}
if sample.metric == "parseable_storage_size"
&& sample.labels.get("type").expect("type is present") == "data"
{
if let PromValue::Gauge(val) = sample.value {
prom_dress.parseable_storage_size.data += val;
}
}
if sample.metric == "parseable_storage_size"
&& sample.labels.get("type").expect("type is present") == "staging"
{
if let PromValue::Gauge(val) = sample.value {
prom_dress.parseable_storage_size.staging += val;
}
} else if sample.metric == "parseable_lifetime_events_storage_size" {
if sample.labels.get("type").expect("type is present") == "data" {
if let PromValue::Gauge(val) = sample.value {
prom_dress.parseable_lifetime_storage_size.data += val;
}
}

if sample.metric == "parseable_lifetime_events_storage_size"
&& sample.labels.get("type").expect("type is present") == "data"
{
if let PromValue::Gauge(val) = sample.value {
prom_dress.parseable_lifetime_storage_size.data += val;
}
} else if sample.metric == "parseable_deleted_events_storage_size"
}
if sample.metric == "parseable_deleted_events_storage_size"
&& sample.labels.get("type").expect("type is present") == "data"
{
if let PromValue::Gauge(val) = sample.value {
prom_dress.parseable_deleted_storage_size.data += val;
}
}
}
let about_api_json = Self::from_about_api_response(ingestor_metadata.clone())
let (commit_id, staging, cache) = Self::from_about_api_response(ingestor_metadata.clone())
.await
.map_err(|err| {
log::error!("Fatal: failed to get ingestor info: {:?}", err);
PostError::Invalid(err.into())
})?;
let commit_id = about_api_json
.get("commit")
.and_then(|x| x.as_str())
.unwrap_or_default();
let staging = about_api_json
.get("staging")
.and_then(|x| x.as_str())
.unwrap_or_default();
let cache = about_api_json
.get("cache")
.and_then(|x| x.as_str())
.unwrap_or_default();
prom_dress.commit = commit_id.to_string();
prom_dress.staging = staging.to_string();
prom_dress.cache = cache.to_string();

prom_dress.commit = commit_id;
prom_dress.staging = staging;
prom_dress.cache = cache;

Ok(prom_dress)
}

pub async fn from_about_api_response(
ingestor_metadata: IngestorMetadata,
) -> Result<serde_json::Value, PostError> {
) -> Result<(String, String, String), PostError> {
let uri = Url::parse(&format!(
"{}{}/about",
&ingestor_metadata.domain_name,
Expand All @@ -203,7 +206,23 @@ impl Metrics {
let about_api_json = res.text().await.map_err(PostError::NetworkError)?;
let about_api_json: serde_json::Value =
serde_json::from_str(&about_api_json).map_err(PostError::SerdeError)?;
Ok(about_api_json)
let commit_id = about_api_json
.get("commit")
.and_then(|x| x.as_str())
.unwrap_or_default();
let staging = about_api_json
.get("staging")
.and_then(|x| x.as_str())
.unwrap_or_default();
let cache = about_api_json
.get("cache")
.and_then(|x| x.as_str())
.unwrap_or_default();
Ok((
commit_id.to_string(),
staging.to_string(),
cache.to_string(),
))
} else {
log::warn!(
"Failed to fetch about API response from ingestor: {}\n",
Expand Down

0 comments on commit fc1d9db

Please sign in to comment.