Skip to content

Commit

Permalink
Record run start time (#150)
Browse files Browse the repository at this point in the history
  • Loading branch information
johnstairs authored Nov 19, 2024
1 parent 9586806 commit 235f2fb
Show file tree
Hide file tree
Showing 10 changed files with 68 additions and 14 deletions.
7 changes: 6 additions & 1 deletion cli/integrationtest/controlplane_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -103,11 +103,16 @@ func TestEndToEnd(t *testing.T) {
"-b", fmt.Sprintf("input=%s", inputBufferId),
"-b", fmt.Sprintf("output=%s", outputBufferId))

waitForRunSuccess(t, runId)
run := waitForRunSuccess(t, runId)

output := runCommandSucceeds(t, "sh", "-c", fmt.Sprintf(`tyger buffer read "%s"`, outputSasUri))

require.Equal("Hello: Bonjour", output)

require.NotNil(run.StartedAt)
require.GreaterOrEqual(*run.StartedAt, run.CreatedAt)
require.NotNil(run.FinishedAt)
require.GreaterOrEqual(*run.FinishedAt, *run.StartedAt)
}

func TestEndToEndWithAutomaticallyCreatedBuffers(t *testing.T) {
Expand Down
5 changes: 5 additions & 0 deletions cli/integrationtest/expected_openapi_spec.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -747,6 +747,11 @@ components:
description: The time the run was created. Populated by the system.
format: date-time
nullable: true
startedAt:
type: string
description: The time the run's job started. Populated by the system.
format: date-time
nullable: true
finishedAt:
type: string
description: The time the run finished. Populated by the system.
Expand Down
1 change: 1 addition & 0 deletions cli/internal/controlplane/model/model.go
Original file line number Diff line number Diff line change
Expand Up @@ -207,6 +207,7 @@ type RunMetadata struct {
StatusReason string `json:"statusReason,omitempty"`
RunningCount *int `json:"runningCount,omitempty"`
CreatedAt time.Time `json:"createdAt,omitempty"`
StartedAt *time.Time `json:"startedAt,omitempty"`
FinishedAt *time.Time `json:"finishedAt,omitempty"`
}

Expand Down
15 changes: 15 additions & 0 deletions cli/internal/dataplane/relayserver.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,14 @@ func RelayInputServer(
}

defer complete()

defer func() {
flusher, ok := w.(http.Flusher)
if ok {
flusher.Flush()
}
}()

if outputWriter == io.Discard {
log.Warn().Msg("Discarding input data")
w.WriteHeader(http.StatusAccepted)
Expand Down Expand Up @@ -120,6 +128,13 @@ func RelayOutputServer(
}
defer complete()

defer func() {
flusher, ok := w.(http.Flusher)
if ok {
flusher.Flush()
}
}()

_, err := io.Copy(w, inputReader)
if err != nil {
log.Error().Err(err).Msg("transfer failed")
Expand Down
8 changes: 6 additions & 2 deletions server/ControlPlane/Compute/Docker/DockerRunCreator.cs
Original file line number Diff line number Diff line change
Expand Up @@ -120,8 +120,7 @@ public async Task<Run> CreateRun(Run run, string? idempotencyKey, CancellationTo
run = await Repository.CreateRun(run, idempotencyKey, cancellationToken);

var bufferMap = await GetBufferMap(jobCodespec.Buffers, run.Job.Buffers!, cancellationToken);

string mainContainerName = $"tyger-run-{run.Id}-main";
string mainContainerName = MainContainerName(run.Id!.Value);

if (run.Job.Buffers != null)
{
Expand Down Expand Up @@ -471,6 +470,11 @@ void WriteTombstone()
return run with { Status = RunStatus.Running };
}

internal static string MainContainerName(long runId)
{
return $"tyger-run-{runId}-main";
}

private async Task CreateAndStartContainer(CreateContainerParameters sidecarContainerParameters, CancellationToken cancellationToken)
{
var createResponse = await _client.Containers.CreateContainerAsync(sidecarContainerParameters, cancellationToken);
Expand Down
19 changes: 14 additions & 5 deletions server/ControlPlane/Compute/Docker/DockerRunReader.cs
Original file line number Diff line number Diff line change
Expand Up @@ -141,9 +141,17 @@ async Task ScheduleFirstUpdate()

public static Run UpdateRunFromContainers(Run run, IReadOnlyList<ContainerInspectResponse> containers)
{
var mainContainerName = $"/{DockerRunCreator.MainContainerName(run.Id!.Value)}"; // inspect always returns names with a leading slash

var mainContainer = containers.FirstOrDefault(c => c.Name == mainContainerName);
if (mainContainer != null)
{
run = run with { StartedAt = mainContainer?.State.StartedAt == null ? null : DateTimeOffset.Parse(mainContainer.State.StartedAt) };
}

if (run.Status is RunStatus.Canceled)
{
return run;
return run with { Status = RunStatus.Canceled };
}

var socketCount = containers.Aggregate(
Expand All @@ -159,14 +167,15 @@ public static Run UpdateRunFromContainers(Run run, IReadOnlyList<ContainerInspec
return run with { Status = RunStatus.Failed };
}

if (containers.Any(c => c.State.Status == "exited" && c.State.ExitCode != 0))
ContainerInspectResponse? exited;
if ((exited = containers.FirstOrDefault(c => c.State.Status == "exited" && c.State.ExitCode != 0)) != null)
{
return run with { Status = RunStatus.Failed };
return run with { Status = RunStatus.Failed, FinishedAt = DateTimeOffset.Parse(exited.State.FinishedAt) };
}

if (containers.All(c => c.State.Status == "exited" && c.State.ExitCode == 0))
{
return run with { Status = RunStatus.Succeeded };
return run with { Status = RunStatus.Succeeded, FinishedAt = DateTimeOffset.Parse(containers.Max(c => c.State.FinishedAt)!) };
}

if (socketCount > 0)
Expand All @@ -175,7 +184,7 @@ public static Run UpdateRunFromContainers(Run run, IReadOnlyList<ContainerInspec
// If the main container has opened a socket, we consider the run successful if all other containers have exited successfully
if (mainSocketContainer != null && containers.Where(c => c.ID != mainSocketContainer.ID).All(c => c.State.Status == "exited" && c.State.ExitCode == 0))
{
return run with { Status = RunStatus.Succeeded };
return run with { Status = RunStatus.Succeeded, FinishedAt = DateTimeOffset.Parse(containers.Max(c => c.State.FinishedAt)!) };
}
}

Expand Down
15 changes: 10 additions & 5 deletions server/ControlPlane/Compute/Kubernetes/RunObjects.cs
Original file line number Diff line number Diff line change
Expand Up @@ -56,11 +56,14 @@ public void ClearCachedMetadata()

private ObservedRunState GetStatus()
{
var startedAt = GetStartedTime();

if (GetFailureTimeAndReason() is (var failureTime, var reason))
{
return new(Id, RunStatus.Failed, JobPods.Length, WorkerPods.Length)
{
StatusReason = reason,
StartedAt = startedAt,
FinishedAt = failureTime,
};
}
Expand All @@ -69,6 +72,7 @@ private ObservedRunState GetStatus()
{
return new(Id, RunStatus.Succeeded, JobPods.Length, WorkerPods.Length)
{
StartedAt = startedAt,
FinishedAt = successTime,
};
}
Expand All @@ -77,24 +81,25 @@ private ObservedRunState GetStatus()

if (runningCount > 0)
{
return new(Id, RunStatus.Running, JobPods.Length, WorkerPods.Length) { RunningCount = runningCount };
return new(Id, RunStatus.Running, JobPods.Length, WorkerPods.Length) { StartedAt = startedAt, RunningCount = runningCount };
}

return new(Id, RunStatus.Pending, JobPods.Length, WorkerPods.Length);
return new(Id, RunStatus.Pending, JobPods.Length, WorkerPods.Length) { StartedAt = startedAt };
}

private (DateTimeOffset, string)? GetFailureTimeAndReason()
{
var fallbackTime = DateTimeOffset.UtcNow;
var containerStatus = JobPods
.Where(p => p?.Status?.ContainerStatuses != null)
.SelectMany(p => p!.Status.ContainerStatuses)
.Where(cs => cs.State?.Terminated?.ExitCode is not null and not 0)
.MinBy(cs => cs.State.Terminated.FinishedAt!.Value);
.MinBy(cs => cs.State.Terminated.FinishedAt ?? fallbackTime); // sometimes FinishedAt is null https://github.com/kubernetes/kubernetes/issues/104107

if (containerStatus != null)
{
var reason = $"{(containerStatus.Name == "main" ? "Main" : "Sidecar")} exited with code {containerStatus.State.Terminated.ExitCode}";
return (containerStatus.State.Terminated.FinishedAt!.Value, reason);
return (containerStatus.State.Terminated.FinishedAt ?? fallbackTime, reason);
}

return null;
Expand All @@ -105,7 +110,7 @@ private ObservedRunState GetStatus()
DateTimeOffset? startedTime = null;
foreach (var pod in JobPods)
{
if (pod == null)
if (pod?.Status?.ContainerStatuses == null)
{
continue;
}
Expand Down
2 changes: 1 addition & 1 deletion server/ControlPlane/Database/Repository.cs
Original file line number Diff line number Diff line change
Expand Up @@ -523,7 +523,7 @@ FOR UPDATE

var runJson = reader.GetString(0);
var run = JsonSerializer.Deserialize<Run>(runJson, _serializerOptions) ?? throw new InvalidOperationException("Failed to deserialize run.");
updatedRun = run with { Status = state.Status, StatusReason = state.StatusReason, RunningCount = state.RunningCount, FinishedAt = state.FinishedAt, Job = run.Job with { NodePool = state.JobNodePool }, Worker = run.Worker == null ? null : run.Worker with { NodePool = state.WorkerNodePool } };
updatedRun = state.ApplyToRun(run);
if (updatedRun.Equals(run))
{
return;
Expand Down
6 changes: 6 additions & 0 deletions server/ControlPlane/Model/Model.cs
Original file line number Diff line number Diff line change
Expand Up @@ -451,6 +451,11 @@ public record Run : ModelBase
/// </summary>
public DateTimeOffset? CreatedAt { get; init; }

/// <summary>
/// The time the run's job started. Populated by the system.
/// </summary>
public DateTimeOffset? StartedAt { get; init; }

/// <summary>
/// The time the run finished. Populated by the system.
/// </summary>
Expand Down Expand Up @@ -486,6 +491,7 @@ public Run WithoutSystemProperties()
StatusReason = null,
RunningCount = null,
CreatedAt = default,
StartedAt = default,
FinishedAt = null,
};
}
Expand Down
4 changes: 4 additions & 0 deletions server/ControlPlane/Model/ObservedRunState.cs
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ public ObservedRunState(Run run, DateTimeOffset? databaseUpdatedAt)
{
StatusReason = run.StatusReason;
RunningCount = run.RunningCount;
StartedAt = run.StartedAt;
FinishedAt = run.FinishedAt;
JobNodePool = run.Job.NodePool;
WorkerNodePool = run.Worker?.NodePool;
Expand All @@ -35,6 +36,8 @@ public ObservedRunState(Run run, DateTimeOffset? databaseUpdatedAt)

public int? RunningCount { get; init; }

public DateTimeOffset? StartedAt { get; init; }

public DateTimeOffset? FinishedAt { get; init; }

public string? JobNodePool { get; init; }
Expand All @@ -51,6 +54,7 @@ public readonly Run ApplyToRun(Run run)
Status = Status,
StatusReason = StatusReason,
RunningCount = RunningCount,
StartedAt = StartedAt,
FinishedAt = FinishedAt,
Job = run.Job with { NodePool = JobNodePool },
Worker = run.Worker == null ? null : run.Worker with { NodePool = WorkerNodePool }
Expand Down

0 comments on commit 235f2fb

Please sign in to comment.