Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor: setup connections with bridge before spawn #277

Merged
merged 14 commits into from
Sep 7, 2023
196 changes: 110 additions & 86 deletions uplink/src/base/bridge/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -84,9 +84,8 @@ impl Point for Payload {
}

#[derive(Debug)]
#[allow(clippy::large_enum_variant)]
pub enum Event {
/// App name and handle for brige to send actions to the app
RegisterActionRoute(String, ActionRouter),
/// Data sent by the app
Data(Payload),
/// Sometime apps can choose to directly send action response instead
Expand Down Expand Up @@ -153,6 +152,42 @@ impl Bridge {
}
}

pub fn register_action_route(&mut self, route: ActionRoute) -> Receiver<Action> {
let (actions_tx, actions_rx) = bounded(1);
self.insert_route(route, actions_tx.clone());

actions_rx
}

pub fn register_action_routes<R: Into<ActionRoute>, V: IntoIterator<Item = R>>(
&mut self,
routes: V,
) -> Option<Receiver<Action>> {
let routes: Vec<ActionRoute> = routes.into_iter().map(|n| n.into()).collect();
if routes.is_empty() {
return None;
}

let (actions_tx, actions_rx) = bounded(1);
for route in routes {
self.insert_route(route, actions_tx.clone());
}

Some(actions_rx)
}

fn insert_route(
&mut self,
ActionRoute { name, timeout }: ActionRoute,
actions_tx: Sender<Action>,
) {
let duration = Duration::from_secs(timeout);
let action_router = ActionRouter { actions_tx, duration };
if self.action_routes.insert(name.clone(), action_router).is_some() {
panic!("Action Route clash: {name}");
}
}

pub fn tx(&mut self) -> BridgeTx {
BridgeTx { events_tx: self.bridge_tx.clone(), shutdown_handle: self.ctrl_tx.clone() }
}
Expand Down Expand Up @@ -212,9 +247,6 @@ impl Bridge {
event = self.bridge_rx.recv_async() => {
let event = event?;
match event {
Event::RegisterActionRoute(name, tx) => {
self.action_routes.insert(name, tx);
}
Event::Data(v) => {
self.streams.forward(v).await;
}
Expand Down Expand Up @@ -358,6 +390,11 @@ impl Bridge {
let mut fwd_action = inflight_action.action.clone();
fwd_action.name = fwd_name.to_owned();

debug!(
"Redirecting action {}: {} ~> {}",
fwd_action.action_id, inflight_action.action.name, fwd_action.name
);

if let Err(e) = self.try_route_action(fwd_action.clone()) {
error!("Failed to route action to app. Error = {:?}", e);
self.forward_action_error(fwd_action, e).await;
Expand Down Expand Up @@ -451,41 +488,6 @@ pub struct BridgeTx {
}

impl BridgeTx {
pub async fn register_action_route(&self, route: ActionRoute) -> Receiver<Action> {
let (actions_tx, actions_rx) = bounded(1);
let ActionRoute { name, timeout } = route;
let duration = Duration::from_secs(timeout);
let action_router = ActionRouter { actions_tx, duration };
let event = Event::RegisterActionRoute(name, action_router);

// Bridge should always be up and hence unwrap is ok
self.events_tx.send_async(event).await.unwrap();
actions_rx
}

pub async fn register_action_routes<R: Into<ActionRoute>, V: IntoIterator<Item = R>>(
&self,
routes: V,
) -> Option<Receiver<Action>> {
let routes: Vec<ActionRoute> = routes.into_iter().map(|n| n.into()).collect();
if routes.is_empty() {
return None;
}

let (actions_tx, actions_rx) = bounded(1);

for route in routes {
let ActionRoute { name, timeout } = route;
let duration = Duration::from_secs(timeout);
let action_router = ActionRouter { actions_tx: actions_tx.clone(), duration };
let event = Event::RegisterActionRoute(name, action_router);
// Bridge should always be up and hence unwrap is ok
self.events_tx.send_async(event).await.unwrap();
}

Some(actions_rx)
}

pub async fn send_payload(&self, payload: Payload) {
let event = Event::Data(payload);
self.events_tx.send_async(event).await.unwrap()
Expand Down Expand Up @@ -537,21 +539,21 @@ mod tests {
}
}

fn start_bridge(config: Arc<Config>) -> (BridgeTx, Sender<Action>, Receiver<Box<dyn Package>>) {
fn create_bridge(config: Arc<Config>) -> (Bridge, Sender<Action>, Receiver<Box<dyn Package>>) {
let (package_tx, package_rx) = bounded(10);
let (metrics_tx, _) = bounded(10);
let (actions_tx, actions_rx) = bounded(10);
let (shutdown_handle, _) = bounded(1);
let bridge = Bridge::new(config, package_tx, metrics_tx, actions_rx, shutdown_handle);

let mut bridge = Bridge::new(config, package_tx, metrics_tx, actions_rx, shutdown_handle);
let bridge_tx = bridge.tx();
(bridge, actions_tx, package_rx)
}

fn spawn_bridge(mut bridge: Bridge) {
std::thread::spawn(move || {
let rt = Runtime::new().unwrap();
rt.block_on(async { bridge.start().await.unwrap() });
});

(bridge_tx, actions_tx, package_rx)
}

struct Responses {
Expand All @@ -575,12 +577,14 @@ mod tests {
let tmpdir = tempdir::TempDir::new("bridge").unwrap();
std::env::set_current_dir(&tmpdir).unwrap();
let config = Arc::new(default_config());
let (bridge_tx, actions_tx, package_rx) = start_bridge(config);
let (mut bridge, actions_tx, package_rx) = create_bridge(config);
let route_1 = ActionRoute { name: "route_1".to_string(), timeout: 10 };
let route_1_rx = bridge_tx.register_action_route(route_1).await;
let route_1_rx = bridge.register_action_route(route_1);

let route_2 = ActionRoute { name: "route_2".to_string(), timeout: 30 };
let route_2_rx = bridge_tx.register_action_route(route_2).await;
let route_2_rx = bridge.register_action_route(route_2);

spawn_bridge(bridge);

std::thread::spawn(move || {
let rt = Runtime::new().unwrap();
Expand Down Expand Up @@ -653,10 +657,12 @@ mod tests {
let tmpdir = tempdir::TempDir::new("bridge").unwrap();
std::env::set_current_dir(&tmpdir).unwrap();
let config = Arc::new(default_config());
let (bridge_tx, actions_tx, package_rx) = start_bridge(config);
let (mut bridge, actions_tx, package_rx) = create_bridge(config);

let test_route = ActionRoute { name: "test".to_string(), timeout: 30 };
let action_rx = bridge_tx.register_action_route(test_route).await;
let action_rx = bridge.register_action_route(test_route);

spawn_bridge(bridge);

std::thread::spawn(move || loop {
let action = action_rx.recv().unwrap();
Expand Down Expand Up @@ -699,10 +705,13 @@ mod tests {
let tmpdir = tempdir::TempDir::new("bridge").unwrap();
std::env::set_current_dir(&tmpdir).unwrap();
let config = Arc::new(default_config());
let (bridge_tx, actions_tx, package_rx) = start_bridge(config);
let (mut bridge, actions_tx, package_rx) = create_bridge(config);

let test_route = ActionRoute { name: "test".to_string(), timeout: 30 };
let action_rx = bridge_tx.register_action_route(test_route).await;
let action_rx = bridge.register_action_route(test_route);
let bridge_tx = bridge.tx();

spawn_bridge(bridge);

std::thread::spawn(move || loop {
let action = action_rx.recv().unwrap();
Expand Down Expand Up @@ -741,31 +750,36 @@ mod tests {
std::env::set_current_dir(&tmpdir).unwrap();
let mut config = default_config();
config.action_redirections.insert("test".to_string(), "redirect".to_string());
let (bridge_tx, actions_tx, package_rx) = start_bridge(Arc::new(config));
let bridge_tx_clone = bridge_tx.clone();
let (mut bridge, actions_tx, package_rx) = create_bridge(Arc::new(config));
let bridge_tx_1 = bridge.tx();
let bridge_tx_2 = bridge.tx();

let test_route = ActionRoute { name: "test".to_string(), timeout: 30 };
let action_rx_1 = bridge.register_action_route(test_route);

let redirect_route = ActionRoute { name: "redirect".to_string(), timeout: 30 };
let action_rx_2 = bridge.register_action_route(redirect_route);

spawn_bridge(bridge);

std::thread::spawn(move || {
let rt = Runtime::new().unwrap();
let test_route = ActionRoute { name: "test".to_string(), timeout: 30 };
let action_rx = rt.block_on(bridge_tx.register_action_route(test_route));
let action = action_rx.recv().unwrap();
let action = action_rx_1.recv().unwrap();
assert_eq!(action.action_id, "1".to_owned());
std::thread::sleep(Duration::from_secs(1));
let response = ActionResponse::progress("1", "Tested", 100);
rt.block_on(bridge_tx.send_action_response(response));
rt.block_on(bridge_tx_1.send_action_response(response));
});

std::thread::spawn(move || {
let rt = Runtime::new().unwrap();
let test_route = ActionRoute { name: "redirect".to_string(), timeout: 30 };
let action_rx = rt.block_on(bridge_tx_clone.register_action_route(test_route));
let action = action_rx.recv().unwrap();
let action = action_rx_2.recv().unwrap();
assert_eq!(action.action_id, "1".to_owned());
let response = ActionResponse::progress("1", "Redirected", 0);
rt.block_on(bridge_tx_clone.send_action_response(response));
rt.block_on(bridge_tx_2.send_action_response(response));
std::thread::sleep(Duration::from_secs(1));
let response = ActionResponse::success("1");
rt.block_on(bridge_tx_clone.send_action_response(response));
rt.block_on(bridge_tx_2.send_action_response(response));
});

std::thread::sleep(Duration::from_secs(1));
Expand Down Expand Up @@ -800,33 +814,38 @@ mod tests {
let tmpdir = tempdir::TempDir::new("bridge").unwrap();
std::env::set_current_dir(&tmpdir).unwrap();
let config = default_config();
let (bridge_tx, actions_tx, package_rx) = start_bridge(Arc::new(config));
let bridge_tx_clone = bridge_tx.clone();
let (mut bridge, actions_tx, package_rx) = create_bridge(Arc::new(config));
let bridge_tx_1 = bridge.tx();
let bridge_tx_2 = bridge.tx();

let tunshell_route = ActionRoute { name: TUNSHELL_ACTION.to_string(), timeout: 30 };
let action_rx_1 = bridge.register_action_route(tunshell_route);

let test_route = ActionRoute { name: "test".to_string(), timeout: 30 };
let action_rx_2 = bridge.register_action_route(test_route);

spawn_bridge(bridge);

std::thread::spawn(move || {
let rt = Runtime::new().unwrap();
let tunshell_route = ActionRoute { name: TUNSHELL_ACTION.to_string(), timeout: 30 };
let action_rx = rt.block_on(bridge_tx.register_action_route(tunshell_route));
let action = action_rx.recv().unwrap();
let action = action_rx_1.recv().unwrap();
assert_eq!(action.action_id, "1");
let response = ActionResponse::progress(&action.action_id, "Launched", 0);
rt.block_on(bridge_tx.send_action_response(response));
rt.block_on(bridge_tx_1.send_action_response(response));
std::thread::sleep(Duration::from_secs(3));
let response = ActionResponse::success(&action.action_id);
rt.block_on(bridge_tx.send_action_response(response));
rt.block_on(bridge_tx_1.send_action_response(response));
});

std::thread::spawn(move || {
let rt = Runtime::new().unwrap();
let test_route = ActionRoute { name: "test".to_string(), timeout: 30 };
let action_rx = rt.block_on(bridge_tx_clone.register_action_route(test_route));
let action = action_rx.recv().unwrap();
let action = action_rx_2.recv().unwrap();
assert_eq!(action.action_id, "2");
let response = ActionResponse::progress(&action.action_id, "Running", 0);
rt.block_on(bridge_tx_clone.send_action_response(response));
rt.block_on(bridge_tx_2.send_action_response(response));
std::thread::sleep(Duration::from_secs(1));
let response = ActionResponse::success(&action.action_id);
rt.block_on(bridge_tx_clone.send_action_response(response));
rt.block_on(bridge_tx_2.send_action_response(response));
});

std::thread::sleep(Duration::from_secs(1));
Expand Down Expand Up @@ -881,33 +900,38 @@ mod tests {
let tmpdir = tempdir::TempDir::new("bridge").unwrap();
std::env::set_current_dir(&tmpdir).unwrap();
let config = default_config();
let (bridge_tx, actions_tx, package_rx) = start_bridge(Arc::new(config));
let bridge_tx_clone = bridge_tx.clone();
let (mut bridge, actions_tx, package_rx) = create_bridge(Arc::new(config));
let bridge_tx_1 = bridge.tx();
let bridge_tx_2 = bridge.tx();

let test_route = ActionRoute { name: "test".to_string(), timeout: 30 };
let action_rx_1 = bridge.register_action_route(test_route);

let tunshell_route = ActionRoute { name: TUNSHELL_ACTION.to_string(), timeout: 30 };
let action_rx = bridge.register_action_route(tunshell_route);

spawn_bridge(bridge);

std::thread::spawn(move || {
let rt = Runtime::new().unwrap();
let test_route = ActionRoute { name: "test".to_string(), timeout: 30 };
let action_rx = rt.block_on(bridge_tx_clone.register_action_route(test_route));
let action = action_rx.recv().unwrap();
let action = action_rx_1.recv().unwrap();
assert_eq!(action.action_id, "1");
let response = ActionResponse::progress(&action.action_id, "Running", 0);
rt.block_on(bridge_tx_clone.send_action_response(response));
rt.block_on(bridge_tx_1.send_action_response(response));
std::thread::sleep(Duration::from_secs(3));
let response = ActionResponse::success(&action.action_id);
rt.block_on(bridge_tx_clone.send_action_response(response));
rt.block_on(bridge_tx_1.send_action_response(response));
});

std::thread::spawn(move || {
let rt = Runtime::new().unwrap();
let test_route = ActionRoute { name: TUNSHELL_ACTION.to_string(), timeout: 30 };
let action_rx = rt.block_on(bridge_tx.register_action_route(test_route));
let action = action_rx.recv().unwrap();
assert_eq!(action.action_id, "2");
let response = ActionResponse::progress(&action.action_id, "Launched", 0);
rt.block_on(bridge_tx.send_action_response(response));
rt.block_on(bridge_tx_2.send_action_response(response));
std::thread::sleep(Duration::from_secs(1));
let response = ActionResponse::success(&action.action_id);
rt.block_on(bridge_tx.send_action_response(response));
rt.block_on(bridge_tx_2.send_action_response(response));
});

std::thread::sleep(Duration::from_secs(1));
Expand Down
Loading
Loading