Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Hardening and throughput improvements #155

Merged
merged 1 commit into from
Nov 26, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .editorconfig
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,7 @@ dotnet_diagnostic.IDE0065.severity = warning # MoveMisplacedUsingDirectives
dotnet_diagnostic.IDE0066.severity = warning # ConvertSwitchStatementToExpression
dotnet_diagnostic.IDE0070.severity = warning # UseSystemHashCode
dotnet_diagnostic.IDE0071.severity = warning # SimplifyInterpolationId
dotnet_diagnostic.IDE0072.severity = warning # PopulateSwitchExpression
dotnet_diagnostic.IDE0072.severity = none # PopulateSwitchExpression
dotnet_diagnostic.IDE0073.severity = warning # FileHeaderMismatch
dotnet_diagnostic.IDE0074.severity = warning # UseCoalesceCompoundAssignment
dotnet_diagnostic.IDE0075.severity = warning # SimplifyConditionalExpression
Expand Down
5 changes: 2 additions & 3 deletions server/ControlPlane/Compute/Kubernetes/Kubernetes.cs
Original file line number Diff line number Diff line change
Expand Up @@ -30,10 +30,9 @@ public static void AddKubernetes(this IHostApplicationBuilder builder)
? KubernetesClientConfiguration.InClusterConfig()
: KubernetesClientConfiguration.BuildConfigFromConfigFile(kubernetesOptions.KubeconfigPath);

var resilienceOptions = new HttpStandardResilienceOptions();
var pipeline = new ResiliencePipelineBuilder<HttpResponseMessage>()
.AddTimeout(resilienceOptions.TotalRequestTimeout)
.AddRetry(resilienceOptions.Retry)
.AddRetry(new HttpStandardResilienceOptions().Retry)
.AddTimeout(TimeSpan.FromSeconds(100))
.Build();

#pragma warning disable EXTEXP0001 // Type is for evaluation purposes only and is subject to change or removal in future updates. Suppress this diagnostic to proceed.
Expand Down
8 changes: 6 additions & 2 deletions server/ControlPlane/Compute/Kubernetes/RunFinalizer.cs
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,10 @@ namespace Tyger.ControlPlane.Compute.Kubernetes;
/// </summary>
public class RunFinalizer : BackgroundService
{
// A value that is too high will put a lot of load on the Kubernetes API server
// because retrieving logs is a relatively expensive operation.
private const int MaxConcurrentFinalizations = 10;

private readonly Repository _repository;
private readonly RunChangeFeed _changeFeed;

Expand All @@ -40,13 +44,13 @@ public RunFinalizer(Repository repository, RunChangeFeed changeFeed, ILogger<Run

protected override async Task ExecuteAsync(CancellationToken stoppingToken)
{
var channel = Channel.CreateBounded<ObservedRunState>(256);
var channel = Channel.CreateBounded<ObservedRunState>(1024);
_changeFeed.RegisterObserver(channel.Writer);

// Keep track of retry counts for failed finalizations
var failedRuns = new Dictionary<long, int>();

var allTasks = Enumerable.Range(0, 100).Select(async _ =>
var allTasks = Enumerable.Range(0, MaxConcurrentFinalizations).Select(async _ =>
{
try
{
Expand Down
13 changes: 13 additions & 0 deletions server/ControlPlane/Compute/Kubernetes/RunObjects.cs
Original file line number Diff line number Diff line change
Expand Up @@ -102,6 +102,19 @@ private ObservedRunState GetStatus()
return (containerStatus.State.Terminated.FinishedAt ?? fallbackTime, reason);
}

// Recognize other failure reasons, such as the pod being evicted.
V1Pod? failedPod = JobPods.FirstOrDefault(p => p?.Status?.Phase == "Failed");
if (failedPod != null)
{
return (failedPod.Status.Reason, failedPod.Status.Message) switch
{
("" or null, "" or null) => (fallbackTime, "Failed"),
("" or null, var message) => (fallbackTime, message),
(var reason, "" or null) => (fallbackTime, reason),
(var reason, var message) => (fallbackTime, $"{reason}: {message}"),
};
}

return null;
}

Expand Down
37 changes: 31 additions & 6 deletions server/ControlPlane/Compute/Kubernetes/RunStateObserver.cs
Original file line number Diff line number Diff line change
Expand Up @@ -16,16 +16,20 @@ namespace Tyger.ControlPlane.Compute.Kubernetes;
public class RunStateObserver : BackgroundService
{
private const string LeaseName = "run-state-observer";
private const int PartitionCount = 8;
private const int ParitionChannelSize = 1024;

private readonly IKubernetes _kubernetesClient;
private readonly Repository _repository;
private readonly ILoggerFactory _loggingFactory;
private readonly ILogger<RunStateObserver> _logger;
private readonly KubernetesApiOptions _kubernetesOptions;
private readonly Dictionary<long, RunObjects> _cache = [];
private readonly Dictionary<long, List<ChannelWriter<(RunObjects, WatchEventType, V1Pod)>>> _listeners = [];
private readonly List<Channel<(WatchEventType, V1Pod)>> _partitionedPodUpdateChannels = Enumerable.Range(0, PartitionCount).Select(_ => Channel.CreateBounded<(WatchEventType, V1Pod)>(ParitionChannelSize)).ToList();
private Task? _podInformerTask;
private readonly CancellationTokenSource _cancellationTokenSource = new();
private readonly Channel<(WatchEventType eventType, V1Pod resource)> _podUpdatesChannel = Channel.CreateBounded<(WatchEventType, V1Pod)>(new BoundedChannelOptions(10240));
private readonly Channel<(WatchEventType eventType, V1Pod resource)> _podUpdatesChannel = Channel.CreateBounded<(WatchEventType, V1Pod)>(PartitionCount * ParitionChannelSize);
private readonly Channel<int> _onLeaseOwnershipAcquiredChannel = Channel.CreateUnbounded<int>();

private int _latestLeaseToken = 1;
Expand All @@ -39,6 +43,7 @@ public RunStateObserver(IKubernetes kubernetesClient, IOptions<KubernetesApiOpti
_repository = repository;
_kubernetesOptions = kubernetesOptions.Value;
_loggingFactory = loggingFactory;
_logger = loggingFactory.CreateLogger<RunStateObserver>();
}

public override async Task StartAsync(CancellationToken cancellationToken)
Expand Down Expand Up @@ -108,7 +113,9 @@ async Task ProcessUpdates()

while (!_onLeaseOwnershipAcquiredChannel.Reader.TryPeek(out _) && _podUpdatesChannel.Reader.TryRead(out var update))
{
await OnPodUpdated(update.eventType, update.resource, stoppingToken);
var runIdString = update.resource.GetLabel(RunLabel);
int partitionIndex = int.Parse(runIdString) % PartitionCount;
await _partitionedPodUpdateChannels[partitionIndex].Writer.WriteAsync(update, stoppingToken);
}

await Task.WhenAny(_onLeaseOwnershipAcquiredChannel.Reader.WaitToReadAsync(stoppingToken).AsTask(), _podUpdatesChannel.Reader.WaitToReadAsync(stoppingToken).AsTask());
Expand All @@ -117,12 +124,30 @@ async Task ProcessUpdates()

var processUpdatesTask = ProcessUpdates();

var partitionedProcessors = Enumerable.Range(0, PartitionCount).Select(async partitionIndex =>
{
var channel = _partitionedPodUpdateChannels[partitionIndex];
await foreach (var (eventType, pod) in channel.Reader.ReadAllAsync(stoppingToken))
{
var count = channel.Reader.Count;
if (count > 100)
{
_logger.RunStateObserverHighQueueLength(partitionIndex, count);
}

await OnPodUpdated(eventType, pod, stoppingToken);
}
}).ToList();

var allTasks = new List<Task>(partitionedProcessors) { processUpdatesTask, _podInformerTask!, _acquireAndHoldLeaseTask! };

// fail if any fail
await await Task.WhenAny(_podInformerTask!, processUpdatesTask);
await await Task.WhenAny(allTasks);

await _podInformerTask!;
await processUpdatesTask;
await _acquireAndHoldLeaseTask!;
foreach (var task in allTasks)
{
await task;
}
}

private async Task OnAcquireLease(int acquiredLeaseToken, CancellationToken cancellationToken)
Expand Down
3 changes: 3 additions & 0 deletions server/ControlPlane/Compute/LoggerExtensions.cs
Original file line number Diff line number Diff line change
Expand Up @@ -43,4 +43,7 @@ public static partial class LoggerExtensions

[LoggerMessage(LogLevel.Information, "Restarting watch after exception")]
public static partial void RestartingWatchAfterException(this ILogger logger, Exception exception);

[LoggerMessage(LogLevel.Warning, "RunStateObserver channel {partition} has high count of {count}")]
public static partial void RunStateObserverHighQueueLength(this ILogger logger, int partition, int count);
}
15 changes: 12 additions & 3 deletions server/ControlPlane/Database/Repository.cs
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ namespace Tyger.ControlPlane.Database;

public class Repository
{
private const int MaxActiveRuns = 2000;
private const int MaxActiveRuns = 5000;
private const string NewRunChannelName = "new_run";
private const string RunFinalizedChannelName = "run_finalized";
private const string RunChangedChannelName = "run_changed";
Expand Down Expand Up @@ -1442,7 +1442,7 @@ ORDER BY created_at ASC
continue;
}

if (!await connection.WaitAsync(TimeSpan.FromMinutes(1), cancellationToken))
if (!await connection.WaitAsync(TimeSpan.FromSeconds(10), cancellationToken))
{
break;
}
Expand Down Expand Up @@ -1509,10 +1509,19 @@ async Task ProcessPayloads()

while (true)
{
if (await connection.WaitAsync(TimeSpan.FromMinutes(1), cancellationToken))
if (await connection.WaitAsync(TimeSpan.FromSeconds(10), cancellationToken))
{
await ProcessPayloads();
}
else
{
// Ensure the connection is still alive
using var cmd = new NpgsqlCommand("SELECT 1", connection);
await cmd.PrepareAsync(cancellationToken);
await cmd.ExecuteScalarAsync(cancellationToken);

await ProcessPayloads();
}
}
}

Expand Down
Loading