-
Notifications
You must be signed in to change notification settings - Fork 1
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Add support for group operations #60
base: master
Are you sure you want to change the base?
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -20,3 +20,4 @@ thiserror = "1.0" | |
clap = "2.33" | ||
git-version = "0.3.5" | ||
command-group = "1.0.8" | ||
async-recursion = "1.1.1" | ||
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,12 +1,15 @@ | ||
use crate::zinit::{config, ZInit}; | ||
use crate::zinit::{self, config, ZInit, ZInitStatus}; | ||
use anyhow::{Context, Result}; | ||
use nix::sys::signal; | ||
use serde::{Deserialize, Serialize}; | ||
use serde_json::{self as encoder, Value}; | ||
use std::collections::HashMap; | ||
use std::env::current_dir; | ||
use std::io::{self, ErrorKind}; | ||
use std::marker::Unpin; | ||
use std::path::{Path, PathBuf}; | ||
use std::str::FromStr; | ||
use tokio::fs; | ||
use tokio::io::{AsyncBufReadExt, AsyncReadExt, AsyncWriteExt, BufStream}; | ||
use tokio::net::{UnixListener, UnixStream}; | ||
|
||
|
@@ -24,16 +27,30 @@ enum State { | |
Error, | ||
} | ||
|
||
#[derive(Clone, Debug, Deserialize, Serialize)] | ||
#[serde(rename_all = "lowercase", untagged)] | ||
pub enum Status { | ||
Service(ServiceStatus), | ||
Group(GroupStatus), | ||
} | ||
|
||
#[derive(Clone, Debug, Deserialize, Serialize)] | ||
#[serde(rename_all = "lowercase")] | ||
pub struct Status { | ||
pub struct ServiceStatus { | ||
pub name: String, | ||
pub pid: u32, | ||
pub state: String, | ||
pub target: String, | ||
pub after: HashMap<String, String>, | ||
} | ||
|
||
#[derive(Clone, Debug, Deserialize, Serialize)] | ||
#[serde(rename_all = "lowercase")] | ||
pub struct GroupStatus { | ||
pub name: String, | ||
pub services: Vec<ServiceStatus>, | ||
} | ||
|
||
pub struct Api { | ||
zinit: ZInit, | ||
socket: PathBuf, | ||
|
@@ -149,17 +166,55 @@ impl Api { | |
let services = zinit.list().await?; | ||
let mut map: HashMap<String, String> = HashMap::new(); | ||
for service in services { | ||
let state = zinit.status(&service).await?; | ||
map.insert(service, format!("{:?}", state.state)); | ||
if let ZInitStatus::Service(state) = zinit.status(&service).await? { | ||
map.insert(service, format!("{:?}", state.state)); | ||
} | ||
} | ||
|
||
Ok(encoder::to_value(map)?) | ||
} | ||
|
||
async fn monitor<S: AsRef<str>>(name: S, zinit: ZInit) -> Result<Value> { | ||
let (name, service) = config::load(format!("{}.yaml", name.as_ref())) | ||
.context("failed to load service config")?; | ||
zinit.monitor(name, service).await?; | ||
match config::load(format!("{}.yaml", name.as_ref())) { | ||
Ok((name, service)) => zinit.monitor(name, config::Entry::Service(service)).await?, | ||
Err(e) => { | ||
if let Some(err) = e.downcast_ref::<io::Error>() { | ||
if err.kind() != ErrorKind::NotFound { | ||
return Err(e.context("failed to load service config")); | ||
} | ||
} else { | ||
return Err(e.context("failed to load service config")); | ||
} | ||
} | ||
} | ||
let canonical_path = fs::canonicalize(name.as_ref()).await?; | ||
Comment on lines
+180
to
+189
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I prefer instead of using downcast (although it's fine) that instead we create our concrete defined error type (using I am wondering if it's better if you do I am wondering also what will happen if you have
what should take precedence ? should we monitor both, or only the group? do we give errors. etc.. |
||
let path = if !canonical_path.starts_with(current_dir()?) { | ||
bail!("directory outside of zinit configuration directory") | ||
} else { | ||
canonical_path.strip_prefix(current_dir()?)? | ||
}; | ||
let prefix = path.to_str().ok_or(anyhow!("invalid path name"))?; | ||
match config::load_dir_with_prefix(path, prefix.to_string()) { | ||
Ok(services) => { | ||
for (k, v) in services { | ||
if let Err(err) = zinit.monitor(&k, v).await { | ||
error!("failed to monitor service {}: {}", k, err); | ||
}; | ||
} | ||
} | ||
Err(e) => { | ||
if let Some(err) = e.downcast_ref::<io::Error>() { | ||
if err.kind() == ErrorKind::NotFound { | ||
bail!( | ||
"neither {}.yaml nor {} directory was found", | ||
name.as_ref(), | ||
name.as_ref() | ||
) | ||
} | ||
} | ||
Comment on lines
+206
to
+213
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. You can see this patterns starting to get out of hand and I think we need to define our own errors. Or completely avoid this by doing checks of which file/dir we need to process. |
||
return Err(e.context("failed to load service config")); | ||
} | ||
} | ||
Ok(Value::Null) | ||
} | ||
|
||
|
@@ -206,30 +261,50 @@ impl Api { | |
} | ||
|
||
async fn status<S: AsRef<str>>(name: S, zinit: ZInit) -> Result<Value> { | ||
let status = zinit.status(&name).await?; | ||
|
||
let result = Status { | ||
name: name.as_ref().into(), | ||
pid: status.pid.as_raw() as u32, | ||
state: format!("{:?}", status.state), | ||
target: format!("{:?}", status.target), | ||
after: { | ||
let mut after = HashMap::new(); | ||
for service in status.service.after { | ||
let status = match zinit.status(&service).await { | ||
Ok(dep) => dep.state, | ||
Err(_) => crate::zinit::State::Unknown, | ||
}; | ||
after.insert(service, format!("{:?}", status)); | ||
} | ||
after | ||
}, | ||
let result = match zinit.status(&name).await? { | ||
ZInitStatus::Service(status) => { | ||
Status::Service(zinit_status_to_service_status(name, zinit, status).await) | ||
} | ||
ZInitStatus::Group(group) => Status::Group(GroupStatus { | ||
name: name.as_ref().into(), | ||
services: { | ||
let mut services = vec![]; | ||
for (name, status) in group.services { | ||
services | ||
.push(zinit_status_to_service_status(name, zinit.clone(), status).await) | ||
} | ||
services | ||
}, | ||
}), | ||
}; | ||
|
||
Ok(encoder::to_value(result)?) | ||
} | ||
} | ||
|
||
async fn zinit_status_to_service_status<S: AsRef<str>>( | ||
name: S, | ||
zinit: ZInit, | ||
status: zinit::ServiceStatus, | ||
) -> ServiceStatus { | ||
ServiceStatus { | ||
name: name.as_ref().into(), | ||
pid: status.pid.as_raw() as u32, | ||
state: format!("{:?}", status.state), | ||
target: format!("{:?}", status.target), | ||
after: { | ||
let mut after = HashMap::new(); | ||
for service in status.service.after { | ||
if let Ok(ZInitStatus::Service(status)) = zinit.status(&service).await { | ||
after.insert(service, format!("{:?}", status.state)); | ||
} else { | ||
after.insert(service, format!("{:?}", crate::zinit::State::Unknown)); | ||
} | ||
} | ||
Comment on lines
+297
to
+301
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I think it was cleaner when we first get the status then format it. It's more readable before and also makes formatting in one place instead of 2 |
||
after | ||
}, | ||
} | ||
} | ||
|
||
pub struct Client { | ||
socket: PathBuf, | ||
} | ||
|
@@ -291,7 +366,8 @@ impl Client { | |
match filter { | ||
None => tokio::io::copy(&mut con, &mut out).await?, | ||
Some(filter) => { | ||
let filter = format!("{}:", filter.as_ref()); | ||
let service_filter = format!("{}:", filter.as_ref()); | ||
let group_filter = format!("{}/", filter.as_ref()); | ||
let mut stream = BufStream::new(con); | ||
loop { | ||
let mut line = String::new(); | ||
|
@@ -303,7 +379,9 @@ impl Client { | |
} | ||
} | ||
|
||
if line[4..].starts_with(&filter) { | ||
if line[4..].starts_with(&service_filter) | ||
|| line[4..].starts_with(&group_filter) | ||
{ | ||
let _ = out.write_all(line.as_bytes()).await; | ||
} | ||
} | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -7,6 +7,8 @@ use std::path::{Path, PathBuf}; | |
use tokio::fs; | ||
use tokio::time; | ||
|
||
use self::api::Status; | ||
|
||
fn logger(level: log::LevelFilter) -> Result<()> { | ||
let logger = fern::Dispatch::new() | ||
.format(|out, message, record| { | ||
|
@@ -135,10 +137,25 @@ pub async fn restart(socket: &str, name: &str) -> Result<()> { | |
client.stop(name).await?; | ||
//pull status | ||
for _ in 0..20 { | ||
let result = client.status(name).await?; | ||
if result.pid == 0 && result.target == "Down" { | ||
client.start(name).await?; | ||
return Ok(()); | ||
match client.status(name).await? { | ||
Status::Service(result) => { | ||
if result.pid == 0 && result.target == "Down" { | ||
client.start(name).await?; | ||
return Ok(()); | ||
} | ||
} | ||
Status::Group(result) => { | ||
let mut start = true; | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. i think Then check if |
||
for service in result.services { | ||
if service.pid != 0 || service.target != "Down" { | ||
start = false; | ||
} | ||
} | ||
if start { | ||
client.start(name).await?; | ||
return Ok(()); | ||
} | ||
} | ||
} | ||
time::sleep(std::time::Duration::from_secs(1)).await; | ||
} | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
No, please don't
Any recursion can be expanded to an iteration. But avoid async recursion because it dramatically consume memory and we don't want zinit to get killed.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
In general I would avoid recursion and definitely in async. The reason is each Future is a state machine that built during compile time. When u do recursion each state machine has a copy of itself, this means recursively this machine size has infinite size (unknown size in compile time) hence this is needed.
But I am 100% sure (even before i see where u need it) this can be rewritten in a way that doesn't need recursion