Skip to content

Commit

Permalink
fix: return val for count query
Browse files Browse the repository at this point in the history
  • Loading branch information
Eshanatnight committed Mar 14, 2024
1 parent d2c9ea0 commit 829a72c
Show file tree
Hide file tree
Showing 3 changed files with 122 additions and 3 deletions.
7 changes: 5 additions & 2 deletions server/src/handlers/http.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
*/

use actix_cors::Cors;
use arrow_schema::Schema;
use serde_json::Value;

use self::{modal::query_server::QueryServer, query::Query};
Expand Down Expand Up @@ -59,7 +60,7 @@ pub fn base_path_without_preceding_slash() -> String {
base_path().trim_start_matches('/').to_string()
}

pub async fn send_schema_request(stream_name: &str) -> anyhow::Result<Vec<arrow_schema::Schema>> {
pub async fn fetch_schema(stream_name: &str) -> anyhow::Result<arrow_schema::Schema> {
let mut res = vec![];
let ima = QueryServer::get_ingester_info().await.unwrap();

Expand All @@ -79,7 +80,9 @@ pub async fn send_schema_request(stream_name: &str) -> anyhow::Result<Vec<arrow_
}
}

Ok(res)
let new_schema = Schema::try_merge(res)?;

Ok(new_schema)
}

pub async fn send_query_request_to_ingester(query: &Query) -> anyhow::Result<Vec<Value>> {
Expand Down
115 changes: 115 additions & 0 deletions server/src/query.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ use datafusion::logical_expr::{Explain, Filter, LogicalPlan, PlanType, ToStringi
use datafusion::prelude::*;
use itertools::Itertools;
use once_cell::sync::Lazy;
use serde_json::{json, Value};
use std::collections::HashMap;
use std::path::{Path, PathBuf};
use std::sync::Arc;
Expand Down Expand Up @@ -297,6 +298,45 @@ fn time_from_path(path: &Path) -> DateTime<Utc> {
.unwrap()
}

pub fn flatten_objects_for_count(objects: Vec<Value>) -> Vec<Value> {
if objects.is_empty() {
return objects;
}

// check if all the keys start with "COUNT"
let flag = objects.iter().all(|obj| {
obj.as_object()
.unwrap()
.keys()
.all(|key| key.starts_with("COUNT"))
});

if flag {
let mut accum = 0u64;
let key = objects[0]
.as_object()
.unwrap()
.keys()
.next()
.unwrap()
.clone();

for obj in objects {
let count = obj.as_object().unwrap().keys().fold(0, |acc, key| {
let value = obj.as_object().unwrap().get(key).unwrap().as_u64().unwrap();
acc + value
});
accum += count;
}

vec![json!({
key: accum
})]
} else {
objects
}
}

pub mod error {
use crate::storage::ObjectStorageError;
use datafusion::error::DataFusionError;
Expand All @@ -312,6 +352,10 @@ pub mod error {

#[cfg(test)]
mod tests {
use serde_json::json;

use crate::query::flatten_objects_for_count;

use super::time_from_path;
use std::path::PathBuf;

Expand All @@ -321,4 +365,75 @@ mod tests {
let time = time_from_path(path.as_path());
assert_eq!(time.timestamp(), 1640995200);
}

#[test]
fn test_flat() {
let val = vec![
json!({
"COUNT(*)": 1
}),
json!({
"COUNT(*)": 2
}),
json!({
"COUNT(*)": 3
}),
];

let out = flatten_objects_for_count(val);
assert_eq!(out, vec![json!({"COUNT(*)": 6})]);
}

#[test]
fn test_flat_empty() {
let val = vec![];
let out = flatten_objects_for_count(val.clone());
assert_eq!(val, out);
}

#[test]
fn test_flat_single() {
let val = vec![json!({"COUNT(ALPHA)": 1}), json!({"COUNT(ALPHA)": 2})];
let out = flatten_objects_for_count(val.clone());
assert_eq!(vec![json!({"COUNT(ALPHA)": 3})], out);
}

#[test]
fn test_flat_fail() {
let val = vec![
json!({
"Num": 1
}),
json!({
"Num": 2
}),
json!({
"Num": 3
}),
];

let out = flatten_objects_for_count(val.clone());
assert_eq!(val, out);
}

#[test]
fn test_flat_multi_key() {
let val = vec![
json!({
"Num": 1,
"COUNT(*)": 1
}),
json!({
"Num": 2,
"COUNT(*)": 2
}),
json!({
"Num": 3,
"COUNT(*)": 3
}),
];

let out = flatten_objects_for_count(val.clone());
assert_eq!(val, out);
}
}
3 changes: 2 additions & 1 deletion server/src/response.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ use datafusion::arrow::record_batch::RecordBatch;
use itertools::Itertools;
use serde_json::{json, Value};

use crate::handlers::http::query::query_response_flatten;
use crate::query::flatten_objects_for_count;

pub struct QueryResponse {
pub records: Vec<RecordBatch>,
Expand Down Expand Up @@ -51,6 +51,7 @@ impl QueryResponse {
values.append(&mut imem);
}

let values = flatten_objects_for_count(values);

let response = if self.with_fields {
json!({
Expand Down

0 comments on commit 829a72c

Please sign in to comment.