Skip to content

Commit

Permalink
run TCP proxy
Browse files Browse the repository at this point in the history
  • Loading branch information
slivingston committed Sep 17, 2024
1 parent 1b7e17b commit 721f5b6
Showing 1 changed file with 72 additions and 8 deletions.
80 changes: 72 additions & 8 deletions src/control.rs
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,7 @@ pub struct ContainerAddress {
ip: String,
port: Port,
hostkey: String,
subprocess: Option<std::process::Child>,
}

struct SshTunnel {
Expand Down Expand Up @@ -370,6 +371,39 @@ impl CurrentInstance {
Err("host key not found".into())
}

fn start_proxy(
cargs: &[String],
timeout: u64,
) -> Result<(std::process::Child, Port), Box<dyn std::error::Error>> {
if cargs[0] != "rrhttp" {
return Err("only rrhttp proxy supported".into());
}
let mut child = Command::new(&cargs[0])
.args(cargs[1..].iter())
.stdout(Stdio::piped())
.spawn()?;
let stdout = child.stdout.as_mut().unwrap();
let max_duration = std::time::Duration::from_secs(timeout);
let sleep_time = std::time::Duration::from_secs(1);
let now = std::time::Instant::now();
let mut acc: Vec<u8> = vec![];
while now.elapsed() <= max_duration {
let mut buf = [0; 32];
let n = stdout.read(&mut buf)?;
for b in buf.iter().take(n) {
if *b == 0x0a {
let line = String::from_utf8(acc)?;
let parts: Vec<&str> = line.split(':').collect();
let port = Port::from_str(parts[1])?;
return Ok((child, port));
}
acc.push(*b);
}
std::thread::sleep(sleep_time);
}
Err("port not found".into())
}

fn start_sshtun(
&self,
container_addr: ContainerAddress,
Expand Down Expand Up @@ -592,18 +626,38 @@ impl CurrentInstance {
error!("{}", err);
}
}

if self.wdeployment.cprovider == CProvider::Proxy {
if let Some(subprocess) = tunnel.container_addr.subprocess.as_mut() {
if let Err(err) = subprocess.kill() {
warn!("proxy kill: : {}", err);
}
match subprocess.wait() {
Ok(s) => {
if !s.success() {
warn!("exit code: {:?}", s.code());
}
}
Err(err) => {
error!("{}", err);
}
}
}
}
}
*tunnel_ref = None;
}

fn destroy(mut instance: CurrentInstance) {
instance.stop_tunnel();

let name = instance.get_local_name().unwrap();
if let Err(err) = Self::destroy_container(&instance.wdeployment, &name) {
error!("Deployment fault! Caught from destroy_container(): {}", err);
instance.declare_status(InstanceStatus::Fault);
return;
if instance.wdeployment.cprovider != CProvider::Proxy {
let name = instance.get_local_name().unwrap();
if let Err(err) = Self::destroy_container(&instance.wdeployment, &name) {
error!("Deployment fault! Caught from destroy_container(): {}", err);
instance.declare_status(InstanceStatus::Fault);
return;
}
}

instance.clear_status();
Expand All @@ -619,6 +673,7 @@ impl CurrentInstance {
let ip: String;
let port: Port;
let hostkey: String;
let mut subprocess = None;
if cprovider == CProvider::Docker
|| cprovider == CProvider::DockerRootless
|| cprovider == CProvider::Podman
Expand Down Expand Up @@ -778,12 +833,21 @@ impl CurrentInstance {
} else if cprovider == CProvider::Lxd {
return Err(Error::new("lxd cprovider not implemented yet"));
} else if cprovider == CProvider::Proxy {
return Err(Error::new("proxy cprovider not implemented yet"));
let res = CurrentInstance::start_proxy(&wdeployment.cargs, 5)?;
port = res.1;
ip = "127.0.0.1".into();
hostkey = "".into();
subprocess = Some(res.0);
} else {
return Err(Error::new(format!("unknown cprovider: {}", cprovider)));
}

Ok(ContainerAddress { ip, port, hostkey })
Ok(ContainerAddress {
ip,
port,
hostkey,
subprocess,
})
}

pub fn destroy_container(
Expand Down Expand Up @@ -1119,7 +1183,7 @@ mod tests {
"id": "8449a67a-fe0d-42b3-9f2d-89c9aa2e9410",
"owner": "frodo",
"cprovider": "proxy",
"cargs": [],
"cargs": ["rrhttp", "127.0.0.1:8080"],
"init_inside": [],
"terminate": [],
"container_name": "rrc"
Expand Down

0 comments on commit 721f5b6

Please sign in to comment.