diff --git a/core/src/main/java/io/kestra/core/services/ExecutionLogService.java b/core/src/main/java/io/kestra/core/services/ExecutionLogService.java index b50dacf7c8..84d7580f95 100644 --- a/core/src/main/java/io/kestra/core/services/ExecutionLogService.java +++ b/core/src/main/java/io/kestra/core/services/ExecutionLogService.java @@ -13,10 +13,16 @@ import reactor.core.publisher.Flux; import reactor.core.publisher.FluxSink; +import java.io.ByteArrayInputStream; +import java.io.InputStream; import java.util.List; import java.util.concurrent.atomic.AtomicReference; +import java.util.stream.Collectors; import java.util.stream.Stream; +/** + * Service for fetching logs for from an execution. + */ @Singleton public class ExecutionLogService { @@ -31,15 +37,15 @@ public Flux> streamExecutionLogs(final String tenantId, final String executionId, final Level minLevel) { - List levels = LogEntry.findLevelsByMin(minLevel).stream().map(Enum::name).toList(); - final AtomicReference disposable = new AtomicReference<>(); return Flux.>create(emitter -> { // fetch repository first - fetchExecutionExecutionLogs(tenantId, executionId, minLevel, levels) + getExecutionLogs(tenantId, executionId, minLevel, List.of()) .forEach(logEntry -> emitter.next(Event.of(logEntry).id("progress"))); + final List levels = LogEntry.findLevelsByMin(minLevel).stream().map(Enum::name).toList(); + // consume in realtime disposable.set(this.logQueue.receive(either -> { if (either.isRight()) { @@ -47,6 +53,7 @@ public Flux> streamExecutionLogs(final String tenantId, } LogEntry current = either.getLeft(); + if (current.getExecutionId() != null && current.getExecutionId().equals(executionId)) { if (levels.contains(current.getLevel().name())) { emitter.next(Event.of(current).id("progress")); @@ -66,9 +73,42 @@ public Flux> streamExecutionLogs(final String tenantId, }); } - public Stream fetchExecutionExecutionLogs(String tenantId, String executionId, Level minLevel, List levels) { - return logRepository.findByExecutionId(tenantId, executionId, minLevel, Pageable.UNPAGED) + public InputStream getExecutionLogsAsStream(final String tenantId, + final String executionId, + final Level minLevel, + final String taskRunId, + final List taskIds, + final Integer attempt) { + List logs = getExecutionLogs(tenantId, executionId, minLevel, taskRunId, taskIds, attempt); + return new ByteArrayInputStream(logs.stream().map(LogEntry::toPrettyString).collect(Collectors.joining("\n")).getBytes()); + } + + public List getExecutionLogs(final String tenantId, + final String executionId, + final Level minLevel, + final String taskRunId, + final List taskIds, + final Integer attempt) { + if (taskIds != null) { + return taskIds.size() == 1 ? + logRepository.findByExecutionIdAndTaskId(tenantId, executionId, taskIds.getFirst(), minLevel): + getExecutionLogs(tenantId, executionId, minLevel, taskIds).toList(); + } + + if (taskRunId != null) { + return attempt != null ? + logRepository.findByExecutionIdAndTaskRunIdAndAttempt(tenantId, executionId, taskRunId, minLevel, attempt) : + logRepository.findByExecutionIdAndTaskRunId(tenantId, executionId, taskRunId, minLevel); + } + return logRepository.findByExecutionId(tenantId, executionId, minLevel); + } + + public Stream getExecutionLogs(String tenantId, + String executionId, + Level minLevel, + List taskIds) { + return logRepository.findByExecutionId(tenantId, executionId, minLevel) .stream() - .filter(logEntry -> levels.contains(logEntry.getLevel().name())); + .filter(data -> taskIds.isEmpty() || taskIds.contains(data.getTaskId())); } } diff --git a/webserver/src/main/java/io/kestra/webserver/controllers/api/LogController.java b/webserver/src/main/java/io/kestra/webserver/controllers/api/LogController.java index fabff797ff..a0b27623df 100644 --- a/webserver/src/main/java/io/kestra/webserver/controllers/api/LogController.java +++ b/webserver/src/main/java/io/kestra/webserver/controllers/api/LogController.java @@ -23,11 +23,10 @@ import org.slf4j.event.Level; import reactor.core.publisher.Flux; -import java.io.ByteArrayInputStream; import java.io.InputStream; import java.time.ZonedDateTime; import java.util.List; -import java.util.stream.Collectors; +import java.util.Optional; import static io.kestra.core.utils.DateUtils.validateTimeline; @@ -76,16 +75,14 @@ public List findByExecution( @Parameter(description = "The task id") @Nullable @QueryValue String taskId, @Parameter(description = "The attempt number") @Nullable @QueryValue Integer attempt ) { - if (taskId != null) { - return logRepository.findByExecutionIdAndTaskId(tenantService.resolveTenant(), executionId, taskId, minLevel); - } else if (taskRunId != null) { - if (attempt != null) { - return logRepository.findByExecutionIdAndTaskRunIdAndAttempt(tenantService.resolveTenant(), executionId, taskRunId, minLevel, attempt); - } - return logRepository.findByExecutionIdAndTaskRunId(tenantService.resolveTenant(), executionId, taskRunId, minLevel); - } else { - return logRepository.findByExecutionId(tenantService.resolveTenant(), executionId, minLevel); - } + return logService.getExecutionLogs( + tenantService.resolveTenant(), + executionId, + minLevel, + taskRunId, + Optional.ofNullable(taskId).map(List::of).orElse(null), + attempt + ); } @ExecuteOn(TaskExecutors.IO) @@ -98,19 +95,14 @@ public StreamedFile download( @Parameter(description = "The task id") @Nullable @QueryValue String taskId, @Parameter(description = "The attempt number") @Nullable @QueryValue Integer attempt ) { - List logEntries; - if (taskId != null) { - logEntries = logRepository.findByExecutionIdAndTaskId(tenantService.resolveTenant(), executionId, taskId, minLevel); - } else if (taskRunId != null) { - if (attempt != null) { - logEntries = logRepository.findByExecutionIdAndTaskRunIdAndAttempt(tenantService.resolveTenant(), executionId, taskRunId, minLevel, attempt); - } else { - logEntries = logRepository.findByExecutionIdAndTaskRunId(tenantService.resolveTenant(), executionId, taskRunId, minLevel); - } - } else { - logEntries = logRepository.findByExecutionId(tenantService.resolveTenant(), executionId, minLevel); - } - InputStream inputStream = new ByteArrayInputStream(logEntries.stream().map(LogEntry::toPrettyString).collect(Collectors.joining("\n")).getBytes()); + InputStream inputStream = logService.getExecutionLogsAsStream( + tenantService.resolveTenant(), + executionId, + minLevel, + taskRunId, + Optional.ofNullable(taskId).map(List::of).orElse(null), + attempt + ); return new StreamedFile(inputStream, MediaType.TEXT_PLAIN_TYPE).attach(executionId + ".log"); }