From e959ab9c712bdf127ee2eab3f66d87baf7c8f011 Mon Sep 17 00:00:00 2001 From: Suresh Kumar Date: Wed, 28 Aug 2024 12:56:10 -0700 Subject: [PATCH 01/10] fix: handle error in JQ rule via error object --- core/main/src/broker/endpoint_broker.rs | 34 +++++++++++++------------ core/main/src/broker/thunder_broker.rs | 8 ++++-- 2 files changed, 24 insertions(+), 18 deletions(-) diff --git a/core/main/src/broker/endpoint_broker.rs b/core/main/src/broker/endpoint_broker.rs index 7bae230c7..20b7d5c33 100644 --- a/core/main/src/broker/endpoint_broker.rs +++ b/core/main/src/broker/endpoint_broker.rs @@ -25,7 +25,7 @@ use ripple_sdk::{ }, extn::extn_client_message::{ExtnEvent, ExtnMessage}, framework::RippleResponse, - log::{error, trace}, + log::{debug, error, trace}, tokio::{ self, sync::mpsc::{self, Receiver, Sender}, @@ -413,7 +413,7 @@ impl EndpointBrokerState { ) { let (id, _updated_request) = self.update_request(&rpc_request, rule.clone(), extn_message); let mut data = JsonRpcApiResponse::default(); - // return em[ty result and handle the rest with jq rule + // return empty result and handle the rest with jq rule let jv: Value = "".into(); data.result = Some(jv); data.id = Some(id); @@ -682,7 +682,12 @@ impl BrokerOutputForwarder { apply_response(v.data.clone(), filter, &rpc_request, &mut v); } } else { - trace!("start_forwarder: null result"); + debug!("start_forwarder: no result {:?}", v); + if let Some(filter) = broker_request.rule.transform.get_transform_data( + super::rules_engine::RuleTransformType::Response, + ) { + apply_response(v.data.clone(), filter, &rpc_request, &mut v); + } } let request_id = rpc_request.ctx.call_id; @@ -793,21 +798,18 @@ fn apply_response( format!("{}_response", rpc_request.ctx.method), ) { Ok(r) => { - trace!( + debug!( "jq rendered output {:?} original input {:?} for filter {}", - r, - v, - result_response_filter + r, v, result_response_filter ); - /* - weird corner case where the filter is "then \"null\"" which is a jq way to return null - */ - if r.to_string().to_lowercase().contains("null") { - v.data.result = Some(Value::Null); - v.data.error = None; - } else if r.to_string().to_lowercase().contains("error") { - v.data.error = Some(r); - v.data.result = None; + if r.is_object() { + if let Some(error) = r.get("error") { + v.data.error = Some(error.clone()); + v.data.result = None; + } else { + v.data.result = Some(r); + v.data.error = None; + } } else { v.data.result = Some(r); v.data.error = None; diff --git a/core/main/src/broker/thunder_broker.rs b/core/main/src/broker/thunder_broker.rs index 454a23287..d85b2958c 100644 --- a/core/main/src/broker/thunder_broker.rs +++ b/core/main/src/broker/thunder_broker.rs @@ -164,8 +164,12 @@ impl ThunderBroker { let mut new_response = response.clone(); if response.params.is_some() { new_response.result = response.params.clone(); - } else if response.error.is_some() { - new_response.result = response.error.clone(); + + // {"jsonrpc": "2.0","id": 1000, "method": "SecureStorage.get", "params": {"scope": "device", "key": "foo"} } + // {"jsonrpc":"2.0","id":1000,"result":{"code":22,"message":"ERROR_UNKNOWN_KEY"},"error":{"code":22,"message":"ERROR_UNKNOWN_KEY"}} + + //} else if response.error.is_some() { + // new_response.result = response.error.clone(); } new_response } From ed3ef57a2a7d999a08e223b792f8274945cddf51 Mon Sep 17 00:00:00 2001 From: Suresh Kumar Date: Wed, 28 Aug 2024 14:17:51 -0700 Subject: [PATCH 02/10] build: cleanup --- core/main/src/broker/thunder_broker.rs | 6 ------ 1 file changed, 6 deletions(-) diff --git a/core/main/src/broker/thunder_broker.rs b/core/main/src/broker/thunder_broker.rs index d85b2958c..4872ad063 100644 --- a/core/main/src/broker/thunder_broker.rs +++ b/core/main/src/broker/thunder_broker.rs @@ -164,12 +164,6 @@ impl ThunderBroker { let mut new_response = response.clone(); if response.params.is_some() { new_response.result = response.params.clone(); - - // {"jsonrpc": "2.0","id": 1000, "method": "SecureStorage.get", "params": {"scope": "device", "key": "foo"} } - // {"jsonrpc":"2.0","id":1000,"result":{"code":22,"message":"ERROR_UNKNOWN_KEY"},"error":{"code":22,"message":"ERROR_UNKNOWN_KEY"}} - - //} else if response.error.is_some() { - // new_response.result = response.error.clone(); } new_response } From ab5a4086f0c794b1edfc4735fc70fdfa38b90b7c Mon Sep 17 00:00:00 2001 From: Suresh Kumar Date: Thu, 29 Aug 2024 10:18:12 -0700 Subject: [PATCH 03/10] build: cleanup --- core/main/src/broker/endpoint_broker.rs | 30 +++++++++++-------------- 1 file changed, 13 insertions(+), 17 deletions(-) diff --git a/core/main/src/broker/endpoint_broker.rs b/core/main/src/broker/endpoint_broker.rs index 20b7d5c33..ccdcb1a0b 100644 --- a/core/main/src/broker/endpoint_broker.rs +++ b/core/main/src/broker/endpoint_broker.rs @@ -600,6 +600,7 @@ impl BrokerOutputForwarder { let rpc_request = broker_request.rpc.clone(); let session_id = rpc_request.ctx.get_id(); let is_subscription = rpc_request.is_subscription(); + let mut apply_response_needed = false; // Step 1: Create the data if let Some(result) = v.data.result.clone() { @@ -674,15 +675,14 @@ impl BrokerOutputForwarder { "event" : rpc_request.ctx.method })); platform_state.endpoint_state.update_unsubscribe_request(id); - } else if let Some(filter) = - broker_request.rule.transform.get_transform_data( - super::rules_engine::RuleTransformType::Response, - ) - { - apply_response(v.data.clone(), filter, &rpc_request, &mut v); + } else { + apply_response_needed = true; } } else { debug!("start_forwarder: no result {:?}", v); + apply_response_needed = true; + } + if apply_response_needed { if let Some(filter) = broker_request.rule.transform.get_transform_data( super::rules_engine::RuleTransformType::Response, ) { @@ -797,21 +797,17 @@ fn apply_response( &result_response_filter, format!("{}_response", rpc_request.ctx.method), ) { - Ok(r) => { + Ok(jq_out) => { debug!( "jq rendered output {:?} original input {:?} for filter {}", - r, v, result_response_filter + jq_out, v, result_response_filter ); - if r.is_object() { - if let Some(error) = r.get("error") { - v.data.error = Some(error.clone()); - v.data.result = None; - } else { - v.data.result = Some(r); - v.data.error = None; - } + + if jq_out.is_object() && jq_out.get("error").is_some() { + v.data.error = Some(jq_out.get("error").unwrap().clone()); + v.data.result = None; } else { - v.data.result = Some(r); + v.data.result = Some(jq_out); v.data.error = None; } trace!("mutated output {:?}", v); From 63457a86dc0bf5897997e8c339a1772379058c76 Mon Sep 17 00:00:00 2001 From: Suresh Kumar Date: Thu, 29 Aug 2024 10:21:57 -0700 Subject: [PATCH 04/10] build: cleanup --- core/main/src/broker/endpoint_broker.rs | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/core/main/src/broker/endpoint_broker.rs b/core/main/src/broker/endpoint_broker.rs index ccdcb1a0b..8c497b825 100644 --- a/core/main/src/broker/endpoint_broker.rs +++ b/core/main/src/broker/endpoint_broker.rs @@ -25,7 +25,7 @@ use ripple_sdk::{ }, extn::extn_client_message::{ExtnEvent, ExtnMessage}, framework::RippleResponse, - log::{debug, error, trace}, + log::{error, trace}, tokio::{ self, sync::mpsc::{self, Receiver, Sender}, @@ -679,7 +679,7 @@ impl BrokerOutputForwarder { apply_response_needed = true; } } else { - debug!("start_forwarder: no result {:?}", v); + trace!("start_forwarder: no result {:?}", v); apply_response_needed = true; } if apply_response_needed { @@ -798,7 +798,7 @@ fn apply_response( format!("{}_response", rpc_request.ctx.method), ) { Ok(jq_out) => { - debug!( + trace!( "jq rendered output {:?} original input {:?} for filter {}", jq_out, v, result_response_filter ); From 3f3333587dc8083938d7db5943521d04f4e0e855 Mon Sep 17 00:00:00 2001 From: Suresh Kumar Date: Thu, 29 Aug 2024 12:10:39 -0700 Subject: [PATCH 05/10] fix: unittest --- core/main/src/broker/endpoint_broker.rs | 13 +++++++------ 1 file changed, 7 insertions(+), 6 deletions(-) diff --git a/core/main/src/broker/endpoint_broker.rs b/core/main/src/broker/endpoint_broker.rs index 8c497b825..766956f5a 100644 --- a/core/main/src/broker/endpoint_broker.rs +++ b/core/main/src/broker/endpoint_broker.rs @@ -991,20 +991,21 @@ mod tests { let rpc_request = RpcRequest::new("new_method".to_string(), "params".to_string(), ctx); let data = JsonRpcApiResponse::mock(); let mut output: BrokerOutput = BrokerOutput { data: data.clone() }; - let filter = "if .result and .result.success then (.result.stbVersion | split(\"_\") [0]) elif .error then if .error.code == -32601 then {\"error\":\"Unknown method.\"} else \"Error occurred with a different code\" end else \"No result or recognizable error\" end".to_string(); + let filter = "if .result and .result.success then (.result.stbVersion | split(\"_\") [0]) elif .error then if .error.code == -32601 then {error: { code: -1, message: \"Unknown method.\" }} else \"Error occurred with a different code\" end else \"No result or recognizable error\" end".to_string(); let mut response = JsonRpcApiResponse::mock(); response.error = Some(error); apply_response(response, filter, &rpc_request, &mut output); + //let msg = output.data.error.unwrap().get("message").unwrap().clone(); assert_eq!( - output.data.error.unwrap(), - json!({"error":"Unknown method."}) + output.data.error.unwrap().get("message").unwrap().clone(), + json!("Unknown method.".to_string()) ); // securestorage.get code 22 in error response let error = json!({"code":22,"message":"test error code 22"}); let data = JsonRpcApiResponse::mock(); let mut output: BrokerOutput = BrokerOutput { data: data.clone() }; - let filter = "if .result and .result.success then .result.value elif .error.code==22 or .error.code==43 then \"null\" else .error end".to_string(); + let filter = "if .result and .result.success then .result.value elif .error.code==22 or .error.code==43 then null else .error end".to_string(); let mut response = JsonRpcApiResponse::mock(); response.error = Some(error); apply_response(response, filter, &rpc_request, &mut output); @@ -1015,7 +1016,7 @@ mod tests { let error = json!({"code":300,"message":"test error code 300"}); let data = JsonRpcApiResponse::mock(); let mut output: BrokerOutput = BrokerOutput { data: data.clone() }; - let filter = "if .result and .result.success then .result.value elif .error.code==22 or .error.code==43 then \"null\" else .error end".to_string(); + let filter = "if .result and .result.success then .result.value elif .error.code==22 or .error.code==43 then null else { error: .error } end".to_string(); let mut response = JsonRpcApiResponse::mock(); response.error = Some(error.clone()); apply_response(response, filter, &rpc_request, &mut output); @@ -1122,7 +1123,7 @@ mod tests { let result = json!({"success":true}); let data = JsonRpcApiResponse::mock(); let mut output: BrokerOutput = BrokerOutput { data: data.clone() }; - let filter = "if .result.success then \"null\" else { code: -32100, message: \"couldn't set skip restriction\" } end".to_string(); + let filter = "if .result.success then null else { code: -32100, message: \"couldn't set skip restriction\" } end".to_string(); let mut response = JsonRpcApiResponse::mock(); response.result = Some(result); apply_response(response, filter, &rpc_request, &mut output); From 709bc7f442c1bb9ed7d1b51513b0828be93ac5d0 Mon Sep 17 00:00:00 2001 From: Suresh Kumar Date: Thu, 29 Aug 2024 12:26:23 -0700 Subject: [PATCH 06/10] fix: update info message Rule not available --- core/main/src/broker/rules_engine.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/core/main/src/broker/rules_engine.rs b/core/main/src/broker/rules_engine.rs index 8db2fad25..459358113 100644 --- a/core/main/src/broker/rules_engine.rs +++ b/core/main/src/broker/rules_engine.rs @@ -195,7 +195,7 @@ impl RuleEngine { rule.transform.apply_context(rpc_request); return Some(rule); } else { - info!("Rule not available for {}", rpc_request.method); + info!("Rule not available for {}, hence falling back to extension handler", rpc_request.method); } None } From 26e20305892db2a5993221d82be067e12eab3355 Mon Sep 17 00:00:00 2001 From: Suresh Kumar Date: Thu, 29 Aug 2024 12:28:04 -0700 Subject: [PATCH 07/10] build: fmt --- core/main/src/broker/endpoint_broker.rs | 4 +++- core/main/src/broker/rules_engine.rs | 5 ++++- 2 files changed, 7 insertions(+), 2 deletions(-) diff --git a/core/main/src/broker/endpoint_broker.rs b/core/main/src/broker/endpoint_broker.rs index 766956f5a..b1d3e6307 100644 --- a/core/main/src/broker/endpoint_broker.rs +++ b/core/main/src/broker/endpoint_broker.rs @@ -800,7 +800,9 @@ fn apply_response( Ok(jq_out) => { trace!( "jq rendered output {:?} original input {:?} for filter {}", - jq_out, v, result_response_filter + jq_out, + v, + result_response_filter ); if jq_out.is_object() && jq_out.get("error").is_some() { diff --git a/core/main/src/broker/rules_engine.rs b/core/main/src/broker/rules_engine.rs index 459358113..c2de717b0 100644 --- a/core/main/src/broker/rules_engine.rs +++ b/core/main/src/broker/rules_engine.rs @@ -195,7 +195,10 @@ impl RuleEngine { rule.transform.apply_context(rpc_request); return Some(rule); } else { - info!("Rule not available for {}, hence falling back to extension handler", rpc_request.method); + info!( + "Rule not available for {}, hence falling back to extension handler", + rpc_request.method + ); } None } From eddaa9f6712b5b782f9b28d2469bbe8af5491401 Mon Sep 17 00:00:00 2001 From: Suresh Kumar Date: Mon, 2 Sep 2024 21:17:50 -0700 Subject: [PATCH 08/10] fix: remove success --- core/main/src/broker/endpoint_broker.rs | 133 ++++++++++++++++++------ core/main/src/broker/thunder_broker.rs | 36 ++++++- 2 files changed, 134 insertions(+), 35 deletions(-) diff --git a/core/main/src/broker/endpoint_broker.rs b/core/main/src/broker/endpoint_broker.rs index b1d3e6307..38f943082 100644 --- a/core/main/src/broker/endpoint_broker.rs +++ b/core/main/src/broker/endpoint_broker.rs @@ -25,7 +25,7 @@ use ripple_sdk::{ }, extn::extn_client_message::{ExtnEvent, ExtnMessage}, framework::RippleResponse, - log::{error, trace}, + log::{debug as trace, error}, tokio::{ self, sync::mpsc::{self, Receiver, Sender}, @@ -584,14 +584,15 @@ impl BrokerOutputForwarder { let event_utility_clone = event_utility.clone(); tokio::spawn(async move { - while let Some(mut v) = rx.recv().await { + while let Some(output) = rx.recv().await { + let mut response = output.data.clone(); let mut is_event = false; // First validate the id check if it could be an event - let id = if let Some(e) = v.get_event() { + let id = if let Some(e) = output.get_event() { is_event = true; Some(e) } else { - v.data.id + response.id }; if let Some(id) = id { @@ -603,13 +604,17 @@ impl BrokerOutputForwarder { let mut apply_response_needed = false; // Step 1: Create the data - if let Some(result) = v.data.result.clone() { + if let Some(mut result) = response.result.clone() { if is_event { + //if !process_event(response.clone()) { + // continue; + //} + /* */ apply_rule_for_event( &broker_request, - &result, + &mut result, &rpc_request, - &mut v, + &mut response, ); if !apply_filter(&broker_request, &result, &rpc_request) { continue; @@ -635,14 +640,14 @@ impl BrokerOutputForwarder { ) .await { - v.data.result = Some(value.expect("REASON")); + response.result = Some(value.expect("REASON")); } - v.data.id = Some(request_id); + response.id = Some(request_id); let message = ApiMessage { request_id: request_id.to_string(), protocol, - jsonrpc_msg: serde_json::to_string(&v.data) + jsonrpc_msg: serde_json::to_string(&response) .unwrap(), }; @@ -666,11 +671,12 @@ impl BrokerOutputForwarder { ); } } + /* */ } else if is_subscription { if sub_processed { continue; } - v.data.result = Some(json!({ + response.result = Some(json!({ "listening" : rpc_request.is_listening(), "event" : rpc_request.ctx.method })); @@ -678,26 +684,32 @@ impl BrokerOutputForwarder { } else { apply_response_needed = true; } - } else { - trace!("start_forwarder: no result {:?}", v); + } else if response.error.is_some() { + trace!("processing error {:?}", response); // XXX: apply_response_needed = true; + } else { + trace!("No result and No error?! {:?}", response); + // GIGO + response.result = Some(Value::Null); + println!("====== response: {:?}", response) } + if apply_response_needed { if let Some(filter) = broker_request.rule.transform.get_transform_data( super::rules_engine::RuleTransformType::Response, ) { - apply_response(v.data.clone(), filter, &rpc_request, &mut v); + apply_response(&mut response, filter, &rpc_request); } } let request_id = rpc_request.ctx.call_id; - v.data.id = Some(request_id); + response.id = Some(request_id); // Step 2: Create the message let message = ApiMessage { request_id: request_id.to_string(), protocol: rpc_request.ctx.protocol.clone(), - jsonrpc_msg: serde_json::to_string(&v.data).unwrap(), + jsonrpc_msg: serde_json::to_string(&response).unwrap(), }; // Step 3: Handle Non Extension @@ -706,7 +718,7 @@ impl BrokerOutputForwarder { platform_state.endpoint_state.get_extn_message(id, is_event) { if is_event { - forward_extn_event(&extn_message, v.data, &platform_state) + forward_extn_event(&extn_message, response, &platform_state) .await; } else { return_extn_response(message, extn_message) @@ -724,10 +736,14 @@ impl BrokerOutputForwarder { .await } } else { - error!("start_forwarder:{} request not found {:?}", line!(), v); + error!( + "start_forwarder:{} request not found {:?}", + line!(), + response + ); } } else { - error!("Error couldnt broker the event {:?}", v) + error!("Error couldnt broker the event {:?}", response) } } }); @@ -785,12 +801,12 @@ async fn forward_extn_event( } fn apply_response( - rcp_response: JsonRpcApiResponse, + response: &mut JsonRpcApiResponse, result_response_filter: String, rpc_request: &RpcRequest, - v: &mut BrokerOutput, + //v: &mut BrokerOutput, ) { - match serde_json::to_value(rcp_response) { + match serde_json::to_value(response.clone()) { Ok(input) => { match jq_compile( input, @@ -801,38 +817,86 @@ fn apply_response( trace!( "jq rendered output {:?} original input {:?} for filter {}", jq_out, - v, + response, result_response_filter ); if jq_out.is_object() && jq_out.get("error").is_some() { - v.data.error = Some(jq_out.get("error").unwrap().clone()); - v.data.result = None; + response.error = Some(jq_out.get("error").unwrap().clone()); + response.result = None; } else { - v.data.result = Some(jq_out); - v.data.error = None; + response.result = Some(jq_out); + response.error = None; } - trace!("mutated output {:?}", v); + trace!("mutated output {:?}", response); } Err(e) => { - v.data.error = Some(json!(e.to_string())); + response.error = Some(json!(e.to_string())); error!("jq_compile error {:?}", e); } } } Err(e) => { - v.data.error = Some(json!(e.to_string())); + response.error = Some(json!(e.to_string())); error!("json rpc response error {:?}", e); } } } +fn process_event(response: &mut JsonRpcApiResponse) -> bool { + println!("====== process_event: {:?}", response); + /* + apply_rule_for_event(&broker_request, &result, &rpc_request, &mut v); + if !apply_filter(&broker_request, &result, &rpc_request) { + false; + } + // check if the request transform has event_decorator_method + if let Some(decorator_method) = broker_request.rule.transform.event_decorator_method.clone() { + if let Some(func) = event_utility_clone.get_function(&decorator_method) { + // spawn a tokio thread to run the function and continue the main thread. + let session_id = rpc_request.ctx.get_id(); + let request_id = rpc_request.ctx.call_id; + let protocol = rpc_request.ctx.protocol.clone(); + let platform_state_c = platform_state.clone(); + let ctx = rpc_request.ctx.clone(); + tokio::spawn(async move { + if let Ok(value) = + func(platform_state_c.clone(), ctx.clone(), Some(result.clone())).await + { + response.result = Some(value.expect("REASON")); + } + response.id = Some(request_id); + + let message = ApiMessage { + request_id: request_id.to_string(), + protocol, + jsonrpc_msg: serde_json::to_string(&data).unwrap(), + }; + + if let Some(session) = platform_state_c + .session_state + .get_session_for_connection_id(&session_id) + { + return_api_message_for_transport(session, message, platform_state_c).await + } + }); + false; + } else { + error!("Failed to invoke decorator method {:?}", decorator_method); + } + } + */ + true +} + fn apply_rule_for_event( broker_request: &BrokerRequest, - result: &Value, + result: &mut Value, rpc_request: &RpcRequest, - v: &mut BrokerOutput, + data: &mut JsonRpcApiResponse, + //v: &mut BrokerOutput, ) { + println!("=====> apply_rule_for_event: {:?}", result); if let Some(filter) = broker_request .rule .transform @@ -843,9 +907,11 @@ fn apply_rule_for_event( &filter, format!("{}_event", rpc_request.ctx.method), ) { - v.data.result = Some(r); + data.result = Some(r); + //*result = r; //Some(r); } } + println!("<===== apply_rule_for_event: {:?}", result); } fn apply_filter(broker_request: &BrokerRequest, result: &Value, rpc_request: &RpcRequest) -> bool { @@ -856,6 +922,7 @@ fn apply_filter(broker_request: &BrokerRequest, result: &Value, rpc_request: &Rp format!("{}_event filter", rpc_request.ctx.method), ) { if r.is_null() { + println!("======> apply_filter returns null"); return false; } else { // get bool value for r and return diff --git a/core/main/src/broker/thunder_broker.rs b/core/main/src/broker/thunder_broker.rs index 4872ad063..697a6aa35 100644 --- a/core/main/src/broker/thunder_broker.rs +++ b/core/main/src/broker/thunder_broker.rs @@ -30,7 +30,7 @@ use ripple_sdk::{ tokio::{self, sync::mpsc}, utils::error::RippleError, }; -use serde_json::json; +use serde_json::{json, Value}; use std::{ sync::{Arc, RwLock}, vec, @@ -161,10 +161,42 @@ impl ThunderBroker { } fn update_response(response: &JsonRpcApiResponse) -> JsonRpcApiResponse { - let mut new_response = response.clone(); + let mut new_response = JsonRpcApiResponse::default(); + new_response.id = response.id; + new_response.jsonrpc = response.jsonrpc.clone(); + + println!("=====> update_response: {:?}", response); if response.params.is_some() { + new_response = response.clone(); new_response.result = response.params.clone(); + } else if response.error.is_some() { + new_response.error = response.error.clone(); + } else { + if let Some(result) = response.result.clone() { + if result.is_object() { + let mut new_result = result.as_object().unwrap().clone(); + for (key, value) in result.as_object().unwrap() { + if key.eq("success") { + if let Some(succes) = value.as_bool() { + if succes == false { + let error: Value = + r#"{ "code": 1, "message": "ERROR_GENERAL"}"#.into(); + new_response.error = Some(error); + new_response.result = None; + } else { + let _ = new_result.remove(key); + } + } + break; + } + } + new_response.result = Some(Value::Object(new_result)); + } else { + new_response.result = response.result.clone(); + } + } } + println!("<===== update_response: {:?}", new_response); new_response } From 4f3eccf05a6765aaac1e7d269077d07a1cb459ef Mon Sep 17 00:00:00 2001 From: Suresh Kumar Date: Mon, 2 Sep 2024 21:31:14 -0700 Subject: [PATCH 09/10] fix: return null for emyt object --- core/main/src/broker/thunder_broker.rs | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/core/main/src/broker/thunder_broker.rs b/core/main/src/broker/thunder_broker.rs index 697a6aa35..1e3140887 100644 --- a/core/main/src/broker/thunder_broker.rs +++ b/core/main/src/broker/thunder_broker.rs @@ -190,7 +190,9 @@ impl ThunderBroker { break; } } - new_response.result = Some(Value::Object(new_result)); + if !new_result.is_empty() { + new_response.result = Some(Value::Object(new_result)); + } } else { new_response.result = response.result.clone(); } From 47639c4509a56ccdb74babeb9d44783003f45322 Mon Sep 17 00:00:00 2001 From: Suresh Kumar Date: Tue, 3 Sep 2024 15:19:37 -0700 Subject: [PATCH 10/10] build: cleanup --- core/main/src/broker/endpoint_broker.rs | 174 ++++++++---------------- core/main/src/broker/rules_engine.rs | 5 +- 2 files changed, 54 insertions(+), 125 deletions(-) diff --git a/core/main/src/broker/endpoint_broker.rs b/core/main/src/broker/endpoint_broker.rs index 38f943082..7bae230c7 100644 --- a/core/main/src/broker/endpoint_broker.rs +++ b/core/main/src/broker/endpoint_broker.rs @@ -25,7 +25,7 @@ use ripple_sdk::{ }, extn::extn_client_message::{ExtnEvent, ExtnMessage}, framework::RippleResponse, - log::{debug as trace, error}, + log::{error, trace}, tokio::{ self, sync::mpsc::{self, Receiver, Sender}, @@ -413,7 +413,7 @@ impl EndpointBrokerState { ) { let (id, _updated_request) = self.update_request(&rpc_request, rule.clone(), extn_message); let mut data = JsonRpcApiResponse::default(); - // return empty result and handle the rest with jq rule + // return em[ty result and handle the rest with jq rule let jv: Value = "".into(); data.result = Some(jv); data.id = Some(id); @@ -584,15 +584,14 @@ impl BrokerOutputForwarder { let event_utility_clone = event_utility.clone(); tokio::spawn(async move { - while let Some(output) = rx.recv().await { - let mut response = output.data.clone(); + while let Some(mut v) = rx.recv().await { let mut is_event = false; // First validate the id check if it could be an event - let id = if let Some(e) = output.get_event() { + let id = if let Some(e) = v.get_event() { is_event = true; Some(e) } else { - response.id + v.data.id }; if let Some(id) = id { @@ -601,20 +600,15 @@ impl BrokerOutputForwarder { let rpc_request = broker_request.rpc.clone(); let session_id = rpc_request.ctx.get_id(); let is_subscription = rpc_request.is_subscription(); - let mut apply_response_needed = false; // Step 1: Create the data - if let Some(mut result) = response.result.clone() { + if let Some(result) = v.data.result.clone() { if is_event { - //if !process_event(response.clone()) { - // continue; - //} - /* */ apply_rule_for_event( &broker_request, - &mut result, + &result, &rpc_request, - &mut response, + &mut v, ); if !apply_filter(&broker_request, &result, &rpc_request) { continue; @@ -640,14 +634,14 @@ impl BrokerOutputForwarder { ) .await { - response.result = Some(value.expect("REASON")); + v.data.result = Some(value.expect("REASON")); } - response.id = Some(request_id); + v.data.id = Some(request_id); let message = ApiMessage { request_id: request_id.to_string(), protocol, - jsonrpc_msg: serde_json::to_string(&response) + jsonrpc_msg: serde_json::to_string(&v.data) .unwrap(), }; @@ -671,45 +665,34 @@ impl BrokerOutputForwarder { ); } } - /* */ } else if is_subscription { if sub_processed { continue; } - response.result = Some(json!({ + v.data.result = Some(json!({ "listening" : rpc_request.is_listening(), "event" : rpc_request.ctx.method })); platform_state.endpoint_state.update_unsubscribe_request(id); - } else { - apply_response_needed = true; + } else if let Some(filter) = + broker_request.rule.transform.get_transform_data( + super::rules_engine::RuleTransformType::Response, + ) + { + apply_response(v.data.clone(), filter, &rpc_request, &mut v); } - } else if response.error.is_some() { - trace!("processing error {:?}", response); // XXX: - apply_response_needed = true; } else { - trace!("No result and No error?! {:?}", response); - // GIGO - response.result = Some(Value::Null); - println!("====== response: {:?}", response) - } - - if apply_response_needed { - if let Some(filter) = broker_request.rule.transform.get_transform_data( - super::rules_engine::RuleTransformType::Response, - ) { - apply_response(&mut response, filter, &rpc_request); - } + trace!("start_forwarder: null result"); } let request_id = rpc_request.ctx.call_id; - response.id = Some(request_id); + v.data.id = Some(request_id); // Step 2: Create the message let message = ApiMessage { request_id: request_id.to_string(), protocol: rpc_request.ctx.protocol.clone(), - jsonrpc_msg: serde_json::to_string(&response).unwrap(), + jsonrpc_msg: serde_json::to_string(&v.data).unwrap(), }; // Step 3: Handle Non Extension @@ -718,7 +701,7 @@ impl BrokerOutputForwarder { platform_state.endpoint_state.get_extn_message(id, is_event) { if is_event { - forward_extn_event(&extn_message, response, &platform_state) + forward_extn_event(&extn_message, v.data, &platform_state) .await; } else { return_extn_response(message, extn_message) @@ -736,14 +719,10 @@ impl BrokerOutputForwarder { .await } } else { - error!( - "start_forwarder:{} request not found {:?}", - line!(), - response - ); + error!("start_forwarder:{} request not found {:?}", line!(), v); } } else { - error!("Error couldnt broker the event {:?}", response) + error!("Error couldnt broker the event {:?}", v) } } }); @@ -801,102 +780,59 @@ async fn forward_extn_event( } fn apply_response( - response: &mut JsonRpcApiResponse, + rcp_response: JsonRpcApiResponse, result_response_filter: String, rpc_request: &RpcRequest, - //v: &mut BrokerOutput, + v: &mut BrokerOutput, ) { - match serde_json::to_value(response.clone()) { + match serde_json::to_value(rcp_response) { Ok(input) => { match jq_compile( input, &result_response_filter, format!("{}_response", rpc_request.ctx.method), ) { - Ok(jq_out) => { + Ok(r) => { trace!( "jq rendered output {:?} original input {:?} for filter {}", - jq_out, - response, + r, + v, result_response_filter ); - - if jq_out.is_object() && jq_out.get("error").is_some() { - response.error = Some(jq_out.get("error").unwrap().clone()); - response.result = None; + /* + weird corner case where the filter is "then \"null\"" which is a jq way to return null + */ + if r.to_string().to_lowercase().contains("null") { + v.data.result = Some(Value::Null); + v.data.error = None; + } else if r.to_string().to_lowercase().contains("error") { + v.data.error = Some(r); + v.data.result = None; } else { - response.result = Some(jq_out); - response.error = None; + v.data.result = Some(r); + v.data.error = None; } - trace!("mutated output {:?}", response); + trace!("mutated output {:?}", v); } Err(e) => { - response.error = Some(json!(e.to_string())); + v.data.error = Some(json!(e.to_string())); error!("jq_compile error {:?}", e); } } } Err(e) => { - response.error = Some(json!(e.to_string())); + v.data.error = Some(json!(e.to_string())); error!("json rpc response error {:?}", e); } } } -fn process_event(response: &mut JsonRpcApiResponse) -> bool { - println!("====== process_event: {:?}", response); - /* - apply_rule_for_event(&broker_request, &result, &rpc_request, &mut v); - if !apply_filter(&broker_request, &result, &rpc_request) { - false; - } - // check if the request transform has event_decorator_method - if let Some(decorator_method) = broker_request.rule.transform.event_decorator_method.clone() { - if let Some(func) = event_utility_clone.get_function(&decorator_method) { - // spawn a tokio thread to run the function and continue the main thread. - let session_id = rpc_request.ctx.get_id(); - let request_id = rpc_request.ctx.call_id; - let protocol = rpc_request.ctx.protocol.clone(); - let platform_state_c = platform_state.clone(); - let ctx = rpc_request.ctx.clone(); - tokio::spawn(async move { - if let Ok(value) = - func(platform_state_c.clone(), ctx.clone(), Some(result.clone())).await - { - response.result = Some(value.expect("REASON")); - } - response.id = Some(request_id); - - let message = ApiMessage { - request_id: request_id.to_string(), - protocol, - jsonrpc_msg: serde_json::to_string(&data).unwrap(), - }; - - if let Some(session) = platform_state_c - .session_state - .get_session_for_connection_id(&session_id) - { - return_api_message_for_transport(session, message, platform_state_c).await - } - }); - false; - } else { - error!("Failed to invoke decorator method {:?}", decorator_method); - } - } - */ - true -} - fn apply_rule_for_event( broker_request: &BrokerRequest, - result: &mut Value, + result: &Value, rpc_request: &RpcRequest, - data: &mut JsonRpcApiResponse, - //v: &mut BrokerOutput, + v: &mut BrokerOutput, ) { - println!("=====> apply_rule_for_event: {:?}", result); if let Some(filter) = broker_request .rule .transform @@ -907,11 +843,9 @@ fn apply_rule_for_event( &filter, format!("{}_event", rpc_request.ctx.method), ) { - data.result = Some(r); - //*result = r; //Some(r); + v.data.result = Some(r); } } - println!("<===== apply_rule_for_event: {:?}", result); } fn apply_filter(broker_request: &BrokerRequest, result: &Value, rpc_request: &RpcRequest) -> bool { @@ -922,7 +856,6 @@ fn apply_filter(broker_request: &BrokerRequest, result: &Value, rpc_request: &Rp format!("{}_event filter", rpc_request.ctx.method), ) { if r.is_null() { - println!("======> apply_filter returns null"); return false; } else { // get bool value for r and return @@ -1060,21 +993,20 @@ mod tests { let rpc_request = RpcRequest::new("new_method".to_string(), "params".to_string(), ctx); let data = JsonRpcApiResponse::mock(); let mut output: BrokerOutput = BrokerOutput { data: data.clone() }; - let filter = "if .result and .result.success then (.result.stbVersion | split(\"_\") [0]) elif .error then if .error.code == -32601 then {error: { code: -1, message: \"Unknown method.\" }} else \"Error occurred with a different code\" end else \"No result or recognizable error\" end".to_string(); + let filter = "if .result and .result.success then (.result.stbVersion | split(\"_\") [0]) elif .error then if .error.code == -32601 then {\"error\":\"Unknown method.\"} else \"Error occurred with a different code\" end else \"No result or recognizable error\" end".to_string(); let mut response = JsonRpcApiResponse::mock(); response.error = Some(error); apply_response(response, filter, &rpc_request, &mut output); - //let msg = output.data.error.unwrap().get("message").unwrap().clone(); assert_eq!( - output.data.error.unwrap().get("message").unwrap().clone(), - json!("Unknown method.".to_string()) + output.data.error.unwrap(), + json!({"error":"Unknown method."}) ); // securestorage.get code 22 in error response let error = json!({"code":22,"message":"test error code 22"}); let data = JsonRpcApiResponse::mock(); let mut output: BrokerOutput = BrokerOutput { data: data.clone() }; - let filter = "if .result and .result.success then .result.value elif .error.code==22 or .error.code==43 then null else .error end".to_string(); + let filter = "if .result and .result.success then .result.value elif .error.code==22 or .error.code==43 then \"null\" else .error end".to_string(); let mut response = JsonRpcApiResponse::mock(); response.error = Some(error); apply_response(response, filter, &rpc_request, &mut output); @@ -1085,7 +1017,7 @@ mod tests { let error = json!({"code":300,"message":"test error code 300"}); let data = JsonRpcApiResponse::mock(); let mut output: BrokerOutput = BrokerOutput { data: data.clone() }; - let filter = "if .result and .result.success then .result.value elif .error.code==22 or .error.code==43 then null else { error: .error } end".to_string(); + let filter = "if .result and .result.success then .result.value elif .error.code==22 or .error.code==43 then \"null\" else .error end".to_string(); let mut response = JsonRpcApiResponse::mock(); response.error = Some(error.clone()); apply_response(response, filter, &rpc_request, &mut output); @@ -1192,7 +1124,7 @@ mod tests { let result = json!({"success":true}); let data = JsonRpcApiResponse::mock(); let mut output: BrokerOutput = BrokerOutput { data: data.clone() }; - let filter = "if .result.success then null else { code: -32100, message: \"couldn't set skip restriction\" } end".to_string(); + let filter = "if .result.success then \"null\" else { code: -32100, message: \"couldn't set skip restriction\" } end".to_string(); let mut response = JsonRpcApiResponse::mock(); response.result = Some(result); apply_response(response, filter, &rpc_request, &mut output); diff --git a/core/main/src/broker/rules_engine.rs b/core/main/src/broker/rules_engine.rs index c2de717b0..8db2fad25 100644 --- a/core/main/src/broker/rules_engine.rs +++ b/core/main/src/broker/rules_engine.rs @@ -195,10 +195,7 @@ impl RuleEngine { rule.transform.apply_context(rpc_request); return Some(rule); } else { - info!( - "Rule not available for {}, hence falling back to extension handler", - rpc_request.method - ); + info!("Rule not available for {}", rpc_request.method); } None }