Skip to content

Commit

Permalink
feat serve: add a DIE command to ask to exit
Browse files Browse the repository at this point in the history
  • Loading branch information
c-cube committed Aug 23, 2024
1 parent b177c05 commit 2165491
Show file tree
Hide file tree
Showing 3 changed files with 41 additions and 0 deletions.
1 change: 1 addition & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@ Each client process should open one connection to `tldrs` and send these message
| `OPEN <trace-id>` | mandatory first message |
| `{"ph": "X", …}` | a normal TEF event |
| `EMIT_TEF <path/to/trace.json>` | optional last message |
| `DIE` | ask tldrs to exit asap |


All processes in a single program run must open the same `trace_id` (a utf-8 safe identifier
Expand Down
4 changes: 4 additions & 0 deletions src/msg.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@ pub enum Msg<'a> {
Add {
json: &'a str,
},
/// Client asks whole daemon to die
Die,
ParseError {
msg: &'static str,
},
Expand All @@ -36,6 +38,8 @@ pub fn decode_line<'a>(line: &'a str) -> Msg<'a> {
Open {
trace_id: rest.trim(),
}
} else if line == "DIE" {
Die
} else if let Some(rest) = line.strip_prefix("SYMLINK ") {
Symlink { file: rest.trim() }
} else if let Some(rest) = line.strip_prefix("EMIT_TEF ") {
Expand Down
36 changes: 36 additions & 0 deletions src/serve.rs
Original file line number Diff line number Diff line change
Expand Up @@ -45,11 +45,27 @@ struct State {
files: Mutex<HashMap<TraceID, Arc<TraceFile>>>,
}

impl State {
fn close_all_force(&self) {
let mut files = self.files.lock().unwrap();
for (_, f) in files.drain() {
if let Err(err) = {
let mut out = f.out.lock().unwrap();
out.flush()
} {
log::error!("Error while flushing {:?}: {:?}", f.path, err)
}
}
}
}

impl Drop for State {
fn drop(&mut self) {
// remove socket file
log::debug!("removing socket file {:?}", self.socket_path);
let _ = fs::remove_file(&self.socket_path);

self.close_all_force();
}
}

Expand Down Expand Up @@ -117,6 +133,10 @@ fn handle_client(st: Arc<State>, mut client: impl BufRead) -> Result<()> {

let mut line = String::new();
loop {
if !st.active.load(atomic::Ordering::SeqCst) {
break;
}

line.clear();
let msg = match client.read_line(&mut line) {
Err(e) => {
Expand All @@ -130,6 +150,22 @@ fn handle_client(st: Arc<State>, mut client: impl BufRead) -> Result<()> {
log::debug!("got msg {:?}", &msg);
match msg {
msg::Msg::Empty => (),
msg::Msg::Die => {
// exit gracefully
log::info!("client asked us to quit");
st.active.store(false, atomic::Ordering::SeqCst);

st.close_all_force();

// if we don't exit in 10s, die less cleanly
thread::spawn(|| {
thread::sleep(Duration::from_secs(10));
log::warn!("timeout, dying the hard way");
std::process::exit(1);
});

break;
}
msg::Msg::Open { trace_id } => {
log::debug!("Opening trace file for trace_id={trace_id:?}");
trace_file = Some(st.get_trace_file(trace_id)?);
Expand Down

0 comments on commit 2165491

Please sign in to comment.