Skip to content

Commit

Permalink
Broker fixes for token refreshes and AccessDeniedException (#3161)
Browse files Browse the repository at this point in the history
  • Loading branch information
luketomlinson authored Feb 21, 2024
1 parent 6603bfb commit 3449d5f
Show file tree
Hide file tree
Showing 3 changed files with 32 additions and 4 deletions.
24 changes: 23 additions & 1 deletion src/Runner.Common/BrokerServer.cs
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@ public interface IBrokerServer : IRunnerService
Task DeleteSessionAsync(CancellationToken cancellationToken);

Task<TaskAgentMessage> GetRunnerMessageAsync(Guid? sessionId, TaskAgentStatus status, string version, string os, string architecture, bool disableUpdate, CancellationToken token);

Task UpdateConnectionIfNeeded(Uri serverUri, VssCredentials credentials);
}

public sealed class BrokerServer : RunnerService, IBrokerServer
Expand Down Expand Up @@ -59,7 +61,7 @@ public Task<TaskAgentMessage> GetRunnerMessageAsync(Guid? sessionId, TaskAgentSt
{
CheckConnection();
var brokerSession = RetryRequest<TaskAgentMessage>(
async () => await _brokerHttpClient.GetRunnerMessageAsync(sessionId, version, status, os, architecture, disableUpdate, cancellationToken), cancellationToken);
async () => await _brokerHttpClient.GetRunnerMessageAsync(sessionId, version, status, os, architecture, disableUpdate, cancellationToken), cancellationToken, shouldRetry: ShouldRetryException);

return brokerSession;
}
Expand All @@ -69,5 +71,25 @@ public async Task DeleteSessionAsync(CancellationToken cancellationToken)
CheckConnection();
await _brokerHttpClient.DeleteSessionAsync(cancellationToken);
}

public Task UpdateConnectionIfNeeded(Uri serverUri, VssCredentials credentials)
{
if (_brokerUri != serverUri || !_hasConnection)
{
return ConnectAsync(serverUri, credentials);
}

return Task.CompletedTask;
}

public bool ShouldRetryException(Exception ex)
{
if (ex is AccessDeniedException ade && ade.ErrorCode == 1)
{
return false;
}

return true;
}
}
}
5 changes: 3 additions & 2 deletions src/Runner.Listener/MessageListener.cs
Original file line number Diff line number Diff line change
Expand Up @@ -109,7 +109,8 @@ public async Task<Boolean> CreateSessionAsync(CancellationToken token)
if (_session.BrokerMigrationMessage != null)
{
Trace.Info("Runner session is in migration mode: Creating Broker session with BrokerBaseUrl: {0}", _session.BrokerMigrationMessage.BrokerBaseUrl);
await _brokerServer.ConnectAsync(_session.BrokerMigrationMessage.BrokerBaseUrl, _creds);

await _brokerServer.UpdateConnectionIfNeeded(_session.BrokerMigrationMessage.BrokerBaseUrl, _creds);
_session = await _brokerServer.CreateSessionAsync(taskAgentSession, token);
_isBrokerSession = true;
}
Expand Down Expand Up @@ -256,7 +257,7 @@ public async Task<TaskAgentMessage> GetNextMessageAsync(CancellationToken token)

var migrationMessage = JsonUtility.FromString<BrokerMigrationMessage>(message.Body);

await _brokerServer.ConnectAsync(migrationMessage.BrokerBaseUrl, _creds);
await _brokerServer.UpdateConnectionIfNeeded(migrationMessage.BrokerBaseUrl, _creds);
message = await _brokerServer.GetRunnerMessageAsync(_session.SessionId,
runnerStatus,
BuildConstants.RunnerPackage.Version,
Expand Down
7 changes: 6 additions & 1 deletion src/Sdk/WebApi/WebApi/BrokerHttpClient.cs
Original file line number Diff line number Diff line change
Expand Up @@ -110,9 +110,14 @@ public async Task<TaskAgentMessage> GetRunnerMessageAsync(
return result.Value;
}

// the only time we throw a `Forbidden` exception from Listener /messages is when the runner is
// disable_update and is too old to poll
if (result.StatusCode == HttpStatusCode.Forbidden)
{
throw new AccessDeniedException(result.Error);
throw new AccessDeniedException($"{result.Error} Runner version v{runnerVersion} is deprecated and cannot receive messages.")
{
ErrorCode = 1
};
}

throw new Exception($"Failed to get job message: {result.Error}");
Expand Down

0 comments on commit 3449d5f

Please sign in to comment.