diff --git a/api/src/apps/mod.rs b/api/src/apps/mod.rs index 50d1edc..be05693 100644 --- a/api/src/apps/mod.rs +++ b/api/src/apps/mod.rs @@ -341,7 +341,7 @@ impl AppsService { pub async fn stream_logs<'a>( &'a self, app_name: &'a AppName, - service_name: &'a String, + service_name: &'a str, limit: usize, ) -> BoxStream<'a, Result> { self.infrastructure @@ -447,6 +447,7 @@ mod tests { use crate::models::{EnvironmentVariable, ServiceBuilder}; use crate::sc; use chrono::Utc; + use futures::StreamExt; use secstr::SecUtf8; use std::hash::Hash; use std::io::Write; @@ -708,6 +709,49 @@ Log msg 3 of service-a of app master Ok(()) } + #[tokio::test] + async fn should_stream_logs_from_infrastructure() -> Result<(), AppsServiceError> { + let config = Config::default(); + let infrastructure = Box::new(Dummy::new()); + let apps = AppsService::new(config, infrastructure)?; + + let app_name = AppName::from_str("master").unwrap(); + + apps.create_or_update( + &app_name, + &AppStatusChangeId::new(), + None, + &vec![sc!("service-a"), sc!("service-b")], + ) + .await?; + + let service_for_logs = String::from("service-a"); + let mut count = 0; + let mut log_chunk = apps.stream_logs(&app_name, &service_for_logs, 5).await; + while let Some(result) = log_chunk.next().await { + match result { + Ok(events) => match events { + LogEvents::Message(message) => assert_eq!( + message, + format!("Log msg 1 of service-a of app master\nLog msg 2 of service-a of app master") + ), + LogEvents::Line(line) => { + assert_eq!( + line, + format!("Streaming log msg {count} of service-a of app master") + ); + count += 1; + } + }, + Err(_e) => { + break; + } + } + } + + Ok(()) + } + #[tokio::test] async fn should_deploy_companions() -> Result<(), AppsServiceError> { let config = config_from_str!( diff --git a/api/src/apps/routes.rs b/api/src/apps/routes.rs index cd6c2bf..6800334 100644 --- a/api/src/apps/routes.rs +++ b/api/src/apps/routes.rs @@ -207,7 +207,7 @@ async fn logs( } }, }; - let limit = limit.unwrap_or(20_000); + let limit = limit.unwrap_or_default(); let log_chunk = apps .get_logs(&app_name, &service_name, &since, limit) @@ -228,7 +228,7 @@ async fn logs( )] async fn stream_logs<'a>( app_name: Result, - service_name: String, + service_name: &'a str, limit: Option, apps: &'a State>, ) -> EventStream![Event + 'a] { @@ -236,11 +236,10 @@ async fn stream_logs<'a>( let limit = limit.unwrap_or(20_000); EventStream! { - let mut log_chunk = Box::pin( + let mut log_chunk = apps .stream_logs(&app_name, &service_name, limit) - .await, - ); + .await; while let Some(result) = log_chunk.as_mut().next().await { match result { Ok(events) => { diff --git a/api/src/infrastructure/docker.rs b/api/src/infrastructure/docker.rs index 34b18e1..fd20820 100644 --- a/api/src/infrastructure/docker.rs +++ b/api/src/infrastructure/docker.rs @@ -798,20 +798,24 @@ impl Infrastructure for DockerInfrastructure { container.id, from ); - + let limit_string = if limit == 0 { + String::from("all") + } else { + limit.to_string() + }; let log_options = match from { Some(from) => LogsOptions::builder() .since(from) .stdout(true) .stderr(true) .timestamps(true) - .tail("all") + .tail(&limit_string) .build(), None => LogsOptions::builder() .stdout(true) .stderr(true) .timestamps(true) - .tail(&limit.to_string()) + .tail(&limit_string) .build(), }; @@ -848,8 +852,8 @@ impl Infrastructure for DockerInfrastructure { fn stream_logs<'a>( &'a self, - app_name: &'a String, - service_name: &'a String, + app_name: &'a str, + service_name: &'a str, limit: usize, ) -> BoxStream<'a, Result> { Box::pin(stream! { diff --git a/api/src/infrastructure/dummy_infrastructure.rs b/api/src/infrastructure/dummy_infrastructure.rs index 8821499..4e9b68f 100644 --- a/api/src/infrastructure/dummy_infrastructure.rs +++ b/api/src/infrastructure/dummy_infrastructure.rs @@ -203,21 +203,19 @@ impl Infrastructure for DummyInfrastructure { fn stream_logs<'a>( &'a self, - app_name: &'a String, - service_name: &'a String, - _limit: usize, + app_name: &'a str, + service_name: &'a str, + limit: usize, ) -> BoxStream<'a, Result> { Box::pin(stream! { - let logs = vec![ - format!("Log msg 1 of {} of app {}\n", service_name, app_name), - format!("Log msg 2 of {} of app {}\n", service_name, app_name), - format!("Log msg 3 of {} of app {}\n", service_name, app_name), - ]; + let logs = format!("Log msg 1 of {service_name} of app {app_name}\nLog msg 2 of {service_name} of app {app_name}"); + + yield Ok(LogEvents::Message(logs)); - for log in logs { - yield Ok(LogEvents::Line(log)); - } + for i in 0..limit { + yield Ok(LogEvents::Line(format!("Streaming log msg {i} of {service_name} of app {app_name}"))); + } }) } diff --git a/api/src/infrastructure/infrastructure.rs b/api/src/infrastructure/infrastructure.rs index de49ef8..b96c4d0 100644 --- a/api/src/infrastructure/infrastructure.rs +++ b/api/src/infrastructure/infrastructure.rs @@ -82,8 +82,8 @@ pub trait Infrastructure: Send + Sync { /// Returns the logs streamed as events. fn stream_logs<'a>( &'a self, - app_name: &'a String, - service_name: &'a String, + app_name: &'a str, + service_name: &'a str, limit: usize, ) -> BoxStream<'a, Result>; diff --git a/api/src/infrastructure/kubernetes/infrastructure.rs b/api/src/infrastructure/kubernetes/infrastructure.rs index 1e25139..3fa4a3b 100644 --- a/api/src/infrastructure/kubernetes/infrastructure.rs +++ b/api/src/infrastructure/kubernetes/infrastructure.rs @@ -700,7 +700,11 @@ impl Infrastructure for KubernetesInfrastructure { .timestamp() }) .filter(|since_seconds| since_seconds > &0), - tail_lines: Some(limit.try_into().unwrap()), + tail_lines: if limit == 0 { + None + } else { + Some(limit.try_into().unwrap()) + }, ..Default::default() }; @@ -712,7 +716,6 @@ impl Infrastructure for KubernetesInfrastructure { let logs = logs .split('\n') .enumerate() - .filter(move |(index, _)| index < &limit) .filter(|(_, line)| !line.is_empty()) .map(|(_, line)| { let mut iter = line.splitn(2, ' '); @@ -734,8 +737,8 @@ impl Infrastructure for KubernetesInfrastructure { fn stream_logs<'a>( &'a self, - app_name: &'a String, - service_name: &'a String, + app_name: &'a str, + service_name: &'a str, limit: usize, ) -> BoxStream<'a, Result> { Box::pin(stream! { diff --git a/api/src/infrastructure/kubernetes/payloads.rs b/api/src/infrastructure/kubernetes/payloads.rs index aac3830..3ba02fe 100644 --- a/api/src/infrastructure/kubernetes/payloads.rs +++ b/api/src/infrastructure/kubernetes/payloads.rs @@ -358,7 +358,7 @@ pub fn deployment_payload( env, volume_mounts, ports: Some(vec![ContainerPort { - container_port: service.port() as i32, + container_port: 80 as i32, ..Default::default() }]), resources, diff --git a/api/src/models/logs_chunks.rs b/api/src/models/logs_chunks.rs index d5ef120..f5c3e00 100644 --- a/api/src/models/logs_chunks.rs +++ b/api/src/models/logs_chunks.rs @@ -26,6 +26,7 @@ use chrono::{DateTime, FixedOffset, Utc}; use std::convert::From; +#[derive(Debug)] pub struct LogChunk { since: DateTime, until: DateTime, diff --git a/frontend/src/LogsDialog.vue b/frontend/src/LogsDialog.vue index 7fc0049..4198d76 100644 --- a/frontend/src/LogsDialog.vue +++ b/frontend/src/LogsDialog.vue @@ -32,8 +32,8 @@   Download Logs - +