Skip to content

Commit

Permalink
Merge pull request #49 from davincios/feature/stdout-capture
Browse files Browse the repository at this point in the history
STDOUT Bash Capture
  • Loading branch information
Macronic authored Aug 23, 2024
2 parents e0269ef + 2f6ddd6 commit 270a316
Show file tree
Hide file tree
Showing 12 changed files with 127 additions and 13 deletions.
2 changes: 1 addition & 1 deletion install-tracer.sh
Original file line number Diff line number Diff line change
Expand Up @@ -158,7 +158,7 @@ function print_help() {
function set_urls() {
if [ "$ENVIRONMENT" = "develop" ]; then
TRACER_VERSION=$TRACER_VERSION_DEVELOP
SERVICE_URL="https://develop.app.tracer.bio/api/data-collector-api"
SERVICE_URL="https://develop.app.tracer.bio/api/"
fi

TRACER_LINUX_URL="https://github.com/davincios/tracer-daemon/releases/download/${TRACER_VERSION}/tracer-x86_64-unknown-linux-gnu.tar.gz"
Expand Down
3 changes: 2 additions & 1 deletion src/cli/nondaemon_commands.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ use anyhow::{Context, Result};
use std::result::Result::Ok;

use crate::{
config_manager::ConfigManager,
config_manager::{ConfigManager, INTERCEPTOR_STDOUT_FILE},
daemon_communication::client::{send_info_request, send_refresh_config_request},
FILE_CACHE_DIR, PID_FILE, REPO_NAME, REPO_OWNER, SOCKET_PATH, STDERR_FILE, STDOUT_FILE,
};
Expand All @@ -13,6 +13,7 @@ pub fn clean_up_after_daemon() -> Result<()> {
std::fs::remove_file(PID_FILE).context("Failed to remove pid file")?;
std::fs::remove_file(STDOUT_FILE).context("Failed to remove stdout file")?;
std::fs::remove_file(STDERR_FILE).context("Failed to remove stderr file")?;
let _ = std::fs::remove_file(INTERCEPTOR_STDOUT_FILE).context("Failed to remove stdout file");
std::fs::remove_dir_all(FILE_CACHE_DIR).context("Failed to remove cache directory")?;
Ok(())
}
Expand Down
6 changes: 6 additions & 0 deletions src/config_manager/bashrc_intercept.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,8 @@ use crate::config_manager::target_process::Target;

const INTERCEPTOR_BASHRC_PATH: &str = ".config/tracer/.bashrc";
const INTERCEPTOR_SOURCE_COMMAND: &str = "source ~/.config/tracer/.bashrc";
pub const INTERCEPTOR_STDOUT_FILE: &str = "/tmp/tracerd-stdout";
const INTERCEPTOR_STDOUT_COMMAND: &str = "exec &> >(tee >(awk 'system(\"[ ! -f /tmp/tracerd.pid ]\") == 1' >> \"/tmp/tracerd-stdout\"))\n";

pub fn get_command_interceptor(
current_tracer_exe_path: PathBuf,
Expand Down Expand Up @@ -55,6 +57,10 @@ pub fn rewrite_interceptor_bashrc_file(
bashrc_file.write_all(command.as_bytes()).unwrap();
}

bashrc_file
.write_all(INTERCEPTOR_STDOUT_COMMAND.as_bytes())
.unwrap();

Ok(())
}

Expand Down
4 changes: 3 additions & 1 deletion src/config_manager/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ use crate::config_manager::target_process::Target;
use super::target_process::targets_list;

const DEFAULT_API_KEY: &str = "EAjg7eHtsGnP3fTURcPz1";
const DEFAULT_SERVICE_URL: &str = "https://app.tracer.bio/api/data-collector-api";
const DEFAULT_SERVICE_URL: &str = "https://app.tracer.bio/api";
const DEFAULT_CONFIG_FILE_LOCATION_FROM_HOME: &str = ".config/tracer/tracer.toml";
const PROCESS_POLLING_INTERVAL_MS: u64 = 5;
const BATCH_SUBMISSION_INTERVAL_MS: u64 = 10000;
Expand Down Expand Up @@ -128,6 +128,8 @@ impl ConfigManager {
config.service_url = service_url;
}

config.service_url = config.service_url.replace("data-collector-api", ""); // To support legacy (pre-2024/08/23) configs

config
}

Expand Down
1 change: 1 addition & 0 deletions src/config_manager/mod.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
mod bashrc_intercept;
mod config;
pub mod target_process;
pub use bashrc_intercept::INTERCEPTOR_STDOUT_FILE;
pub use config::{Config, ConfigManager};
7 changes: 4 additions & 3 deletions src/http_client/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -67,13 +67,13 @@ pub async fn send_http_get(
}

pub async fn send_http_body(
service_url: &str,
url: &str,
api_key: &str,
request_body: &Value,
) -> Result<(u16, String)> {
let client = Client::new();
let response = client
.post(service_url)
.post(url)
.header("x-api-key", api_key)
.header("Content-Type", "application/json")
.json(request_body)
Expand All @@ -99,8 +99,9 @@ pub async fn send_http_event(service_url: &str, api_key: &str, logs: &Value) ->
let request_body = json!({ "logs": logs_array });
record_all_outgoing_http_calls(service_url, api_key, &request_body).await?;

let url = format!("{}/data-collector-api", service_url);
// Send request
let (status, response_text) = send_http_body(service_url, api_key, &request_body).await?;
let (status, response_text) = send_http_body(&url, api_key, &request_body).await?;

// Log response body
info!(
Expand Down
15 changes: 12 additions & 3 deletions src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,16 +8,18 @@ mod file_watcher;
mod http_client;
mod metrics;
mod process_watcher;
mod stdout;
mod submit_batched_data;
mod syslog;
mod tracer_client;
mod upload;
use anyhow::{Context, Ok, Result};
use cli::process_cli;
use config_manager::INTERCEPTOR_STDOUT_FILE;
use daemon_communication::server::run_server;
use daemonize::Daemonize;
use std::borrow::BorrowMut;
use syslog::run_lines_read_thread;
use syslog::run_syslog_lines_read_thread;

use std::fs::File;
use std::sync::Arc;
Expand Down Expand Up @@ -83,11 +85,16 @@ pub async fn run(workflow_directory_path: String) -> Result<()> {
config.clone(),
));

let lines_task = tokio::spawn(run_lines_read_thread(
let syslog_lines_task = tokio::spawn(run_syslog_lines_read_thread(
SYSLOG_FILE,
tracer_client.lock().await.get_syslog_lines_buffer(),
));

let stdout_lines_task = tokio::spawn(stdout::run_stdout_lines_read_thread(
INTERCEPTOR_STDOUT_FILE,
tracer_client.lock().await.get_stdout_lines_buffer(),
));

tracer_client
.lock()
.await
Expand Down Expand Up @@ -120,7 +127,8 @@ pub async fn run(workflow_directory_path: String) -> Result<()> {
tracer_client.lock().await.borrow_mut().poll_files().await?;
}

lines_task.abort();
syslog_lines_task.abort();
stdout_lines_task.abort();

Ok(())
}
Expand All @@ -131,6 +139,7 @@ pub async fn monitor_processes_with_tracer_client(tracer_client: &mut TracerClie
// tracer_client.run_cleanup().await?;
tracer_client.poll_process_metrics().await?;
tracer_client.poll_syslog().await?;
tracer_client.poll_stdout().await?;
tracer_client.refresh_sysinfo();
tracer_client.reset_just_started_process_flag();
Ok(())
Expand Down
73 changes: 73 additions & 0 deletions src/stdout/mod.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,73 @@
use std::sync::Arc;

use anyhow::Result;
use linemux::MuxedLines;
use serde_json::json;
use tokio::sync::RwLock;
use tokio_stream::StreamExt;

use crate::{debug_log::Logger, http_client::send_http_body};

// Todo: A lot of code is duplicated between this file and syslog. Maybe we could extract the file reading code into a separate module?
pub struct StdoutWatcher {}

pub async fn run_stdout_lines_read_thread(
file_path: &str,
pending_lines: Arc<RwLock<Vec<String>>>,
) {
let line_reader = MuxedLines::new();

if line_reader.is_err() {
return;
}

let mut line_reader = line_reader.unwrap();

let result = line_reader.add_file(file_path).await;

if result.is_err() {
return;
}

while let Ok(Some(line)) = line_reader.try_next().await {
let mut vec = pending_lines.write().await;
let line = line.line();
vec.push(line.to_string());
}
}

impl StdoutWatcher {
pub fn new() -> StdoutWatcher {
StdoutWatcher {}
}

pub async fn poll_stdout(
&mut self,
service_url: &str,
api_key: &str,
pending_lines: Arc<RwLock<Vec<String>>>,
) -> Result<()> {
let logger = Logger::new();

if pending_lines.read().await.is_empty() {
logger.log("No lines from stdout to send", None).await;
return Ok(());
}

let url = format!("{}/stdout-capture", service_url);

let body = json!({
"lines": *pending_lines.as_ref().read().await
});

logger
.log(&format!("Sending stdout lines: {:?}", body), None)
.await;

pending_lines.write().await.clear();

send_http_body(&url, api_key, &body).await?;

Ok(())
}
}
5 changes: 4 additions & 1 deletion src/syslog/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,10 @@ pub struct SyslogWatcher {
pub last_lines: Vec<String>,
}

pub async fn run_lines_read_thread(file_path: &str, pending_lines: Arc<RwLock<Vec<String>>>) {
pub async fn run_syslog_lines_read_thread(
file_path: &str,
pending_lines: Arc<RwLock<Vec<String>>>,
) {
let line_reader = MuxedLines::new();

if line_reader.is_err() {
Expand Down
19 changes: 19 additions & 0 deletions src/tracer_client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ use crate::events::{send_end_run_event, send_start_run_event};
use crate::file_watcher::FileWatcher;
use crate::metrics::SystemMetricsCollector;
use crate::process_watcher::ProcessWatcher;
use crate::stdout::StdoutWatcher;
use crate::submit_batched_data::submit_batched_data;
use crate::syslog::SyslogWatcher;
use crate::FILE_CACHE_DIR;
Expand Down Expand Up @@ -39,13 +40,15 @@ pub struct TracerClient {
pub logs: EventRecorder,
process_watcher: ProcessWatcher,
syslog_watcher: SyslogWatcher,
stdout_watcher: StdoutWatcher,
metrics_collector: SystemMetricsCollector,
file_watcher: FileWatcher,
workflow_directory: String,
api_key: String,
service_url: String,
current_run: Option<RunMetadata>,
syslog_lines_buffer: Arc<RwLock<Vec<String>>>,
stdout_lines_buffer: Arc<RwLock<Vec<String>>>,
}

impl TracerClient {
Expand Down Expand Up @@ -76,11 +79,13 @@ impl TracerClient {
last_sent: None,
current_run: None,
syslog_watcher: SyslogWatcher::new(),
stdout_watcher: StdoutWatcher::new(),
// Sub mannagers
logs: EventRecorder::new(),
file_watcher,
workflow_directory,
syslog_lines_buffer: Arc::new(RwLock::new(Vec::new())),
stdout_lines_buffer: Arc::new(RwLock::new(Vec::new())),
process_watcher: ProcessWatcher::new(config.targets),
metrics_collector: SystemMetricsCollector::new(),
})
Expand All @@ -106,6 +111,10 @@ impl TracerClient {
self.syslog_lines_buffer.clone()
}

pub fn get_stdout_lines_buffer(&self) -> Arc<RwLock<Vec<String>>> {
self.stdout_lines_buffer.clone()
}

pub async fn submit_batched_data(&mut self) -> Result<()> {
submit_batched_data(
&self.api_key,
Expand Down Expand Up @@ -240,6 +249,16 @@ impl TracerClient {
.await
}

pub async fn poll_stdout(&mut self) -> Result<()> {
self.stdout_watcher
.poll_stdout(
&self.service_url,
&self.api_key,
self.get_stdout_lines_buffer(),
)
.await
}

pub fn refresh_sysinfo(&mut self) {
self.system.refresh_all();
}
Expand Down
3 changes: 1 addition & 2 deletions src/upload/presigned_url_put.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,9 +10,8 @@ pub async fn request_presigned_url(
file_name: &str,
) -> Result<String> {
// Construct the full URL with the query parameter
let presigned_url = service_url.replace("data-collector-api", "upload/presigned-put");
let presigned_url = format!("{}/upload/presigned-put", service_url);
let logger = Logger::new();
logger.log(&presigned_url, None).await;
let mut url = Url::parse(&presigned_url).context("Failed to parse service URL")?;
url.query_pairs_mut().append_pair("fileName", file_name);

Expand Down
2 changes: 1 addition & 1 deletion tracer.toml
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
api_key = "<YOUR_API_KEY>"
service_url = "https://app.tracer.bio/api/data-collector-api"
service_url = "https://app.tracer.bio/api"
process_polling_interval_ms = 20
batch_submission_interval_ms = 5000

0 comments on commit 270a316

Please sign in to comment.