Skip to content

Commit

Permalink
add signal handler for tracer enabled
Browse files Browse the repository at this point in the history
  • Loading branch information
Ziy1-Tan committed Oct 19, 2024
1 parent b017ce8 commit 0d9905f
Show file tree
Hide file tree
Showing 8 changed files with 115 additions and 39 deletions.
5 changes: 3 additions & 2 deletions vmm/common/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -19,14 +19,15 @@ protobuf = "3.2"
async-trait = "0.1"
regex = "1.5.6"

tracing-log = "0.2.0"
tracing = "0.1.40"
tracing-opentelemetry = "0.21.0"
tracing-subscriber = { version = "0.3.18", features = ["env-filter"] }
tracing-subscriber = { version = "0.3.18", features = ["env-filter", "tracing-log"] }

opentelemetry = { version = "0.20.0", features = ["rt-tokio"] }
opentelemetry-otlp = "0.13.0"

signal-hook = "0.3.1"

[build-dependencies]
ttrpc-codegen = { git = "https://github.com/kuasar-io/ttrpc-rust.git", branch = "v0.7.1-kuasar" }
tonic-build = "0.7.2"
68 changes: 61 additions & 7 deletions vmm/common/src/tracer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,12 +14,37 @@ See the License for the specific language governing permissions and
limitations under the License.
*/

use std::sync::atomic::{AtomicBool, Ordering};

use anyhow::anyhow;
use lazy_static::lazy_static;
use log::{debug, info, warn};
use nix::libc::{SIGINT, SIGTERM, SIGUSR1};
use opentelemetry::{
global,
sdk::{trace, trace::Tracer, Resource},
sdk::{
trace::{self, Tracer},
Resource,
},
trace::noop::NoopTracer,
};
use signal_hook::iterator::Signals;
use tracing_subscriber::{
layer::SubscriberExt, util::SubscriberInitExt, EnvFilter, Layer, Registry,
};
use tracing_subscriber::{layer::SubscriberExt, EnvFilter, Layer, Registry};

lazy_static! {
static ref ENABLED: AtomicBool = AtomicBool::new(false);
}

pub fn enabled() -> bool {
ENABLED.load(Ordering::Relaxed)
}

// 设置 ENABLED 的值
pub fn set_enabled(enabled: bool) {
ENABLED.store(enabled, Ordering::Relaxed);
}

pub fn setup_tracing(
log_level: &str,
Expand All @@ -29,18 +54,21 @@ pub fn setup_tracing(
let env_filter = init_logger_filter(log_level)
.map_err(|e| anyhow!("failed to init logger filter: {}", e))?;

tracing_log::LogTracer::init().map_err(|e| anyhow!("failed to init LogTracer: {}", e))?;

let mut layers = vec![tracing_subscriber::fmt::layer().boxed()];

if enable_tracing {
let tracer = init_otlp_tracer(otlp_service_name)
.map_err(|e| anyhow!("failed to init otlp tracer: {}", e))?;
layers.push(tracing_opentelemetry::layer().with_tracer(tracer).boxed());
} else {
layers.push(
tracing_opentelemetry::layer()
.with_tracer(NoopTracer::new())
.boxed(),
);
}

let subscriber = Registry::default().with(env_filter).with(layers);
tracing::subscriber::set_global_default(subscriber)
.map_err(|e| anyhow!("failed to set global subscriber: {}", e))?;
Registry::default().with(env_filter).with(layers).init();
Ok(())
}

Expand All @@ -63,5 +91,31 @@ pub fn init_otlp_tracer(otlp_service_name: &str) -> anyhow::Result<Tracer> {
}

pub fn shutdown_tracing() {
set_enabled(false);
global::shutdown_tracer_provider();
}

pub async fn handle_signals(log_level: &str, otlp_service_name: &str) {
let mut signals = Signals::new([SIGTERM, SIGINT, SIGUSR1]).expect("new signal failed");
for sig in signals.forever() {
match sig {
SIGUSR1 => {
set_enabled(!enabled());
let _ = setup_tracing(log_level, enabled(), otlp_service_name);
}
SIGINT | SIGTERM => {
// 处理退出信号,执行清理操作并退出
info!("Received exit signal, stopping tracing and exiting...");
shutdown_tracing();
std::process::exit(0); // 优雅退出
}
_ => {
if let Ok(sig) = nix::sys::signal::Signal::try_from(sig) {
debug!("received {}", sig);
} else {
warn!("received invalid signal {}", sig);
}
}
}
}
}
2 changes: 1 addition & 1 deletion vmm/sandbox/Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

14 changes: 8 additions & 6 deletions vmm/sandbox/src/bin/cloud_hypervisor/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,12 +36,10 @@ async fn main() {

// Update args log level if it not presents args but in config.
let enable_tracing = config.sandbox.enable_tracing;
tracer::setup_tracing(
&args.log_level.unwrap_or(config.sandbox.log_level()),
enable_tracing,
"kuasar-vmm-sandboxer-clh-service",
)
.unwrap();
let log_level = args.log_level.unwrap_or(config.sandbox.log_level());
let service_name = "kuasar-vmm-sandboxer-clh-service";
tracer::setup_tracing(&log_level, enable_tracing, service_name).unwrap();
tracer::set_enabled(enable_tracing);

let mut sandboxer: KuasarSandboxer<CloudHypervisorVMFactory, CloudHypervisorHooks> =
KuasarSandboxer::new(
Expand All @@ -50,6 +48,10 @@ async fn main() {
CloudHypervisorHooks::default(),
);

tokio::spawn(async move {
tracer::handle_signals(&log_level, service_name).await;
});

// Do recovery job
sandboxer.recover(&args.dir).await;

Expand Down
14 changes: 8 additions & 6 deletions vmm/sandbox/src/bin/qemu/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -52,19 +52,21 @@ async fn main() {
};

let enable_tracing = config.sandbox.enable_tracing;
tracer::setup_tracing(
&config.sandbox.log_level(),
enable_tracing,
"kuasar-vmm-sandboxer-qemu-service",
)
.unwrap();
let log_level = config.sandbox.log_level();
let service_name = "kuasar-vmm-sandboxer-qemu-service";
tracer::setup_tracing(&log_level, enable_tracing, service_name).unwrap();
tracer::set_enabled(enable_tracing);

let sandboxer: KuasarSandboxer<QemuVMFactory, QemuHooks> = KuasarSandboxer::new(
config.sandbox,
config.hypervisor.clone(),
QemuHooks::new(config.hypervisor),
);

tokio::spawn(async move {
tracer::handle_signals(&log_level, service_name).await;
});

// Run the sandboxer
containerd_sandbox::run(
"kuasar-vmm-sandboxer-qemu",
Expand Down
14 changes: 8 additions & 6 deletions vmm/sandbox/src/bin/stratovirt/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -38,19 +38,21 @@ async fn main() {

// Update args log level if it not presents args but in config.
let enable_tracing = config.sandbox.enable_tracing;
tracer::setup_tracing(
&config.sandbox.log_level(),
enable_tracing,
"kuasar-vmm-sandboxer-stratovirt-service",
)
.unwrap();
let log_level = config.sandbox.log_level();
let service_name = "kuasar-vmm-sandboxer-stratovirt-service";
tracer::setup_tracing(&log_level, enable_tracing, service_name).unwrap();
tracer::set_enabled(enable_tracing);

let mut sandboxer: KuasarSandboxer<StratoVirtVMFactory, StratoVirtHooks> = KuasarSandboxer::new(
config.sandbox,
config.hypervisor.clone(),
StratoVirtHooks::new(config.hypervisor),
);

tokio::spawn(async move {
tracer::handle_signals(&log_level, service_name).await;
});

// Do recovery job
sandboxer.recover(&args.dir).await;

Expand Down
2 changes: 1 addition & 1 deletion vmm/task/Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

35 changes: 25 additions & 10 deletions vmm/task/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ use nix::{
sys::wait::{self, WaitPidFlag, WaitStatus},
unistd::Pid,
};
use opentelemetry::trace::noop::NoopTracer;
use signal_hook_tokio::Signals;
use streaming::STREAMING_SERVICE;
use tokio::sync::mpsc::channel;
Expand Down Expand Up @@ -145,6 +146,7 @@ lazy_static! {

async fn initialize(config: &TaskConfig) -> anyhow::Result<()> {
init_logger(&config.log_level, config.enable_tracing)?;
tracer::set_enabled(config.enable_tracing);

info!("Task server start with config: {:?}", config);

Expand Down Expand Up @@ -176,12 +178,18 @@ fn init_logger(log_level: &str, enable_tracing: bool) -> anyhow::Result<()> {
.add_directive(format!("containerd_shim={}", log_level).parse()?)
.add_directive(format!("vmm_task={}", log_level).parse()?);

tracing_log::LogTracer::init()?;

let mut layers = vec![tracing_subscriber::fmt::layer().boxed()];

if enable_tracing {
let tracer = init_otlp_tracer("kuasar-vmm-task-service")?;
let tracer = init_otlp_tracer("kuasar-vmm-task-service")
.map_err(|e| anyhow!("failed to init otlp tracer: {}", e))?;
layers.push(tracing_opentelemetry::layer().with_tracer(tracer).boxed());
} else {
layers.push(
tracing_opentelemetry::layer()
.with_tracer(NoopTracer::new())
.boxed(),
);
}

let subscriber = Registry::default().with(env_filter).with(layers);
Expand Down Expand Up @@ -223,7 +231,13 @@ async fn main() {
exit(-1);
}

let signals = match Signals::new([libc::SIGTERM, libc::SIGINT, libc::SIGPIPE, libc::SIGCHLD]) {
let signals = match Signals::new([
libc::SIGTERM,
libc::SIGINT,
libc::SIGPIPE,
libc::SIGCHLD,
libc::SIGUSR1,
]) {
Ok(s) => s,
Err(e) => {
error!("new signal failed: {:?}", e);
Expand All @@ -232,11 +246,7 @@ async fn main() {
};

info!("Task server successfully started, waiting for exit signal...");
handle_signals(signals).await;

if config.enable_tracing {
tracer::shutdown_tracing();
}
handle_signals(signals, &config.log_level).await;
}

// Do some initialization before everything starts.
Expand All @@ -250,11 +260,12 @@ async fn early_init_call() -> Result<()> {
Ok(())
}

async fn handle_signals(signals: Signals) {
async fn handle_signals(signals: Signals, log_level: &str) {
let mut signals = signals.fuse();
while let Some(sig) = signals.next().await {
match sig {
libc::SIGTERM | libc::SIGINT => {
tracer::shutdown_tracing();
debug!("received {}", sig);
}
libc::SIGCHLD => loop {
Expand Down Expand Up @@ -321,6 +332,10 @@ async fn handle_signals(signals: Signals) {
} // stick until exit
}
},
libc::SIGUSR1 => {
tracer::set_enabled(!tracer::enabled());
let _ = init_logger(log_level, tracer::enabled());
}
_ => {
if let Ok(sig) = nix::sys::signal::Signal::try_from(sig) {
debug!("received {}", sig);
Expand Down

0 comments on commit 0d9905f

Please sign in to comment.