Skip to content

Commit

Permalink
Working Commit 2
Browse files Browse the repository at this point in the history
  • Loading branch information
samuchila committed Jan 12, 2024
1 parent 1195a0c commit 2471975
Show file tree
Hide file tree
Showing 9 changed files with 96 additions and 50 deletions.
46 changes: 45 additions & 1 deletion api/src/apps/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -341,7 +341,7 @@ impl AppsService {
pub async fn stream_logs<'a>(
&'a self,
app_name: &'a AppName,
service_name: &'a String,
service_name: &'a str,
limit: usize,
) -> BoxStream<'a, Result<LogEvents, failure::Error>> {
self.infrastructure
Expand Down Expand Up @@ -447,6 +447,7 @@ mod tests {
use crate::models::{EnvironmentVariable, ServiceBuilder};
use crate::sc;
use chrono::Utc;
use futures::StreamExt;
use secstr::SecUtf8;
use std::hash::Hash;
use std::io::Write;
Expand Down Expand Up @@ -708,6 +709,49 @@ Log msg 3 of service-a of app master
Ok(())
}

#[tokio::test]
async fn should_stream_logs_from_infrastructure() -> Result<(), AppsServiceError> {
let config = Config::default();
let infrastructure = Box::new(Dummy::new());
let apps = AppsService::new(config, infrastructure)?;

let app_name = AppName::from_str("master").unwrap();

apps.create_or_update(
&app_name,
&AppStatusChangeId::new(),
None,
&vec![sc!("service-a"), sc!("service-b")],
)
.await?;

let service_for_logs = String::from("service-a");
let mut count = 0;
let mut log_chunk = apps.stream_logs(&app_name, &service_for_logs, 5).await;
while let Some(result) = log_chunk.next().await {
match result {
Ok(events) => match events {
LogEvents::Message(message) => assert_eq!(
message,
format!("Log msg 1 of service-a of app master\nLog msg 2 of service-a of app master")
),
LogEvents::Line(line) => {
assert_eq!(
line,
format!("Streaming log msg {count} of service-a of app master")
);
count += 1;
}
},
Err(_e) => {
break;
}
}
}

Ok(())
}

#[tokio::test]
async fn should_deploy_companions() -> Result<(), AppsServiceError> {
let config = config_from_str!(
Expand Down
9 changes: 4 additions & 5 deletions api/src/apps/routes.rs
Original file line number Diff line number Diff line change
Expand Up @@ -207,7 +207,7 @@ async fn logs(
}
},
};
let limit = limit.unwrap_or(20_000);
let limit = limit.unwrap_or_default();

let log_chunk = apps
.get_logs(&app_name, &service_name, &since, limit)
Expand All @@ -228,19 +228,18 @@ async fn logs(
)]
async fn stream_logs<'a>(
app_name: Result<AppName, AppNameError>,
service_name: String,
service_name: &'a str,
limit: Option<usize>,
apps: &'a State<Arc<Apps>>,
) -> EventStream![Event + 'a] {
let app_name = app_name.unwrap();
let limit = limit.unwrap_or(20_000);

EventStream! {
let mut log_chunk = Box::pin(
let mut log_chunk =
apps
.stream_logs(&app_name, &service_name, limit)
.await,
);
.await;
while let Some(result) = log_chunk.as_mut().next().await {
match result {
Ok(events) => {
Expand Down
14 changes: 9 additions & 5 deletions api/src/infrastructure/docker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -798,20 +798,24 @@ impl Infrastructure for DockerInfrastructure {
container.id,
from
);

let limit_string = if limit == 0 {
String::from("all")
} else {
limit.to_string()
};
let log_options = match from {
Some(from) => LogsOptions::builder()
.since(from)
.stdout(true)
.stderr(true)
.timestamps(true)
.tail("all")
.tail(&limit_string)
.build(),
None => LogsOptions::builder()
.stdout(true)
.stderr(true)
.timestamps(true)
.tail(&limit.to_string())
.tail(&limit_string)
.build(),
};

Expand Down Expand Up @@ -848,8 +852,8 @@ impl Infrastructure for DockerInfrastructure {

fn stream_logs<'a>(
&'a self,
app_name: &'a String,
service_name: &'a String,
app_name: &'a str,
service_name: &'a str,
limit: usize,
) -> BoxStream<'a, Result<LogEvents, failure::Error>> {
Box::pin(stream! {
Expand Down
20 changes: 9 additions & 11 deletions api/src/infrastructure/dummy_infrastructure.rs
Original file line number Diff line number Diff line change
Expand Up @@ -203,21 +203,19 @@ impl Infrastructure for DummyInfrastructure {

fn stream_logs<'a>(
&'a self,
app_name: &'a String,
service_name: &'a String,
_limit: usize,
app_name: &'a str,
service_name: &'a str,
limit: usize,
) -> BoxStream<'a, Result<LogEvents, failure::Error>> {
Box::pin(stream! {
let logs = vec![
format!("Log msg 1 of {} of app {}\n", service_name, app_name),
format!("Log msg 2 of {} of app {}\n", service_name, app_name),
format!("Log msg 3 of {} of app {}\n", service_name, app_name),
];
let logs = format!("Log msg 1 of {service_name} of app {app_name}\nLog msg 2 of {service_name} of app {app_name}");

yield Ok(LogEvents::Message(logs));

for log in logs {
yield Ok(LogEvents::Line(log));
}

for i in 0..limit {
yield Ok(LogEvents::Line(format!("Streaming log msg {i} of {service_name} of app {app_name}")));
}
})
}

Expand Down
4 changes: 2 additions & 2 deletions api/src/infrastructure/infrastructure.rs
Original file line number Diff line number Diff line change
Expand Up @@ -82,8 +82,8 @@ pub trait Infrastructure: Send + Sync {
/// Returns the logs streamed as events.
fn stream_logs<'a>(
&'a self,
app_name: &'a String,
service_name: &'a String,
app_name: &'a str,
service_name: &'a str,
limit: usize,
) -> BoxStream<'a, Result<LogEvents, failure::Error>>;

Expand Down
11 changes: 7 additions & 4 deletions api/src/infrastructure/kubernetes/infrastructure.rs
Original file line number Diff line number Diff line change
Expand Up @@ -700,7 +700,11 @@ impl Infrastructure for KubernetesInfrastructure {
.timestamp()
})
.filter(|since_seconds| since_seconds > &0),
tail_lines: Some(limit.try_into().unwrap()),
tail_lines: if limit == 0 {
None
} else {
Some(limit.try_into().unwrap())
},
..Default::default()
};

Expand All @@ -712,7 +716,6 @@ impl Infrastructure for KubernetesInfrastructure {
let logs = logs
.split('\n')
.enumerate()
.filter(move |(index, _)| index < &limit)
.filter(|(_, line)| !line.is_empty())
.map(|(_, line)| {
let mut iter = line.splitn(2, ' ');
Expand All @@ -734,8 +737,8 @@ impl Infrastructure for KubernetesInfrastructure {

fn stream_logs<'a>(
&'a self,
app_name: &'a String,
service_name: &'a String,
app_name: &'a str,
service_name: &'a str,
limit: usize,
) -> BoxStream<'a, Result<LogEvents, failure::Error>> {
Box::pin(stream! {
Expand Down
2 changes: 1 addition & 1 deletion api/src/infrastructure/kubernetes/payloads.rs
Original file line number Diff line number Diff line change
Expand Up @@ -358,7 +358,7 @@ pub fn deployment_payload(
env,
volume_mounts,
ports: Some(vec![ContainerPort {
container_port: service.port() as i32,
container_port: 80 as i32,
..Default::default()
}]),
resources,
Expand Down
1 change: 1 addition & 0 deletions api/src/models/logs_chunks.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
use chrono::{DateTime, FixedOffset, Utc};
use std::convert::From;

#[derive(Debug)]
pub struct LogChunk {
since: DateTime<FixedOffset>,
until: DateTime<FixedOffset>,
Expand Down
39 changes: 18 additions & 21 deletions frontend/src/LogsDialog.vue
Original file line number Diff line number Diff line change
Expand Up @@ -32,8 +32,8 @@
&nbsp;
Download Logs</button>
</div>
<DynamicScroller ref="scroller" :items="logLines" :min-item-size="54" :item-size="itemSize" class="ra-logs"
:emit-update="true" :buffer="600">
<DynamicScroller ref="scroller" :items="logLines" :min-item-size="24" :item-size="itemSize" class="ra-logs"
:emit-update="true" :buffer="800">
<template v-slot="{ item, index, active }">
<DynamicScrollerItem :item="item" :active="active" :size-dependencies="[item.line,]" :data-index="index"
:data-active="active">
Expand All @@ -48,26 +48,23 @@
</template>

<style>
@import 'vue-virtual-scroller/dist/vue-virtual-scroller.css';
@import 'vue-virtual-scroller/dist/vue-virtual-scroller.css';
.ra-logs {
height: 80vh;
overflow: auto;
display: flex;
flex-direction: column;
background-color: black;
color: white;
font-family: var(--font-family-monospace);
.ra-logs {
height: 80vh;
overflow: auto;
padding: 0.5rem;
}
background-color: black;
color: white;
font-family: var(--font-family-monospace);
.ra-log-line {
white-space: nowrap;
overflow: hidden;
text-overflow: ellipsis;
height: 20px;
}
padding: 0.5rem;
}
.ra-log-line {
white-space: nowrap;
height: 20px;
}
</style>

<script>
Expand Down Expand Up @@ -114,14 +111,15 @@
},
},
mounted() {
this.$refs.dialog.open();
this.fetchLogs(this.currentPageLink);
this.$refs.scroller.$el.addEventListener('scroll', this.handleScroll);
},
beforeDestroy() {
if (this.eventSource) {
this.eventSource.close();
}
this.$refs.scroller.$el.removeEventListener('scroll');
this.$refs.scroller.$el.removeEventListener('scroll',this.handleScroll);
},
methods: {
fetchLogs(newRequestUri, reload = false) {
Expand All @@ -136,7 +134,6 @@
requestUri = newRequestUri;
this.eventSource = new EventSource(requestUri);
this.eventSource.onopen = () => {
this.$refs.dialog.open();
retryCount = 0;
};
Expand Down

0 comments on commit 2471975

Please sign in to comment.