Skip to content

Commit

Permalink
Hardening and throughput improvements
Browse files Browse the repository at this point in the history
  • Loading branch information
johnstairs committed Nov 25, 2024
1 parent 75a5cff commit 03cc90f
Show file tree
Hide file tree
Showing 7 changed files with 68 additions and 15 deletions.
2 changes: 1 addition & 1 deletion .editorconfig
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,7 @@ dotnet_diagnostic.IDE0065.severity = warning # MoveMisplacedUsingDirectives
dotnet_diagnostic.IDE0066.severity = warning # ConvertSwitchStatementToExpression
dotnet_diagnostic.IDE0070.severity = warning # UseSystemHashCode
dotnet_diagnostic.IDE0071.severity = warning # SimplifyInterpolationId
dotnet_diagnostic.IDE0072.severity = warning # PopulateSwitchExpression
dotnet_diagnostic.IDE0072.severity = none # PopulateSwitchExpression
dotnet_diagnostic.IDE0073.severity = warning # FileHeaderMismatch
dotnet_diagnostic.IDE0074.severity = warning # UseCoalesceCompoundAssignment
dotnet_diagnostic.IDE0075.severity = warning # SimplifyConditionalExpression
Expand Down
5 changes: 2 additions & 3 deletions server/ControlPlane/Compute/Kubernetes/Kubernetes.cs
Original file line number Diff line number Diff line change
Expand Up @@ -30,10 +30,9 @@ public static void AddKubernetes(this IHostApplicationBuilder builder)
? KubernetesClientConfiguration.InClusterConfig()
: KubernetesClientConfiguration.BuildConfigFromConfigFile(kubernetesOptions.KubeconfigPath);

var resilienceOptions = new HttpStandardResilienceOptions();
var pipeline = new ResiliencePipelineBuilder<HttpResponseMessage>()
.AddTimeout(resilienceOptions.TotalRequestTimeout)
.AddRetry(resilienceOptions.Retry)
.AddRetry(new HttpStandardResilienceOptions().Retry)
.AddTimeout(TimeSpan.FromSeconds(100))
.Build();

#pragma warning disable EXTEXP0001 // Type is for evaluation purposes only and is subject to change or removal in future updates. Suppress this diagnostic to proceed.
Expand Down
8 changes: 6 additions & 2 deletions server/ControlPlane/Compute/Kubernetes/RunFinalizer.cs
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,10 @@ namespace Tyger.ControlPlane.Compute.Kubernetes;
/// </summary>
public class RunFinalizer : BackgroundService
{
// A value that is too high will put a lot of load on the Kubernetes API server
// because retrieving logs is a relatively expensive operation.
private const int MaxConcurrentFinalizations = 10;

private readonly Repository _repository;
private readonly RunChangeFeed _changeFeed;

Expand All @@ -40,13 +44,13 @@ public RunFinalizer(Repository repository, RunChangeFeed changeFeed, ILogger<Run

protected override async Task ExecuteAsync(CancellationToken stoppingToken)
{
var channel = Channel.CreateBounded<ObservedRunState>(256);
var channel = Channel.CreateBounded<ObservedRunState>(1024);
_changeFeed.RegisterObserver(channel.Writer);

// Keep track of retry counts for failed finalizations
var failedRuns = new Dictionary<long, int>();

var allTasks = Enumerable.Range(0, 100).Select(async _ =>
var allTasks = Enumerable.Range(0, MaxConcurrentFinalizations).Select(async _ =>
{
try
{
Expand Down
13 changes: 13 additions & 0 deletions server/ControlPlane/Compute/Kubernetes/RunObjects.cs
Original file line number Diff line number Diff line change
Expand Up @@ -102,6 +102,19 @@ private ObservedRunState GetStatus()
return (containerStatus.State.Terminated.FinishedAt ?? fallbackTime, reason);
}

// Recognize other failure reasons, such as the pod being evicted.
V1Pod? failedPod = JobPods.FirstOrDefault(p => p?.Status?.Phase == "Failed");
if (failedPod != null)
{
return (failedPod.Status.Reason, failedPod.Status.Message) switch
{
("" or null, "" or null) => (fallbackTime, "Failed"),
("" or null, var message) => (fallbackTime, message),
(var reason, "" or null) => (fallbackTime, reason),
(var reason, var message) => (fallbackTime, $"{reason}: {message}"),
};
}

return null;
}

Expand Down
37 changes: 31 additions & 6 deletions server/ControlPlane/Compute/Kubernetes/RunStateObserver.cs
Original file line number Diff line number Diff line change
Expand Up @@ -16,16 +16,20 @@ namespace Tyger.ControlPlane.Compute.Kubernetes;
public class RunStateObserver : BackgroundService
{
private const string LeaseName = "run-state-observer";
private const int PartitionCount = 8;
private const int ParitionChannelSize = 1024;

private readonly IKubernetes _kubernetesClient;
private readonly Repository _repository;
private readonly ILoggerFactory _loggingFactory;
private readonly ILogger<RunStateObserver> _logger;
private readonly KubernetesApiOptions _kubernetesOptions;
private readonly Dictionary<long, RunObjects> _cache = [];
private readonly Dictionary<long, List<ChannelWriter<(RunObjects, WatchEventType, V1Pod)>>> _listeners = [];
private readonly List<Channel<(WatchEventType, V1Pod)>> _partitionedPodUpdateChannels = Enumerable.Range(0, PartitionCount).Select(_ => Channel.CreateBounded<(WatchEventType, V1Pod)>(ParitionChannelSize)).ToList();
private Task? _podInformerTask;
private readonly CancellationTokenSource _cancellationTokenSource = new();
private readonly Channel<(WatchEventType eventType, V1Pod resource)> _podUpdatesChannel = Channel.CreateBounded<(WatchEventType, V1Pod)>(new BoundedChannelOptions(10240));
private readonly Channel<(WatchEventType eventType, V1Pod resource)> _podUpdatesChannel = Channel.CreateBounded<(WatchEventType, V1Pod)>(PartitionCount * ParitionChannelSize);
private readonly Channel<int> _onLeaseOwnershipAcquiredChannel = Channel.CreateUnbounded<int>();

private int _latestLeaseToken = 1;
Expand All @@ -39,6 +43,7 @@ public RunStateObserver(IKubernetes kubernetesClient, IOptions<KubernetesApiOpti
_repository = repository;
_kubernetesOptions = kubernetesOptions.Value;
_loggingFactory = loggingFactory;
_logger = loggingFactory.CreateLogger<RunStateObserver>();
}

public override async Task StartAsync(CancellationToken cancellationToken)
Expand Down Expand Up @@ -108,7 +113,9 @@ async Task ProcessUpdates()

while (!_onLeaseOwnershipAcquiredChannel.Reader.TryPeek(out _) && _podUpdatesChannel.Reader.TryRead(out var update))
{
await OnPodUpdated(update.eventType, update.resource, stoppingToken);
var runIdString = update.resource.GetLabel(RunLabel);
int partitionIndex = int.Parse(runIdString) % PartitionCount;
await _partitionedPodUpdateChannels[partitionIndex].Writer.WriteAsync(update, stoppingToken);
}

await Task.WhenAny(_onLeaseOwnershipAcquiredChannel.Reader.WaitToReadAsync(stoppingToken).AsTask(), _podUpdatesChannel.Reader.WaitToReadAsync(stoppingToken).AsTask());
Expand All @@ -117,12 +124,30 @@ async Task ProcessUpdates()

var processUpdatesTask = ProcessUpdates();

var partitionedProcessors = Enumerable.Range(0, PartitionCount).Select(async partitionIndex =>
{
var channel = _partitionedPodUpdateChannels[partitionIndex];
await foreach (var (eventType, pod) in channel.Reader.ReadAllAsync(stoppingToken))
{
var count = channel.Reader.Count;
if (count > 100)
{
_logger.RunStateObserverHighQueueLength(partitionIndex, count);
}

await OnPodUpdated(eventType, pod, stoppingToken);
}
}).ToList();

var allTasks = new List<Task>(partitionedProcessors) { processUpdatesTask, _podInformerTask!, _acquireAndHoldLeaseTask! };

// fail if any fail
await await Task.WhenAny(_podInformerTask!, processUpdatesTask);
await await Task.WhenAny(allTasks);

await _podInformerTask!;
await processUpdatesTask;
await _acquireAndHoldLeaseTask!;
foreach (var task in allTasks)
{
await task;
}
}

private async Task OnAcquireLease(int acquiredLeaseToken, CancellationToken cancellationToken)
Expand Down
3 changes: 3 additions & 0 deletions server/ControlPlane/Compute/LoggerExtensions.cs
Original file line number Diff line number Diff line change
Expand Up @@ -43,4 +43,7 @@ public static partial class LoggerExtensions

[LoggerMessage(LogLevel.Information, "Restarting watch after exception")]
public static partial void RestartingWatchAfterException(this ILogger logger, Exception exception);

[LoggerMessage(LogLevel.Warning, "RunStateObserver channel {partition} has high count of {count}")]
public static partial void RunStateObserverHighQueueLength(this ILogger logger, int partition, int count);
}
15 changes: 12 additions & 3 deletions server/ControlPlane/Database/Repository.cs
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ namespace Tyger.ControlPlane.Database;

public class Repository
{
private const int MaxActiveRuns = 2000;
private const int MaxActiveRuns = 5000;
private const string NewRunChannelName = "new_run";
private const string RunFinalizedChannelName = "run_finalized";
private const string RunChangedChannelName = "run_changed";
Expand Down Expand Up @@ -1442,7 +1442,7 @@ ORDER BY created_at ASC
continue;
}

if (!await connection.WaitAsync(TimeSpan.FromMinutes(1), cancellationToken))
if (!await connection.WaitAsync(TimeSpan.FromSeconds(10), cancellationToken))
{
break;
}
Expand Down Expand Up @@ -1509,10 +1509,19 @@ async Task ProcessPayloads()

while (true)
{
if (await connection.WaitAsync(TimeSpan.FromMinutes(1), cancellationToken))
if (await connection.WaitAsync(TimeSpan.FromSeconds(10), cancellationToken))
{
await ProcessPayloads();
}
else
{
// Ensure the connection is still alive
using var cmd = new NpgsqlCommand("SELECT 1", connection);
await cmd.PrepareAsync(cancellationToken);
await cmd.ExecuteScalarAsync(cancellationToken);

await ProcessPayloads();
}
}
}

Expand Down

0 comments on commit 03cc90f

Please sign in to comment.