Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: endpoint for uplink connection status #350

Merged
merged 2 commits into from
Jul 9, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions uplink/src/base/mqtt/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ use tokio::{select, task};
use std::fs::File;
use std::io::Read;
use std::path::Path;
use std::sync::Mutex;

use crate::{Action, Config};
use rumqttc::{
Expand Down Expand Up @@ -58,13 +59,16 @@ pub struct Mqtt {
/// Control handles
ctrl_rx: Receiver<MqttShutdown>,
ctrl_tx: Sender<MqttShutdown>,
/// True when network is connected
network_up: Arc<Mutex<bool>>,
}

impl Mqtt {
pub fn new(
config: Arc<Config>,
actions_tx: Sender<Action>,
metrics_tx: Sender<MqttMetrics>,
network_up: Arc<Mutex<bool>>,
) -> Mqtt {
// create a new eventloop and reuse it during every reconnection
let options = mqttoptions(&config);
Expand All @@ -81,6 +85,7 @@ impl Mqtt {
metrics_tx,
ctrl_tx,
ctrl_rx,
network_up,
}
}

Expand Down Expand Up @@ -168,6 +173,7 @@ impl Mqtt {
event = self.eventloop.poll() => {
match event {
Ok(Event::Incoming(Incoming::ConnAck(connack))) => {
*self.network_up.lock().unwrap() = true;
info!("Connected to broker. Session present = {}", connack.session_present);
let subscription = self.config.actions_subscription.clone();
let client = self.client();
Expand Down Expand Up @@ -213,6 +219,7 @@ impl Mqtt {
}
}
Err(e) => {
*self.network_up.lock().unwrap() = false;
self.metrics.add_reconnection();
self.check_disconnection_metrics(e);
tokio::time::sleep(Duration::from_secs(3)).await;
Expand Down
2 changes: 1 addition & 1 deletion uplink/src/base/serializer/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -224,7 +224,7 @@ impl StorageHandler {
}

/// The uplink Serializer is the component that deals with serializing, compressing and writing data onto disk or Network.
/// In case of network issues, the Serializer enters various states depending on the severeness, managed by [`start()`].
/// In case of network issues, the Serializer enters various states depending on the severeness, managed by [`start()`].
///
/// The Serializer writes data directly to network in **normal mode** with the [`try_publish()`] method on the MQTT client.
/// In case of the network being slow, this fails and we are forced into **slow mode**, where-in new data gets written into
Expand Down
22 changes: 19 additions & 3 deletions uplink/src/console.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,12 +2,13 @@ use std::sync::{Arc, Mutex};

use axum::{
extract::State,
http::StatusCode,
http::{response::Builder, StatusCode},
response::IntoResponse,
routing::{post, put},
routing::{get, post, put},
Router,
};
use log::info;
use serde_json::json;
use uplink::base::CtrlTx;

use crate::ReloadHandle;
Expand All @@ -17,6 +18,7 @@ struct StateHandle {
reload_handle: ReloadHandle,
ctrl_tx: CtrlTx,
downloader_disable: Arc<Mutex<bool>>,
network_up: Arc<Mutex<bool>>,
}

#[tokio::main]
Expand All @@ -25,15 +27,17 @@ pub async fn start(
reload_handle: ReloadHandle,
ctrl_tx: CtrlTx,
downloader_disable: Arc<Mutex<bool>>,
network_up: Arc<Mutex<bool>>,
) {
let address = format!("0.0.0.0:{port}");
info!("Starting uplink console server: {address}");
let state = StateHandle { reload_handle, ctrl_tx, downloader_disable };
let state = StateHandle { reload_handle, ctrl_tx, downloader_disable, network_up };
let app = Router::new()
.route("/logs", post(reload_loglevel))
.route("/shutdown", post(shutdown))
.route("/disable_downloader", put(disable_downloader))
.route("/enable_downloader", put(enable_downloader))
.route("/status", get(status))
.with_state(state);

axum::Server::bind(&address.parse().unwrap()).serve(app.into_make_service()).await.unwrap();
Expand Down Expand Up @@ -78,3 +82,15 @@ async fn enable_downloader(State(state): State<StateHandle>) -> impl IntoRespons
StatusCode::ACCEPTED
}
}

// Pushes uplink status as JSON text
async fn status(State(state): State<StateHandle>) -> impl IntoResponse {
Builder::new()
.body(
json!({
"connected": *state.network_up.lock().unwrap(),
})
.to_string(),
)
.unwrap()
}
4 changes: 3 additions & 1 deletion uplink/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -133,11 +133,13 @@ impl Uplink {
&mut self,
mut bridge: Bridge,
downloader_disable: Arc<Mutex<bool>>,
network_up: Arc<Mutex<bool>>,
) -> Result<CtrlTx, Error> {
let (mqtt_metrics_tx, mqtt_metrics_rx) = bounded(10);
let (ctrl_actions_lane, ctrl_data_lane) = bridge.ctrl_tx();

let mut mqtt = Mqtt::new(self.config.clone(), self.action_tx.clone(), mqtt_metrics_tx);
let mut mqtt =
Mqtt::new(self.config.clone(), self.action_tx.clone(), mqtt_metrics_tx, network_up);
let mqtt_client = mqtt.client();
let ctrl_mqtt = mqtt.ctrl_tx();

Expand Down
7 changes: 4 additions & 3 deletions uplink/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ pub type ReloadHandle =
use uplink::config::{AppConfig, Config, StreamConfig, MAX_BATCH_SIZE};
use uplink::{simulator, spawn_named_thread, TcpJson, Uplink};

const DEFAULT_CONFIG: &str = r#"
const DEFAULT_CONFIG: &str = r#"
[mqtt]
max_packet_size = 256000
max_inflight = 100
Expand Down Expand Up @@ -329,7 +329,8 @@ fn main() -> Result<(), Error> {
};

let downloader_disable = Arc::new(Mutex::new(false));
let ctrl_tx = uplink.spawn(bridge, downloader_disable.clone())?;
let network_up = Arc::new(Mutex::new(false));
let ctrl_tx = uplink.spawn(bridge, downloader_disable.clone(), network_up.clone())?;

if let Some(config) = config.simulator.clone() {
spawn_named_thread("Simulator", || {
Expand All @@ -341,7 +342,7 @@ fn main() -> Result<(), Error> {
let port = config.console.port;
let ctrl_tx = ctrl_tx.clone();
spawn_named_thread("Uplink Console", move || {
console::start(port, reload_handle, ctrl_tx, downloader_disable)
console::start(port, reload_handle, ctrl_tx, downloader_disable, network_up)
});
}

Expand Down
Loading