Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix schedule concurrency errors #2277

Merged
merged 1 commit into from
Sep 11, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
45 changes: 43 additions & 2 deletions libs/net/services/Helpers/ApiService.cs
Original file line number Diff line number Diff line change
Expand Up @@ -118,6 +118,41 @@ protected async Task<T> RetryRequestAsync<T>(Func<Task<T>> callbackDelegate)
return await RetryRequestAsync<T>(callbackDelegate);
}
}

/// <summary>
/// Keep trying a request if the failure is caused by an optimistic concurrency error.
/// </summary>
/// <typeparam name="T"></typeparam>
/// <param name="callbackDelegate"></param>
/// <returns></returns>
public async Task<T> HandleConcurrencyAsync<T>(Func<Task<T>> callbackDelegate)
{
// Keep trying to update the record and handle concurrency errors.
while (true)
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Essentially keep trying until successful. It will only error out if the exception is not a optimistic concurrency error.

{
try
{
return await callbackDelegate();
}
catch (HttpClientRequestException ex)
{
// If it's a concurrency error, keep trying. Otherwise throw the error.
this.Logger.LogError(ex, "Failed to complete request. Determining if this is a concurrency error.");
var data = ex.Data["Body"] as string;
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I couldn't find a safe way to determine if the key exists in the dictionary. Regrettably .net uses a IDictionary object that doesn't have those features.

if (!String.IsNullOrWhiteSpace(data))
{
var json = JsonSerializer.Deserialize<API.Models.ErrorResponseModel>(data, _serializerOptions);
if (json != null && json.Type == nameof(Microsoft.EntityFrameworkCore.DbUpdateConcurrencyException))
{
// A concurrency error can be resolved by loading the latest and reapplying the values.
continue;
}
}
// It wasn't a concurrency error, throw as a real failure.
throw;
}
}
}
#endregion

#region Kafka Methods
Expand Down Expand Up @@ -826,13 +861,19 @@ protected async Task<T> RetryRequestAsync<T>(Func<Task<T>> callbackDelegate)

/// <summary>
/// Make a request to the API to update the event schedule for the specified 'model'.
/// The most common issue with this endpoint is concurrency errors. Retrying won't fix that, so use the HandleConcurrency function and set retry = false.
/// </summary>
/// <param name="model"></param>
/// <param name="retry"></param>
/// <returns></returns>
public async Task<API.Areas.Services.Models.EventSchedule.EventScheduleModel?> UpdateEventScheduleAsync(API.Areas.Services.Models.EventSchedule.EventScheduleModel model)
public async Task<API.Areas.Services.Models.EventSchedule.EventScheduleModel?> UpdateEventScheduleAsync(API.Areas.Services.Models.EventSchedule.EventScheduleModel model, bool retry = true)
{
var url = this.Options.ApiUrl.Append($"services/events/schedules/{model.Id}");
return await RetryRequestAsync(async () => await this.OpenClient.PutAsync<API.Areas.Services.Models.EventSchedule.EventScheduleModel?>(url, JsonContent.Create(model)));

if (retry)
return await RetryRequestAsync(async () => await this.OpenClient.PutAsync<API.Areas.Services.Models.EventSchedule.EventScheduleModel?>(url, JsonContent.Create(model)));
else
return await this.OpenClient.PutAsync<API.Areas.Services.Models.EventSchedule.EventScheduleModel?>(url, JsonContent.Create(model));
}
#endregion

Expand Down
13 changes: 11 additions & 2 deletions libs/net/services/Helpers/IApiService.cs
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,15 @@ public interface IApiService
#endregion

#region Helper Methods
public Task<T> HandleRequestFailure<T>(Func<Task<T>> callbackDelegate, bool ignoreError, T defaultResponse);
Task<T> HandleRequestFailure<T>(Func<Task<T>> callbackDelegate, bool ignoreError, T defaultResponse);

/// <summary>
/// Keep trying a request if the failure is caused by an optimistic concurrency error.
/// </summary>
/// <typeparam name="T"></typeparam>
/// <param name="callbackDelegate"></param>
/// <returns></returns>
Task<T> HandleConcurrencyAsync<T>(Func<Task<T>> callbackDelegate);
#endregion

#region Kafka
Expand Down Expand Up @@ -436,8 +444,9 @@ public interface IApiService
/// Make a request to the API to update the event schedule for the specified 'model'.
/// </summary>
/// <param name="model"></param>
/// <param name="retry"></param>
/// <returns></returns>
Task<API.Areas.Services.Models.EventSchedule.EventScheduleModel?> UpdateEventScheduleAsync(API.Areas.Services.Models.EventSchedule.EventScheduleModel model);
Task<API.Areas.Services.Models.EventSchedule.EventScheduleModel?> UpdateEventScheduleAsync(API.Areas.Services.Models.EventSchedule.EventScheduleModel model, bool retry = true);
#endregion

#region AV Overview
Expand Down
11 changes: 7 additions & 4 deletions services/net/event-handler/EventHandlerManager.cs
Original file line number Diff line number Diff line change
Expand Up @@ -244,10 +244,13 @@ private async Task ProcessEventHandlerAsync(ConsumeResult<string, EventScheduleR
await this.Api.RemoveContentFromFolder(eventSchedule.FolderId.Value);
this.Logger.LogInformation("Event schedule cleaned folder. Key: {key}, Event ID: {eventId}", result.Message.Key, request.EventScheduleId);

// Need to fetch the latest because it could have been updated recently.
eventSchedule = await this.Api.GetEventScheduleAsync(request.EventScheduleId) ?? throw new NoContentException("Event schedule no longer exists");
eventSchedule.LastRanOn = DateTime.UtcNow;
await this.Api.UpdateEventScheduleAsync(eventSchedule);
await this.Api.HandleConcurrencyAsync<API.Areas.Services.Models.EventSchedule.EventScheduleModel?>(async () =>
{
// Need to fetch the latest because it could have been updated recently.
eventSchedule = await this.Api.GetEventScheduleAsync(request.EventScheduleId) ?? throw new NoContentException($"Event schedule {eventSchedule.Id}:{eventSchedule.Name} does not exist.");
eventSchedule.LastRanOn = DateTime.UtcNow;
return await this.Api.UpdateEventScheduleAsync(eventSchedule, false);
});
}
else
{
Expand Down
9 changes: 6 additions & 3 deletions services/net/reporting/ReportingManager.cs
Original file line number Diff line number Diff line change
Expand Up @@ -380,9 +380,12 @@ private async Task ProcessReportAsync(ConsumeResult<string, ReportRequestModel>
// If the request originated from the scheduler service, update the last ran one.
if (request.EventScheduleId.HasValue)
{
var scheduledEvent = await this.Api.GetEventScheduleAsync(request.EventScheduleId.Value) ?? throw new NoContentException($"Event schedule '{request.EventScheduleId}' does not exist.");
scheduledEvent.LastRanOn = DateTime.UtcNow;
await this.Api.UpdateEventScheduleAsync(scheduledEvent);
await this.Api.HandleConcurrencyAsync<API.Areas.Services.Models.EventSchedule.EventScheduleModel?>(async () =>
{
var eventSchedule = await this.Api.GetEventScheduleAsync(request.EventScheduleId.Value) ?? throw new NoContentException($"Event schedule {request.EventScheduleId.Value} does not exist.");
eventSchedule.LastRanOn = DateTime.UtcNow;
return await this.Api.UpdateEventScheduleAsync(eventSchedule, false);
});
}
}

Expand Down
8 changes: 6 additions & 2 deletions services/net/scheduler/SchedulerManager.cs
Original file line number Diff line number Diff line change
Expand Up @@ -90,8 +90,12 @@ public override async Task RunAsync()
else
await GenerateEventScheduleRequestAsync(scheduledEvent);

scheduledEvent.RequestSentOn = DateTime.UtcNow;
await this.Api.UpdateEventScheduleAsync(scheduledEvent);
await this.Api.HandleConcurrencyAsync<EventScheduleModel?>(async () =>
{
scheduledEvent = await this.Api.GetEventScheduleAsync(scheduledEvent.Id) ?? throw new NoContentException($"Event schedule {scheduledEvent.Id}:{scheduledEvent.Name} does not exist.");
scheduledEvent.RequestSentOn = DateTime.UtcNow;
return await this.Api.UpdateEventScheduleAsync(scheduledEvent, false);
});
}
}
}
Expand Down
Loading